From ccd42f6fb464fa11e2fd5487a41cf4dac1dc0e09 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 24 May 2019 15:11:37 +0200 Subject: [PATCH 01/10] Saving dates as the iso standard --- .../org/splink/cpipe/JsonColumnParser.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala index 2841718..27bfab4 100644 --- a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala +++ b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala @@ -1,5 +1,9 @@ package org.splink.cpipe +import java.time.format.DateTimeFormatter +import java.time.{ZoneId, ZonedDateTime} +import java.util.Date + import com.datastax.driver.core.{DataType, Row} import play.api.libs.json._ @@ -8,10 +12,16 @@ import scala.util.{Failure, Success, Try} object JsonColumnParser { - case class Column(name: String, value: String, typ: DataType) + case class Column(name: String, value: Object, typ: DataType) + + private val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") def column2Json(column: Column) = { - val sanitized = stripControlChars(column.value) + val sanitized: String = column.value match { + case date: Date => dateFormatter.format(ZonedDateTime.ofInstant(date.toInstant, ZoneId.of("UTC"))) + case _ => stripControlChars(column.value.toString) + } + Try(Json.parse(sanitized)) match { case Success(json) => val r = json match { @@ -28,7 +38,7 @@ object JsonColumnParser { def row2Json(row: Row) = row.getColumnDefinitions.iterator.asScala.flatMap { definition => - Try(row.getObject(definition.getName).toString) match { + Try(row.getObject(definition.getName)) match { case Success(value) => column2Json { Column(definition.getName, value, definition.getType) From 9647a0293503edafb6f687745a85b22ceb863dae Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 24 May 2019 15:11:45 +0200 Subject: [PATCH 02/10] Improving load speed --- .../splink/cpipe/processors/Importer.scala | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/processors/Importer.scala b/src/main/scala/org/splink/cpipe/processors/Importer.scala index 1b7de0e..a256c44 100644 --- a/src/main/scala/org/splink/cpipe/processors/Importer.scala +++ b/src/main/scala/org/splink/cpipe/processors/Importer.scala @@ -1,11 +1,17 @@ package org.splink.cpipe.processors +import java.util.concurrent.TimeUnit + import com.datastax.driver.core.Session -import org.splink.cpipe.{JsonFrame, Output, Rps} import org.splink.cpipe.config.Config +import org.splink.cpipe.{JsonFrame, Output, Rps} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} import scala.io.Source + class Importer extends Processor { import org.splink.cpipe.JsonColumnParser._ @@ -14,15 +20,19 @@ class Importer extends Processor { override def process(session: Session, config: Config): Int = { val frame = new JsonFrame() - Source.stdin.getLines().foreach { line => - frame.push(line.toCharArray).foreach { result => - string2Json(result).map { json => - - rps.compute() - if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") - - session.execute(json2Query(json, config.selection.table)) + Source.stdin.getLines().flatMap { line => + frame.push(line.toCharArray) + }.grouped(500).foreach { group: Iterable[String] => + group.map { jsonStr => + Future { + string2Json(jsonStr).map { json => + rps.compute() + if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") + session.execute(json2Query(json, config.selection.table)) + }.get } + }.foreach { future => + Await.ready(future, Duration(10, TimeUnit.SECONDS)).recover{case e: Exception =>println(e)} } } From 9d8e720923a0758dfc7616a525b26e2b6f670b54 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 24 May 2019 17:42:22 +0200 Subject: [PATCH 03/10] Revert "Improving load speed" This reverts commit 9647a029 --- .../splink/cpipe/processors/Importer.scala | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/processors/Importer.scala b/src/main/scala/org/splink/cpipe/processors/Importer.scala index a256c44..1b7de0e 100644 --- a/src/main/scala/org/splink/cpipe/processors/Importer.scala +++ b/src/main/scala/org/splink/cpipe/processors/Importer.scala @@ -1,17 +1,11 @@ package org.splink.cpipe.processors -import java.util.concurrent.TimeUnit - import com.datastax.driver.core.Session -import org.splink.cpipe.config.Config import org.splink.cpipe.{JsonFrame, Output, Rps} +import org.splink.cpipe.config.Config -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} import scala.io.Source - class Importer extends Processor { import org.splink.cpipe.JsonColumnParser._ @@ -20,19 +14,15 @@ class Importer extends Processor { override def process(session: Session, config: Config): Int = { val frame = new JsonFrame() - Source.stdin.getLines().flatMap { line => - frame.push(line.toCharArray) - }.grouped(500).foreach { group: Iterable[String] => - group.map { jsonStr => - Future { - string2Json(jsonStr).map { json => - rps.compute() - if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") - session.execute(json2Query(json, config.selection.table)) - }.get + Source.stdin.getLines().foreach { line => + frame.push(line.toCharArray).foreach { result => + string2Json(result).map { json => + + rps.compute() + if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") + + session.execute(json2Query(json, config.selection.table)) } - }.foreach { future => - Await.ready(future, Duration(10, TimeUnit.SECONDS)).recover{case e: Exception =>println(e)} } } From fd3dff9cbeceda721cd37d07594377ac408a49a5 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 24 May 2019 17:48:05 +0200 Subject: [PATCH 04/10] Added importer that uses batches of prepared statements. --- src/main/scala/org/splink/cpipe/CPipe.scala | 7 ++- .../org/splink/cpipe/JsonColumnParser.scala | 43 ++++++++++++++++--- .../org/splink/cpipe/config/Arguments.scala | 5 ++- .../splink/cpipe/processors/Importer2.scala | 38 ++++++++++++++++ 4 files changed, 84 insertions(+), 9 deletions(-) create mode 100644 src/main/scala/org/splink/cpipe/processors/Importer2.scala diff --git a/src/main/scala/org/splink/cpipe/CPipe.scala b/src/main/scala/org/splink/cpipe/CPipe.scala index ac3d654..0149ed2 100644 --- a/src/main/scala/org/splink/cpipe/CPipe.scala +++ b/src/main/scala/org/splink/cpipe/CPipe.scala @@ -1,7 +1,8 @@ package org.splink.cpipe -import org.splink.cpipe.processors.{Exporter, Exporter2, Importer} -import org.splink.cpipe.config.{Config, Arguments} +import org.splink.cpipe.processors.{Exporter, Exporter2, Importer, Importer2} +import org.splink.cpipe.config.{Arguments, Config} + import scala.language.implicitConversions import scala.util.{Failure, Success, Try} @@ -23,6 +24,8 @@ object CPipe { config.mode match { case "import" => new Importer().process(session, config) + case "import2" => + new Importer2().process(session, config) case "export" => new Exporter().process(session, config) case "export2" => diff --git a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala index 27bfab4..b573976 100644 --- a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala +++ b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala @@ -1,10 +1,9 @@ package org.splink.cpipe -import java.time.format.DateTimeFormatter -import java.time.{ZoneId, ZonedDateTime} +import java.lang.{Double, Boolean} import java.util.Date -import com.datastax.driver.core.{DataType, Row} +import com.datastax.driver.core.{BatchStatement, DataType, PreparedStatement, Row, Session} import play.api.libs.json._ import scala.collection.JavaConverters._ @@ -14,11 +13,11 @@ object JsonColumnParser { case class Column(name: String, value: Object, typ: DataType) - private val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + private val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") def column2Json(column: Column) = { val sanitized: String = column.value match { - case date: Date => dateFormatter.format(ZonedDateTime.ofInstant(date.toInstant, ZoneId.of("UTC"))) + case date: Date => dateFormat.format(date) case _ => stripControlChars(column.value.toString) } @@ -83,6 +82,40 @@ object JsonColumnParser { } + def json2PreparedStatement(table: String, json: JsObject, session: Session): PreparedStatement = { + val str = s"INSERT INTO $table ( ${json.fields.map(_._1).mkString(", ")} ) VALUES ( ${json.fields.map(_ => "?").mkString(", ")} );" + session.prepare(str) + } + + def getStringToObjectMappingForTable(session: Session, table: String): Map[String, String => Object] = { + val queryResult = session.execute(s"select * from $table limit 1") + queryResult.getColumnDefinitions.asScala.map{ + definition => definition.getName -> getStringToObjectConversionMethod(definition.getType) + }.toMap + } + + def getStringToObjectConversionMethod(dataType: DataType): String => Object = (s: String) => { + dataType.getName match { + case DataType.Name.DATE => dateFormat.parse(s) + case DataType.Name.TIMESTAMP => dateFormat.parse(s) + case DataType.Name.DOUBLE => new Double(s.toDouble) + case DataType.Name.INT => new Integer(s.toInt) + case DataType.Name.VARCHAR => s + case DataType.Name.BOOLEAN => new Boolean(s == "true") + case _ => throw new IllegalArgumentException(s"Please add a mapping for the '${dataType.getName}' type") + } + } + + def jsValueToScalaObject(name: String, jsValue: JsValue, objectMapping: Map[String, String => Object]) : Object = { + val v = jsValue.toString.stripPrefix("\"").stripSuffix("\"") + objectMapping.get(name).getOrElse(throw new IllegalArgumentException(s"$name was not found in the map $objectMapping"))(v) + } + + def addJsonToBatch(json: JsObject, preparedStatement: PreparedStatement, batch: BatchStatement, objectMapping: Map[String, String => Object]): Unit = { + val values = json.fields.map { v => jsValueToScalaObject(v._1, v._2, objectMapping) } + batch.add(preparedStatement.bind(values : _*)) + } + import java.util.regex.Pattern val pattern = Pattern.compile("[\\u0000-\\u001f]") diff --git a/src/main/scala/org/splink/cpipe/config/Arguments.scala b/src/main/scala/org/splink/cpipe/config/Arguments.scala index ce94919..10b8542 100644 --- a/src/main/scala/org/splink/cpipe/config/Arguments.scala +++ b/src/main/scala/org/splink/cpipe/config/Arguments.scala @@ -65,8 +65,9 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) { val compression = choice(Seq("ON", "OFF"), default = Some("ON"), descr = "Use LZ4 compression and trade reduced network traffic for CPU cycles. Defaults to ON") - val mode = choice(choices = Seq("import", "export", "export2"), required = true, + val mode = choice(choices = Seq("import", "import2", "export", "export2"), required = true, descr = "Select the mode. Choose mode 'import' to import data. " + + "Choose mode 'import2' to import data with a prepared statement (faster, but only for tables with fixed columns); " + "Choose mode 'export' to export data (optional with a filter); " + "Choose mode 'export2' to export data using token ranges to increase performance and reduce load on the cluster. " + "'export2' mode cannot be combined with a filter and it requires that the cluster uses Murmur3Partitioner. " + @@ -79,4 +80,4 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) { } verify() -} \ No newline at end of file +} diff --git a/src/main/scala/org/splink/cpipe/processors/Importer2.scala b/src/main/scala/org/splink/cpipe/processors/Importer2.scala new file mode 100644 index 0000000..889095c --- /dev/null +++ b/src/main/scala/org/splink/cpipe/processors/Importer2.scala @@ -0,0 +1,38 @@ +package org.splink.cpipe.processors + +import com.datastax.driver.core.{BatchStatement, PreparedStatement, Session} +import org.splink.cpipe.config.Config +import org.splink.cpipe.{JsonFrame, Output, Rps} + +import scala.io.Source + +class Importer2 extends Processor { + + import org.splink.cpipe.JsonColumnParser._ + + val rps = new Rps() + + override def process(session: Session, config: Config): Int = { + val frame = new JsonFrame() + var statement: PreparedStatement = null + val dataTypeMapping = getStringToObjectMappingForTable(session, config.selection.table) + + Source.stdin.getLines().flatMap { line => + frame.push(line.toCharArray) + }.grouped(500).foreach { group => + val batch = new BatchStatement + group.foreach { str => + string2Json(str).foreach { json => + if (statement == null) { + statement = json2PreparedStatement(config.selection.table, json, session) + } + addJsonToBatch(json, statement, batch, dataTypeMapping) + rps.compute() + } + } + if (config.flags.showProgress) Output.update(s"${rps.count} rows at $rps rows/sec.") + session.execute(batch) + } + rps.count + } +} From 4d8efe877b24898ccf8fabc9180c536b6cbfbe50 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Mon, 27 May 2019 10:31:12 +0200 Subject: [PATCH 05/10] Added --batch-size argument --- src/main/scala/org/splink/cpipe/config/Arguments.scala | 3 +++ src/main/scala/org/splink/cpipe/config/Config.scala | 5 +++-- src/main/scala/org/splink/cpipe/processors/Importer2.scala | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/config/Arguments.scala b/src/main/scala/org/splink/cpipe/config/Arguments.scala index 10b8542..7e82f15 100644 --- a/src/main/scala/org/splink/cpipe/config/Arguments.scala +++ b/src/main/scala/org/splink/cpipe/config/Arguments.scala @@ -54,6 +54,9 @@ class Arguments(arguments: Seq[String]) extends ScallopConf(arguments) { val fetchSize = opt[Int](default = Some(5000), descr = "The amount of rows which is retrieved simultaneously. Defaults to 5000.") + val batchSize = opt[Int](default = Some(500), + descr = "The amount of rows which is saved simultaneously when using mode import2. Defaults to 500.") + val threads = opt[Int](default = Some(32), descr = "The amount of parallelism used in export2 mode. Defaults to 32 parallel requests.") diff --git a/src/main/scala/org/splink/cpipe/config/Config.scala b/src/main/scala/org/splink/cpipe/config/Config.scala index 9778b66..df99f37 100644 --- a/src/main/scala/org/splink/cpipe/config/Config.scala +++ b/src/main/scala/org/splink/cpipe/config/Config.scala @@ -18,6 +18,7 @@ case object Config { verbose <- args.verbose.toOption threads <- args.threads.toOption fetchSize <- args.fetchSize.toOption + batchSize <- args.batchSize.toOption useCompression <- args.compression.toOption.map { case c if c == "ON" => true case _ => false @@ -42,7 +43,7 @@ case object Config { Selection(keyspace, table, filter), Credentials(username, password), Flags(!beQuiet, useCompression, verbose), - Settings(fetchSize, consistencyLevel, threads)) + Settings(fetchSize, batchSize, consistencyLevel, threads)) } } @@ -54,4 +55,4 @@ final case class Credentials(username: String, password: String) final case class Flags(showProgress: Boolean, useCompression: Boolean, verbose: Boolean) -final case class Settings(fetchSize: Int, consistencyLevel: ConsistencyLevel, threads: Int) +final case class Settings(fetchSize: Int, batchSize: Int, consistencyLevel: ConsistencyLevel, threads: Int) diff --git a/src/main/scala/org/splink/cpipe/processors/Importer2.scala b/src/main/scala/org/splink/cpipe/processors/Importer2.scala index 889095c..7695188 100644 --- a/src/main/scala/org/splink/cpipe/processors/Importer2.scala +++ b/src/main/scala/org/splink/cpipe/processors/Importer2.scala @@ -19,7 +19,7 @@ class Importer2 extends Processor { Source.stdin.getLines().flatMap { line => frame.push(line.toCharArray) - }.grouped(500).foreach { group => + }.grouped(config.settings.batchSize).foreach { group => val batch = new BatchStatement group.foreach { str => string2Json(str).foreach { json => From 176b6e3618294765676ced83a2ada6e24d351239 Mon Sep 17 00:00:00 2001 From: Wim Verreydt Date: Tue, 4 Feb 2020 09:26:06 +0100 Subject: [PATCH 06/10] Limit export threads to 1 --- src/main/scala/org/splink/cpipe/CPipe.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/CPipe.scala b/src/main/scala/org/splink/cpipe/CPipe.scala index 0149ed2..d2eb413 100644 --- a/src/main/scala/org/splink/cpipe/CPipe.scala +++ b/src/main/scala/org/splink/cpipe/CPipe.scala @@ -27,10 +27,10 @@ object CPipe { case "import2" => new Importer2().process(session, config) case "export" => - new Exporter().process(session, config) + new Exporter().process(session, exportConfig(config)) case "export2" => if (session.getCluster.getMetadata.getPartitioner == "org.apache.cassandra.dht.Murmur3Partitioner") { - new Exporter2().process(session, config) + new Exporter2().process(session, exportConfig(config)) } else { Output.log("mode 'export2' requires the cluster to use 'Murmur3Partitioner'") } @@ -65,6 +65,16 @@ object CPipe { conf.flags.useCompression) + def exportConfig(config: Config): Config = { + if (config.settings.threads != 1) { + Output.log("Export is limited to 1 thread") + config.copy(settings = config.settings.copy(threads = 1)) + } else { + config + } + } + + object ElapsedSecondFormat { def zero(i: Long) = if (i < 10) s"0$i" else s"$i" From 172470cd620097e8299e5477512fdc46848dc21e Mon Sep 17 00:00:00 2001 From: Wim Verreydt Date: Tue, 4 Feb 2020 09:29:42 +0100 Subject: [PATCH 07/10] Adding parsing logic for null and NaN --- .../org/splink/cpipe/JsonColumnParser.scala | 94 ++++++++++--------- 1 file changed, 52 insertions(+), 42 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala index b573976..750e36b 100644 --- a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala +++ b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala @@ -16,23 +16,29 @@ object JsonColumnParser { private val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") def column2Json(column: Column) = { - val sanitized: String = column.value match { - case date: Date => dateFormat.format(date) - case _ => stripControlChars(column.value.toString) - } + val value = column.value - Try(Json.parse(sanitized)) match { - case Success(json) => - val r = json match { - case o: JsObject => o - case _ => parseCassandraDataType(sanitized, column.typ) + if (value == null) { + Some(JsObject(Map(column.name -> JsNull))) + } else { + val sanitized: String = value match { + case date: Date => dateFormat.format(date) + case _ => stripControlChars(value.toString) } - Some(JsObject(Map(column.name -> r))) + Try(Json.parse(sanitized)) match { + case Success(json) => + val r = json match { + case o: JsObject => o + case _ => parseCassandraDataType(value, sanitized, column.typ) + } - case Failure(_) => - Some(JsObject(Map(column.name -> parseCassandraDataType(sanitized, column.typ)))) - } + Some(JsObject(Map(column.name -> r))) + + case Failure(_) => + Some(JsObject(Map(column.name -> parseCassandraDataType(value, sanitized, column.typ)))) + } + } } def row2Json(row: Row) = @@ -89,7 +95,7 @@ object JsonColumnParser { def getStringToObjectMappingForTable(session: Session, table: String): Map[String, String => Object] = { val queryResult = session.execute(s"select * from $table limit 1") - queryResult.getColumnDefinitions.asScala.map{ + queryResult.getColumnDefinitions.asScala.map { definition => definition.getName -> getStringToObjectConversionMethod(definition.getType) }.toMap } @@ -123,32 +129,36 @@ object JsonColumnParser { def stripControlChars(s: String) = pattern.matcher(s).replaceAll("") - def parseCassandraDataType(a: String, dt: DataType) = - dt.getName match { - case DataType.Name.ASCII => JsString(a) - case DataType.Name.BLOB => JsString(a) - case DataType.Name.DATE => JsString(a) - case DataType.Name.INET => JsString(a) - case DataType.Name.TEXT => JsString(a) - case DataType.Name.TIME => JsString(a) - case DataType.Name.TIMESTAMP => JsString(a) - case DataType.Name.TIMEUUID => JsString(a) - case DataType.Name.UUID => JsString(a) - case DataType.Name.VARCHAR => JsString(a) - case DataType.Name.BOOLEAN => JsBoolean(a == "true") - case DataType.Name.BIGINT => JsNumber(BigDecimal(a)) - case DataType.Name.DECIMAL => JsNumber(BigDecimal(a)) - case DataType.Name.DOUBLE => JsNumber(BigDecimal(a)) - case DataType.Name.FLOAT => JsNumber(BigDecimal(a)) - case DataType.Name.INT => JsNumber(BigDecimal(a)) - case DataType.Name.SMALLINT => JsNumber(BigDecimal(a)) - case DataType.Name.TINYINT => JsNumber(BigDecimal(a)) - case DataType.Name.VARINT => JsNumber(BigDecimal(a)) - case DataType.Name.LIST => Json.parse(a) - case DataType.Name.MAP => Json.parse(a) - case DataType.Name.SET => Json.parse(a) - case DataType.Name.TUPLE => Json.parse(a) - case DataType.Name.UDT => Json.parse(a) - case _ => Json.parse(a) - } + def parseCassandraDataType(v: Object, a: String, dt: DataType) = { + dt.getName match { + case DataType.Name.ASCII => JsString(a) + case DataType.Name.BLOB => JsString(a) + case DataType.Name.DATE => JsString(a) + case DataType.Name.INET => JsString(a) + case DataType.Name.TEXT => JsString(a) + case DataType.Name.TIME => JsString(a) + case DataType.Name.TIMESTAMP => JsString(a) + case DataType.Name.TIMEUUID => JsString(a) + case DataType.Name.UUID => JsString(a) + case DataType.Name.VARCHAR => JsString(a) + case DataType.Name.BOOLEAN => JsBoolean(a == "true") + case DataType.Name.BIGINT => JsNumber(BigDecimal(a)) + case DataType.Name.DECIMAL => JsNumber(BigDecimal(a)) + case DataType.Name.DOUBLE => v match { + case d: Double if Double.isNaN(d) => JsNull + case _ => JsNumber(BigDecimal(a)) + } + case DataType.Name.FLOAT => JsNumber(BigDecimal(a)) + case DataType.Name.INT => JsNumber(BigDecimal(a)) + case DataType.Name.SMALLINT => JsNumber(BigDecimal(a)) + case DataType.Name.TINYINT => JsNumber(BigDecimal(a)) + case DataType.Name.VARINT => JsNumber(BigDecimal(a)) + case DataType.Name.LIST => Json.parse(a) + case DataType.Name.MAP => Json.parse(a) + case DataType.Name.SET => Json.parse(a) + case DataType.Name.TUPLE => Json.parse(a) + case DataType.Name.UDT => Json.parse(a) + case _ => Json.parse(a) + } + } } From 91bdcc50eb003183f694813034a6abc7613e7f2a Mon Sep 17 00:00:00 2001 From: Wim Verreydt Date: Tue, 4 Feb 2020 09:30:18 +0100 Subject: [PATCH 08/10] Make SimpleDateFormat threadsafe --- src/main/scala/org/splink/cpipe/JsonColumnParser.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala index 750e36b..4f73d77 100644 --- a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala +++ b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala @@ -13,7 +13,15 @@ object JsonColumnParser { case class Column(name: String, value: Object, typ: DataType) - private val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") + // SimpleDateFormat is not thread safe + private val tlDateFormat = new ThreadLocal[java.text.SimpleDateFormat] + + private def dateFormat = { + if (tlDateFormat.get() == null) { + tlDateFormat.set(new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) + } + tlDateFormat.get() + } def column2Json(column: Column) = { val value = column.value From 7294170790ec72a4c4deb223f7d524a1ea62f671 Mon Sep 17 00:00:00 2001 From: Wim Verreydt Date: Thu, 20 Feb 2020 10:38:22 +0100 Subject: [PATCH 09/10] Support null during data import --- .../org/splink/cpipe/JsonColumnParser.scala | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala index 4f73d77..05f2833 100644 --- a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala +++ b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala @@ -110,12 +110,12 @@ object JsonColumnParser { def getStringToObjectConversionMethod(dataType: DataType): String => Object = (s: String) => { dataType.getName match { - case DataType.Name.DATE => dateFormat.parse(s) - case DataType.Name.TIMESTAMP => dateFormat.parse(s) - case DataType.Name.DOUBLE => new Double(s.toDouble) - case DataType.Name.INT => new Integer(s.toInt) - case DataType.Name.VARCHAR => s - case DataType.Name.BOOLEAN => new Boolean(s == "true") + case DataType.Name.DATE => nullOr(dateFormat.parse)(s) + case DataType.Name.TIMESTAMP => nullOr(dateFormat.parse)(s) + case DataType.Name.DOUBLE => nullOr{x: String => new Double(x.toDouble)}(s) + case DataType.Name.INT => nullOr{x: String => new Integer(x.toInt)}(s) + case DataType.Name.VARCHAR => nullOr(identity)(s) + case DataType.Name.BOOLEAN =>nullOr{x:String => new Boolean(x == "true")}(s) case _ => throw new IllegalArgumentException(s"Please add a mapping for the '${dataType.getName}' type") } } @@ -130,6 +130,15 @@ object JsonColumnParser { batch.add(preparedStatement.bind(values : _*)) } + def nullOr(parser: String => Object): String => Object = (s: String) => { + if (s.equals("null")) { + null + } else { + parser(s) + } + } + + import java.util.regex.Pattern val pattern = Pattern.compile("[\\u0000-\\u001f]") From 81f68716bf1021ebea7e3c5dcd22699febe7e442 Mon Sep 17 00:00:00 2001 From: Jan De Bleser Date: Fri, 22 May 2020 18:36:27 +0200 Subject: [PATCH 10/10] Added support for smallint columns --- src/main/scala/org/splink/cpipe/JsonColumnParser.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala index b573976..5cefe80 100644 --- a/src/main/scala/org/splink/cpipe/JsonColumnParser.scala +++ b/src/main/scala/org/splink/cpipe/JsonColumnParser.scala @@ -1,6 +1,6 @@ package org.splink.cpipe -import java.lang.{Double, Boolean} +import java.lang.{Boolean, Double, Short} import java.util.Date import com.datastax.driver.core.{BatchStatement, DataType, PreparedStatement, Row, Session} @@ -102,6 +102,7 @@ object JsonColumnParser { case DataType.Name.INT => new Integer(s.toInt) case DataType.Name.VARCHAR => s case DataType.Name.BOOLEAN => new Boolean(s == "true") + case DataType.Name.SMALLINT => new Short(s.toShort) case _ => throw new IllegalArgumentException(s"Please add a mapping for the '${dataType.getName}' type") } }