From 11abbaff60291f85a060e74f76cb05fd9488a646 Mon Sep 17 00:00:00 2001 From: Mahdi Malverdi Date: Sat, 21 Dec 2024 16:40:20 +0330 Subject: [PATCH] Feat/add settings to write (#3) * Support ClickHouse insert settings for table writes (#369) - Enable custom ClickHouse insert settings when writing to tables. - Add support for `spark.clickhouse.write.settings` configuration. - Update documentation to describe usage of write settings. Closes #369 * reformat the codes Closes #369 --------- Co-authored-by: Mahdi Malverdi --- docs/configurations/02_sql_configurations.md | 1 + .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ .../spark/write/ClickHouseWriter.scala | 17 ++++++++++++++++- .../sql/clickhouse/ClickHouseSQLConf.scala | 8 ++++++++ .../spark/sql/clickhouse/SparkOptions.scala | 3 +++ 10 files changed, 82 insertions(+), 3 deletions(-) diff --git a/docs/configurations/02_sql_configurations.md b/docs/configurations/02_sql_configurations.md index 3328cd21..26d066b6 100644 --- a/docs/configurations/02_sql_configurations.md +++ b/docs/configurations/02_sql_configurations.md @@ -38,4 +38,5 @@ spark.clickhouse.write.repartitionNum|0|Repartition data to meet the distributio spark.clickhouse.write.repartitionStrictly|false|If `true`, Spark will strictly distribute incoming records across partitions to satisfy the required distribution before passing the records to the data source table on write. Otherwise, Spark may apply certain optimizations to speed up the query but break the distribution requirement. Note, this configuration requires SPARK-37523(available in Spark 3.4), w/o this patch, it always acts as `true`.|0.3.0 spark.clickhouse.write.retryInterval|10s|The interval in seconds between write retry.|0.1.0 spark.clickhouse.write.retryableErrorCodes|241|The retryable error codes returned by ClickHouse server when write failing.|0.1.0 +spark.clickhouse.write.settings||Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`|0.9.0 diff --git a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 13953a2a..6383c1f1 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -220,12 +220,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.3/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index bedd827c..c3d7d106 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -246,12 +246,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala index 6f9b267b..e0c8a622 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/com/clickhouse/spark/write/ClickHouseWriter.scala @@ -248,12 +248,27 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription) val client = nodeClient(shardNum) val data = serialize() var writeTime = 0L + + val settings = writeJob.writeOptions.settings + .getOrElse("") + .split(",") + .map(_.trim.split("=", 2)) + .collect { case Array(key, value) => key -> value } + .toMap + Utils.retry[Unit, RetryableCHException]( writeJob.writeOptions.maxRetry, writeJob.writeOptions.retryInterval ) { var startWriteTime = System.currentTimeMillis - client.syncInsertOutputJSONEachRow(database, table, format, codec, new ByteArrayInputStream(data)) match { + client.syncInsertOutputJSONEachRow( + database, + table, + format, + codec, + new ByteArrayInputStream(data), + settings + ) match { case Right(_) => writeTime = System.currentTimeMillis - startWriteTime _totalWriteTime.add(writeTime) diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala index 39e2bc4a..a794d56f 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ClickHouseSQLConf.scala @@ -218,4 +218,12 @@ object ClickHouseSQLConf { .transform(_.toLowerCase) .createOptional + val WRITE_SETTINGS: OptionalConfigEntry[String] = + buildConf("spark.clickhouse.write.settings") + .doc("Settings when write into ClickHouse. e.g. `final=1, max_execution_time=5`") + .version("0.9.0") + .stringConf + .transform(_.toLowerCase) + .createOptional + } diff --git a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala index b473d7db..9ceff1eb 100644 --- a/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala +++ b/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SparkOptions.scala @@ -91,4 +91,7 @@ class WriteOptions(_options: JMap[String, String]) extends SparkOptions { def format: String = eval(WRITE_FORMAT.key, WRITE_FORMAT) + + def settings: Option[String] = + eval(WRITE_SETTINGS.key, WRITE_SETTINGS) }