Skip to content

PoC: Wall Clock Window Expiration (no integration with triggers) #990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: feature/wall-clock
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 70 additions & 4 deletions quixstreams/dataframe/windows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from typing_extensions import TypeAlias

from quixstreams.context import message_context
from quixstreams.core.stream import TransformExpandedCallback
from quixstreams.core.stream import (
Stream,
TransformExpandedCallback,
TransformFunction,
TransformWallClockExpandedCallback,
)
from quixstreams.core.stream.exceptions import InvalidOperation
from quixstreams.models.topics.manager import TopicManager
from quixstreams.state import WindowedPartitionTransaction
Expand All @@ -42,6 +47,8 @@
Iterable[Message],
]

WallClockCallback = Callable[[int, WindowedPartitionTransaction], Iterable[Message]]


class Window(abc.ABC):
def __init__(
Expand Down Expand Up @@ -69,6 +76,14 @@ def process_window(
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
pass

@abstractmethod
def process_wall_clock(
self,
timestamp_ms: int,
transaction: WindowedPartitionTransaction,
) -> Iterable[WindowKeyResult]:
pass

def register_store(self) -> None:
TopicManager.ensure_topics_copartitioned(*self._dataframe.topics)
# Create a config for the changelog topic based on the underlying SDF topics
Expand All @@ -83,6 +98,7 @@ def _apply_window(
self,
func: TransformRecordCallbackExpandedWindowed,
name: str,
wall_clock_func: WallClockCallback,
) -> "StreamingDataFrame":
self.register_store()

Expand All @@ -92,12 +108,24 @@ def _apply_window(
processing_context=self._dataframe.processing_context,
store_name=name,
)
wall_clock_transform_func = _as_wall_clock(
func=wall_clock_func,
stream_id=self._dataframe.stream_id,
processing_context=self._dataframe.processing_context,
store_name=name,
)
# Manually modify the Stream and clone the source StreamingDataFrame
# to avoid adding "transform" API to it.
# Transform callbacks can modify record key and timestamp,
# and it's prone to misuse.
stream = self._dataframe.stream.add_transform(func=windowed_func, expand=True)
return self._dataframe.__dataframe_clone__(stream=stream)
windowed_stream = self._dataframe.stream.add_transform(
func=windowed_func, expand=True
)
wall_clock_stream = Stream(
TransformFunction(wall_clock_transform_func, expand=True, wall_clock=True)
)
sdf = self._dataframe.__dataframe_clone__(stream=windowed_stream)
return sdf.concat_wall_clock(wall_clock_stream)

def final(self) -> "StreamingDataFrame":
"""
Expand Down Expand Up @@ -140,9 +168,17 @@ def window_callback(
for key, window in expired_windows:
yield (window, key, window["start"], None)

def wall_clock_callback(
timestamp: int, transaction: WindowedPartitionTransaction
) -> Iterable[Message]:
# TODO: Check if this will work for sliding windows
for key, window in self.process_wall_clock(timestamp, transaction):
yield (window, key, window["start"], None)

return self._apply_window(
func=window_callback,
name=self._name,
wall_clock_func=wall_clock_callback,
)

def current(self) -> "StreamingDataFrame":
Expand Down Expand Up @@ -188,7 +224,17 @@ def window_callback(
for key, window in updated_windows:
yield (window, key, window["start"], None)

return self._apply_window(func=window_callback, name=self._name)
def wall_clock_callback(
timestamp: int, transaction: WindowedPartitionTransaction
) -> Iterable[Message]:
# TODO: Implement wall_clock callback
return []

return self._apply_window(
func=window_callback,
name=self._name,
wall_clock_func=wall_clock_callback,
)

# Implemented by SingleAggregationWindowMixin and MultiAggregationWindowMixin
# Single aggregation and multi aggregation windows store aggregations and collections
Expand Down Expand Up @@ -424,6 +470,26 @@ def wrapper(
return wrapper


def _as_wall_clock(
func: WallClockCallback,
processing_context: "ProcessingContext",
store_name: str,
stream_id: str,
) -> TransformWallClockExpandedCallback:
@functools.wraps(func)
def wrapper(timestamp: int) -> Iterable[Message]:
ctx = message_context()
transaction = cast(
WindowedPartitionTransaction,
processing_context.checkpoint.get_store_transaction(
stream_id=stream_id, partition=ctx.partition, store_name=store_name
),
)
return func(timestamp, transaction)

return wrapper


class WindowOnLateCallback(Protocol):
def __call__(
self,
Expand Down
7 changes: 7 additions & 0 deletions quixstreams/dataframe/windows/count_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ def process_window(
state.set(key=self.STATE_KEY, value=data)
return updated_windows, expired_windows

def process_wall_clock(
self,
timestamp_ms: int,
transaction: WindowedPartitionTransaction,
) -> Iterable[WindowKeyResult]:
return []

def _get_collection_start_id(self, window: CountWindowData) -> int:
start_id = window.get("collection_start_id", _MISSING)
if start_id is _MISSING:
Expand Down
17 changes: 17 additions & 0 deletions quixstreams/dataframe/windows/time_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,27 @@ def process_window(

return updated_windows, expired_windows

def process_wall_clock(
self,
timestamp_ms: int,
transaction: WindowedPartitionTransaction,
) -> Iterable[WindowKeyResult]:
latest_expired_window_end = transaction.get_latest_expired(prefix=b"")
latest_timestamp = max(timestamp_ms, latest_expired_window_end)
max_expired_window_end = latest_timestamp - self._grace_ms
return self.expire_by_partition(
transaction,
max_expired_window_end,
self.collect,
advance_last_expired_timestamp=False,
)

def expire_by_partition(
self,
transaction: WindowedPartitionTransaction,
max_expired_end: int,
collect: bool,
advance_last_expired_timestamp: bool = True,
) -> Iterable[WindowKeyResult]:
for (
window_start,
Expand All @@ -214,6 +230,7 @@ def expire_by_partition(
step_ms=self._step_ms if self._step_ms else self._duration_ms,
collect=collect,
delete=True,
advance_last_expired_timestamp=advance_last_expired_timestamp,
):
yield key, self._results(aggregated, collected, window_start, window_end)

Expand Down
10 changes: 7 additions & 3 deletions quixstreams/state/rocksdb/windowed/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ def expire_all_windows(
step_ms: int,
delete: bool = True,
collect: bool = False,
advance_last_expired_timestamp: bool = True,
) -> Iterable[ExpiredWindowDetail]:
"""
Get all expired windows for all prefix from RocksDB up to the specified `max_end_time` timestamp.
Expand Down Expand Up @@ -360,9 +361,12 @@ def expire_all_windows(
if collect:
self.delete_from_collection(end=start, prefix=prefix)

self._set_timestamp(
prefix=b"", cache=self._last_expired_timestamps, timestamp_ms=last_expired
)
if advance_last_expired_timestamp:
self._set_timestamp(
prefix=b"",
cache=self._last_expired_timestamps,
timestamp_ms=last_expired,
)

def delete_windows(
self, max_start_time: int, delete_values: bool, prefix: bytes
Expand Down
2 changes: 2 additions & 0 deletions quixstreams/state/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ def expire_all_windows(
step_ms: int,
delete: bool = True,
collect: bool = False,
advance_last_expired_timestamp: bool = True,
) -> Iterable[ExpiredWindowDetail[V]]:
"""
Get all expired windows for all prefix from RocksDB up to the specified `max_start_time` timestamp.
Expand All @@ -388,6 +389,7 @@ def expire_all_windows(
:param max_end_time: The timestamp up to which windows are considered expired, inclusive.
:param delete: If True, expired windows will be deleted.
:param collect: If True, values will be collected into windows.
:param advance_last_expired_timestamp: If True, the last expired timestamp will be persisted.
"""
...

Expand Down