File tree Expand file tree Collapse file tree 1 file changed +8
-4
lines changed
src/main/scala/com/oreilly/learningsparkexamples/scala Expand file tree Collapse file tree 1 file changed +8
-4
lines changed Original file line number Diff line number Diff line change @@ -27,7 +27,7 @@ object BasicParseJsonWithJackson {
2727 val outputFile = args(2 )
2828 val sc = new SparkContext (master, " BasicParseJsonWithJackson" , System .getenv(" SPARK_HOME" ))
2929 val input = sc.textFile(inputFile)
30-
30+
3131 // Parse it into a specific case class. We use mapPartitions beacuse:
3232 // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
3333 // on the driver and have to send data back to the driver to go through the singleton object.
@@ -44,13 +44,17 @@ object BasicParseJsonWithJackson {
4444 // list with one element if everything is ok (Some(_)).
4545 records.flatMap(record => {
4646 try {
47- Some (mapper.readValue(record, classOf [ioRecord ]))
47+ Some (mapper.readValue(record, classOf [Person ]))
4848 } catch {
4949 case e : Exception => None
5050 }
5151 })
52- }, true )
53- result.filter(_.lovesPandas).map(mapper.writeValueAsString(_))
52+ }, true )
53+ result.filter(_.lovesPandas).mapPartitions(records => {
54+ val mapper = new ObjectMapper with ScalaObjectMapper
55+ mapper.registerModule(DefaultScalaModule )
56+ records.map(mapper.writeValueAsString(_))
57+ })
5458 .saveAsTextFile(outputFile)
5559 }
5660}
You can’t perform that action at this time.
0 commit comments