From 4e61420a40666e8fafe9e7dba6a9f3c3a5cb39ec Mon Sep 17 00:00:00 2001 From: Xinyuan Yang Date: Wed, 17 May 2023 14:54:38 +0800 Subject: [PATCH 1/3] Spark 3.4: Support distribute by any predefined transform --- .../xenon/clickhouse/spec/NodeSpec.scala | 2 + .../ClusterShardByTransformSuite.scala | 117 ++++++++++++++++++ .../spark/sql/clickhouse/ExprUtils.scala | 5 +- .../scala/xenon/clickhouse/func/Days.scala | 52 ++++++++ .../clickhouse/func/FunctionRegistry.scala | 7 +- .../scala/xenon/clickhouse/func/Hours.scala | 51 ++++++++ .../scala/xenon/clickhouse/func/Mod.scala | 63 ++++++++++ .../scala/xenon/clickhouse/func/Months.scala | 52 ++++++++ .../scala/xenon/clickhouse/func/Years.scala | 52 ++++++++ .../write/WriteJobDescription.scala | 10 +- 10 files changed, 407 insertions(+), 4 deletions(-) create mode 100644 spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala create mode 100644 spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala diff --git a/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala b/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala index 454312df..eb809169 100644 --- a/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala +++ b/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala @@ -97,4 +97,6 @@ case class ClusterSpec( override def toString: String = s"cluster: $name, shards: [${shards.mkString(", ")}]" @JsonIgnore @transient override lazy val nodes: Array[NodeSpec] = shards.sorted.flatMap(_.nodes) + + def totalWeight: Int = shards.map(_.weight).sum } diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala new file mode 100644 index 00000000..e02dad11 --- /dev/null +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala @@ -0,0 +1,117 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.clickhouse.cluster + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row + +class ClusterShardByTransformSuite extends SparkClickHouseClusterTest { + override protected def sparkConf: SparkConf = { + val _conf = super.sparkConf + .set("spark.clickhouse.write.distributed.convertLocal", "true") + _conf + } + + def runTest(func_name: String, func_args: Array[String]): Unit = { + val func_expr = s"$func_name(${func_args.mkString(",")})" + val cluster = "single_replica" + val db = s"db_${func_name}_shard_transform" + val tbl_dist = s"tbl_${func_name}_shard" + val tbl_local = s"${tbl_dist}_local" + + try { + runClickHouseSQL(s"CREATE DATABASE IF NOT EXISTS $db ON CLUSTER $cluster") + + spark.sql( + s"""CREATE TABLE $db.$tbl_local ( + | create_time TIMESTAMP NOT NULL, + | create_date DATE NOT NULL, + | value STRING NOT NULL + |) USING ClickHouse + |TBLPROPERTIES ( + | cluster = '$cluster', + | engine = 'MergeTree()', + | order_by = 'create_time' + |) + |""".stripMargin + ) + + runClickHouseSQL( + s"""CREATE TABLE $db.$tbl_dist ON CLUSTER $cluster + |AS $db.$tbl_local + |ENGINE = Distributed($cluster, '$db', '$tbl_local', $func_expr) + |""".stripMargin + ) + spark.sql( + s"""INSERT INTO `$db`.`$tbl_dist` + |VALUES + | (timestamp'2021-01-01 10:10:10', date'2021-01-01', '1'), + | (timestamp'2022-02-02 11:10:10', date'2022-02-02', '2'), + | (timestamp'2023-03-03 12:10:10', date'2023-03-03', '3'), + | (timestamp'2024-04-04 13:10:10', date'2024-04-04', '4') + | AS tab(create_time, create_date, value) + |""".stripMargin + ) + // check that data is indeed written + checkAnswer( + spark.table(s"$db.$tbl_dist").select("value").orderBy("create_time"), + Seq(Row("1"), Row("2"), Row("3"), Row("4")) + ) + + // check same data is sharded in the same server comparing native sharding + runClickHouseSQL( + s"""INSERT INTO `$db`.`$tbl_dist` + |VALUES + | (timestamp'2021-01-01 10:10:10', date'2021-01-01', '1'), + | (timestamp'2022-02-02 11:10:10', date'2022-02-02', '2'), + | (timestamp'2023-03-03 12:10:10', date'2023-03-03', '3'), + | (timestamp'2024-04-04 13:10:10', date'2024-04-04', '4') + |""".stripMargin + ) + checkAnswer( + spark.table(s"$db.$tbl_local") + .groupBy("value").count().filter("count != 2"), + Seq.empty + ) + + } finally { + runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl_dist ON CLUSTER $cluster") + runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl_local ON CLUSTER $cluster") + runClickHouseSQL(s"DROP DATABASE IF EXISTS $db ON CLUSTER $cluster") + } + } + + Seq( + // wait for SPARK-44180 to be fixed, then add implicit cast test cases + ("toYear", Array("create_date")), +// ("toYear", Array("create_time")), + ("toYYYYMM", Array("create_date")), +// ("toYYYYMM", Array("create_time")), + ("toYYYYMMDD", Array("create_date")), +// ("toYYYYMMDD", Array("create_time")), + ("toHour", Array("create_time")), + ("xxHash64", Array("value")), + ("murmurHash2_64", Array("value")), + ("murmurHash2_32", Array("value")), + ("murmurHash3_64", Array("value")), + ("murmurHash3_32", Array("value")), + ("cityHash64", Array("value")), + ("modulo", Array("toYYYYMM(create_date)", "10")) + ).foreach { + case (func_name: String, func_args: Array[String]) => + test(s"shard by $func_name(${func_args.mkString(",")})")(runTest(func_name, func_args)) + } + +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala index 3aaa3a69..ed6a5cb3 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala @@ -58,7 +58,7 @@ object ExprUtils extends SQLConfHelper with Serializable { functionRegistry: FunctionRegistry ): Array[V2SortOrder] = toSparkSplits( - shardingKeyIgnoreRand, + shardingKeyIgnoreRand.map(k => ExprUtils.toSplitWithModulo(k, cluster.get.totalWeight)), partitionKey, functionRegistry ).map(Expressions.sort(_, SortDirection.ASCENDING)) ++: @@ -211,4 +211,7 @@ object ExprUtils extends SQLConfHelper with Serializable { case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket") case other: Transform => throw CHClientException(s"Unsupported transform: $other") } + + def toSplitWithModulo(shardingKey: Expr, weight: Int): FuncExpr = + FuncExpr("modulo", List(shardingKey, StringLiteral(weight.toString))) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala new file mode 100644 index 00000000..3008d7fd --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +object Days extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_days" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("toYYYYMMDD") + + override def description: String = s"$name: (date: Date) => shard_num: int" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(StructField(_, DateType, _, _)) => this +// case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") + } + + override def inputTypes: Array[DataType] = Array(DateType) + + override def resultType: DataType = IntegerType + + override def isResultNullable: Boolean = false + + def invoke(days: Int): Int = { + val date = LocalDate.ofEpochDay(days) + val formatter = DateTimeFormatter.ofPattern("yyyyMMdd") + date.format(formatter).toInt + } +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala index 8faf6564..dd74635d 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala @@ -53,7 +53,12 @@ object StaticFunctionRegistry extends FunctionRegistry { "clickhouse_murmurHash2_64" -> MurmurHash2_64, "clickhouse_murmurHash3_32" -> MurmurHash3_32, "clickhouse_murmurHash3_64" -> MurmurHash3_64, - "clickhouse_cityHash64" -> CityHash64 + "clickhouse_cityHash64" -> CityHash64, + "clickhouse_years" -> Years, + "clickhouse_months" -> Months, + "clickhouse_days" -> Days, + "clickhouse_hours" -> Hours, + "sharding_mod" -> Mod ) override def list: Array[String] = functions.keys.toArray diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala new file mode 100644 index 00000000..e88907be --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +import java.sql.Timestamp +import java.text.SimpleDateFormat + +object Hours extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_hours" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("toHour", "HOUR") + + override def description: String = s"$name: (time: timestamp) => shard_num: int" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + case _ => throw new UnsupportedOperationException(s"Expect 1 TIMESTAMP argument. $description") + } + + override def inputTypes: Array[DataType] = Array(TimestampType) + + override def resultType: DataType = IntegerType + + override def isResultNullable: Boolean = false + + def invoke(time: Long): Int = { + val ts = new Timestamp(time / 1000) + val formatter: SimpleDateFormat = new SimpleDateFormat("hh") + formatter.format(ts).toInt + } +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala new file mode 100644 index 00000000..69fdedc9 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +object Mod extends UnboundFunction with ScalarFunction[Long] with ClickhouseEquivFunction { + + override def name: String = "sharding_mod" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + // remainder is not a Clickhouse function, but modulo will be parsed to remainder in the connector. + // Added remainder as a synonym. + override val ckFuncNames: Array[String] = Array("modulo", "remainder") + + override def description: String = s"$name: (a: long, b: long) => mod: long" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(a, b) if + (a match { + case StructField(_, LongType, _, _) => true + case StructField(_, IntegerType, _, _) => true + case StructField(_, ShortType, _, _) => true + case StructField(_, ByteType, _, _) => true + case StructField(_, StringType, _, _) => true + case _ => false + }) && + (b match { + case StructField(_, LongType, _, _) => true + case StructField(_, IntegerType, _, _) => true + case StructField(_, ShortType, _, _) => true + case StructField(_, ByteType, _, _) => true + case StructField(_, StringType, _, _) => true + case _ => false + }) => + this + case _ => throw new UnsupportedOperationException(s"Expect 2 integer arguments. $description") + } + + override def inputTypes: Array[DataType] = Array(LongType, LongType) + + override def resultType: DataType = LongType + + override def isResultNullable: Boolean = false + + def invoke(a: Long, b: Long): Long = a % b +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala new file mode 100644 index 00000000..13e06d88 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +object Months extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_months" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("toYYYYMM") + + override def description: String = s"$name: (date: Date) => shard_num: int" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(StructField(_, DateType, _, _)) => this +// case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") + } + + override def inputTypes: Array[DataType] = Array(DateType) + + override def resultType: DataType = IntegerType + + override def isResultNullable: Boolean = false + + def invoke(days: Int): Int = { + val date = LocalDate.ofEpochDay(days) + val formatter = DateTimeFormatter.ofPattern("yyyyMM") + date.format(formatter).toInt + } +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala new file mode 100644 index 00000000..6bf987fb --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala @@ -0,0 +1,52 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +object Years extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_years" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("toYear", "YEAR") + + override def description: String = s"$name: (date: Date) => shard_num: int" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(StructField(_, DateType, _, _)) => this +// case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") + } + + override def inputTypes: Array[DataType] = Array(DateType) + + override def resultType: DataType = IntegerType + + override def isResultNullable: Boolean = false + + def invoke(days: Int): Int = { + val date = LocalDate.ofEpochDay(days) + val formatter = DateTimeFormatter.ofPattern("yyyy") + date.format(formatter).toInt + } +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/WriteJobDescription.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/WriteJobDescription.scala index 411f08a4..646d6ca5 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/WriteJobDescription.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/WriteJobDescription.scala @@ -62,15 +62,21 @@ case class WriteJobDescription( } def sparkSplits: Array[Transform] = + // Pmod by total weight * constant. Note that this key will be further hashed by spark. Reasons of doing this: + // - Enlarged range of modulo to avoid hash collision of small number of shards, hence mitigate data skew caused + // by this. + // - Still distribute data from one shard to only a subset of executors. If we do not apply modulo (instead we + // need to apply module during sorting in `toSparkSortOrders`), data belongs to shard 1 will be sorted in the + // front for all tasks, resulting in instant high pressure for shard 1 when stage starts. if (writeOptions.repartitionByPartition) { ExprUtils.toSparkSplits( - shardingKeyIgnoreRand, + shardingKeyIgnoreRand.map(k => ExprUtils.toSplitWithModulo(k, cluster.get.totalWeight * 5)), partitionKey, functionRegistry ) } else { ExprUtils.toSparkSplits( - shardingKeyIgnoreRand, + shardingKeyIgnoreRand.map(k => ExprUtils.toSplitWithModulo(k, cluster.get.totalWeight * 5)), None, functionRegistry ) From 5d45838036abdf781840c96fd5a9664078f25b76 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 28 Aug 2023 17:13:22 +0800 Subject: [PATCH 2/3] nit --- .../cluster/ClusterShardByTransformSuite.scala | 15 ++++++--------- .../main/scala/xenon/clickhouse/func/Days.scala | 3 ++- .../main/scala/xenon/clickhouse/func/Months.scala | 3 ++- .../main/scala/xenon/clickhouse/func/Years.scala | 3 ++- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala index e02dad11..21d7da59 100644 --- a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala @@ -18,11 +18,8 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.Row class ClusterShardByTransformSuite extends SparkClickHouseClusterTest { - override protected def sparkConf: SparkConf = { - val _conf = super.sparkConf - .set("spark.clickhouse.write.distributed.convertLocal", "true") - _conf - } + override protected def sparkConf: SparkConf = super.sparkConf + .set("spark.clickhouse.write.distributed.convertLocal", "true") def runTest(func_name: String, func_args: Array[String]): Unit = { val func_expr = s"$func_name(${func_args.mkString(",")})" @@ -94,13 +91,13 @@ class ClusterShardByTransformSuite extends SparkClickHouseClusterTest { } Seq( - // wait for SPARK-44180 to be fixed, then add implicit cast test cases + // TODO timezone aware implicit cast requires SPARK-44180 ("toYear", Array("create_date")), -// ("toYear", Array("create_time")), + // ("toYear", Array("create_time")), ("toYYYYMM", Array("create_date")), -// ("toYYYYMM", Array("create_time")), + // ("toYYYYMM", Array("create_time")), ("toYYYYMMDD", Array("create_date")), -// ("toYYYYMMDD", Array("create_time")), + // ("toYYYYMMDD", Array("create_time")), ("toHour", Array("create_time")), ("xxHash64", Array("value")), ("murmurHash2_64", Array("value")), diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala index 3008d7fd..4a540d3e 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala @@ -34,7 +34,8 @@ object Days extends UnboundFunction with ScalarFunction[Int] with ClickhouseEqui override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, DateType, _, _)) => this -// case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + // TODO timezone aware implicit cast requires SPARK-44180 + // case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala index 13e06d88..d9532cca 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala @@ -34,7 +34,8 @@ object Months extends UnboundFunction with ScalarFunction[Int] with ClickhouseEq override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, DateType, _, _)) => this -// case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + // TODO timezone aware implicit cast requires SPARK-44180 + // case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala index 6bf987fb..4eb3f30e 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala @@ -34,7 +34,8 @@ object Years extends UnboundFunction with ScalarFunction[Int] with ClickhouseEqu override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, DateType, _, _)) => this -// case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + // TODO timezone aware implicit cast requires SPARK-44180 + // case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") } From bc64afb372e2368c88ad9271a744288a7174e66e Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 30 Oct 2023 10:55:04 +0800 Subject: [PATCH 3/3] nit --- .../scala/xenon/clickhouse/func/Days.scala | 4 ++-- .../clickhouse/func/FunctionRegistry.scala | 22 +++++++++---------- .../scala/xenon/clickhouse/func/Hours.scala | 4 ++-- .../scala/xenon/clickhouse/func/Mod.scala | 4 ++-- .../scala/xenon/clickhouse/func/Months.scala | 4 ++-- .../clickhouse/func/MultiStringArgsHash.scala | 2 +- .../xenon/clickhouse/func/XxHash64.scala | 4 ++-- .../scala/xenon/clickhouse/func/Years.scala | 4 ++-- 8 files changed, 24 insertions(+), 24 deletions(-) diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala index 4a540d3e..d2d1ca11 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala @@ -22,7 +22,7 @@ import java.time.format.DateTimeFormatter object Days extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { - override def name: String = "clickhouse_days" + override def name: String = "clickhouse_toYYYYMMDD" override def canonicalName: String = s"clickhouse.$name" @@ -30,7 +30,7 @@ object Days extends UnboundFunction with ScalarFunction[Int] with ClickhouseEqui override val ckFuncNames: Array[String] = Array("toYYYYMMDD") - override def description: String = s"$name: (date: Date) => shard_num: int" + override def description: String = s"$name: (date: DATE) => YYYYMMDD: STRING" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, DateType, _, _)) => this diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala index dd74635d..25f5061c 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala @@ -48,17 +48,17 @@ object StaticFunctionRegistry extends FunctionRegistry { private val functions = Map[String, UnboundFunction]( "ck_xx_hash64" -> ClickHouseXxHash64, // for compatible - "clickhouse_xxHash64" -> ClickHouseXxHash64, - "clickhouse_murmurHash2_32" -> MurmurHash2_32, - "clickhouse_murmurHash2_64" -> MurmurHash2_64, - "clickhouse_murmurHash3_32" -> MurmurHash3_32, - "clickhouse_murmurHash3_64" -> MurmurHash3_64, - "clickhouse_cityHash64" -> CityHash64, - "clickhouse_years" -> Years, - "clickhouse_months" -> Months, - "clickhouse_days" -> Days, - "clickhouse_hours" -> Hours, - "sharding_mod" -> Mod + ClickHouseXxHash64.name -> ClickHouseXxHash64, + MurmurHash2_32.name -> MurmurHash2_32, + MurmurHash2_64.name -> MurmurHash2_64, + MurmurHash3_32.name -> MurmurHash3_32, + MurmurHash3_64.name -> MurmurHash3_64, + CityHash64.name -> CityHash64, + Years.name -> Years, + Months.name -> Months, + Days.name -> Days, + Hours.name -> Hours, + Mod.name -> Mod ) override def list: Array[String] = functions.keys.toArray diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala index e88907be..8d7bf1ed 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala @@ -22,7 +22,7 @@ import java.text.SimpleDateFormat object Hours extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { - override def name: String = "clickhouse_hours" + override def name: String = "clickhouse_toHour" override def canonicalName: String = s"clickhouse.$name" @@ -30,7 +30,7 @@ object Hours extends UnboundFunction with ScalarFunction[Int] with ClickhouseEqu override val ckFuncNames: Array[String] = Array("toHour", "HOUR") - override def description: String = s"$name: (time: timestamp) => shard_num: int" + override def description: String = s"$name: (time: TIMESTAMP) => HH: INT" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala index 69fdedc9..80fc22c7 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala @@ -19,7 +19,7 @@ import org.apache.spark.sql.types._ object Mod extends UnboundFunction with ScalarFunction[Long] with ClickhouseEquivFunction { - override def name: String = "sharding_mod" + override def name: String = "clickhouse_modulo" override def canonicalName: String = s"clickhouse.$name" @@ -29,7 +29,7 @@ object Mod extends UnboundFunction with ScalarFunction[Long] with ClickhouseEqui // Added remainder as a synonym. override val ckFuncNames: Array[String] = Array("modulo", "remainder") - override def description: String = s"$name: (a: long, b: long) => mod: long" + override def description: String = s"$name: (a: LONG, b: LONG) => a % b: LONG" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(a, b) if diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala index d9532cca..96dc0578 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala @@ -22,7 +22,7 @@ import java.time.format.DateTimeFormatter object Months extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { - override def name: String = "clickhouse_months" + override def name: String = "clickhouse_toYYYYMM" override def canonicalName: String = s"clickhouse.$name" @@ -30,7 +30,7 @@ object Months extends UnboundFunction with ScalarFunction[Int] with ClickhouseEq override val ckFuncNames: Array[String] = Array("toYYYYMM") - override def description: String = s"$name: (date: Date) => shard_num: int" + override def description: String = s"$name: (date: DATE) => YYYYMM: INT" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, DateType, _, _)) => this diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala index c99bf247..5665a253 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala @@ -27,7 +27,7 @@ abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivF override val ckFuncNames: Array[String] - override def description: String = s"$name: (value: string, ...) => hash_value: long" + override def description: String = s"$name: (value: STRING, ...) => hash_value: LONG" private def isExceptedType(dt: DataType): Boolean = dt.isInstanceOf[StringType] diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala index 3c4a5b1a..fc2f4951 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala @@ -36,7 +36,7 @@ object ClickHouseXxHash64 extends UnboundFunction with ScalarFunction[Long] with override val ckFuncNames: Array[String] = Array("xxHash64") - override def description: String = s"$name: (value: string) => hash_value: long" + override def description: String = s"$name: (value: STRING) => hash_value: LONG" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, StringType, _, _)) => this @@ -75,7 +75,7 @@ class ClickHouseXxHash64Shard(clusters: Seq[ClusterSpec]) extends UnboundFunctio override def canonicalName: String = s"clickhouse.$name" - override def description: String = s"$name: (cluster_name: string, value: string) => shard_num: int" + override def description: String = s"$name: (cluster_name: STRING, value: STRING) => shard_num: INT" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, StringType, _, _), StructField(_, StringType, _, _)) => this diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala index 4eb3f30e..d7d198e8 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala @@ -22,7 +22,7 @@ import java.time.format.DateTimeFormatter object Years extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { - override def name: String = "clickhouse_years" + override def name: String = "clickhouse_toYear" override def canonicalName: String = s"clickhouse.$name" @@ -30,7 +30,7 @@ object Years extends UnboundFunction with ScalarFunction[Int] with ClickhouseEqu override val ckFuncNames: Array[String] = Array("toYear", "YEAR") - override def description: String = s"$name: (date: Date) => shard_num: int" + override def description: String = s"$name: (date: DATE) => YYYY: INT" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, DateType, _, _)) => this