diff --git a/.haxerc b/.haxerc index 9bb4f16..3ee99dd 100644 --- a/.haxerc +++ b/.haxerc @@ -1,4 +1,4 @@ { - "version": "3.4.7", + "version": "4.3.3", "resolveLibs": "scoped" } \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index e6b5570..dba47bf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,21 +6,21 @@ stages: - deploy language: node_js -node_js: 8 +node_js: 14 os: - linux # - osx env: - - HAXE_VERSION=3.4.7 + - HAXE_VERSION=stable - HAXE_VERSION=nightly - + install: - - npm i -g lix@15.3.13 + - npm i -g lix@15.9.0 - lix install haxe $HAXE_VERSION - lix download - + script: # - lix run travix interp # runtime stack overflow - lix run travix neko @@ -28,10 +28,9 @@ script: - lix run travix node - lix run travix js # - lix run travix flash - - lix run travix java - - if [[ "$(haxe -version)" =~ ^4.* ]]; then lix run travix java -D jvm; fi + - lix run travix java -D jvm - lix run travix cpp - # - lix run travix cs # gencs stack overflow, to be investigated + - lix run travix cs -D erase-generics - lix run travix php - lix run travix lua diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 6a40fe7..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "haxe.displayConfigurations": [ - ["dev.hxml"] - ] -} diff --git a/.vscode/tasks.json b/.vscode/tasks.json index f56a473..99242b8 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -1,7 +1,14 @@ -{ - "version": "0.1.0", - "command": "npm", - "args": ["run","--silent","travix","node"], - "isShellCommand": true, - "problemMatcher": "$haxe" -} +{ + "version": "2.0.0", + "tasks": [ + { + "type": "hxml", + "file": "dev.hxml", + "group": { + "kind": "build", + "isDefault": true + }, + "label": "hxml: dev.hxml" + } + ] +} \ No newline at end of file diff --git a/bench.hxml b/bench.hxml new file mode 100644 index 0000000..ec78333 --- /dev/null +++ b/bench.hxml @@ -0,0 +1,7 @@ +-lib tink_streams +-lib hxnodejs +-cp tests +-main Benchmark +--dce full +-D analyzer-optimize +-js bin/bench.js \ No newline at end of file diff --git a/extraParams.hxml b/extraParams.hxml deleted file mode 100644 index f06da6b..0000000 --- a/extraParams.hxml +++ /dev/null @@ -1,2 +0,0 @@ -# temp for development, delete this file when pure branch merged --D pure \ No newline at end of file diff --git a/haxe_libraries/hxcpp.hxml b/haxe_libraries/hxcpp.hxml index b09fbe6..09ac2ce 100644 --- a/haxe_libraries/hxcpp.hxml +++ b/haxe_libraries/hxcpp.hxml @@ -1,4 +1,4 @@ --D hxcpp=4.0.8 -# @install: lix --silent download "haxelib:/hxcpp#4.0.8" into hxcpp/4.0.8/haxelib -# @run: haxelib run-dir hxcpp ${HAXE_LIBCACHE}/hxcpp/4.0.8/haxelib --cp ${HAXE_LIBCACHE}/hxcpp/4.0.8/haxelib/ +# @install: lix --silent download "haxelib:/hxcpp#4.2.1" into hxcpp/4.2.1/haxelib +# @run: haxelib run-dir hxcpp ${HAXE_LIBCACHE}/hxcpp/4.2.1/haxelib +-cp ${HAXE_LIBCACHE}/hxcpp/4.2.1/haxelib/ +-D hxcpp=4.2.1 \ No newline at end of file diff --git a/haxe_libraries/hxcs.hxml b/haxe_libraries/hxcs.hxml new file mode 100644 index 0000000..72f2a5b --- /dev/null +++ b/haxe_libraries/hxcs.hxml @@ -0,0 +1,4 @@ +# @install: lix --silent download "haxelib:/hxcs#4.2.0" into hxcs/4.2.0/haxelib +# @run: haxelib run-dir hxcs ${HAXE_LIBCACHE}/hxcs/4.2.0/haxelib +-cp ${HAXE_LIBCACHE}/hxcs/4.2.0/haxelib/ +-D hxcs=4.2.0 \ No newline at end of file diff --git a/haxe_libraries/hxjava.hxml b/haxe_libraries/hxjava.hxml index 0942a5a..bf7270e 100644 --- a/haxe_libraries/hxjava.hxml +++ b/haxe_libraries/hxjava.hxml @@ -1,5 +1,5 @@ --D hxjava=3.2.0 -# @install: lix --silent download "haxelib:/hxjava#3.2.0" into hxjava/3.2.0/haxelib -# @run: haxelib run-dir hxjava ${HAXE_LIBCACHE}/hxjava/3.2.0/haxelib --cp ${HAXE_LIBCACHE}/hxjava/3.2.0/haxelib/ +# @install: lix --silent download "haxelib:/hxjava#4.2.0" into hxjava/4.2.0/haxelib +# @run: haxelib run-dir hxjava ${HAXE_LIBCACHE}/hxjava/4.2.0/haxelib +-cp ${HAXE_LIBCACHE}/hxjava/4.2.0/haxelib/ +-D hxjava=4.2.0 -java-lib lib/hxjava-std.jar diff --git a/haxe_libraries/hxnodejs.hxml b/haxe_libraries/hxnodejs.hxml index 21bc573..7772815 100644 --- a/haxe_libraries/hxnodejs.hxml +++ b/haxe_libraries/hxnodejs.hxml @@ -1,6 +1,7 @@ -# @install: lix --silent download haxelib:hxnodejs#4.0.9 into hxnodejs/4.0.9/haxelib --D hxnodejs=4.0.9 --cp ${HAXESHIM_LIBCACHE}/hxnodejs/4.0.9/haxelib/src --D nodejs +# @install: lix --silent download "gh://github.com/HaxeFoundation/hxnodejs#504066dc1ba5ad543afa5f6c3ea019f06136a82b" into hxnodejs/12.1.0/github/504066dc1ba5ad543afa5f6c3ea019f06136a82b +-cp ${HAXE_LIBCACHE}/hxnodejs/12.1.0/github/504066dc1ba5ad543afa5f6c3ea019f06136a82b/src +-D hxnodejs=12.1.0 --macro allowPackage('sys') ---macro _hxnodejs.VersionWarning.include() +# should behave like other target defines and not be defined in macro context +--macro define('nodejs') +--macro _internal.SuppressDeprecated.run() diff --git a/haxe_libraries/tink_core.hxml b/haxe_libraries/tink_core.hxml index ac2b1c6..2354244 100644 --- a/haxe_libraries/tink_core.hxml +++ b/haxe_libraries/tink_core.hxml @@ -1,3 +1,3 @@ --D tink_core=1.22.0 -# @install: lix --silent download "gh://github.com/haxetink/tink_core#fa752b88f6757c18da92998aeab5523fe4f28853" into tink_core/1.22.0/github/fa752b88f6757c18da92998aeab5523fe4f28853 --cp ${HAXE_LIBCACHE}/tink_core/1.22.0/github/fa752b88f6757c18da92998aeab5523fe4f28853/src +# @install: lix --silent download "gh://github.com/haxetink/tink_core#670bc1d256a657cee2e78e3554d7effe31c3682d" into tink_core/2.1.1/github/670bc1d256a657cee2e78e3554d7effe31c3682d +-cp ${HAXE_LIBCACHE}/tink_core/2.1.1/github/670bc1d256a657cee2e78e3554d7effe31c3682d/src +-D tink_core=2.1.1 \ No newline at end of file diff --git a/haxe_libraries/tink_testrunner.hxml b/haxe_libraries/tink_testrunner.hxml index 01a09d1..d5a2d6f 100644 --- a/haxe_libraries/tink_testrunner.hxml +++ b/haxe_libraries/tink_testrunner.hxml @@ -1,6 +1,6 @@ --D tink_testrunner=0.7.2 -# @install: lix --silent download "gh://github.com/haxetink/tink_testrunner#9a2e3cbb9ddff7269e08584f30fc425226f10aae" into tink_testrunner/0.7.2/github/9a2e3cbb9ddff7269e08584f30fc425226f10aae +# @install: lix --silent download "gh://github.com/haxetink/tink_testrunner#866de8b991be89b969825b0c0f5565d51f96a6f7" into tink_testrunner/0.8.0/github/866de8b991be89b969825b0c0f5565d51f96a6f7 -lib ansi -lib tink_macro -lib tink_streams --cp ${HAXE_LIBCACHE}/tink_testrunner/0.7.2/github/9a2e3cbb9ddff7269e08584f30fc425226f10aae/src +-cp ${HAXE_LIBCACHE}/tink_testrunner/0.8.0/github/866de8b991be89b969825b0c0f5565d51f96a6f7/src +-D tink_testrunner=0.8.0 \ No newline at end of file diff --git a/haxe_libraries/tink_unittest.hxml b/haxe_libraries/tink_unittest.hxml index 454e37b..b00d844 100644 --- a/haxe_libraries/tink_unittest.hxml +++ b/haxe_libraries/tink_unittest.hxml @@ -1,6 +1,6 @@ --D tink_unittest=0.6.2 -# @install: lix --silent download "gh://github.com/haxetink/tink_unittest#0b0c7de647e522ca42662e2cdfc59e21ed8d4eb4" into tink_unittest/0.6.2/github/0b0c7de647e522ca42662e2cdfc59e21ed8d4eb4 +# @install: lix --silent download "gh://github.com/haxetink/tink_unittest#1c26b50064855d3e7810d4d871103964d5ac9fba" into tink_unittest/0.7.0/github/1c26b50064855d3e7810d4d871103964d5ac9fba -lib tink_syntaxhub -lib tink_testrunner --cp ${HAXE_LIBCACHE}/tink_unittest/0.6.2/github/0b0c7de647e522ca42662e2cdfc59e21ed8d4eb4/src +-cp ${HAXE_LIBCACHE}/tink_unittest/0.7.0/github/1c26b50064855d3e7810d4d871103964d5ac9fba/src +-D tink_unittest=0.7.0 --macro tink.unit.AssertionBufferInjector.use() \ No newline at end of file diff --git a/haxe_libraries/travix.hxml b/haxe_libraries/travix.hxml index f0ede93..a7e0a62 100644 --- a/haxe_libraries/travix.hxml +++ b/haxe_libraries/travix.hxml @@ -1,6 +1,7 @@ -# @install: lix --silent download "gh://github.com/back2dos/travix#90624892ef6bd5b7bb02d359959d1b3d47553999" into travix/0.14.0/github/90624892ef6bd5b7bb02d359959d1b3d47553999 -# @post-install: cd ${HAXE_LIBCACHE}/travix/0.14.0/github/90624892ef6bd5b7bb02d359959d1b3d47553999 && haxe -cp src --run travix.PostDownload -# @run: haxelib run-dir travix ${HAXE_LIBCACHE}/travix/0.14.0/github/90624892ef6bd5b7bb02d359959d1b3d47553999 +# @install: lix --silent download "haxelib:/travix#0.15.3" into travix/0.15.3/haxelib +# @post-install: cd ${HAXE_LIBCACHE}/travix/0.15.3/haxelib && haxe -cp src --run travix.PostDownload +# @run: haxelib run-dir travix "${HAXE_LIBCACHE}/travix/0.15.3/haxelib" -lib tink_cli --cp ${HAXE_LIBCACHE}/travix/0.14.0/github/90624892ef6bd5b7bb02d359959d1b3d47553999/src --D travix=0.14.0 +-cp ${HAXE_LIBCACHE}/travix/0.15.3/haxelib/src +-D travix=0.15.3 +--macro travix.Macro.setup() \ No newline at end of file diff --git a/src/tink/streams/IdealStream.hx b/src/tink/streams/IdealStream.hx index e7c7e36..f312bbd 100644 --- a/src/tink/streams/IdealStream.hx +++ b/src/tink/streams/IdealStream.hx @@ -1,31 +1,3 @@ package tink.streams; -import tink.streams.Stream; - -using tink.CoreApi; - -@:forward @:transitive -abstract IdealStream(Stream) from Stream to Stream { - @:from - public static inline function promiseOfIdealStream(p:Promise>):IdealStream - return cast Stream.promise(p); - - @:from - public static inline function promiseOfStreamNoise(p:Promise>):IdealStream - return cast Stream.promise(p); - - public function collect():Future> { - var buf = []; - return this.forEach(function(x) { - buf.push(x); - return Resume; - }).map(function(c) return buf); - } -} - -typedef IdealStreamObject = StreamObject; - -class IdealStreamBase extends StreamBase { - override public function idealize(rescue:Error->Stream):IdealStream - return this; -} \ No newline at end of file +typedef IdealStream = Stream; \ No newline at end of file diff --git a/src/tink/streams/RealStream.hx b/src/tink/streams/RealStream.hx index c17239b..50268ff 100644 --- a/src/tink/streams/RealStream.hx +++ b/src/tink/streams/RealStream.hx @@ -1,38 +1,27 @@ package tink.streams; -import tink.streams.Stream; +import tink.streams.Stream.StreamObject; using tink.CoreApi; -@:forward @:transitive -abstract RealStream(Stream) from Stream to Stream { - @:from - public static inline function promiseOfIdealStream(p:Promise>):RealStream - return cast Stream.promise(p); - - @:from - public static inline function promiseOfStreamNoise(p:Promise>):RealStream - return cast Stream.promise(p); - - @:from - public static inline function promiseOfRealStream(p:Promise>):RealStream - return cast Stream.promise(p); - - @:from - public static inline function promiseOfStreamError(p:Promise>):RealStream - return cast Stream.promise(p); - - public function collect():Promise> { - var buf = []; - return this.forEach(function(x) { - buf.push(x); - return Resume; - }).map(function(c) return switch c { - case Depleted: Success(buf); - case Failed(e): Failure(e); - case Halted(_): throw 'unreachable'; - }); - } +typedef RealStream = Stream; + +class RealStreamTools { + static public function idealize(s:RealStream, rescue:(error:Error)->RealStream):IdealStream + return cast s; } -typedef RealStreamObject = StreamObject; -typedef RealStreamBase = StreamBase; \ No newline at end of file + +// private class IdealizedStream implements StreamObject { + +// final stream:RealStream; +// final rescue:Error->RealStream; + +// public function new(stream, rescue) { +// this.stream = stream; +// this.rescue = rescue; +// } + + // public function forEach(f:Consumer):Future> + // return + // stream.forEach() +// } \ No newline at end of file diff --git a/src/tink/streams/Return.hx b/src/tink/streams/Return.hx new file mode 100644 index 0000000..30c6358 --- /dev/null +++ b/src/tink/streams/Return.hx @@ -0,0 +1,29 @@ +package tink.streams; + +using tink.CoreApi; + +@:forward +abstract Return(Surprise) from Surprise { + + inline function new(v) + this = v; + + public inline function asFuture() + return this; + + @:from static function ofError(e:Error):Return + return ofPromise(e); + + @:from static function ofOutcome(o:Outcome):Return + return new Return(Future.sync(o)); + + @:from static function ofPromise(f:Promise):Return + return new Return(f); + + @:from static function ofFuture(f:Future):Return + return new Return(f.map(Success)); + + @:from static function ofConst(v:T):Return + return ofFuture(v); + +} \ No newline at end of file diff --git a/src/tink/streams/Stream.hx b/src/tink/streams/Stream.hx index 5632bab..a894752 100644 --- a/src/tink/streams/Stream.hx +++ b/src/tink/streams/Stream.hx @@ -1,743 +1,550 @@ package tink.streams; -import tink.streams.IdealStream; - +import tink.core.Callback; using tink.CoreApi; -@:forward @:transitive +@:transitive +@:using(tink.streams.RealStream) abstract Stream(StreamObject) from StreamObject to StreamObject { - public var depleted(get, never):Bool; - inline function get_depleted() - return this.depleted; + static public function generate(generate:()->Future>):Stream { + return new Generator(generate); + } - @:to function dirty():Stream - return cast this; + static public function single(item:Item):Stream + return new SingleItem(item); - static public function single(i:Item):Stream - return new Single(i); + public function next():Future> + return this.forEach(i -> Some(i)).map(function (r):Step return switch r { + case Done: End; + case Stopped(rest, result): Link(result, rest); + case Failed(_, e): Fail(e); + }); - @:from static public function ofIterator(i:Iterator):Stream { - return Generator.stream(function next(step) step(if(i.hasNext()) Link(i.next(), Generator.stream(next)) else End)); - } - - static public function flatten(stream:Stream, Quality>):Stream { - return stream.regroup(function(arr) return Converted(arr[0])); - } + public function blend(other:Stream):Stream + return new BlendStream(this, other); - #if cs - // This is to mitigate an error in the c# generator that it generates paramterized calls - // with type parameters which is not defined in scope - // similar to https://github.com/HaxeFoundation/haxe/issues/6833 - @:from static public function dirtyFuture(f:Future>):Stream - return new FutureStream(f); - #end - - @:from static public function future(f:Future>):Stream - return new FutureStream(f); - - #if cs - // This is to mitigate an error in the c# generator that it generates paramterized calls - // with type parameters which is not defined in scope - // similar to https://github.com/HaxeFoundation/haxe/issues/6833 - @:from static public function dirtyPromise(f:Promise>):Stream - return dirtyFuture(f.map(function (o) return switch o { - case Success(s): s; - case Failure(e): ofError(e); + public function select(selector):Stream + return + if (Type.getClass(this) == SelectStream) + SelectStream.chain(cast this, selector); + else + new SelectStream(this, selector); + + public function forEach(f:(item:Item)->Future>):Future> + return this.forEach(f); + + public function filter(f:Item->Return):Stream + return select(i -> f(i).map(o -> switch o { + case Success(matched): Success(if (matched) Some(i) else None); + case Failure(failure): Failure(failure); })); - #end - - @:from static inline function promiseIdeal(f:Promise>):Stream - return cast promise(f); - - @:from static inline function promiseReal(f:Promise>):Stream - return cast promise(f); - @:from static public function promise(f:Promise>):Stream - return future(f.map(function (o) return switch o { - case Success(s): s.dirty(); - case Failure(e): ofError(e); + public function map(f:Item->Return):Stream + return select(i -> f(i).map(r -> switch r { + case Success(data): Success(Some(data));// BUG: Success(data) compiles + case Failure(failure): Failure(failure); })); - @:from static public function ofError(e:Error):Stream - return new ErrorStream(e); + static public inline function empty():Stream + return @:privateAccess + #if cs + new Empty(); + #else + cast Empty.INST; + #end - #if (nodejs && !macro) - @:noUsing static public inline function ofNodeStream(name:String, r:js.node.stream.Readable.IReadable, ?options:{ ?onEnd:Void->Void }):RealStream { - return tink.streams.nodejs.NodejsStream.wrap(name, r, options == null ? null : options.onEnd); - } - #end -} + @:op(a...b) public function append(b:Stream):Stream + return new Compound([this, b]); -enum RegroupStatus { - Flowing:RegroupStatus; - Errored(e:Error):RegroupStatus; - Ended:RegroupStatus; -} + @:from static public function ofIterator(t:Iterator):Stream + return SyncLinkStream.ofIterator(t); -enum RegroupResult { - Converted(data:Stream, ?untouched:Array):RegroupResult; - Terminated(data:Option>):RegroupResult; - Untouched:RegroupResult; - Errored(e:Error):RegroupResult; -} + @:from static public function future(p:Future>):Stream + return new FutureStream(p); -@:forward -abstract Regrouper(RegrouperBase) from RegrouperBase to RegrouperBase { - @:from - public static function ofIgnorance(f:Array->Future>):Regrouper - return {apply: function(i, _) return f(i)}; - @:from - public static function ofIgnoranceSync(f:Array->RegroupResult):Regrouper - return {apply: function(i, _) return Future.sync(f(i))}; - @:from - public static function ofFunc(f:Array->RegroupStatus->Future>):Regrouper - return {apply: f}; - @:from - public static function ofFuncSync(f:Array->RegroupStatus->RegroupResult):Regrouper - return {apply: function(i, s) return Future.sync(f(i, s))}; -} + @:from static public function promise(p:Promise>):Stream + return new PromiseStream(p); -private typedef RegrouperBase = { - function apply(input:Array, status:RegroupStatus):Future>; -} + @:from static public function ofError(e:Error):Stream + return promise(e); -private class RegroupStream extends CompoundStream { - public function new(source:Stream, f:Regrouper, ?prev, ?buf) { - if(prev == null) prev = Empty.make(); - if(buf == null) buf = []; - - var ret = null; - var terminated = false; - var next = Stream.future(source.forEach(function(item) { - buf.push(item); - return f.apply(buf, Flowing).map(function (o):Handled return switch o { - case Converted(v, untouched): - ret = v; - buf = untouched; - Finish; - case Terminated(v): - ret = v.or(Empty.make); - terminated = true; - Finish; - case Untouched: - Resume; - case Errored(e): - Clog(e); - }); - }).map(function(o):Stream return switch o { - case Failed(e): Stream.ofError(e); - case Depleted if(buf.length == 0): Empty.make(); - case Depleted: - Stream.future(f.apply(buf, Ended).map(function(o) return switch o { - case Converted(v): v; - case Terminated(v): v.or(Empty.make); - case Untouched: Empty.make(); - case Errored(e): cast Stream.ofError(e); - })); - case Halted(_) if(terminated): ret; - case Halted(rest): new RegroupStream(rest, f, ret, buf); - case Clogged(e, _): cast new ErrorStream(e); // the regroup stream should terminate when an error occurs during the regroup process - })); - // TODO: get rid of those casts in this function + static public function flatten(s:Stream, Error>):Stream + return new FlattenStream(s); - super([prev, next]); - } + static public function ofSignal(s):Stream + return new SignalStream(s); } -enum Handled { - BackOff:Handled; - Finish:Handled; - Resume:Handled; - Clog(e:Error):Handled; +enum Step { + Link(value:Item, next:Stream):Step; + Fail(e:Error):Step; + End:Step; } -enum Conclusion { - Halted(rest:Stream):Conclusion; - Clogged(error:Error, at:Stream):Conclusion; - Failed(error:Error):Conclusion; - Depleted:Conclusion; -} +private class FlattenStream implements StreamObject { + final source:Stream, Quality>; -enum ReductionStep { - Progress(result:Result):ReductionStep; - Crash(e:Error):ReductionStep; -} + public function new(source) + this.source = source; -enum Reduction { - Crashed(error:Error, at:Stream):Reduction; - Failed(error:Error):Reduction; - Reduced(result:Result):Reduction; + public function forEach(f:(item:Item)->Future>):Future> + return + source.forEach(child -> child.forEach(f).map(r -> switch r { + case Done: None; + case Stopped(rest, result): Some(new Pair(rest, Success(result))); + case Failed(rest, e): Some(new Pair(cast rest, Failure(e))); + })).map(r -> switch r { + case Done: Done; + case Stopped(rest, result): + var rest = result.a.append(new FlattenStream(rest)); + switch result.b { + case Success(data): Stopped(rest, data); + case Failure(failure): cast Failed(cast rest, failure); + } + case Failed(rest, e): + cast Failed(new FlattenStream(rest), e); + }); } -private class CloggedStream extends StreamBase { - - var rest:Stream; - var error:Error; +private class FutureStream implements StreamObject { + final stream:Future>; - public function new(rest, error) { - this.rest = rest; - this.error = error; - } + public function new(stream) + this.stream = stream; - override function next():Future> - return Future.sync(Step.Fail(error)); - - override public function forEach(handler:Handler):Future> - return Future.sync(cast Conclusion.Clogged(error, rest)); + public function forEach(f:(item:Item)->Future>):Future> + return stream.flatMap(s -> s.forEach(f)); } +private class PromiseStream implements StreamObject { + final stream:Promise>; + + public function new(stream) + this.stream = stream; + + public function forEach(f:(item:Item)->Future>):Future> + return stream.flatMap(o -> switch o { + case Success(s): + s.forEach(f); + case Failure(e): + Failed(Stream.empty(), e); + }); +} -private class ErrorStream extends StreamBase { - - var error:Error; - - public function new(error) - this.error = error; - - override function next():Future> - return Future.sync(Step.Fail(error)); - - override public function forEach(handler:Handler):Future> - return Future.sync(Conclusion.Failed(error)); +private class SingleItem implements StreamObject { + final item:Item; + public function new(item) + this.item = item; + + public function forEach(f:(item:Item)->Future>) + return new Future>( + trigger -> Helper.trySync( + f(item), + (s, _) -> trigger(switch s { + case Some(v): + Stopped(Stream.empty(), v); + case None: + Done; + }) + ) + ); +} +@:using(Stream.IterationResultTools) +enum IterationResult { + Done:IterationResult; + Failed(rest:Stream, e:Error):IterationResult; + Stopped(rest:Stream, result:Result):IterationResult; } interface StreamObject { - /** - * `true` if there is no data in this stream - */ - var depleted(get, never):Bool; - function next():Future>; - /** - * Create a new stream by performing an N-to-M mapping - */ - function regroup(f:Regrouper):Stream; - /** - * Create a new stream by performing an 1-to-1 mapping - */ - function map(f:Mapping):Stream; - /** - * Create a filtered stream - */ - function filter(f:Filter):Stream; - function retain():Void->Void; - /** - * Create an IdealStream. - * The stream returned from the `rescue` function will be recursively rescued by the same `rescue` function - */ - function idealize(rescue:Error->Stream):IdealStream; - /** - * Append another stream after this - */ - function append(other:Stream):Stream; - /** - * Prepend another stream before this - */ - function prepend(other:Stream):Stream; - function blend(other:Stream):Stream; - function decompose(into:Array>):Void; - /** - * Iterate this stream. - * The handler should return one of the following values (or a `Future` of it) - * - Backoff: stop the iteration before the current item - * - Finish: stop the iteration after the current item - * - Resume: continue the iteration - * - Clog(error): produce an error - * @return A conclusion that indicates how the iteration was ended - * - Depleted: there are no more data in the stream - * - Failed(err): the stream produced an error - * - Halted(rest): the iteration was halted by `Backoff` or `Finish` - * - Clogged(err): the iteration was halted by `Clog(err)` - */ - function forEach(handle:Handler):Future>; - /** - * Think Lambda.fold() - */ - function reduce(initial:Result, reducer:Reducer):Future>; + function forEach(f:(item:Item)->Future>):Future>; } -class Empty extends StreamBase { - - function new() {} - - override function get_depleted() - return true; - - override function next():Future> - return Future.sync(Step.End); - - override public function forEach(handler:Handler):Future> - return Future.sync(Depleted); - - static var inst = new Empty(); - - static public inline function make():Stream - return (cast inst : Stream); - +private enum LinkKind { + Fin(error:Null); + Cons(head:Item, tail:Tail); } -abstract Mapping(Regrouper) to Regrouper { - - inline function new(o) - this = o; - - @:from static function ofNext(n:Next):Mapping - return new Mapping({ - apply: function (i:Array, _) return n(i[0]).next(function(o) return Converted(Stream.single(o))).recover(Errored), - }); - - @:from static function ofAsync(f:In->Future):Mapping - return new Mapping({ - apply: function (i:Array, _) return f(i[0]).map(function(o) return Converted(Stream.single(o))), - }); +private class Empty implements StreamObject { - @:from static function ofSync(f:In->Outcome):Mapping - return new Mapping({ - apply: function (i:Array, _) return Future.sync(switch f(i[0]) { - case Success(v): Converted(Stream.single(v)); - case Failure(e): Errored(e); - }), - }); + static final INST:StreamObject = new Empty(); - @:from static function ofPlain(f:In->Out):Mapping - return new Mapping({ - apply: function (i:Array, _) return Future.sync(Converted(Stream.single(f(i[0])))), - }); + function new() {} + public function forEach(f:(item:Item)->Future>):Future> + return Done; } -abstract Filter(Regrouper) to Regrouper { - - inline function new(o) - this = o; +private class Compound implements StreamObject { + final parts:Array>; - @:from static function ofNext(n:Next):Filter - return new Filter({ - apply: function (i:Array, _) return n(i[0]).next(function (matched) return Converted(if (matched) Stream.single(i[0]) else Empty.make())).recover(Errored), - }); + public function new(parts) { + this.parts = parts; + } - @:from static function ofAsync(f:T->Future):Filter - return new Filter({ - apply: function (i:Array, _) return f(i[0]).map(function (matched) return Converted(if (matched) Stream.single(i[0]) else Empty.make())), - }); + public function forEach(f:(item:Item)->Future>) { + var index = 0, + cur = Future.sync(Done); + return new Future>(trigger -> { + final wait = new CallbackLinkRef(); - @:from static function ofSync(f:T->Outcome):Filter - return new Filter({ - apply: function (i:Array, _) return Future.sync(switch f(i[0]) { - case Success(v): Converted(if(v)Stream.single(i[0]) else Empty.make()); - case Failure(e): Errored(e); - }), - }); + var streaming = true; + function yield(v) { + streaming = false; + trigger(v); + } - @:from static function ofPlain(f:T->Bool):Filter - return new Filter({ - apply: function (i:Array, _) return Future.sync(Converted(if (f(i[0])) Stream.single(i[0]) else Empty.make())), + function loop() + while (streaming) { + wait.link = Helper.trySync(cur, (val, sync) -> switch val { + case Done: + if (index < parts.length) + cur = parts[index++].forEach(f); + else + yield(Done); + + if (!sync) loop(); + case v: + yield(v.withStream(s -> s.append(new Compound(parts.slice(index))))); + }); + if (wait.link != null) break; + } + loop(); + return wait; }); - + } } -class StreamBase implements StreamObject { +private typedef Selector = In->Return, Quality>; - public var depleted(get, never):Bool; - function get_depleted() return false; +private class SelectStream implements StreamObject { - var retainCount = 0; + final source:Stream; + final selector:Selector; - public function retain() { - retainCount++; - var retained = true; - return function () { - if (retained) { - retained = false; - if (--retainCount == 0) - destroy(); - } - } + public function new(source, selector) { + this.source = source; + this.selector = selector; } - public function next():Future> { - throw 'not implemented'; - // var item = null; - // return this.forEach(function(i) { - // item = i; - // return Finish; - // }).map(function(o):Step return switch o { - // case Depleted: End; - // case Halted(rest): Link(item, rest); - // case Failed(e): Fail(e); - // }); - } + function continued(source):Stream + return new SelectStream(source, selector); - public function regroup(f:Regrouper):Stream - return new RegroupStream(this, f); - - public function map(f:Mapping):Stream - return regroup(f); - - public function filter(f:Filter):Stream - return regroup(f); - - function destroy() {} - - public function append(other:Stream):Stream - return - if (depleted) other; - else CompoundStream.of([this, other]); - - public function prepend(other:Stream):Stream - return - if (depleted) other; - else CompoundStream.of([other, this]); - - public function blend(other:Stream):Stream - return - if (depleted) other; - else new BlendStream(this, other); - - public function decompose(into:Array>) - if (!depleted) - into.push(this); - - public function idealize(rescue:Error->Stream):IdealStream + public function forEach(f:(item:Out)->Future>):Future> return - if (depleted) Empty.make(); - else new IdealizeStream(this, rescue); - - public function reduce(initial:Result, reducer:Reducer):Future> - return Future.async(function (cb:Reduction->Void) { - forEach(function (item) - return reducer.apply(initial, item).map( - function (o):Handled return switch o { - case Progress(v): initial = v; Resume; - case Crash(e): Clog(e); - }) - ).handle(function (c) switch c { - case Failed(e): cb(Failed(e)); - case Depleted: cb(Reduced(initial)); - case Halted(_): throw "assert"; - case Clogged(e, rest): cb(Crashed(e, rest)); + source.forEach( + item -> { + var selected = selector(item).asFuture(); + new Future(trigger -> { + return + Helper.trySync(selected, (val, sync) -> switch val { + case Success(None): trigger(None); + case Success(Some(v)): + Helper.trySync(f(v), (val, sync) -> switch val { + case Some(v): trigger(Some(Success(v))); + case None: trigger(None); + }); + case Failure(e): trigger(Some(Failure(e))); + }); + }); + } + ).map(res -> switch res { + case Done: Done; + case Stopped(rest, Success(result)): + Stopped(continued(rest), result); + case Failed(rest, e) | Stopped(rest, Failure(e)): + cast Failed(cast continued(cast rest), e);// GADT bug }); - }); - public function forEach(handler:Handler):Future> - return throw 'not implemented'; + static public function chain( + a:SelectStream, + b:Selector + ) + return new SelectStream(a.source, chainSelectors(a.selector, b)); + + static function chainSelectors( + a:Selector, + b:Selector + ):Selector + return v -> new Future( + trigger -> { + final inner = new CallbackLinkRef(); + + a(v).handle(o -> switch o { + case Success(None): + trigger(Success(None)); + case Success(Some(v)): + inner.link = b(v).handle(trigger); + case Failure(e): + trigger(Failure(e)); + }).join(inner); + } + ); } -class IdealizeStream extends IdealStreamBase { - var target:Stream; - var rescue:Error->Stream; +private class Grouped implements StreamObject { + final source:Stream, Quality>; - public function new(target, rescue) { - this.target = target; - this.rescue = rescue; - } + public function new(source) + this.source = source; - override function get_depleted() - return target.depleted; - - override function next():Future> - return target.next().flatMap(function(v) return switch v { - case Fail(e): rescue(e).idealize(rescue).next(); - default: Future.sync(cast v); - }); - - override public function forEach(handler:Handler):Future> + public function forEach(f:(item:Item)->Future>):Future> return - Future.async(function (cb:Conclusion->Void) - target.forEach(handler).handle(function (end) switch end { - case Depleted: - cb(Depleted); - case Halted(rest): - cb(Halted(rest.idealize(rescue))); - case Clogged(e, at): - cb(Clogged(e, at.idealize(rescue))); - case Failed(e): - rescue(e).idealize(rescue).forEach(handler).handle(cb); - }) - ); - -} - -class Single extends StreamBase { - var value:Lazy; - - public function new(value) - this.value = value; - - override function next():Future> - return Future.sync(Link(value.get(), Empty.make())); - - override public function forEach(handle:Handler) - return handle.apply(value).map(function (step):Conclusion return switch step { - case BackOff: - Halted(this); - case Finish: - Halted(Empty.make()); - case Resume: - Depleted; - case Clog(e): - Clogged(e, this); - }); -} - -abstract Handler(Item->Future>) { - inline function new(f) - this = f; - - public inline function apply(item):Future> - return this(item); - - @:from static function ofSafeSync(f:Item->Handled):Handler - return new Handler(function (i) return Future.sync(f(i))); - - @:from static function ofUnknownSync(f:Item->Handled):Handler - return new Handler(function (i) return Future.sync(f(i))); - - @:from static function ofSafe(f:Item->Future>):Handler - return new Handler(f); - - @:from static function ofUnknown(f:Item->Future>):Handler - return new Handler(f); -} - -abstract Reducer(Result->Item->Future>) { - inline function new(f) - this = f; - - public inline function apply(res, item):Future> - return this(res, item); - - @:from static function ofSafeSync(f:Result->Item->ReductionStep):Reducer - return new Reducer(function (res, cur) return Future.sync(f(res, cur))); - - @:from static function ofUnknownSync(f:Result->Item->ReductionStep):Reducer - return new Reducer(function (res, cur) return Future.sync(f(res, cur))); - - @:from static function ofSafe(f:Result->Item->Future>):Reducer - return new Reducer(f); - - @:from static function ofPlainSync(f:Result->Item->Result):Reducer - return new Reducer(function (res, cur) return Future.sync(Progress(f(res, cur)))); - - @:from static function ofUnknown(f:Result->Item->Future>):Reducer - return new Reducer(f); - - @:from static function ofPromiseBased(f:Result->Item->Promise) - return new Reducer(function (res, cur) return f(res, cur).map(function (s) return switch s { - case Success(r): Progress(r); - case Failure(e): Crash(e); - })); - + source.forEach( + group -> switch group { + case []: None; + case [item]: + f(item).map(o -> switch o { + case Some(v): Some(new Pair(Stream.empty(), Success(v))); + case None: None; + }); + default: + SyncLinkStream.ofIterator(group.iterator()) + .forEach(f).map(res -> switch res { + case Done: None; + case Stopped(rest, result): Some(new Pair(rest, Success(result))); + case Failed(rest, e): Some(new Pair(rest, Failure(e))); + }); + } + ).map(function (o):IterationResult return switch o { + case Done: Done; + case Failed(rest, e): Failed(new Grouped(rest), e); + case Stopped(rest, { a: left, b: res }): + var rest = left.append(new Grouped(cast rest)); + switch res { + case Success(data): Stopped(cast rest, data); + case Failure(e): cast Failed(rest, e); + } + }); } -#if (java || cs) -private abstract Parts(Array) { - public var length(get, never):Int; - inline function get_length() return this.length; - - public function new(parts:Array>) - this = parts; - - @:arrayAccess function get(index:Int):Stream - return this[index]; - - @:arrayAccess function set(index:Int, value:Stream):Stream - return this[index] = value; - - public function copy():Parts - return new Parts(cast this.copy()); - - public function slice(start:Int, ?end:Int):Parts - return new Parts(cast this.slice(start, end)); - - @:from static function ofArray(a:Array>) - return new Parts(a); +class IterationResultTools { + static public function withStream(i:IterationResult, f:Stream->Stream):IterationResult + return switch i { + case Done: Done; + case Failed(rest, e): Failed(f(rest), e); + case Stopped(rest, result): Stopped(f(rest), result); + } } -#else -private typedef Parts = Array>; -#end - -private class CompoundStream extends StreamBase { - - var parts:Parts; - - function new(parts) - this.parts = parts; - override function get_depleted() - return switch parts.length { - case 0: true; - case 1: parts[0].depleted; - default: false; - } +private class Helper { - override function next():Future> { - return if(parts.length == 0) Future.sync(Step.End); - else parts[0].next().flatMap(function(v) return switch v { - case End if(parts.length > 1): parts[1].next(); - case Link(v, rest): - var copy = parts.copy(); - copy[0] = rest; - Future.sync(Link(v, new CompoundStream(copy))); - default: Future.sync(v); - }); + static public function noop(_:Dynamic) {} + static public inline function trySync(f:Future, cb:(val:X, sync:Bool)->Void) { + var tmp = f.handle(Helper.noop); + return + switch f.status { + case Ready(result): + cb(result.get(), true); + null; + default: + swapHandler(f, tmp, cb.bind(_, false)); + } } + static public function swapHandler(f:Future, prev:CallbackLink, cb) { + var ret = f.handle(cb); + prev.cancel(); + return ret; + } +} - override public function decompose(into:Array>):Void - for (p in parts) - p.decompose(into); - - override public function forEach(handler:Handler):Future> - return Future.async(consumeParts.bind(cast parts, handler, _)); - - static function consumeParts(parts:Parts, handler:Handler, cb:Conclusion->Void) - if (parts.length == 0) - cb(Depleted); - else - (parts[0]:Stream).forEach(handler).handle(function (o) switch o { - case Depleted: - - consumeParts(parts.slice(1), handler, cb); - - case Halted(rest): +private typedef AsyncLink = Future>; +private typedef AsyncLinkKind = LinkKind>; - parts = parts.copy(); - parts[0] = rest; - cb(Halted(new CompoundStream(parts))); +private class AsyncLinkStream implements StreamObject { + final link:AsyncLink; - case Clogged(e, at): + public function new(link) + this.link = link; - if (at.depleted) - parts = parts.slice(1); - else { - parts = parts.copy(); - parts[0] = at; + public function forEach(f:(item:Item)->Future>) { + var pos = link; + return new Future>(trigger -> { + final wait = new CallbackLinkRef(); + var streaming = true; + function yield(v) { + streaming = false; + trigger(v); + } + function loop() { + while (streaming) { + switch pos.status { + case Ready(_.get() => result): + switch result { + case Fin(v): + yield(switch v { + case null: Done; + case error: + cast Failed(Stream.empty(), cast error);// GADT bug + }); + case Cons(item, tail): + wait.link = Helper.trySync(f(item), (val, sync) -> switch val { + case Some(v): + yield(Stopped(new AsyncLinkStream(tail), v)); + case None: + pos = tail; + if (!sync) loop(); + }); + if (wait.link == null) continue; + } + default: + wait.link = pos.handle(Helper.noop); + if (pos.status.match(Ready(_))) + continue; + else + wait.link = Helper.swapHandler(pos, wait, loop);// this is very lazy } - - cb(Clogged(e, new CompoundStream(parts))); - - case Failed(e): - - cb(Failed(e)); - - }); - - static public function of(streams:Array>):Stream { - - var ret = []; - - for (s in streams) - s.decompose(ret); - - return - if (ret.length == 0) Empty.make(); - else new CompoundStream(ret); + break; + } + } + loop(); + return wait; + }); } - } -class FutureStream extends StreamBase { - var f:Future>; - public function new(f) - this.f = f; - - override function next():Future> - return f.flatMap(function(s) return s.next()); - - override public function forEach(handler:Handler) { - return Future.async(function (cb) { - f.handle(function (s) s.forEach(handler).handle(cb)); - }); +private typedef SyncLink = Lazy>>; + +class Generator extends AsyncLinkStream { + public function new(generate:()->Future>) { + function rec():AsyncLink + return Future.irreversible(yield -> + generate().handle(o -> yield(switch o { + case Data(data): Cons(data, rec()); + case Fail(e): Fin(cast e); + case End: Fin(null); + })) + ); + // TODO: in theory, this should work too + // ... but suspending the step makes StreamTest.laziness fail (stream becomes too lazy) + // ... which has something to do with suspension within compoundstream ... go figure + // { + // var step = generate(); + // new Future(yield -> + // step.handle(o -> yield(switch o { + // case Data(data): Cons(data, rec()); + // case Fail(e): Fin(cast e); + // case End: Fin(null); + // })); + // ) + // } + + super(rec()); } } class BlendStream extends Generator { public function new(a:Stream, b:Stream) { - var first = null; + var aDone = false, + bDone = false, + flipped = false; + + super( + () -> new Future>(yield -> { + var yielded = false; + var ret = []; + + function progress(s:Stream, onNext, onDone) + ret.push( + s.next().handle(v -> { + switch v { + case Link(value, next): + if (!yielded) { + yielded = true; + onNext(next); + // trace('yielding $value'); + yield(Data(value)); + } + case Fail(e): + yield(Fail(e)); + case End: + onDone(); + if (aDone && bDone) yield(End); + } + }) + ); + + if (!flipped && !aDone) progress(a, s -> a = s, () -> aDone = true); + + if (!bDone) progress(b, s -> b = s, () -> bDone = true); + + if (flipped && !aDone) progress(a, s -> a = s, () -> aDone = true); + + flipped = !flipped; + + ret; + }) + ); + } +} - function wait(s:Stream) { - return s.next().map(function(o) { - if(first == null) first = s; - return o; - }); - } +private class SyncLinkStream implements StreamObject { + final link:SyncLink; - var n1 = wait(a); - var n2 = wait(b); - - super(Future.async(function(cb) { - n1.first(n2).handle(function(o) switch o { - case Link(item, rest): - cb(Link(item, new BlendStream(rest, first == a ? b : a))); - case End: - (first == a ? n2 : n1).handle(cb); - case Fail(e): - cb(Fail(e)); - }); - })); + public function new(link) + this.link = link; - } -} + public function forEach(f:(item:Item)->Future>) { + var pos = link; + return new Future>(trigger -> { + final wait = new CallbackLinkRef(); + var streaming = true; + function yield(v) { + streaming = false; + trigger(v); + } -class Generator extends StreamBase { - var upcoming:Future>; - - function new(upcoming) - this.upcoming = upcoming; - - override function next():Future> - return upcoming; - - override public function forEach(handler:Handler) - return Future.async(function (cb:Conclusion->Void) - upcoming.handle(function (e) switch e { - case Link(v, then): - handler.apply(v).handle(function (s) switch s { - case BackOff: - cb(Halted(this)); - case Finish: - cb(Halted(then)); - case Resume: - then.forEach(handler).handle(cb); - case Clog(e): - cb(Clogged(e, this)); - }); - case Fail(e): - cb(Failed(e)); - case End: - cb(Depleted); - }) - ); + function loop() + while (streaming) + switch pos.get() { + case Fin(error): + yield(switch error { + case null: Done; + case e: cast Failed(Stream.empty(), cast e); + }); + case Cons(item, tail): + wait.link = Helper.trySync(f(item), (val, sync) -> switch val { + case Some(v): + yield(Stopped(new SyncLinkStream(tail), v)); + case None: + pos = tail; + if (!sync) loop(); + }); + } - static public function stream(step:(Step->Void)->Void) { - return new Generator(Future.async(step)); + loop(); + + return wait; + }); } -} + static function iteratorLink(i:Iterator):SyncLink + return () -> if (i.hasNext()) Cons(i.next(), iteratorLink(i)) else Fin(null); -enum Step { - Link(value:Item, next:Stream):Step; - Fail(e:Error):Step; - End:Step; + static public function ofIterator(i:Iterator):Stream + return new SyncLinkStream(iteratorLink(i)); } -class SignalStream extends Generator { - public function new(signal:Signal>) - super( - signal.nextTime().map(function(o):Step return switch o { - case Data(data): Link(data, new SignalStream(signal)); - case Fail(e): Fail(e); - case End: End; - }).eager() // this must be eager, otherwise the signal will "run away" if there's no consumer for this stream - ); +class SignalStream extends AsyncLinkStream { + public function new(signal:Signal>) + super(makeLink(signal)); + + static function makeLink(signal:Signal>):AsyncLink + return + signal.nextTime().map(function(o):AsyncLinkKind return switch o { + case Data(data): Cons(data, makeLink(signal)); + case Fail(e): Fin(e); + case End: Fin(null); + }).eager(); // this must be eager, otherwise the signal will "run away" if there's no consumer for this stream } enum Yield { - Data(data:Item):Yield; - Fail(e:Error):Yield; - End:Yield; -} + Data(data:Item):Yield; + Fail(e:Error):Yield; + End:Yield; +} \ No newline at end of file diff --git a/src/tink/streams/nodejs/NodejsStream.hx b/src/tink/streams/nodejs/NodejsStream.hx index 70beccb..0c17b2d 100644 --- a/src/tink/streams/nodejs/NodejsStream.hx +++ b/src/tink/streams/nodejs/NodejsStream.hx @@ -1,21 +1,79 @@ package tink.streams.nodejs; +import js.node.stream.Writable; +import js.node.stream.Readable; +import tink.core.Callback; import tink.streams.Stream; using tink.CoreApi; -class NodejsStream extends Generator { - - function new(target:WrappedReadable) { - super(Future.async(function (cb) { - target.read().handle(function (o) cb(switch o { - case Success(null): End; - case Success(data): Link(data, new NodejsStream(target)); - case Failure(e): Fail(e); - })); - })); +class NodejsStream { + + static public function pipe(s:Stream, name:String, dest:IWritable):Future> { + return s.forEach(item -> Future.irreversible( + yield -> + dest.write(item, (?e:js.lib.Error) -> yield(switch e { + case null: None; + case e: Some(e); + })) + )).map(o -> switch o { + case Done: Done; + case Stopped(rest, e): Failed(cast rest, tink.core.Error.withData('Failed to write to $name', e)); + }); + } + + static public function wrap(name:String, native:IReadable) { + + function failure(e:Dynamic) + return Yield.Fail(tink.core.Error.withData('failed to read from $name', e)); + + final ended = new Future(yield -> { + function end(_) + yield(Yield.End); + + function fail(e:Dynamic) + yield(failure(e)); + + native.on('end', end); + native.on('close', end); + native.on('error', fail); + + () -> { + native.off('end', end); + native.off('close', end); + native.off('error', fail); + } + }); + + final becameReadable = new Signal(fire -> { + native.on('readable', fire); + () -> native.off('readable', fire); + }); + + return Stream.generate(() -> ended || new Future>( + yield -> { + if (native.readableEnded) { + yield(End); + return null; + } + + final ret = new CallbackLinkRef(); + + function attempt() + try switch native.read() { + case null: + ret.link = becameReadable.nextTime().handle(attempt); + case v: + yield(Data(v)); + } + catch (e:Dynamic) yield(failure(e)); + + attempt(); + + return ret; + } + )); + } - static public function wrap(name, native, onEnd) - return new NodejsStream(new WrappedReadable(name, native, onEnd)); - + } \ No newline at end of file diff --git a/src/tink/streams/nodejs/WrappedReadable.hx b/src/tink/streams/nodejs/WrappedReadable.hx deleted file mode 100644 index 5a41004..0000000 --- a/src/tink/streams/nodejs/WrappedReadable.hx +++ /dev/null @@ -1,49 +0,0 @@ -package tink.streams.nodejs; - -import js.node.Buffer; -import js.node.stream.Readable.IReadable; - -using tink.CoreApi; - -class WrappedReadable { - - var native:IReadable; - var name:String; - var end:Surprise, Error>; - - public function new(name, native, onEnd) { - this.name = name; - this.native = native; - - end = Future.async(function (cb) { - native.once('end', function () cb(Success(null))); - native.once('close', function () cb(Success(null))); - native.once('error', function (e:{ code:String, message:String }) cb(Failure(new Error('${e.code} - Failed reading from $name because ${e.message}')))); - }) - .eager(); // async laziness fix for tink_core v2 - if (onEnd != null) - end.handle(function () - js.Node.process.nextTick(onEnd) - ); - } - - public function read():Promise>{ - return Future.async(function (cb) { - function attempt() { - try - switch native.read() { - case null: - native.once('readable', attempt); - case object: - cb(Success((cast object:T))); - } - catch (e:Dynamic) { - cb(Failure(Error.withData('Error while reading from $name', e))); - } - } - - attempt(); - //end.handle(cb); - }).first(end); - } -} \ No newline at end of file diff --git a/tests/Benchmark.hx b/tests/Benchmark.hx new file mode 100644 index 0000000..5cc7f91 --- /dev/null +++ b/tests/Benchmark.hx @@ -0,0 +1,208 @@ +// import js.node.stream.Readable; +import tink.streams.Stream.Yield; +import js.node.stream.Readable; +import js.node.Buffer; +import tink.streams.*; +import js.node.events.EventEmitter; +import js.node.Fs; + +using sys.io.File; +using sys.FileSystem; +using tink.CoreApi; + +class Iter extends Readable> { + final iterator:Iterator; + public function new(iterator) { + super({ objectMode: true }); + this.iterator = iterator; + } + override function _read(size:Int) { + for (i in 0...size) + if (iterator.hasNext()) { + if (!push(iterator.next())) break; + } + else { + push(null); + break; + } + } +} +class Benchmark { + static function measure(a:Array>>) + switch a { + case []: + case _[0] => task: + var start = haxe.Timer.stamp(); + task.value.handle(function (x) { + if (task.name != null) + trace('${task.name} took ${haxe.Timer.stamp() - start}s producing ${x}'); + measure(a.slice(1)); + }); + } + + static function main() { + numbers(); + files(); + } + + static function files() { + final dummy = 'bin/dummy.txt'; + if (!dummy.exists()) { + var s = 'a'; + for (i in 0...28) + s += s; + dummy.saveContent(s); + } + + final copy = 'bin/dummy_copy.txt'; + + function delete():Future + return Future.irreversible(yield -> { + if (copy.exists()) + copy.deleteFile(); + yield(null); + }); + + function tinkRead() + return new Named>('tink read', { + var len = 0; + readStream(dummy).forEach(item -> { + len += item.length; + None; + }).map(_ -> len); + }); + + function tinkCopy() + return new Named>('tink copy', { + writeStream(copy, cast readStream(dummy)).next(_ -> 42); + }); + + function nodeRead() + return new Named>('node read', Future.irreversible(yield -> { + var len = 0; + Fs.createReadStream(dummy) + .on('data', (b:Buffer) -> len += b.length) + .on('end', () -> yield(len)); + // .pipe(Fs.createWriteStream(copy), { end: true }).on('close', () -> yield(Noise)); + })); + + function nodeCopy() + return new Named>('node copy', Future.irreversible(yield -> { + Fs.createReadStream(dummy) + .pipe(Fs.createWriteStream(copy), { end: true }).on('close', () -> yield(Success(42))); + })); + measure([ + nodeRead(), + tinkRead(), + nodeRead(), + tinkRead(), + new Named(null, delete()), + nodeCopy(), + new Named(null, delete()), + tinkCopy(), + new Named(null, delete()), + nodeCopy(), + new Named(null, delete()), + tinkCopy(), + ]); + } + + static function writeStream(path:String, s:Stream) + return + open(path, WriteCreate).next( + fd -> s.forEach(buf -> { + Future.irreversible(trigger -> { + function flush(start:Int) + Fs.write(fd, buf, start, buf.length - start, (error, written, buf) -> { + if (error != null) trigger(Some(Failure(error))) + else if (written + start == buf.length) trigger(None); + else flush(start + written); + }); + flush(0); + }); + }).next(res -> switch res { + case Done: Noise; + case Stopped(rest, Failure(e)): + tink.core.Error.withData('failed writing to $path', e); + case Stopped(_): + throw 'unreachable'; + }).map(x -> { + Fs.closeSync(fd); + x; + }).eager() + ); + + static function open(path:String, flags) + return new Promise((resolve, reject) -> { + Fs.open(path, flags, null, (?error, fd) -> { + if (error == null) resolve(fd); + else reject(tink.core.Error.withData('failed to open $path', error)); + }); + return null; + }); + + static function readStream(path:String, chunSize:Int = 0x40000) { + var file = open(path, Read); + + function read(fd) + return Future.irreversible(trigger -> { + Fs.read(fd, Buffer.alloc(chunSize), 0, chunSize, null, (error, bytesRead, buffer) -> trigger( + if (error != null) Yield.Fail(null) + else if (bytesRead == 0) { + Fs.closeSync(fd); + Yield.End; + } + else Yield.Data(buffer.slice(0, bytesRead)) + )); + }); + + return Stream.promise(file.next(fd -> Stream.generate(() -> read(fd)))); + } + + static function numbers() { + final total = 100000; + var s = Stream.ofIterator(0...total); + + measure([ + new Named('tink', { + var x = 0; + s.forEach(_ -> { + x += 1; + None; + }).map(_ -> x); + }), + new Named('node', new Future(yield -> { + var i = new Iter(0...total); + var x = 0; + i.on('data', v -> { + x += 1; + }); + i.on('end', () -> yield(x)); + return null; + })), + new Named('tink', { + var s = tink.streams.Stream.ofIterator(0...total); + var x = 0; + s.forEach(_ -> { + x += 1; + None; + }).map(_ -> x); + }), + new Named('node', Future.irreversible(yield -> { + var i = new Iter(0...total); + var x = 0; + i.on('data', v -> { + x += 1; + }); + i.on('end', () -> yield(x)); + })), + new Named('tink repeat', { + var x = 0; + s.forEach(_ -> { + x += 1; + None; + }).map(_ -> x); + }), + ]); + } +} \ No newline at end of file diff --git a/tests/BlendTest.hx b/tests/BlendTest.hx index ed77113..2d4c9dd 100644 --- a/tests/BlendTest.hx +++ b/tests/BlendTest.hx @@ -8,116 +8,123 @@ using tink.CoreApi; @:asserts class BlendTest { public function new() {} - + public function testBlend() { var done = false; var a = Signal.trigger(); var b = Signal.trigger(); - var blended = new SignalStream(a.asSignal()).blend(new SignalStream(b.asSignal())); + var blended = Stream.ofSignal(a.asSignal()).blend(new SignalStream(b.asSignal())); + a.trigger(Data(1)); b.trigger(Data(2)); a.trigger(Data(3)); - + var i = 0; var sum = 0; + var result = blended.forEach(function (v) { asserts.assert(++i == v); sum += v; - return Resume; + return None; + }); + + result.handle(function (x) { + asserts.assert(Done == x); + asserts.assert(15 == sum); + done = true; }); - + a.trigger(Data(4)); a.trigger(End); b.trigger(Data(5)); b.trigger(End); b.trigger(Data(6)); a.trigger(Data(7)); - - result.handle(function (x) { - asserts.assert(Depleted == x); - asserts.assert(15 == sum); - done = true; - }); + asserts.assert(done); return asserts.done(); } - + public function testCompound() { var done = false; var a = Signal.trigger(); var b = Signal.trigger(); var c = Signal.trigger(); - var blended = new SignalStream(a).append(new SignalStream(c)).blend(new SignalStream(b)); + var blended = Stream.ofSignal(a).append(Stream.ofSignal(c)).blend(Stream.ofSignal(b)); a.trigger(Data(1)); b.trigger(Data(2)); a.trigger(End); c.trigger(Data(3)); - + var i = 0; var sum = 0; var result = blended.forEach(function (v) { asserts.assert(++i == v); sum += v; - return Resume; + return None; + }); + + result.handle(function (x) { + asserts.assert(Done == x); + asserts.assert(15 == sum); + done = true; }); - + c.trigger(Data(4)); c.trigger(End); b.trigger(Data(5)); b.trigger(End); b.trigger(Data(6)); a.trigger(Data(7)); - - result.handle(function (x) { - asserts.assert(Depleted == x); - asserts.assert(15 == sum); - done = true; - }); + asserts.assert(done); return asserts.done(); } - + public function testError() { var done = false; var a = Signal.trigger(); var b = Signal.trigger(); - var blended = new SignalStream(a.asSignal()).blend(new SignalStream(b.asSignal())); + var blended = Stream.ofSignal(a).blend(Stream.ofSignal(b)); a.trigger(Data(1)); b.trigger(Data(2)); a.trigger(Data(3)); - + var i = 0; var sum = 0; var result = blended.forEach(function (v) { asserts.assert(++i == v); sum += v; - return Resume; + return None; }); - - a.trigger(Data(4)); - a.trigger(Data(5)); - b.trigger(Fail(new Error('Failed'))); - a.trigger(End); - + result.handle(function (x) { asserts.assert(x.match(Failed(_))); asserts.assert(15 == sum); - done = true; + done = true; }); + + a.trigger(Data(4)); + a.trigger(Data(5)); + b.trigger(Fail(new Error('Failed'))); + a.trigger(End); + asserts.assert(done); return asserts.done(); } - + public function testReuse() { var a = Signal.trigger(); var b = Signal.trigger(); - var blended = new SignalStream(a.asSignal()).blend(new SignalStream(b.asSignal())); + + var blended = Stream.ofSignal(a).blend(Stream.ofSignal(b)); + a.trigger(Data(1)); b.trigger(Data(2)); b.trigger(End); a.trigger(Data(3)); a.trigger(End); - + var count = 0; function iterate() { var i = 0; @@ -125,14 +132,14 @@ class BlendTest { blended.forEach(function (v) { asserts.assert(++i == v); sum += v; - return Resume; + return None; }).handle(function (x) { - asserts.assert(Depleted == x); + asserts.assert(Done == x); asserts.assert(6 == sum); count++; }); } - + iterate(); iterate(); iterate(); diff --git a/tests/NextTest.hx b/tests/NextTest.hx index 320d04d..b1f7d8d 100644 --- a/tests/NextTest.hx +++ b/tests/NextTest.hx @@ -15,35 +15,35 @@ class NextTest { check(asserts, b, [1,2,3]); return asserts.done(); } - + public function testFilter() { var a = Stream.ofIterator(0...6); var b = a.filter(function(v) return v % 2 == 1); check(asserts, b, [1,3,5]); return asserts.done(); } - + public function testCompound() { var a = Signal.trigger(); var b = Signal.trigger(); - var compound = new SignalStream(a).append(new SignalStream(b)); + var compound = Stream.ofSignal(a).append(new SignalStream(b)); a.trigger(Data(1)); b.trigger(Data(3)); a.trigger(Data(2)); a.trigger(End); check(asserts, compound, [1,2,3]); - + var a = Stream.ofIterator(0...3); var b = Stream.ofIterator(0...3); var compound = a.append(b); check(asserts, compound, [0,1,2,0,1,2]); return asserts.done(); } - + public function testBlend() { var a = Signal.trigger(); var b = Signal.trigger(); - var compound = new SignalStream(a).blend(new SignalStream(b)); + var compound = Stream.ofSignal(a).blend(Stream.ofSignal(b)); a.trigger(Data(1)); b.trigger(Data(2)); a.trigger(Data(3)); @@ -52,13 +52,13 @@ class NextTest { check(asserts, compound, [1,2,3]); return asserts.done(); } - + function check(asserts:AssertionBuffer, stream:Stream, values:Array, ?pos:haxe.PosInfos) { var current = stream; for(i in 0...values.length) { current.next().handle(function(v) switch v { case Link(v, rest): asserts.assert(values[i] == v, pos); current = rest; - default: asserts.fail('Expected Link(_)', pos); + default: asserts.fail('Expected Link(_), got ${v.getName()} @ $i', pos); }); } current.next().handle(function(v) asserts.assert(v.match(End), pos)); diff --git a/tests/RunTests.hx b/tests/RunTests.hx index 34cb93f..c6b5b0f 100644 --- a/tests/RunTests.hx +++ b/tests/RunTests.hx @@ -8,11 +8,6 @@ using tink.CoreApi; class RunTests { static function main() { - - #if python - (cast python.lib.Sys).setrecursionlimit(9999); - #end - Runner.run(TestBatch.make([ new StreamTest(), new BlendTest(), @@ -20,5 +15,5 @@ class RunTests { new SignalStreamTest(), ])).handle(Runner.exit); } - + } diff --git a/tests/SignalStreamTest.hx b/tests/SignalStreamTest.hx index a572a0b..f5ecf42 100644 --- a/tests/SignalStreamTest.hx +++ b/tests/SignalStreamTest.hx @@ -15,30 +15,30 @@ class SignalStreamTest { a.trigger(Data(1)); a.trigger(Data(2)); a.trigger(Data(3)); - + var i = 0; var sum = 0; - var result = stream.forEach(function (v) { + var result = stream.forEach(v -> { asserts.assert(++i == v); sum += v; - return Resume; + None; }); - + a.trigger(Data(4)); a.trigger(Data(5)); a.trigger(End); a.trigger(Data(6)); a.trigger(Data(7)); - + result.handle(function (x) { - asserts.assert(Depleted == x); - asserts.assert(15 == sum); + asserts.assert(x == Done); + asserts.assert(sum == 15); done = true; }); asserts.assert(done); return asserts.done(); } - + public function testError() { var done = false; var a = Signal.trigger(); @@ -46,28 +46,28 @@ class SignalStreamTest { a.trigger(Data(1)); a.trigger(Data(2)); a.trigger(Data(3)); - + var i = 0; var sum = 0; - var result = stream.forEach(function (v) { + var result = stream.forEach(v -> { asserts.assert(++i == v); sum += v; - return Resume; + None; }); - + a.trigger(Data(4)); a.trigger(Data(5)); a.trigger(Fail(new Error('Failed'))); - + result.handle(function (x) { asserts.assert(x.match(Failed(_))); asserts.assert(15 == sum); - done = true; + done = true; }); asserts.assert(done); return asserts.done(); } - + public function testReuse() { var a = Signal.trigger(); var stream = new SignalStream(a.asSignal()); @@ -75,22 +75,22 @@ class SignalStreamTest { a.trigger(Data(2)); a.trigger(Data(3)); a.trigger(End); - + var count = 0; function iterate() { var i = 0; var sum = 0; - stream.forEach(function (v) { + stream.forEach(v -> { asserts.assert(++i == v); sum += v; - return Resume; + None; }).handle(function (x) { - asserts.assert(Depleted == x); - asserts.assert(6 == sum); + asserts.assert(x == Done); + asserts.assert(sum == 6); count++; }); } - + iterate(); iterate(); iterate(); diff --git a/tests/StreamTest.hx b/tests/StreamTest.hx index aa25bad..6232e1b 100644 --- a/tests/StreamTest.hx +++ b/tests/StreamTest.hx @@ -13,166 +13,164 @@ class StreamTest { public function testIterator() { var s = Stream.ofIterator(0...100); var sum = 0; - s.forEach(function (v) { + s.forEach(v -> { sum += v; - return Resume; + None; }).handle(function (x) { - asserts.assert(Depleted == x); + asserts.assert(Done == x); asserts.assert(4950 == sum); asserts.done(); }); return asserts; } - + public function testMapFilter() { - + var s = Stream.ofIterator(0...100); - - s = s.filter(function (i) return i % 2 == 0); - s = s.filter(function (i) return Success(i % 3 == 0)); - s = s.map(function (i) return i * 3); - s = s.filter(function (i) return Future.sync(i % 5 == 0)); - s = s.filter(function (i) return Promise.lift(i > 100)); - s = s.map(function (i) return Success(i << 1)); - s = s.map(function (i) return Promise.lift(i + 13)); - s = s.map(function (i) return Future.sync(i - 3)); - s = s.map(function (i) return i * 2); - + + s = s.filter(i -> i % 2 == 0); + s = s.filter(i -> Success(i % 3 == 0)); + s = s.map(i -> i * 3); + s = s.filter(i -> Future.sync(i % 5 == 0)); + s = s.filter(i -> Promise.lift(i > 100)); + s = s.map(i -> Success(i << 1)); + s = s.map(i -> Promise.lift(i + 13)); + s = s.map(i -> Future.sync(i - 3)); + s = s.map(i -> i * 2); + var sum = 0; - - s.forEach(function (v) return Future.sync(Resume)).handle(function (c) switch c { + + s.forEach(v -> None).handle(function (c) switch c { case Failed(_): default: }); - - s.idealize(null).forEach(function (v) { + + s.idealize(null).forEach(v -> { sum += v; - return Future.sync(Resume); - }).handle(function (x) switch x { - case Depleted: + return None; + }).handle(x -> switch x { + case Done: asserts.assert(1840 == sum); asserts.done(); - case Halted(_): + case Stopped(_): asserts.fail('Expected "Depleted'); }); return asserts; } - + public function testMapError() { var s = Stream.ofIterator(0...100); var mapped = s.map(function(v) return v % 5 == 4 ? Failure(new Error('Fail $v')) : Success(v)); var sum = 0; - - mapped.forEach(function(v) { + + mapped.forEach(v -> { sum += v; - return Resume; + return None; }).handle(function(o) switch o { - case Depleted: - asserts.fail('Expected "Failed'); - case Failed(e): + case Failed(_, e): asserts.assert(e.message == 'Fail 4'); asserts.assert(sum == 6); asserts.done(); - case Halted(_): + default: asserts.fail('Expected "Failed'); }); - + return asserts; } - - public function testRegroup() { - - var s = Stream.ofIterator(0...100); - - var sum = 0; - s.regroup(function (i:Array) return i.length == 5 ? Converted(Stream.single(i[0] + i[4])) : Untouched) - .idealize(null).forEach(function (v) { - sum += v; - return Resume; - }) - .handle(function (x) switch x { - case Depleted: - asserts.assert(1980 == sum); - case Halted(_): - asserts.fail('Expected "Depleted"'); - }); - - var sum = 0; - s.regroup(function (i:Array, s) { - return if(s == Flowing) - i.length == 3 ? Converted(Stream.single(i[0] + i[2])) : Untouched - else - Converted(Stream.single(i[0])); // TODO: test backoff / clog at last step - }) - .idealize(null).forEach(function (v) { - sum += v; - return Resume; - }) - .handle(function (x) switch x { - case Depleted: - asserts.assert(3333 == sum); - case Halted(_): - asserts.fail('Expected "Depleted"'); - }); - - var sum = 0; - s.regroup(function (i:Array) return Converted([i[0], i[0]].iterator())) - .idealize(null).forEach(function (v) { - sum += v; - return Resume; - }) - .handle(function (x) switch x { - case Depleted: - asserts.assert(9900 == sum); - case Halted(_): - asserts.fail('Expected "Depleted"'); - }); - - var sum = 0; - s.regroup(function (i:Array, status:RegroupStatus) { - var batch = null; - - if(status == Ended) - batch = i; - else if(i.length > 3) - batch = i.splice(0, 3); // leave one item in the buf - - return if(batch != null) - Converted(batch.iterator(), i) - else - Untouched; - }) - .idealize(null).forEach(function (v) { - sum += v; - return Resume; - }) - .handle(function (x) switch x { - case Depleted: - asserts.assert(4950 == sum); - case Halted(_): - asserts.fail('Expected "Depleted"'); - }); - - return asserts.done(); - } - + + // public function testRegroup() { + + // var s = Stream.ofIterator(0...100); + + // var sum = 0; + // s.regroup(function (i:Array) return i.length == 5 ? Converted(Stream.single(i[0] + i[4])) : Untouched) + // .idealize(null).forEach(function (v) { + // sum += v; + // return Resume; + // }) + // .handle(function (x) switch x { + // case Depleted: + // asserts.assert(1980 == sum); + // case Halted(_): + // asserts.fail('Expected "Depleted"'); + // }); + + // var sum = 0; + // s.regroup(function (i:Array, s) { + // return if(s == Flowing) + // i.length == 3 ? Converted(Stream.single(i[0] + i[2])) : Untouched + // else + // Converted(Stream.single(i[0])); // TODO: test backoff / clog at last step + // }) + // .idealize(null).forEach(function (v) { + // sum += v; + // return Resume; + // }) + // .handle(function (x) switch x { + // case Depleted: + // asserts.assert(3333 == sum); + // case Halted(_): + // asserts.fail('Expected "Depleted"'); + // }); + + // var sum = 0; + // s.regroup(function (i:Array) return Converted([i[0], i[0]].iterator())) + // .idealize(null).forEach(function (v) { + // sum += v; + // return Resume; + // }) + // .handle(function (x) switch x { + // case Depleted: + // asserts.assert(9900 == sum); + // case Halted(_): + // asserts.fail('Expected "Depleted"'); + // }); + + // var sum = 0; + // s.regroup(function (i:Array, status:RegroupStatus) { + // var batch = null; + + // if(status == Ended) + // batch = i; + // else if(i.length > 3) + // batch = i.splice(0, 3); // leave one item in the buf + + // return if(batch != null) + // Converted(batch.iterator(), i) + // else + // Untouched; + // }) + // .idealize(null).forEach(function (v) { + // sum += v; + // return Resume; + // }) + // .handle(function (x) switch x { + // case Depleted: + // asserts.assert(4950 == sum); + // case Halted(_): + // asserts.fail('Expected "Depleted"'); + // }); + + // return asserts.done(); + // } + public function testNested() { var n = Stream.ofIterator([Stream.ofIterator(0...3), Stream.ofIterator(3...6)].iterator()); var s = Stream.flatten(n); var sum = 0; - + s.forEach(function (v) { sum += v; - return Resume; + return None; }).handle(function (x) { - asserts.assert(Depleted == x); + asserts.assert(Done == x); asserts.assert(15 == sum); asserts.done(); }); - + return asserts; } - + public function testNestedWithInnerError() { var n = Stream.ofIterator([ Stream.ofIterator(0...3), @@ -181,65 +179,179 @@ class StreamTest { ].iterator()); var s = Stream.flatten(n); var sum = 0; - + s.forEach(function (v) { sum += v; - return Resume; + return None; }).handle(function (x) { asserts.assert(x.match(Failed(_))); asserts.assert(6 == sum); asserts.done(); }); - + + return asserts; + } + + public function depthTest() { + var s = Stream.single(1); + for (i in 0...10000) + s = s.map(f -> f + 1); + s.forEach(v -> Some(v)).eager().handle(res -> switch res { + case Stopped(rest, result): + asserts.assert(result == 10001); + asserts.done(); + default: + asserts.fail('Expected `Stopped`'); + }); return asserts; } - + public function testNestedWithOuterError() { var n = ofOutcomes([ Success(Stream.ofIterator(0...3)), Failure(new Error('dummy')), Success(Stream.ofIterator(6...9)), ].iterator()); - + var s = Stream.flatten(n); var sum = 0; - + s.forEach(function (v) { sum += v; - return Resume; + return None; }).handle(function (x) { asserts.assert(x.match(Failed(_))); asserts.assert(3 == sum); asserts.done(); }); - + return asserts; } - - #if !java - public function casts() { - var pi1:Promise> = Promise.NEVER; - var pi2:Promise> = Promise.NEVER; - var pr1:Promise> = Promise.NEVER; - var pr2:Promise> = Promise.NEVER; - var r1:RealStream; - var r2:Stream; - - r1 = pi1; - r2 = pi1; - r1 = pi2; - r2 = pi2; - - r1 = pr1; - r2 = pr1; - r1 = pr2; - r2 = pr2; - + + // #if !java + // public function casts() { + // var pi1:Promise> = Promise.NEVER; + // var pi2:Promise> = Promise.NEVER; + // var pr1:Promise> = Promise.NEVER; + // var pr2:Promise> = Promise.NEVER; + // var r1:RealStream; + // var r2:Stream; + + // r1 = pi1; + // r2 = pi1; + // r1 = pi2; + // r2 = pi2; + + // r1 = pr1; + // r2 = pr1; + // r1 = pr2; + // r2 = pr2; + + // return asserts.done(); + // } + // #end + + public function suspend() { + + var triggers = []; + var s = Stream.generate(() -> { + + var t = new FutureTrigger>(); + triggers.push(t); + t.asFuture().map(v -> if (v == null) Yield.End else Data(v)); + }); + + var log = []; + var res = s.forEach(v -> { log.push(v); None; }); + var active = res.handle(function () {}); + + function progress() + triggers[triggers.length - 1].trigger(triggers.length - 1); + + for (i in 0...5) + progress(); + + active.cancel(); + + progress(); + + res.eager(); + triggers[triggers.length - 1].trigger(null); + asserts.assert(res.status.match(Ready(_.get() => Done))); + + asserts.assert(log.join(',') == [for (i in 0...triggers.length - 1) i].join(',')); + return asserts.done(); } - #end - - + public function laziness() { + + var triggers = [], + counter = 0; + var s = Stream.generate(() -> { + + var t = switch triggers[counter] { + case null: triggers[counter] = new FutureTrigger>(); + case v: v; + } + + counter++; + + t.asFuture().map(v -> if (v == null) Yield.End else Data(v)); + }); + + var res = s.forEach(t -> if (t < 20) None else Some(t)).eager(); + + for (i in 0...21) { + asserts.assert(triggers.length == i + 1); + asserts.assert(res.status.match(EagerlyAwaited)); + triggers[i].trigger(i); + } + + asserts.assert(res.status.match(Ready(_.get() => Stopped(_, 20)))); + + var res = s.forEach(t -> if (t < 40) None else Some(t)).eager(); + + for (i in 21...41) { + asserts.assert(triggers.length == i + 1); + asserts.assert(res.status.match(EagerlyAwaited)); + triggers[i].trigger(i); + } + + asserts.assert(res.status.match(Ready(_.get() => Stopped(_, 40)))); + + var log = []; + var res = (s...s).forEach(t -> { log.push(t); None; }); + var active:CallbackLink = null; + + active = res.handle(function () {}); + + for (i in 0...5) { + asserts.assert(triggers.length == 42 + i); + triggers[triggers.length - 1].trigger(triggers.length - 1); + } + + active.cancel(); + + for (i in 0...5) { + var t = new FutureTrigger(); + t.trigger(triggers.length - 1); + triggers.push(t); + } + + var t = new FutureTrigger(); + t.trigger(null); + triggers.push(t); + + res.eager(); + + asserts.assert(res.status.match(Ready(_.get() => Done))); + var expected = [for (i in 0...triggers.length - 2) i].join(','); + expected = '$expected,$expected'; + asserts.assert(log.join(',') == expected); + + return asserts.done(); + } + // maybe useful to be moved to Stream itself inline function ofOutcomes(i:Iterator>) { return Stream.ofIterator(i).map(function(v:Outcome) return v);