diff --git a/skiplang/prelude/src/skstore/EagerDir.sk b/skiplang/prelude/src/skstore/EagerDir.sk index a8a5cef73..ea0fb8ae3 100644 --- a/skiplang/prelude/src/skstore/EagerDir.sk +++ b/skiplang/prelude/src/skstore/EagerDir.sk @@ -712,28 +712,10 @@ class EagerDir protected { } } - fun copy{ - timeStack: TimeStack, - dirName: DirName, - input: Bool, - creator: ?ArrowKey, - }: this { - static{ - timeStack, - dirName, - input, - creator, - fixedData => this.fixedData, - data => this.data, - totalSize => this.totalSize, - } - } - - fun reset( + protected fun reset( context: mutable Context, writer: Path, keep: SortedSet, - wasSeen: (Tick, Path) ~> Bool = (_, _) ~> true, ): this { this.unsafeGetFileIter().each(kv -> { (key, valueIter) = kv; @@ -742,18 +724,15 @@ class EagerDir protected { | None() -> void | Some _ -> this.unsafeGetDataIterWithoutTombs(key).each(kv -> { - (tick, source, _values) = kv; - // do not wipe out data that has not yet been seen - if (wasSeen(tick, source)) { - !this = this.writeEntry( - context, - source, - writer, - key, - Array[], - true, // force: reset is low-level and we wish to ignore any safeguards - ) - } + (_tick, source, _values) = kv; + !this = this.writeEntry( + context, + source, + writer, + key, + Array[], + true, // force: reset is low-level and we wish to ignore any safeguards + ) }) } } @@ -762,7 +741,7 @@ class EagerDir protected { this } - fun writeDiff( + protected fun writeDiff( isReset: Bool, changes: mutable Iterator, writer: mutable Debug.BufferedWriter, diff --git a/sql/src/SqlCsv.sk b/sql/src/SqlCsv.sk index 0662c06e0..e1ae77982 100644 --- a/sql/src/SqlCsv.sk +++ b/sql/src/SqlCsv.sk @@ -624,7 +624,7 @@ fun applyDiffStrategy( isEligibleForTomb = (tick, source) ~> tick.value < snapshot.default(0) || source == writer; - !dir = dir.reset(newRoot, writer, SortedSet[], isEligibleForTomb); + !dir = dir.resetForRebuild(newRoot, writer, isEligibleForTomb); SKDB.resetWatermark(newRoot, table.name.lower, identity); @@ -1026,3 +1026,39 @@ fun replayStdin(): Map { } module end; + +module SKStore; + +extension class EagerDir { + fun resetForRebuild( + context: mutable Context, + writer: Path, + wasSeen: (Tick, Path) ~> Bool, + ): this { + this.unsafeGetFileIter().each(kv -> { + (key, valueIter) = kv; + valueIter.next() match { + | None() -> void + | Some _ -> + this.unsafeGetDataIterWithoutTombs(key).each(kv -> { + (tick, source, _values) = kv; + // do not wipe out data that has not yet been seen + if (wasSeen(tick, source)) { + !this = this.writeEntry( + context, + source, + writer, + key, + Array[], + true, // force: reset is low-level and we wish to ignore any safeguards + ) + } + }) + } + }); + + this + } +} + +module end; diff --git a/sql/src/SqlEval.sk b/sql/src/SqlEval.sk index 59b611661..dd61cf49b 100644 --- a/sql/src/SqlEval.sk +++ b/sql/src/SqlEval.sk @@ -2056,44 +2056,15 @@ fun importNext( (targetDirName, version) = makeInputDirName(dirName); !hasMultipleVersions = hasMultipleVersions || version != 0; updates.maybeGet(targetDirName) match { - | Some((_, _, _, iversion, _)) if (iversion > version) -> void - | _ -> - (isReset, changedKeys) = dir.getChangesAfter(tick); - targetCtx.unsafeMaybeGetDir(targetDirName) match { - | None() -> - time = targetCtx.timeStamp(); - targetDir = SKStore.EagerDir::create{ - timeStack => SKStore.TimeStack::createInput(time), - input => true, - dirName => targetDirName, - creator => None(), - }; - targetCtx.setDir(targetDir) - | Some(_) -> void - }; - entries = mutable Vector[]; - for (key in changedKeys) { - for (srcValue in dir.unsafeGetAllDataIterAfter(tick, key)) { - (_, source, writer, values) = srcValue; - entries.push((key, (source, writer, values))); - } - }; - updates![targetDirName] = (isReset, changedKeys, dir, version, entries) + | Some((iversion, _)) if (iversion > version) -> void + | _ -> updates![targetDirName] = (version, dir) } | _ -> void } }; for (targetDirName => src in updates) { - (isReset, changedKeys, dir, _, entries) = src; - targetCtx.unsafeMaybeGetEagerDir(targetDirName) match { - | None() -> invariant_violation("Directory should have been created") - | Some(inputDir) -> - inputDir.writeArraySourceMany( - targetCtx, - entries.iterator(), - if (isReset) Some((dir.dirName, changedKeys)) else None(), - ) - } + (_version, dir) = src; + dir.importNext{targetCtx, tick, targetDirName} }; hasMultipleVersions } @@ -2211,16 +2182,7 @@ fun printContext(context: mutable SKStore.Context): void { producedAnyOutput = false; for ((dirsub, tick) in spec) { edir = context.unsafeGetEagerDir(dirsub.dirName); - (shouldRebuild, changes) = edir.getChangesAfter(tick); - producedOutput = edir.writeDiff( - shouldRebuild, - changes.iterator(), - writer, - dirsub.entity, - dirsub.format, - dirsub.filter(context.clone(), false, dirsub.dirName), - dirsub.getDestinationWatermark(context), - ); + producedOutput = edir.writeChangesAfter(context, dirsub, tick, writer); !producedAnyOutput = producedAnyOutput || producedOutput }; if (producedAnyOutput) { @@ -2679,6 +2641,58 @@ extension class EagerDir { !this.timeStack = this.timeStack.maxMinus(); this } + + fun importNext{ + targetCtx: mutable Context, + tick: Tick, + targetDirName: DirName, + }: void { + (isReset, changedKeys) = this.getChangesAfter(tick); + inputDir = targetCtx.unsafeMaybeGetEagerDir(targetDirName) match { + | None() -> + time = targetCtx.timeStamp(); + targetDir = EagerDir::create{ + timeStack => TimeStack::createInput(time), + input => true, + dirName => targetDirName, + creator => None(), + }; + targetCtx.setDir(targetDir); + targetDir + | Some(d) -> d + }; + entries = mutable Vector[]; + for (key in changedKeys) { + for (srcValue in this.unsafeGetAllDataIterAfter(tick, key)) { + (_, source, writer, values) = srcValue; + entries.push((key, (source, writer, values))); + } + }; + inputDir.writeArraySourceMany( + targetCtx, + entries.iterator(), + if (isReset) Some((this.dirName, changedKeys)) else None(), + ) + } + + fun writeChangesAfter( + context: readonly Context, + dirsub: DirSub, + tick: SKStore.Tick, + writer: mutable Debug.BufferedWriter, + ): Bool { + (shouldRebuild, changes) = this.getChangesAfter(tick); + producedOutput = this.writeDiff( + shouldRebuild, + changes.iterator(), + writer, + dirsub.entity, + dirsub.format, + dirsub.filter(context, false, dirsub.dirName), + dirsub.getDestinationWatermark(context), + ); + producedOutput + } } @cpp_extern("SKIP_physEq") diff --git a/sql/src/SqlTables.sk b/sql/src/SqlTables.sk index d1c42f110..3b6215cb9 100644 --- a/sql/src/SqlTables.sk +++ b/sql/src/SqlTables.sk @@ -998,3 +998,26 @@ class NamedIDs( } module end; + +module SKStore; + +extension class EagerDir { + fun copy{ + timeStack: TimeStack, + dirName: DirName, + input: Bool, + creator: ?ArrowKey, + }: this { + static{ + timeStack, + dirName, + input, + creator, + fixedData => this.fixedData, + data => this.data, + totalSize => this.totalSize, + } + } +} + +module end; diff --git a/sql/src/SqlTailer.sk b/sql/src/SqlTailer.sk index fb924fec0..3dfbf4abc 100644 --- a/sql/src/SqlTailer.sk +++ b/sql/src/SqlTailer.sk @@ -143,8 +143,7 @@ fun tailShouldWake( context.unsafeMaybeGetEagerDir(dirName) match { | None() -> void | Some(dir) -> - (isReset, changes) = dir.getChangesAfter(tick); - if (isReset || !changes.isEmpty()) { + if (dir.changesAfterIsResetOrNonEmpty(tick)) { return true } } @@ -291,7 +290,6 @@ fun tailSub( for (dirSub in sub.dirSubs()) { dirName = dirSub.dirName; - edir = context.unsafeGetEagerDir(dirName); format = options.format match { | SKDB.OFK_CSV() -> assert(dirSub.format is SKStore.OCSV(_)); @@ -303,52 +301,22 @@ fun tailSub( }; since = tailFrom(lookupSpec, dirName, tailWatermark); - (shouldRebuild, changes) = if (forceCompleteRebuild) { - (true, Array[].iterator()) - } else if (since.value <= 0) { - // if since is zero, the receiver is specifying that they - // have no data to tail since. to avoid a likely rebuild and - // allow chunking, we stream the current state of the table - // and begin tailing from here. - (false, edir.unsafeGetFileIterNoReducer(None()).map(pair -> pair.i0)) - } else if (since.value > 1 && !established) { - // we have a client that we've never seen before asking to - // tail from a point in time that they cannot understand, as - // they have no reference for our clock. we assume that this - // indicates they've moved from a different server to us and - // so need to reset their concept of the server's time. - (true, Array[].iterator()) - } else { - // we can only chunk while tables are since 0 as we'll emit - // zero checkpoints. once a nonzero request is handled, we must - // make sure that it doesn't ultimately get sent back as part of a - // zero-checkpoint chunk - otherwise it won't be applied - // TODO: optimisation, request all 0 tables first - !chunkingDisabled = true; - (shouldRebuild, changes) = edir.getChangesAfter(since); - (shouldRebuild, changes.iterator()) - }; - - // Write checkpoints out at least every `checkpointInterval` rows during - // initialization, so that the client can start processing without waiting - // for the full changeset. - // this value is chosen based on some unscientific local testing - checkpointInterval = if (since.value == 0 && !chunkingDisabled) { - 512 - } else { - Int::max - }; - - producedOutput = edir.writeDiff( + edir = context.unsafeGetEagerDir(dirName); + ( + producedOutput, shouldRebuild, - changes, + !chunkingDisabled, + ) = edir.writeChangesTailSub{ + context, + dirSub, + since, writer, - dirSub.entity, format, - dirSub.filter(context, since.value < 1, dirName), //send everything if client has nothing, i.e. since.value == 0 - dirSub.getDestinationWatermark(context), - checkpointInterval, - ); + forceCompleteRebuild, + established, + chunkingDisabled, + }; + !producedAnyOutput = producedAnyOutput || producedOutput; !couldFollowChanges = couldFollowChanges && !shouldRebuild; }; @@ -444,4 +412,70 @@ extension class Sub { } } +extension class EagerDir { + fun changesAfterIsResetOrNonEmpty(tick: Tick): Bool { + (isReset, changes) = this.getChangesAfter(tick); + isReset || !changes.isEmpty() + } + + fun writeChangesTailSub{ + context: readonly Context, + dirSub: DirSub, + since: SKStore.Tick, + writer: mutable Debug.BufferedWriter, + format: OutputFormat, + forceCompleteRebuild: Bool, + established: Bool, + chunkingDisabled: Bool, + }: (Bool, Bool, Bool) { + (shouldRebuild, changes) = if (forceCompleteRebuild) { + (true, Array[].iterator()) + } else if (since.value <= 0) { + // if since is zero, the receiver is specifying that they + // have no data to tail since. to avoid a likely rebuild and + // allow chunking, we stream the current state of the table + // and begin tailing from here. + (false, this.unsafeGetFileIterNoReducer(None()).map(pair -> pair.i0)) + } else if (since.value > 1 && !established) { + // we have a client that we've never seen before asking to + // tail from a point in time that they cannot understand, as + // they have no reference for our clock. we assume that this + // indicates they've moved from a different server to us and + // so need to reset their concept of the server's time. + (true, Array[].iterator()) + } else { + // we can only chunk while tables are since 0 as we'll emit + // zero checkpoints. once a nonzero request is handled, we must + // make sure that it doesn't ultimately get sent back as part of a + // zero-checkpoint chunk - otherwise it won't be applied + // TODO: optimisation, request all 0 tables first + !chunkingDisabled = true; + (shouldRebuild, changes) = this.getChangesAfter(since); + (shouldRebuild, changes.iterator()) + }; + + // Write checkpoints out at least every `checkpointInterval` rows during + // initialization, so that the client can start processing without waiting + // for the full changeset. + // this value is chosen based on some unscientific local testing + checkpointInterval = if (since.value == 0 && !chunkingDisabled) { + 512 + } else { + Int::max + }; + + producedOutput = this.writeDiff( + shouldRebuild, + changes, + writer, + dirSub.entity, + format, + dirSub.filter(context, since.value < 1, this.dirName), //send everything if client has nothing, i.e. since.value == 0 + dirSub.getDestinationWatermark(context), + checkpointInterval, + ); + (producedOutput, shouldRebuild, chunkingDisabled) + } +} + module end;