diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3b45a08b0673d..44b28a1f07e4a 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1658,7 +1658,7 @@ class Partition(val topicPartition: TopicPartition, def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = inReadLock(leaderIsrUpdateLock) { leaderLogIfLocal match { case Some(leaderLog) => - if (!leaderLog.config.delete) + if (!leaderLog.config.delete && leaderLog.config.compact) throw new PolicyViolationException(s"Records of partition $topicPartition can not be deleted due to the configured policy") val convertedOffset = if (offset == DeleteRecordsRequest.HIGH_WATERMARK) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 8e512ad4d0128..5662c2d227636 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric import kafka.log.LogManager import kafka.server._ import kafka.utils._ -import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException} +import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, UnknownLeaderEpochException} import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset @@ -61,7 +61,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -4030,4 +4030,46 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) partition.tryCompleteDelayedRequests() } + + @Test + def testDeleteRecordsOnLeaderWithEmptyPolicy(): Unit = { + val leaderEpoch = 5 + val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + val emptyPolicyConfig = new LogConfig(util.Map.of( + TopicConfig.CLEANUP_POLICY_CONFIG, "" + )) + + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.config).thenReturn(emptyPolicyConfig) + when(mockLog.logEndOffset).thenReturn(2L) + when(mockLog.logStartOffset).thenReturn(0L) + when(mockLog.highWatermark).thenReturn(2L) + when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true) + + partition.setLog(mockLog, false) + + val result = partition.deleteRecordsOnLeader(1L) + assertEquals(1L, result.requestedOffset) + } + + @Test + def testDeleteRecordsOnLeaderWithCompactPolicy(): Unit = { + val leaderEpoch = 5 + val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + val emptyPolicyConfig = new LogConfig(util.Map.of( + TopicConfig.CLEANUP_POLICY_CONFIG, "compact" + )) + + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.config).thenReturn(emptyPolicyConfig) + when(mockLog.logEndOffset).thenReturn(2L) + when(mockLog.logStartOffset).thenReturn(0L) + when(mockLog.highWatermark).thenReturn(2L) + when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true) + + partition.setLog(mockLog, false) + assertThrows(classOf[PolicyViolationException], () => partition.deleteRecordsOnLeader(1L)) + } } diff --git a/docs/upgrade.html b/docs/upgrade.html index c7af07e0411ab..0b596a711bc33 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -136,9 +136,12 @@
Notable changes in 4 settings.
  • - The cleanup.policy is empty and remote.storage.enable is set to true, the + If cleanup.policy is empty and remote.storage.enable is set to true, the local log segments will be cleaned based on the values of log.local.retention.bytes and log.local.retention.ms. +
    + Note that cleanup.policy supports empty values, which means infinite retention. + This is equivalent to setting retention.ms=-1 and retention.bytes=-1.
  • diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 61b4b9d0edb2f..b6f0a685b4c45 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -1925,8 +1925,8 @@ public int deleteOldSegments() throws IOException { deleteRetentionSizeBreachedSegments() + deleteRetentionMsBreachedSegments(); } else { - // If cleanup.policy is empty and remote storage is disabled, we should not delete any local - // log segments + // If cleanup.policy is empty and remote storage is disabled, we should not delete any local log segments + // unless the log start offset advances through deleteRecords return deleteLogStartOffsetBreachedSegments(); } }