diff --git a/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala b/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala index 454312df..eb809169 100644 --- a/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala +++ b/clickhouse-core/src/main/scala/xenon/clickhouse/spec/NodeSpec.scala @@ -97,4 +97,6 @@ case class ClusterSpec( override def toString: String = s"cluster: $name, shards: [${shards.mkString(", ")}]" @JsonIgnore @transient override lazy val nodes: Array[NodeSpec] = shards.sorted.flatMap(_.nodes) + + def totalWeight: Int = shards.map(_.weight).sum } diff --git a/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala new file mode 100644 index 00000000..21d7da59 --- /dev/null +++ b/spark-3.4/clickhouse-spark-it/src/test/scala/org/apache/spark/sql/clickhouse/cluster/ClusterShardByTransformSuite.scala @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.clickhouse.cluster + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row + +class ClusterShardByTransformSuite extends SparkClickHouseClusterTest { + override protected def sparkConf: SparkConf = super.sparkConf + .set("spark.clickhouse.write.distributed.convertLocal", "true") + + def runTest(func_name: String, func_args: Array[String]): Unit = { + val func_expr = s"$func_name(${func_args.mkString(",")})" + val cluster = "single_replica" + val db = s"db_${func_name}_shard_transform" + val tbl_dist = s"tbl_${func_name}_shard" + val tbl_local = s"${tbl_dist}_local" + + try { + runClickHouseSQL(s"CREATE DATABASE IF NOT EXISTS $db ON CLUSTER $cluster") + + spark.sql( + s"""CREATE TABLE $db.$tbl_local ( + | create_time TIMESTAMP NOT NULL, + | create_date DATE NOT NULL, + | value STRING NOT NULL + |) USING ClickHouse + |TBLPROPERTIES ( + | cluster = '$cluster', + | engine = 'MergeTree()', + | order_by = 'create_time' + |) + |""".stripMargin + ) + + runClickHouseSQL( + s"""CREATE TABLE $db.$tbl_dist ON CLUSTER $cluster + |AS $db.$tbl_local + |ENGINE = Distributed($cluster, '$db', '$tbl_local', $func_expr) + |""".stripMargin + ) + spark.sql( + s"""INSERT INTO `$db`.`$tbl_dist` + |VALUES + | (timestamp'2021-01-01 10:10:10', date'2021-01-01', '1'), + | (timestamp'2022-02-02 11:10:10', date'2022-02-02', '2'), + | (timestamp'2023-03-03 12:10:10', date'2023-03-03', '3'), + | (timestamp'2024-04-04 13:10:10', date'2024-04-04', '4') + | AS tab(create_time, create_date, value) + |""".stripMargin + ) + // check that data is indeed written + checkAnswer( + spark.table(s"$db.$tbl_dist").select("value").orderBy("create_time"), + Seq(Row("1"), Row("2"), Row("3"), Row("4")) + ) + + // check same data is sharded in the same server comparing native sharding + runClickHouseSQL( + s"""INSERT INTO `$db`.`$tbl_dist` + |VALUES + | (timestamp'2021-01-01 10:10:10', date'2021-01-01', '1'), + | (timestamp'2022-02-02 11:10:10', date'2022-02-02', '2'), + | (timestamp'2023-03-03 12:10:10', date'2023-03-03', '3'), + | (timestamp'2024-04-04 13:10:10', date'2024-04-04', '4') + |""".stripMargin + ) + checkAnswer( + spark.table(s"$db.$tbl_local") + .groupBy("value").count().filter("count != 2"), + Seq.empty + ) + + } finally { + runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl_dist ON CLUSTER $cluster") + runClickHouseSQL(s"DROP TABLE IF EXISTS $db.$tbl_local ON CLUSTER $cluster") + runClickHouseSQL(s"DROP DATABASE IF EXISTS $db ON CLUSTER $cluster") + } + } + + Seq( + // TODO timezone aware implicit cast requires SPARK-44180 + ("toYear", Array("create_date")), + // ("toYear", Array("create_time")), + ("toYYYYMM", Array("create_date")), + // ("toYYYYMM", Array("create_time")), + ("toYYYYMMDD", Array("create_date")), + // ("toYYYYMMDD", Array("create_time")), + ("toHour", Array("create_time")), + ("xxHash64", Array("value")), + ("murmurHash2_64", Array("value")), + ("murmurHash2_32", Array("value")), + ("murmurHash3_64", Array("value")), + ("murmurHash3_32", Array("value")), + ("cityHash64", Array("value")), + ("modulo", Array("toYYYYMM(create_date)", "10")) + ).foreach { + case (func_name: String, func_args: Array[String]) => + test(s"shard by $func_name(${func_args.mkString(",")})")(runTest(func_name, func_args)) + } + +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala index 3aaa3a69..ed6a5cb3 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/ExprUtils.scala @@ -58,7 +58,7 @@ object ExprUtils extends SQLConfHelper with Serializable { functionRegistry: FunctionRegistry ): Array[V2SortOrder] = toSparkSplits( - shardingKeyIgnoreRand, + shardingKeyIgnoreRand.map(k => ExprUtils.toSplitWithModulo(k, cluster.get.totalWeight)), partitionKey, functionRegistry ).map(Expressions.sort(_, SortDirection.ASCENDING)) ++: @@ -211,4 +211,7 @@ object ExprUtils extends SQLConfHelper with Serializable { case bucket: BucketTransform => throw CHClientException(s"Bucket transform not support yet: $bucket") case other: Transform => throw CHClientException(s"Unsupported transform: $other") } + + def toSplitWithModulo(shardingKey: Expr, weight: Int): FuncExpr = + FuncExpr("modulo", List(shardingKey, StringLiteral(weight.toString))) } diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala new file mode 100644 index 00000000..d2d1ca11 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Days.scala @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +object Days extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_toYYYYMMDD" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("toYYYYMMDD") + + override def description: String = s"$name: (date: DATE) => YYYYMMDD: STRING" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(StructField(_, DateType, _, _)) => this + // TODO timezone aware implicit cast requires SPARK-44180 + // case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") + } + + override def inputTypes: Array[DataType] = Array(DateType) + + override def resultType: DataType = IntegerType + + override def isResultNullable: Boolean = false + + def invoke(days: Int): Int = { + val date = LocalDate.ofEpochDay(days) + val formatter = DateTimeFormatter.ofPattern("yyyyMMdd") + date.format(formatter).toInt + } +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala index 8faf6564..25f5061c 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/FunctionRegistry.scala @@ -48,12 +48,17 @@ object StaticFunctionRegistry extends FunctionRegistry { private val functions = Map[String, UnboundFunction]( "ck_xx_hash64" -> ClickHouseXxHash64, // for compatible - "clickhouse_xxHash64" -> ClickHouseXxHash64, - "clickhouse_murmurHash2_32" -> MurmurHash2_32, - "clickhouse_murmurHash2_64" -> MurmurHash2_64, - "clickhouse_murmurHash3_32" -> MurmurHash3_32, - "clickhouse_murmurHash3_64" -> MurmurHash3_64, - "clickhouse_cityHash64" -> CityHash64 + ClickHouseXxHash64.name -> ClickHouseXxHash64, + MurmurHash2_32.name -> MurmurHash2_32, + MurmurHash2_64.name -> MurmurHash2_64, + MurmurHash3_32.name -> MurmurHash3_32, + MurmurHash3_64.name -> MurmurHash3_64, + CityHash64.name -> CityHash64, + Years.name -> Years, + Months.name -> Months, + Days.name -> Days, + Hours.name -> Hours, + Mod.name -> Mod ) override def list: Array[String] = functions.keys.toArray diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala new file mode 100644 index 00000000..8d7bf1ed --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Hours.scala @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +import java.sql.Timestamp +import java.text.SimpleDateFormat + +object Hours extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_toHour" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("toHour", "HOUR") + + override def description: String = s"$name: (time: TIMESTAMP) => HH: INT" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + case _ => throw new UnsupportedOperationException(s"Expect 1 TIMESTAMP argument. $description") + } + + override def inputTypes: Array[DataType] = Array(TimestampType) + + override def resultType: DataType = IntegerType + + override def isResultNullable: Boolean = false + + def invoke(time: Long): Int = { + val ts = new Timestamp(time / 1000) + val formatter: SimpleDateFormat = new SimpleDateFormat("hh") + formatter.format(ts).toInt + } +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala new file mode 100644 index 00000000..80fc22c7 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Mod.scala @@ -0,0 +1,63 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +object Mod extends UnboundFunction with ScalarFunction[Long] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_modulo" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + // remainder is not a Clickhouse function, but modulo will be parsed to remainder in the connector. + // Added remainder as a synonym. + override val ckFuncNames: Array[String] = Array("modulo", "remainder") + + override def description: String = s"$name: (a: LONG, b: LONG) => a % b: LONG" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(a, b) if + (a match { + case StructField(_, LongType, _, _) => true + case StructField(_, IntegerType, _, _) => true + case StructField(_, ShortType, _, _) => true + case StructField(_, ByteType, _, _) => true + case StructField(_, StringType, _, _) => true + case _ => false + }) && + (b match { + case StructField(_, LongType, _, _) => true + case StructField(_, IntegerType, _, _) => true + case StructField(_, ShortType, _, _) => true + case StructField(_, ByteType, _, _) => true + case StructField(_, StringType, _, _) => true + case _ => false + }) => + this + case _ => throw new UnsupportedOperationException(s"Expect 2 integer arguments. $description") + } + + override def inputTypes: Array[DataType] = Array(LongType, LongType) + + override def resultType: DataType = LongType + + override def isResultNullable: Boolean = false + + def invoke(a: Long, b: Long): Long = a % b +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala new file mode 100644 index 00000000..96dc0578 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Months.scala @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +object Months extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_toYYYYMM" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("toYYYYMM") + + override def description: String = s"$name: (date: DATE) => YYYYMM: INT" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(StructField(_, DateType, _, _)) => this + // TODO timezone aware implicit cast requires SPARK-44180 + // case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") + } + + override def inputTypes: Array[DataType] = Array(DateType) + + override def resultType: DataType = IntegerType + + override def isResultNullable: Boolean = false + + def invoke(days: Int): Int = { + val date = LocalDate.ofEpochDay(days) + val formatter = DateTimeFormatter.ofPattern("yyyyMM") + date.format(formatter).toInt + } +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala index c99bf247..5665a253 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/MultiStringArgsHash.scala @@ -27,7 +27,7 @@ abstract class MultiStringArgsHash extends UnboundFunction with ClickhouseEquivF override val ckFuncNames: Array[String] - override def description: String = s"$name: (value: string, ...) => hash_value: long" + override def description: String = s"$name: (value: STRING, ...) => hash_value: LONG" private def isExceptedType(dt: DataType): Boolean = dt.isInstanceOf[StringType] diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala index 3c4a5b1a..fc2f4951 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/XxHash64.scala @@ -36,7 +36,7 @@ object ClickHouseXxHash64 extends UnboundFunction with ScalarFunction[Long] with override val ckFuncNames: Array[String] = Array("xxHash64") - override def description: String = s"$name: (value: string) => hash_value: long" + override def description: String = s"$name: (value: STRING) => hash_value: LONG" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, StringType, _, _)) => this @@ -75,7 +75,7 @@ class ClickHouseXxHash64Shard(clusters: Seq[ClusterSpec]) extends UnboundFunctio override def canonicalName: String = s"clickhouse.$name" - override def description: String = s"$name: (cluster_name: string, value: string) => shard_num: int" + override def description: String = s"$name: (cluster_name: STRING, value: STRING) => shard_num: INT" override def bind(inputType: StructType): BoundFunction = inputType.fields match { case Array(StructField(_, StringType, _, _), StructField(_, StringType, _, _)) => this diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala new file mode 100644 index 00000000..d7d198e8 --- /dev/null +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/func/Years.scala @@ -0,0 +1,53 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xenon.clickhouse.func + +import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, ScalarFunction, UnboundFunction} +import org.apache.spark.sql.types._ + +import java.time.LocalDate +import java.time.format.DateTimeFormatter + +object Years extends UnboundFunction with ScalarFunction[Int] with ClickhouseEquivFunction { + + override def name: String = "clickhouse_toYear" + + override def canonicalName: String = s"clickhouse.$name" + + override def toString: String = name + + override val ckFuncNames: Array[String] = Array("toYear", "YEAR") + + override def description: String = s"$name: (date: DATE) => YYYY: INT" + + override def bind(inputType: StructType): BoundFunction = inputType.fields match { + case Array(StructField(_, DateType, _, _)) => this + // TODO timezone aware implicit cast requires SPARK-44180 + // case Array(StructField(_, TimestampType, _, _)) | Array(StructField(_, TimestampNTZType, _, _)) => this + case _ => throw new UnsupportedOperationException(s"Expect 1 DATE argument. $description") + } + + override def inputTypes: Array[DataType] = Array(DateType) + + override def resultType: DataType = IntegerType + + override def isResultNullable: Boolean = false + + def invoke(days: Int): Int = { + val date = LocalDate.ofEpochDay(days) + val formatter = DateTimeFormatter.ofPattern("yyyy") + date.format(formatter).toInt + } +} diff --git a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/WriteJobDescription.scala b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/WriteJobDescription.scala index 411f08a4..646d6ca5 100644 --- a/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/WriteJobDescription.scala +++ b/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/write/WriteJobDescription.scala @@ -62,15 +62,21 @@ case class WriteJobDescription( } def sparkSplits: Array[Transform] = + // Pmod by total weight * constant. Note that this key will be further hashed by spark. Reasons of doing this: + // - Enlarged range of modulo to avoid hash collision of small number of shards, hence mitigate data skew caused + // by this. + // - Still distribute data from one shard to only a subset of executors. If we do not apply modulo (instead we + // need to apply module during sorting in `toSparkSortOrders`), data belongs to shard 1 will be sorted in the + // front for all tasks, resulting in instant high pressure for shard 1 when stage starts. if (writeOptions.repartitionByPartition) { ExprUtils.toSparkSplits( - shardingKeyIgnoreRand, + shardingKeyIgnoreRand.map(k => ExprUtils.toSplitWithModulo(k, cluster.get.totalWeight * 5)), partitionKey, functionRegistry ) } else { ExprUtils.toSparkSplits( - shardingKeyIgnoreRand, + shardingKeyIgnoreRand.map(k => ExprUtils.toSplitWithModulo(k, cluster.get.totalWeight * 5)), None, functionRegistry )