From 29dc59db747264c5eeab6aec1cd09bedb82c81c5 Mon Sep 17 00:00:00 2001 From: Vitali Obukh Date: Thu, 15 Jun 2023 16:13:13 +0300 Subject: [PATCH] SPARKC-701 --- .../spark/connector/writer/TableWriter.scala | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala b/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala index 5c4c255d5..abe942e68 100644 --- a/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala +++ b/connector/src/main/scala/com/datastax/spark/connector/writer/TableWriter.scala @@ -73,7 +73,13 @@ class TableWriter[T] private ( val deleteColumnsClause = deleteColumnNames.map(quote).mkString(", ") val whereClause = quotedColumnNames(primaryKey).map(c => s"$c = :$c").mkString(" AND ") - s"DELETE ${deleteColumnsClause} FROM ${quote(keyspaceName)}.${quote(tableName)} WHERE $whereClause" + val timestampSpec = writeConf.timestamp match { + case TimestampOption(PerRowWriteOptionValue(placeholder)) => s""" USING TIMESTAMP :$placeholder""" + case TimestampOption(StaticWriteOptionValue(value)) => s" USING TIMESTAMP $value" + case _ => "" + } + + s"DELETE ${deleteColumnsClause} FROM ${quote(keyspaceName)}.${quote(tableName)}$timestampSpec WHERE $whereClause" } private lazy val queryTemplateUsingUpdate: String = { val (primaryKey, regularColumns) = columns.partition(_.isPrimaryKeyColumn) @@ -100,7 +106,22 @@ class TableWriter[T] private ( val setClause = (setNonCounterColumnsClause ++ setCounterColumnsClause).mkString(", ") val whereClause = quotedColumnNames(primaryKey).map(c => s"$c = :$c").mkString(" AND ") - s"UPDATE ${quote(keyspaceName)}.${quote(tableName)} SET $setClause WHERE $whereClause" + val ttlSpec = writeConf.ttl match { + case TTLOption(PerRowWriteOptionValue(placeholder)) => Some(s"""TTL :$placeholder""") + case TTLOption(StaticWriteOptionValue(value)) => Some(s"TTL $value") + case _ => None + } + + val timestampSpec = writeConf.timestamp match { + case TimestampOption(PerRowWriteOptionValue(placeholder)) => Some(s"""TIMESTAMP :$placeholder""") + case TimestampOption(StaticWriteOptionValue(value)) => Some(s"TIMESTAMP $value") + case _ => None + } + + val options = List(ttlSpec, timestampSpec).flatten + val optionsSpec = if (options.nonEmpty) s" USING ${options.mkString(" AND ")}" else "" + + s"UPDATE ${quote(keyspaceName)}.${quote(tableName)}$optionsSpec SET $setClause WHERE $whereClause" } private val isCounterUpdate =