Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,24 @@
<td>Boolean</td>
<td>Set whether to compact the changes sent downstream in row-time mini-batch. If true, Flink will compact changes and send only the latest change downstream. Note that if the downstream needs the details of versioned data, this optimization cannot be applied. If false, Flink will send all changes to downstream just like when the mini-batch is not enabled.</td>
</tr>
<tr>
<td><h5>table.exec.delta-join.cache-enabled</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether to enable the cache of delta join. If enabled, the delta join caches the records from remote dim table. Default is true.</td>
</tr>
<tr>
<td><h5>table.exec.delta-join.left.cache-size</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">10000</td>
<td>Long</td>
<td>The cache size used to cache the lookup results of the left table in delta join. This value must be positive when enabling cache. Default is 10000.</td>
</tr>
<tr>
<td><h5>table.exec.delta-join.right.cache-size</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">10000</td>
<td>Long</td>
<td>The cache size used to cache the lookup results of the right table in delta join. This value must be positive when enabling cache. Default is 10000.</td>
</tr>
<tr>
<td><h5>table.exec.disabled-operators</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,33 @@ public class ExecutionConfigOptions {
"Set whether to use the SQL/Table operators based on the asynchronous state api. "
+ "Default value is false.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Boolean> TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED =
key("table.exec.delta-join.cache-enabled")
.booleanType()
.defaultValue(true)
Copy link
Contributor

@davidradl davidradl Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is your thinking to have a the default as true, I assume this is changing the existing non caching behaviour. It would make more sense to me to have the users opt into the caching and having the default as false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained earlier.

.withDescription(
"Whether to enable the cache of delta join. If enabled, the delta join caches the "
+ "records from remote dim table. Default is true.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Long> TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE =
key("table.exec.delta-join.left.cache-size")
.longType()
.defaultValue(10000L)
.withDescription(
"The cache size used to cache the lookup results of the left table in delta join. "
+ "This value must be positive when enabling cache. Default is 10000.");

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Long> TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE =
key("table.exec.delta-join.right.cache-size")
.longType()
.defaultValue(10000L)
.withDescription(
"The cache size used to cache the lookup results of the right table in delta join. "
+ "This value must be positive when enabling cache. Default is 10000.");

// ------------------------------------------------------------------------------------------
// Enum option types
// ------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
Expand Down Expand Up @@ -82,6 +84,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeltaJoin.DELTA_JOIN_TRANSFORMATION;
import static org.apache.flink.table.planner.plan.utils.DeltaJoinUtil.getUnwrappedAsyncLookupFunction;
Expand Down Expand Up @@ -233,11 +236,17 @@ protected Transformation<RowData> translateToPlanInternal(
RowDataKeySelector leftJoinKeySelector =
KeySelectorUtil.getRowDataSelector(
classLoader, leftJoinKeys, InternalTypeInfo.of(leftStreamType));
// currently, delta join only supports consuming INSERT-ONLY stream
RowDataKeySelector leftUpsertKeySelector =
getUpsertKeySelector(new int[0], leftStreamType, classLoader);

// right side selector
RowDataKeySelector rightJoinKeySelector =
KeySelectorUtil.getRowDataSelector(
classLoader, rightJoinKeys, InternalTypeInfo.of(rightStreamType));
// currently, delta join only supports consuming INSERT-ONLY stream
RowDataKeySelector rightUpsertKeySelector =
getUpsertKeySelector(new int[0], rightStreamType, classLoader);

StreamOperatorFactory<RowData> operatorFactory =
createAsyncLookupDeltaJoin(
Expand All @@ -251,7 +260,9 @@ protected Transformation<RowData> translateToPlanInternal(
leftStreamType,
rightStreamType,
leftJoinKeySelector,
leftUpsertKeySelector,
rightJoinKeySelector,
rightUpsertKeySelector,
classLoader);

final TwoInputTransformation<RowData, RowData, RowData> transform =
Expand Down Expand Up @@ -281,7 +292,9 @@ private StreamOperatorFactory<RowData> createAsyncLookupDeltaJoin(
RowType leftStreamType,
RowType rightStreamType,
RowDataKeySelector leftJoinKeySelector,
RowDataKeySelector leftUpsertKeySelector,
RowDataKeySelector rightJoinKeySelector,
RowDataKeySelector rightUpsertKeySelector,
ClassLoader classLoader) {

DataTypeFactory dataTypeFactory =
Expand All @@ -298,6 +311,10 @@ private StreamOperatorFactory<RowData> createAsyncLookupDeltaJoin(
leftStreamType,
rightStreamType,
leftLookupKeys,
leftJoinKeySelector,
leftUpsertKeySelector,
rightJoinKeySelector,
rightUpsertKeySelector,
false);

AsyncDeltaJoinRunner rightLookupTableAsyncFunction =
Expand All @@ -311,15 +328,23 @@ private StreamOperatorFactory<RowData> createAsyncLookupDeltaJoin(
leftStreamType,
rightStreamType,
rightLookupKeys,
leftJoinKeySelector,
leftUpsertKeySelector,
rightJoinKeySelector,
rightUpsertKeySelector,
true);

Tuple2<Long, Long> leftRightCacheSize = getCacheSize(config);

return new StreamingDeltaJoinOperatorFactory(
rightLookupTableAsyncFunction,
leftLookupTableAsyncFunction,
leftJoinKeySelector,
rightJoinKeySelector,
asyncLookupOptions.asyncTimeout,
asyncLookupOptions.asyncBufferCapacity,
leftRightCacheSize.f0,
leftRightCacheSize.f1,
leftStreamType,
rightStreamType);
}
Expand All @@ -335,6 +360,10 @@ private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
RowType leftStreamSideType,
RowType rightStreamSideType,
Map<Integer, FunctionParam> lookupKeys,
RowDataKeySelector leftJoinKeySelector,
RowDataKeySelector leftUpsertKeySelector,
RowDataKeySelector rightJoinKeySelector,
RowDataKeySelector rightUpsertKeySelector,
boolean treatRightAsLookupTable) {
RelOptTable lookupTable = treatRightAsLookupTable ? rightTempTable : leftTempTable;
RowType streamSideType = treatRightAsLookupTable ? leftStreamSideType : rightStreamSideType;
Expand Down Expand Up @@ -408,8 +437,13 @@ private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
(DataStructureConverter<RowData, Object>) lookupSideFetcherConverter,
lookupSideGeneratedResultFuture,
InternalSerializers.create(lookupTableSourceRowType),
leftJoinKeySelector,
leftUpsertKeySelector,
rightJoinKeySelector,
rightUpsertKeySelector,
asyncLookupOptions.asyncBufferCapacity,
treatRightAsLookupTable);
treatRightAsLookupTable,
enableCache(config));
}

/**
Expand Down Expand Up @@ -448,4 +482,33 @@ public RexNode visitInputRef(RexInputRef inputRef) {

return condition.accept(converter);
}

private RowDataKeySelector getUpsertKeySelector(
int[] upsertKey, RowType rowType, ClassLoader classLoader) {
final int[] rightUpsertKeys;
if (upsertKey.length > 0) {
rightUpsertKeys = upsertKey;
} else {
rightUpsertKeys = IntStream.range(0, rowType.getFields().size()).toArray();
}
return KeySelectorUtil.getRowDataSelector(
classLoader, rightUpsertKeys, InternalTypeInfo.of(rowType));
}

private boolean enableCache(ReadableConfig config) {
return config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED);
}

/** Get the left cache size and right size. */
private Tuple2<Long, Long> getCacheSize(ReadableConfig config) {
long leftCacheSize =
config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE);
long rightCacheSize =
config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE);
if ((leftCacheSize <= 0 || rightCacheSize <= 0) && enableCache(config)) {
throw new IllegalArgumentException(
"Cache size in delta join must be positive when enabling cache.");
}
return Tuple2.of(leftCacheSize, rightCacheSize);
}
}
Loading