Skip to content

[skstore/sql] A few rewrites to help generic directories #930

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 25, 2025
43 changes: 11 additions & 32 deletions skiplang/prelude/src/skstore/EagerDir.sk
Original file line number Diff line number Diff line change
Expand Up @@ -712,28 +712,10 @@ class EagerDir protected {
}
}

fun copy{
timeStack: TimeStack,
dirName: DirName,
input: Bool,
creator: ?ArrowKey,
}: this {
static{
timeStack,
dirName,
input,
creator,
fixedData => this.fixedData,
data => this.data,
totalSize => this.totalSize,
}
}

fun reset(
protected fun reset(
context: mutable Context,
writer: Path,
keep: SortedSet<Key>,
wasSeen: (Tick, Path) ~> Bool = (_, _) ~> true,
): this {
this.unsafeGetFileIter().each(kv -> {
(key, valueIter) = kv;
Expand All @@ -742,18 +724,15 @@ class EagerDir protected {
| None() -> void
| Some _ ->
this.unsafeGetDataIterWithoutTombs(key).each(kv -> {
(tick, source, _values) = kv;
// do not wipe out data that has not yet been seen
if (wasSeen(tick, source)) {
!this = this.writeEntry(
context,
source,
writer,
key,
Array[],
true, // force: reset is low-level and we wish to ignore any safeguards
)
}
(_tick, source, _values) = kv;
!this = this.writeEntry(
context,
source,
writer,
key,
Array[],
true, // force: reset is low-level and we wish to ignore any safeguards
)
})
}
}
Expand All @@ -762,7 +741,7 @@ class EagerDir protected {
this
}

fun writeDiff(
protected fun writeDiff(
isReset: Bool,
changes: mutable Iterator<Key>,
writer: mutable Debug.BufferedWriter,
Expand Down
38 changes: 37 additions & 1 deletion sql/src/SqlCsv.sk
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ fun applyDiffStrategy(
isEligibleForTomb = (tick, source) ~>
tick.value < snapshot.default(0) || source == writer;

!dir = dir.reset(newRoot, writer, SortedSet[], isEligibleForTomb);
!dir = dir.resetForRebuild(newRoot, writer, isEligibleForTomb);

SKDB.resetWatermark(newRoot, table.name.lower, identity);

Expand Down Expand Up @@ -1026,3 +1026,39 @@ fun replayStdin(): Map<String, (Int, Int)> {
}

module end;

module SKStore;

extension class EagerDir {
fun resetForRebuild(
context: mutable Context,
writer: Path,
wasSeen: (Tick, Path) ~> Bool,
): this {
this.unsafeGetFileIter().each(kv -> {
(key, valueIter) = kv;
valueIter.next() match {
| None() -> void
| Some _ ->
this.unsafeGetDataIterWithoutTombs(key).each(kv -> {
(tick, source, _values) = kv;
// do not wipe out data that has not yet been seen
if (wasSeen(tick, source)) {
!this = this.writeEntry(
context,
source,
writer,
key,
Array[],
true, // force: reset is low-level and we wish to ignore any safeguards
)
}
})
}
});

this
}
}

module end;
100 changes: 57 additions & 43 deletions sql/src/SqlEval.sk
Original file line number Diff line number Diff line change
Expand Up @@ -2056,44 +2056,15 @@ fun importNext(
(targetDirName, version) = makeInputDirName(dirName);
!hasMultipleVersions = hasMultipleVersions || version != 0;
updates.maybeGet(targetDirName) match {
| Some((_, _, _, iversion, _)) if (iversion > version) -> void
| _ ->
(isReset, changedKeys) = dir.getChangesAfter(tick);
targetCtx.unsafeMaybeGetDir(targetDirName) match {
| None() ->
time = targetCtx.timeStamp();
targetDir = SKStore.EagerDir::create{
timeStack => SKStore.TimeStack::createInput(time),
input => true,
dirName => targetDirName,
creator => None(),
};
targetCtx.setDir(targetDir)
| Some(_) -> void
};
entries = mutable Vector[];
for (key in changedKeys) {
for (srcValue in dir.unsafeGetAllDataIterAfter(tick, key)) {
(_, source, writer, values) = srcValue;
entries.push((key, (source, writer, values)));
}
};
updates![targetDirName] = (isReset, changedKeys, dir, version, entries)
| Some((iversion, _)) if (iversion > version) -> void
| _ -> updates![targetDirName] = (version, dir)
}
| _ -> void
}
};
for (targetDirName => src in updates) {
(isReset, changedKeys, dir, _, entries) = src;
targetCtx.unsafeMaybeGetEagerDir(targetDirName) match {
| None() -> invariant_violation("Directory should have been created")
| Some(inputDir) ->
inputDir.writeArraySourceMany(
targetCtx,
entries.iterator(),
if (isReset) Some((dir.dirName, changedKeys)) else None(),
)
}
(_version, dir) = src;
dir.importNext{targetCtx, tick, targetDirName}
};
hasMultipleVersions
}
Expand Down Expand Up @@ -2211,16 +2182,7 @@ fun printContext(context: mutable SKStore.Context): void {
producedAnyOutput = false;
for ((dirsub, tick) in spec) {
edir = context.unsafeGetEagerDir(dirsub.dirName);
(shouldRebuild, changes) = edir.getChangesAfter(tick);
producedOutput = edir.writeDiff(
shouldRebuild,
changes.iterator(),
writer,
dirsub.entity,
dirsub.format,
dirsub.filter(context.clone(), false, dirsub.dirName),
dirsub.getDestinationWatermark(context),
);
producedOutput = edir.writeChangesAfter(context, dirsub, tick, writer);
!producedAnyOutput = producedAnyOutput || producedOutput
};
if (producedAnyOutput) {
Expand Down Expand Up @@ -2679,6 +2641,58 @@ extension class EagerDir {
!this.timeStack = this.timeStack.maxMinus();
this
}

fun importNext{
targetCtx: mutable Context,
tick: Tick,
targetDirName: DirName,
}: void {
(isReset, changedKeys) = this.getChangesAfter(tick);
inputDir = targetCtx.unsafeMaybeGetEagerDir(targetDirName) match {
| None() ->
time = targetCtx.timeStamp();
targetDir = EagerDir::create{
timeStack => TimeStack::createInput(time),
input => true,
dirName => targetDirName,
creator => None(),
};
targetCtx.setDir(targetDir);
targetDir
| Some(d) -> d
};
entries = mutable Vector[];
for (key in changedKeys) {
for (srcValue in this.unsafeGetAllDataIterAfter(tick, key)) {
(_, source, writer, values) = srcValue;
entries.push((key, (source, writer, values)));
}
};
inputDir.writeArraySourceMany(
targetCtx,
entries.iterator(),
if (isReset) Some((this.dirName, changedKeys)) else None(),
)
}

fun writeChangesAfter(
context: readonly Context,
dirsub: DirSub,
tick: SKStore.Tick,
writer: mutable Debug.BufferedWriter,
): Bool {
(shouldRebuild, changes) = this.getChangesAfter(tick);
producedOutput = this.writeDiff(
shouldRebuild,
changes.iterator(),
writer,
dirsub.entity,
dirsub.format,
dirsub.filter(context, false, dirsub.dirName),
dirsub.getDestinationWatermark(context),
);
producedOutput
}
}

@cpp_extern("SKIP_physEq")
Expand Down
23 changes: 23 additions & 0 deletions sql/src/SqlTables.sk
Original file line number Diff line number Diff line change
Expand Up @@ -998,3 +998,26 @@ class NamedIDs(
}

module end;

module SKStore;

extension class EagerDir {
fun copy{
timeStack: TimeStack,
dirName: DirName,
input: Bool,
creator: ?ArrowKey,
}: this {
static{
timeStack,
dirName,
input,
creator,
fixedData => this.fixedData,
data => this.data,
totalSize => this.totalSize,
}
}
}

module end;
Loading