From 36856c9fe8172b85c9ffa135828640544016b492 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Sun, 18 Apr 2021 19:35:46 +0200 Subject: [PATCH 01/22] async/await prototype Signed-off-by: David Nadoba --- Package.resolved | 4 +- Package.swift | 28 ++- .../Examples/AsyncTwitterClient/main.swift | 79 ++++++++ Sources/RSocketAsync/AsyncAwait.swift | 177 ++++++++++++++++++ 4 files changed, 285 insertions(+), 3 deletions(-) create mode 100644 Sources/Examples/AsyncTwitterClient/main.swift create mode 100644 Sources/RSocketAsync/AsyncAwait.swift diff --git a/Package.resolved b/Package.resolved index f9ca2ad3..8fbc2800 100644 --- a/Package.resolved +++ b/Package.resolved @@ -24,8 +24,8 @@ "repositoryURL": "https://github.com/apple/swift-nio", "state": { "branch": null, - "revision": "3be4e0980075de10a4bc8dee07491d49175cfd7a", - "version": "2.27.0" + "revision": "4220c7a16a5ee0abb7da150bd3d4444940a20cc2", + "version": null } }, { diff --git a/Package.swift b/Package.swift index ec3d9342..5c31fe8e 100644 --- a/Package.swift +++ b/Package.swift @@ -19,15 +19,17 @@ let package = Package( // Transport protocol .library(name: "RSocketWSTransport", targets: ["RSocketWSTransport"]), .library(name: "RSocketTCPTransport", targets: ["RSocketTCPTransport"]), + .library(name: "RSocketAsync", targets: ["RSocketAsync"]), // Examples .executable(name: "timer-client-example", targets: ["TimerClientExample"]), .executable(name: "twitter-client-example", targets: ["TwitterClientExample"]), .executable(name: "vanilla-client-example", targets: ["VanillaClientExample"]), + .executable(name: "async-twitter-client-example", targets: ["AsyncTwitterClientExample"]), ], dependencies: [ .package(url: "https://github.com/ReactiveCocoa/ReactiveSwift.git", from: "6.6.0"), - .package(url: "https://github.com/apple/swift-nio", from: "2.26.0"), + .package(url: "https://github.com/apple/swift-nio", .revision("4220c7a16a5ee0abb7da150bd3d4444940a20cc2")), .package(url: "https://github.com/apple/swift-nio-extras", from: "1.8.0"), .package(url: "https://github.com/apple/swift-nio-transport-services", from: "1.9.2"), .package(url: "https://github.com/apple/swift-nio-ssl", from: "2.10.4"), @@ -46,6 +48,11 @@ let package = Package( "RSocketCore", .product(name: "ReactiveSwift", package: "ReactiveSwift") ]), + .target(name: "RSocketAsync", dependencies: [ + "RSocketCore", + .product(name: "NIO", package: "swift-nio"), + .product(name: "_NIOConcurrency", package: "swift-nio"), + ]), // Channel .target(name: "RSocketTSChannel", dependencies: [ @@ -135,6 +142,25 @@ let package = Package( ], path: "Sources/Examples/VanillaClient" ), + .target( + name: "AsyncTwitterClientExample", + dependencies: [ + "RSocketCore", + "RSocketNIOChannel", + "RSocketWSTransport", + "RSocketAsync", + .product(name: "ArgumentParser", package: "swift-argument-parser"), + .product(name: "NIO", package: "swift-nio"), + .product(name: "_NIOConcurrency", package: "swift-nio"), + ], + path: "Sources/Examples/AsyncTwitterClient", + swiftSettings: [ + .unsafeFlags([ + "-Xfrontend", + "-enable-experimental-concurrency" + ]) + ] + ), ], swiftLanguageVersions: [.v5] ) diff --git a/Sources/Examples/AsyncTwitterClient/main.swift b/Sources/Examples/AsyncTwitterClient/main.swift new file mode 100644 index 00000000..85fa06f3 --- /dev/null +++ b/Sources/Examples/AsyncTwitterClient/main.swift @@ -0,0 +1,79 @@ +#if compiler(>=5.4) && $AsyncAwait +import ArgumentParser +import Foundation +import NIO +import RSocketAsync +import RSocketCore +import RSocketNIOChannel +import RSocketReactiveSwift +import RSocketWSTransport + +func route(_ route: String) -> Data { + let encodedRoute = Data(route.utf8) + precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded") + let encodedRouteLength = Data([UInt8(encodedRoute.count)]) + + return encodedRouteLength + encodedRoute +} + +extension URL: ExpressibleByArgument { + public init?(argument: String) { + guard let url = URL(string: argument) else { return nil } + self = url + } + public var defaultValueDescription: String { description } +} + +/// the server-side code can be found here -> https://github.com/rsocket/rsocket-demo/tree/master/src/main/kotlin/io/rsocket/demo/twitter +struct TwitterClientExample: ParsableCommand { + static var configuration = CommandConfiguration( + abstract: "connects to an RSocket endpoint using WebSocket transport, requests a stream at the route `searchTweets` to search for tweets that match the `searchString` and logs all events." + ) + + @Argument(help: "used to find tweets that match the given string") + var searchString = "spring" + + @Option + var url = URL(string: "wss://demo.rsocket.io/rsocket")! + + @Option(help: "maximum number of tweets that are taken before it cancels the stream") + var limit = 1000 + + func run() throws { + let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { try! eventLoop.syncShutdownGracefully() } + let promise = eventLoop.next().makePromise(of: Void.self) + promise.completeWithAsync { + try await self.runAsync() + } + try promise.futureResult.wait() + } + func runAsync() async throws { + let bootstrap = ClientBootstrap( + config: ClientSetupConfig( + timeBetweenKeepaliveFrames: 0, + maxLifetime: 30_000, + metadataEncodingMimeType: "message/x.rsocket.routing.v0", + dataEncodingMimeType: "application/json" + ), + transport: WSTransport(), + timeout: .seconds(30) + ) + let client = try await bootstrap.connect(to: .init(url: url)) + + let stream = client.requester.requestStream(payload: Payload( + metadata: route("searchTweets"), + data: Data(searchString.utf8) + )) + + for try await payload in stream.prefix(limit) { + let json = try JSONSerialization.jsonObject(with: payload.data, options: []) + let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted]) + let string = String(decoding: data, as: UTF8.self) + print(string) + } + } +} + +TwitterClientExample.main() +#endif diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift new file mode 100644 index 00000000..40b5e92c --- /dev/null +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -0,0 +1,177 @@ +#if compiler(>=5.4) && $AsyncAwait +import RSocketCore +import _Concurrency +import NIO +import _NIOConcurrency + +public protocol RSocket { + func requestResponse(payload: Payload) async throws -> Payload + func requestStream(payload: Payload) -> AsyncStreamSequence +} + +public struct RequesterAdapter: RSocket { + private let requester: RSocketCore.RSocket + private let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1) + public init(requester: RSocketCore.RSocket) { + self.requester = requester + } + public func requestResponse(payload: Payload) async throws -> Payload { + struct RequestResponseOperator: UnidirectionalStream { + var promise: EventLoopPromise + func onNext(_ payload: Payload, isCompletion: Bool) { + assert(isCompletion) + promise.succeed(payload) + } + + func onComplete() { + assertionFailure("request response does not support \(#function)") + } + + func onRequestN(_ requestN: Int32) { + assertionFailure("request response does not support \(#function)") + } + + func onCancel() { + promise.fail(Error.canceled(message: "onCancel")) + } + + func onError(_ error: Error) { + promise.fail(error) + } + + func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { + assertionFailure("request response does not support \(#function)") + } + } + let promise = eventLoop.next().makePromise(of: Payload.self) + let stream = RequestResponseOperator(promise: promise) + _ = requester.requestResponse(payload: payload, responderStream: stream) + return try await promise.futureResult.get() + } + + public func requestStream(payload: Payload) -> AsyncStreamSequence { + AsyncStreamSequence(payload: payload, requester: requester, eventLoop: eventLoop.next()) + } +} + +public struct AsyncStreamSequence: AsyncSequence { + public typealias AsyncIterator = AsyncStreamIterator + + public typealias Element = Payload + + fileprivate init(payload: Payload, requester: RSocketCore.RSocket, eventLoop: EventLoop) { + self.payload = payload + self.requester = requester + self.eventLoop = eventLoop + } + private var payload: Payload + private var requester: RSocketCore.RSocket + private var eventLoop: EventLoop + public func makeAsyncIterator() -> AsyncStreamIterator { + let stream = AsyncStreamIterator(eventLoop: eventLoop) + stream.subscription = requester.stream(payload: payload, initialRequestN: 0, responderStream: stream) + return stream + } +} + +public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStream { + fileprivate init( + eventLoop: EventLoop + ) { + self.eventLoop = eventLoop + } + + private enum Event { + case next(Payload, isCompletion: Bool) + case error(Error) + case complete + case cancel + } + private var eventLoop: EventLoop + private var event: EventLoopPromise? = nil + private var isCompleted: Bool = false + fileprivate var subscription: Subscription! = nil + public func onNext(_ payload: Payload, isCompletion: Bool) { + eventLoop.execute { [self] in + assert(event != nil) + event?.succeed(.next(payload, isCompletion: isCompletion)) + } + + } + + public func onComplete() { + eventLoop.execute { [self] in + assert(event != nil) + event?.succeed(.complete) + } + } + + public func onRequestN(_ requestN: Int32) { + assertionFailure("request response does not support \(#function)") + } + + public func onCancel() { + eventLoop.execute { [self] in + assert(event != nil) + event?.succeed(.cancel) + } + } + + public func onError(_ error: Error) { + eventLoop.execute { [self] in + assert(event != nil) + event?.succeed(.error(error)) + } + } + + public func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { + assertionFailure("request response does not support \(#function)") + } + public func next() async throws -> Payload? { + let p = eventLoop.makePromise(of: Optional.self) + p.completeWithAsync { [self] in + guard !isCompleted else { return nil } + assert(event == nil) + let promise = eventLoop.makePromise(of: Event.self) + event = promise + subscription.onRequestN(1) + let result = try await promise.futureResult.get() + event = nil + switch result { + case let .next(payload, isCompletion): + self.isCompleted = isCompletion + return payload + case .complete: + self.isCompleted = true + return nil + case .cancel: + self.isCompleted = true + return nil + case let .error(error): + self.isCompleted = true + throw error + } + } + return try await p.futureResult.get() + } + deinit { + subscription.onCancel() + } +} + +public struct AsyncClient { + private let coreClient: RSocketCore.CoreClient + + public var requester: RSocket { RequesterAdapter(requester: coreClient.requester) } + + public init(_ coreClient: RSocketCore.CoreClient) { + self.coreClient = coreClient + } +} + +extension RSocketCore.ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket { + public func connect(to endpoint: Transport.Endpoint) async throws -> AsyncClient { + AsyncClient(try await connect(to: endpoint, responder: nil).get()) + } +} +#endif From 61bbfc111af81e8473fc958c1c9fa29caafe3930 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Sun, 18 Apr 2021 21:36:06 +0200 Subject: [PATCH 02/22] use @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) Signed-off-by: David Nadoba --- Package.resolved | 6 +++--- Package.swift | 4 ++-- Sources/Examples/AsyncTwitterClient/main.swift | 6 ++++-- Sources/RSocketAsync/AsyncAwait.swift | 6 ++++++ 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/Package.resolved b/Package.resolved index 8fbc2800..c057a030 100644 --- a/Package.resolved +++ b/Package.resolved @@ -21,10 +21,10 @@ }, { "package": "swift-nio", - "repositoryURL": "https://github.com/apple/swift-nio", + "repositoryURL": "https://github.com/adam-fowler/swift-nio", "state": { - "branch": null, - "revision": "4220c7a16a5ee0abb7da150bd3d4444940a20cc2", + "branch": "async-available", + "revision": "b5d6a06204d87cfe24ed2f19e59936d5875d543b", "version": null } }, diff --git a/Package.swift b/Package.swift index 5c31fe8e..885a0b50 100644 --- a/Package.swift +++ b/Package.swift @@ -29,7 +29,7 @@ let package = Package( ], dependencies: [ .package(url: "https://github.com/ReactiveCocoa/ReactiveSwift.git", from: "6.6.0"), - .package(url: "https://github.com/apple/swift-nio", .revision("4220c7a16a5ee0abb7da150bd3d4444940a20cc2")), + .package(url: "https://github.com/adam-fowler/swift-nio", .branch("async-available")), .package(url: "https://github.com/apple/swift-nio-extras", from: "1.8.0"), .package(url: "https://github.com/apple/swift-nio-transport-services", from: "1.9.2"), .package(url: "https://github.com/apple/swift-nio-ssl", from: "2.10.4"), @@ -157,7 +157,7 @@ let package = Package( swiftSettings: [ .unsafeFlags([ "-Xfrontend", - "-enable-experimental-concurrency" + "-enable-experimental-concurrency", ]) ] ), diff --git a/Sources/Examples/AsyncTwitterClient/main.swift b/Sources/Examples/AsyncTwitterClient/main.swift index 85fa06f3..8e773dd6 100644 --- a/Sources/Examples/AsyncTwitterClient/main.swift +++ b/Sources/Examples/AsyncTwitterClient/main.swift @@ -25,6 +25,7 @@ extension URL: ExpressibleByArgument { } /// the server-side code can be found here -> https://github.com/rsocket/rsocket-demo/tree/master/src/main/kotlin/io/rsocket/demo/twitter +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) struct TwitterClientExample: ParsableCommand { static var configuration = CommandConfiguration( abstract: "connects to an RSocket endpoint using WebSocket transport, requests a stream at the route `searchTweets` to search for tweets that match the `searchString` and logs all events." @@ -74,6 +75,7 @@ struct TwitterClientExample: ParsableCommand { } } } - -TwitterClientExample.main() +if #available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) { + TwitterClientExample.main() +} #endif diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift index 40b5e92c..4049f7ad 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -4,11 +4,13 @@ import _Concurrency import NIO import _NIOConcurrency +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public protocol RSocket { func requestResponse(payload: Payload) async throws -> Payload func requestStream(payload: Payload) -> AsyncStreamSequence } +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public struct RequesterAdapter: RSocket { private let requester: RSocketCore.RSocket private let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1) @@ -54,6 +56,7 @@ public struct RequesterAdapter: RSocket { } } +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public struct AsyncStreamSequence: AsyncSequence { public typealias AsyncIterator = AsyncStreamIterator @@ -74,6 +77,7 @@ public struct AsyncStreamSequence: AsyncSequence { } } +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStream { fileprivate init( eventLoop: EventLoop @@ -159,6 +163,7 @@ public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStr } } +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public struct AsyncClient { private let coreClient: RSocketCore.CoreClient @@ -169,6 +174,7 @@ public struct AsyncClient { } } +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) extension RSocketCore.ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket { public func connect(to endpoint: Transport.Endpoint) async throws -> AsyncClient { AsyncClient(try await connect(to: endpoint, responder: nil).get()) From 8af659cf3ac6dd93c02476c549031c66570a3d71 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Sun, 18 Apr 2021 21:42:14 +0200 Subject: [PATCH 03/22] concurrency features are now enabled by default and do no longer need unsafe flag to compile on latest toolchain by removing this flag, this should compile on older compilers too Signed-off-by: David Nadoba --- Package.swift | 8 +------- Sources/Examples/AsyncTwitterClient/main.swift | 2 +- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/Package.swift b/Package.swift index 885a0b50..1217c525 100644 --- a/Package.swift +++ b/Package.swift @@ -153,13 +153,7 @@ let package = Package( .product(name: "NIO", package: "swift-nio"), .product(name: "_NIOConcurrency", package: "swift-nio"), ], - path: "Sources/Examples/AsyncTwitterClient", - swiftSettings: [ - .unsafeFlags([ - "-Xfrontend", - "-enable-experimental-concurrency", - ]) - ] + path: "Sources/Examples/AsyncTwitterClient" ), ], swiftLanguageVersions: [.v5] diff --git a/Sources/Examples/AsyncTwitterClient/main.swift b/Sources/Examples/AsyncTwitterClient/main.swift index 8e773dd6..87a2f98e 100644 --- a/Sources/Examples/AsyncTwitterClient/main.swift +++ b/Sources/Examples/AsyncTwitterClient/main.swift @@ -1,4 +1,4 @@ -#if compiler(>=5.4) && $AsyncAwait +#if compiler(>=5.5) && $AsyncAwait import ArgumentParser import Foundation import NIO From cc1ef4b324e5bbc52035c7419097dde63a228792 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Sun, 18 Apr 2021 22:20:05 +0200 Subject: [PATCH 04/22] use YieldingContinuation and UnsafeContinuation instead of promises Signed-off-by: David Nadoba --- Sources/RSocketAsync/AsyncAwait.swift | 97 +++++++-------------------- 1 file changed, 25 insertions(+), 72 deletions(-) diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift index 4049f7ad..d96bc6e2 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -13,16 +13,15 @@ public protocol RSocket { @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public struct RequesterAdapter: RSocket { private let requester: RSocketCore.RSocket - private let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1) public init(requester: RSocketCore.RSocket) { self.requester = requester } public func requestResponse(payload: Payload) async throws -> Payload { struct RequestResponseOperator: UnidirectionalStream { - var promise: EventLoopPromise + var continuation: UnsafeContinuation func onNext(_ payload: Payload, isCompletion: Bool) { assert(isCompletion) - promise.succeed(payload) + continuation.resume(returning: payload) } func onComplete() { @@ -34,25 +33,27 @@ public struct RequesterAdapter: RSocket { } func onCancel() { - promise.fail(Error.canceled(message: "onCancel")) + continuation.resume(throwing: Error.canceled(message: "onCancel")) } func onError(_ error: Error) { - promise.fail(error) + continuation.resume(throwing: error) } func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { assertionFailure("request response does not support \(#function)") } } - let promise = eventLoop.next().makePromise(of: Payload.self) - let stream = RequestResponseOperator(promise: promise) - _ = requester.requestResponse(payload: payload, responderStream: stream) - return try await promise.futureResult.get() + var cancelable: Cancellable? + defer { cancelable?.onCancel() } + return try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in + let stream = RequestResponseOperator(continuation: continuation) + cancelable = requester.requestResponse(payload: payload, responderStream: stream) + } } public func requestStream(payload: Payload) -> AsyncStreamSequence { - AsyncStreamSequence(payload: payload, requester: requester, eventLoop: eventLoop.next()) + AsyncStreamSequence(payload: payload, requester: requester) } } @@ -62,16 +63,14 @@ public struct AsyncStreamSequence: AsyncSequence { public typealias Element = Payload - fileprivate init(payload: Payload, requester: RSocketCore.RSocket, eventLoop: EventLoop) { + fileprivate init(payload: Payload, requester: RSocketCore.RSocket) { self.payload = payload self.requester = requester - self.eventLoop = eventLoop } private var payload: Payload private var requester: RSocketCore.RSocket - private var eventLoop: EventLoop public func makeAsyncIterator() -> AsyncStreamIterator { - let stream = AsyncStreamIterator(eventLoop: eventLoop) + let stream = AsyncStreamIterator() stream.subscription = requester.stream(payload: payload, initialRequestN: 0, responderStream: stream) return stream } @@ -79,84 +78,38 @@ public struct AsyncStreamSequence: AsyncSequence { @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStream { - fileprivate init( - eventLoop: EventLoop - ) { - self.eventLoop = eventLoop - } - - private enum Event { - case next(Payload, isCompletion: Bool) - case error(Error) - case complete - case cancel - } - private var eventLoop: EventLoop - private var event: EventLoopPromise? = nil - private var isCompleted: Bool = false fileprivate var subscription: Subscription! = nil + private var yieldingContinuation = YieldingContinuation() + public func onNext(_ payload: Payload, isCompletion: Bool) { - eventLoop.execute { [self] in - assert(event != nil) - event?.succeed(.next(payload, isCompletion: isCompletion)) + _ = yieldingContinuation.yield(payload) + if isCompletion { + _ = yieldingContinuation.yield(nil) } - } public func onComplete() { - eventLoop.execute { [self] in - assert(event != nil) - event?.succeed(.complete) - } + _ = yieldingContinuation.yield(nil) } public func onRequestN(_ requestN: Int32) { - assertionFailure("request response does not support \(#function)") + assertionFailure("request stream does not support \(#function)") } public func onCancel() { - eventLoop.execute { [self] in - assert(event != nil) - event?.succeed(.cancel) - } + _ = yieldingContinuation.yield(nil) } public func onError(_ error: Error) { - eventLoop.execute { [self] in - assert(event != nil) - event?.succeed(.error(error)) - } + _ = yieldingContinuation.yield(throwing: error) } public func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { - assertionFailure("request response does not support \(#function)") + assertionFailure("request stream does not support \(#function)") } public func next() async throws -> Payload? { - let p = eventLoop.makePromise(of: Optional.self) - p.completeWithAsync { [self] in - guard !isCompleted else { return nil } - assert(event == nil) - let promise = eventLoop.makePromise(of: Event.self) - event = promise - subscription.onRequestN(1) - let result = try await promise.futureResult.get() - event = nil - switch result { - case let .next(payload, isCompletion): - self.isCompleted = isCompletion - return payload - case .complete: - self.isCompleted = true - return nil - case .cancel: - self.isCompleted = true - return nil - case let .error(error): - self.isCompleted = true - throw error - } - } - return try await p.futureResult.get() + subscription.onRequestN(1) + return try await yieldingContinuation.next() } deinit { subscription.onCancel() From f8295e8a3a15df0f38896c2d45aac3c2be5a568d Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Sun, 18 Apr 2021 22:59:31 +0200 Subject: [PATCH 05/22] use buffered continuation Signed-off-by: David Nadoba --- Package.swift | 13 ++++- Sources/RSocketAsync/AsyncAwait.swift | 79 +++++++++++++++++++++++---- 2 files changed, 80 insertions(+), 12 deletions(-) diff --git a/Package.swift b/Package.swift index 1217c525..d09c95c9 100644 --- a/Package.swift +++ b/Package.swift @@ -52,6 +52,11 @@ let package = Package( "RSocketCore", .product(name: "NIO", package: "swift-nio"), .product(name: "_NIOConcurrency", package: "swift-nio"), + ], swiftSettings: [ + .unsafeFlags([ + "-Xfrontend", + "-enable-experimental-concurrency", + ]) ]), // Channel @@ -153,7 +158,13 @@ let package = Package( .product(name: "NIO", package: "swift-nio"), .product(name: "_NIOConcurrency", package: "swift-nio"), ], - path: "Sources/Examples/AsyncTwitterClient" + path: "Sources/Examples/AsyncTwitterClient", + swiftSettings: [ + .unsafeFlags([ + "-Xfrontend", + "-enable-experimental-concurrency", + ]) + ] ), ], swiftLanguageVersions: [.v5] diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift index d96bc6e2..03ec1046 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -1,11 +1,14 @@ -#if compiler(>=5.4) && $AsyncAwait +#if compiler(>=5.5) && $AsyncAwait +import Foundation +import NIO import RSocketCore import _Concurrency -import NIO import _NIOConcurrency @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public protocol RSocket { + func metadataPush(metadata: Data) + func fireAndForget(payload: Payload) func requestResponse(payload: Payload) async throws -> Payload func requestStream(payload: Payload) -> AsyncStreamSequence } @@ -16,6 +19,12 @@ public struct RequesterAdapter: RSocket { public init(requester: RSocketCore.RSocket) { self.requester = requester } + public func metadataPush(metadata: Data) { + requester.metadataPush(metadata: metadata) + } + public func fireAndForget(payload: Payload) { + requester.fireAndForget(payload: payload) + } public func requestResponse(payload: Payload) async throws -> Payload { struct RequestResponseOperator: UnidirectionalStream { var continuation: UnsafeContinuation @@ -76,20 +85,57 @@ public struct AsyncStreamSequence: AsyncSequence { } } +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +actor BufferedContinuation { + typealias Item = Result + let continuation = YieldingContinuation(yielding: Item.self) + var buffer = [Item]() + + public func push(result: Item) async { + if !continuation.yield(result) { + buffer.append(result) + } + } + public func push(_ element: Element) async { + await push(result: .success(element)) + } + public func push(throwing error: Error) async { + await push(result: .failure(error)) + } + + public func pop() async throws -> Element { + if buffer.count > 0 { + return try buffer.removeFirst().get() + } + return try await continuation.next().get() + } + + public func popAsResult() async -> Item { + if buffer.count > 0 { + return buffer.removeFirst() + } + return await continuation.next() + } +} + @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStream { fileprivate var subscription: Subscription! = nil - private var yieldingContinuation = YieldingContinuation() - + private var yieldingContinuation = BufferedContinuation() + private var isCompleted = false public func onNext(_ payload: Payload, isCompletion: Bool) { - _ = yieldingContinuation.yield(payload) - if isCompletion { - _ = yieldingContinuation.yield(nil) + detach { [self] in + await yieldingContinuation.push(payload) + if isCompletion { + await yieldingContinuation.push(nil) + } } } public func onComplete() { - _ = yieldingContinuation.yield(nil) + detach { [self] in + await yieldingContinuation.push(nil) + } } public func onRequestN(_ requestN: Int32) { @@ -97,19 +143,30 @@ public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStr } public func onCancel() { - _ = yieldingContinuation.yield(nil) + detach { [self] in + await yieldingContinuation.push(nil) + } } public func onError(_ error: Error) { - _ = yieldingContinuation.yield(throwing: error) + detach { [self] in + await yieldingContinuation.push(throwing: error) + } } public func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { assertionFailure("request stream does not support \(#function)") } public func next() async throws -> Payload? { + guard !isCompleted else { return nil } subscription.onRequestN(1) - return try await yieldingContinuation.next() + let value = await yieldingContinuation.popAsResult() + switch value { + case .failure, .success(.none): + isCompleted = true + default: break + } + return try value.get() } deinit { subscription.onCancel() From dd5d64c962bfee6c02dc44b6603c1bcf274cac78 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 20 Apr 2021 09:41:25 +0200 Subject: [PATCH 06/22] use `CheckedContinuation` instead of `UnsafeContinuation` Signed-off-by: David Nadoba --- Package.resolved | 6 +++--- Package.swift | 2 +- Sources/RSocketAsync/AsyncAwait.swift | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Package.resolved b/Package.resolved index c057a030..370f4d3c 100644 --- a/Package.resolved +++ b/Package.resolved @@ -21,10 +21,10 @@ }, { "package": "swift-nio", - "repositoryURL": "https://github.com/adam-fowler/swift-nio", + "repositoryURL": "https://github.com/apple/swift-nio", "state": { - "branch": "async-available", - "revision": "b5d6a06204d87cfe24ed2f19e59936d5875d543b", + "branch": "main", + "revision": "f6936ae8132e14c64ed971764065e6842358fde0", "version": null } }, diff --git a/Package.swift b/Package.swift index d09c95c9..d7960bdd 100644 --- a/Package.swift +++ b/Package.swift @@ -29,7 +29,7 @@ let package = Package( ], dependencies: [ .package(url: "https://github.com/ReactiveCocoa/ReactiveSwift.git", from: "6.6.0"), - .package(url: "https://github.com/adam-fowler/swift-nio", .branch("async-available")), + .package(url: "https://github.com/apple/swift-nio", .branch("main")), .package(url: "https://github.com/apple/swift-nio-extras", from: "1.8.0"), .package(url: "https://github.com/apple/swift-nio-transport-services", from: "1.9.2"), .package(url: "https://github.com/apple/swift-nio-ssl", from: "2.10.4"), diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift index 03ec1046..6d139c9d 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -27,7 +27,7 @@ public struct RequesterAdapter: RSocket { } public func requestResponse(payload: Payload) async throws -> Payload { struct RequestResponseOperator: UnidirectionalStream { - var continuation: UnsafeContinuation + var continuation: CheckedContinuation func onNext(_ payload: Payload, isCompletion: Bool) { assert(isCompletion) continuation.resume(returning: payload) @@ -55,7 +55,7 @@ public struct RequesterAdapter: RSocket { } var cancelable: Cancellable? defer { cancelable?.onCancel() } - return try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in + return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in let stream = RequestResponseOperator(continuation: continuation) cancelable = requester.requestResponse(payload: payload, responderStream: stream) } From b436abf2ac776db0030aaa0e9e5058678f1257af Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 13 Jul 2021 18:51:22 +0200 Subject: [PATCH 07/22] fix merge conflicts Signed-off-by: David Nadoba --- Package.resolved | 6 +++--- Package.swift | 2 +- Sources/Examples/AsyncTwitterClient/main.swift | 15 ++++++--------- Sources/RSocketAsync/AsyncAwait.swift | 4 ++-- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/Package.resolved b/Package.resolved index 370f4d3c..3b50e047 100644 --- a/Package.resolved +++ b/Package.resolved @@ -23,9 +23,9 @@ "package": "swift-nio", "repositoryURL": "https://github.com/apple/swift-nio", "state": { - "branch": "main", - "revision": "f6936ae8132e14c64ed971764065e6842358fde0", - "version": null + "branch": null, + "revision": "d79e33308b0ac83326b0ead0ea6446e604b8162d", + "version": "2.30.0" } }, { diff --git a/Package.swift b/Package.swift index d7960bdd..dc0dddfb 100644 --- a/Package.swift +++ b/Package.swift @@ -29,7 +29,7 @@ let package = Package( ], dependencies: [ .package(url: "https://github.com/ReactiveCocoa/ReactiveSwift.git", from: "6.6.0"), - .package(url: "https://github.com/apple/swift-nio", .branch("main")), + .package(url: "https://github.com/apple/swift-nio", from: "2.30.0"), .package(url: "https://github.com/apple/swift-nio-extras", from: "1.8.0"), .package(url: "https://github.com/apple/swift-nio-transport-services", from: "1.9.2"), .package(url: "https://github.com/apple/swift-nio-ssl", from: "2.10.4"), diff --git a/Sources/Examples/AsyncTwitterClient/main.swift b/Sources/Examples/AsyncTwitterClient/main.swift index 87a2f98e..425acc17 100644 --- a/Sources/Examples/AsyncTwitterClient/main.swift +++ b/Sources/Examples/AsyncTwitterClient/main.swift @@ -51,16 +51,13 @@ struct TwitterClientExample: ParsableCommand { } func runAsync() async throws { let bootstrap = ClientBootstrap( - config: ClientSetupConfig( - timeBetweenKeepaliveFrames: 0, - maxLifetime: 30_000, - metadataEncodingMimeType: "message/x.rsocket.routing.v0", - dataEncodingMimeType: "application/json" - ), - transport: WSTransport(), - timeout: .seconds(30) + transport: WSTransport(), + config: .mobileToServer + .set(\.encoding.metadata, to: .rsocketRoutingV0) + .set(\.encoding.data, to: .json), + timeout: .seconds(30) ) - let client = try await bootstrap.connect(to: .init(url: url)) + let client = try await bootstrap.connect(to: .init(url: url), payload: .empty) let stream = client.requester.requestStream(payload: Payload( metadata: route("searchTweets"), diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift index 6d139c9d..30ba0362 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -186,8 +186,8 @@ public struct AsyncClient { @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) extension RSocketCore.ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket { - public func connect(to endpoint: Transport.Endpoint) async throws -> AsyncClient { - AsyncClient(try await connect(to: endpoint, responder: nil).get()) + public func connect(to endpoint: Transport.Endpoint, payload: Payload) async throws -> AsyncClient { + AsyncClient(try await connect(to: endpoint, payload: payload, responder: nil).get()) } } #endif From 178bd54eb71970be385078512103e6963f76fe9f Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 13 Jul 2021 18:53:43 +0200 Subject: [PATCH 08/22] remove unsafe flags because they are no longer needed Signed-off-by: David Nadoba --- Package.swift | 5 ----- Sources/Examples/AsyncTwitterClient/main.swift | 2 +- Sources/RSocketAsync/AsyncAwait.swift | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/Package.swift b/Package.swift index dc0dddfb..fa128f4d 100644 --- a/Package.swift +++ b/Package.swift @@ -52,11 +52,6 @@ let package = Package( "RSocketCore", .product(name: "NIO", package: "swift-nio"), .product(name: "_NIOConcurrency", package: "swift-nio"), - ], swiftSettings: [ - .unsafeFlags([ - "-Xfrontend", - "-enable-experimental-concurrency", - ]) ]), // Channel diff --git a/Sources/Examples/AsyncTwitterClient/main.swift b/Sources/Examples/AsyncTwitterClient/main.swift index 425acc17..e7b078e9 100644 --- a/Sources/Examples/AsyncTwitterClient/main.swift +++ b/Sources/Examples/AsyncTwitterClient/main.swift @@ -1,4 +1,4 @@ -#if compiler(>=5.5) && $AsyncAwait +#if compiler(>=5.5) import ArgumentParser import Foundation import NIO diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift index 30ba0362..1dcb0bc1 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -1,4 +1,4 @@ -#if compiler(>=5.5) && $AsyncAwait +#if compiler(>=5.5) import Foundation import NIO import RSocketCore From baa4ff3564e5bdb1bedcb3d6de5298db50e4cbd3 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 13 Jul 2021 19:01:08 +0200 Subject: [PATCH 09/22] run tests on Xcode beta too Signed-off-by: David Nadoba --- .github/workflows/swift.yml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index cbae9e50..e51dfacf 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -9,11 +9,19 @@ on: jobs: test-on-macOS-and-iOS: runs-on: macos-latest + strategy: + matrix: + xcode: [ + '/Applications/Xcode_12.5.1.app/Contents/Developer', + '/Applications/Xcode_13.0.app/Contents/Developer', + ] steps: - uses: actions/checkout@v2 - name: Test on iOS Simulator + env: + DEVELOPER_DIR: ${{ matrix.xcode }} run: > xcodebuild test -scheme RSocket-Package @@ -24,6 +32,8 @@ jobs: -destination:'platform=iOS Simulator,name=iPhone 12' - name: Test on macOS + env: + DEVELOPER_DIR: ${{ matrix.xcode }} run: > xcodebuild test -scheme RSocket-Package @@ -34,10 +44,18 @@ jobs: performance-tests-on-macOS: runs-on: macos-latest + strategy: + matrix: + xcode: [ + '/Applications/Xcode_12.5.1.app/Contents/Developer', + '/Applications/Xcode_13.0.app/Contents/Developer', + ] steps: - uses: actions/checkout@v2 - name: Build & Run Performance Tests on macOS + env: + DEVELOPER_DIR: ${{ matrix.xcode }} run: > swift test --configuration=release From f10526736f014d0a2d51a3a66df769560e335dd9 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 13 Jul 2021 20:25:38 +0200 Subject: [PATCH 10/22] remove unsafe flags Signed-off-by: David Nadoba --- Package.swift | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/Package.swift b/Package.swift index fa128f4d..43fb1c54 100644 --- a/Package.swift +++ b/Package.swift @@ -153,13 +153,7 @@ let package = Package( .product(name: "NIO", package: "swift-nio"), .product(name: "_NIOConcurrency", package: "swift-nio"), ], - path: "Sources/Examples/AsyncTwitterClient", - swiftSettings: [ - .unsafeFlags([ - "-Xfrontend", - "-enable-experimental-concurrency", - ]) - ] + path: "Sources/Examples/AsyncTwitterClient" ), ], swiftLanguageVersions: [.v5] From 5b21af98ae3422223126e005d489d772711686e3 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 13 Jul 2021 20:29:20 +0200 Subject: [PATCH 11/22] use maxim-lobanov/setup-xcode@v1 to select Xcode version Signed-off-by: David Nadoba --- .github/workflows/swift.yml | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index e51dfacf..b6f98b7d 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -11,17 +11,15 @@ jobs: runs-on: macos-latest strategy: matrix: - xcode: [ - '/Applications/Xcode_12.5.1.app/Contents/Developer', - '/Applications/Xcode_13.0.app/Contents/Developer', - ] + xcode: ['12.5.1', '13.0'] steps: - uses: actions/checkout@v2 - + - uses: maxim-lobanov/setup-xcode@v1 + with: + xcode-version: ${{ matrix.xcode }} - name: Test on iOS Simulator - env: - DEVELOPER_DIR: ${{ matrix.xcode }} + run: > xcodebuild test -scheme RSocket-Package @@ -32,8 +30,6 @@ jobs: -destination:'platform=iOS Simulator,name=iPhone 12' - name: Test on macOS - env: - DEVELOPER_DIR: ${{ matrix.xcode }} run: > xcodebuild test -scheme RSocket-Package @@ -46,13 +42,13 @@ jobs: runs-on: macos-latest strategy: matrix: - xcode: [ - '/Applications/Xcode_12.5.1.app/Contents/Developer', - '/Applications/Xcode_13.0.app/Contents/Developer', - ] + xcode: ['12.5.1', '13.0'] steps: - uses: actions/checkout@v2 + - uses: maxim-lobanov/setup-xcode@v1 + with: + xcode-version: ${{ matrix.xcode }} - name: Build & Run Performance Tests on macOS env: DEVELOPER_DIR: ${{ matrix.xcode }} From 36160ed8eafa8c4ddd089f01250dee43248b7261 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Wed, 14 Jul 2021 10:44:46 +0200 Subject: [PATCH 12/22] use macOS 11 Signed-off-by: David Nadoba --- .github/workflows/swift.yml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index b6f98b7d..c5317d15 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -8,7 +8,7 @@ on: jobs: test-on-macOS-and-iOS: - runs-on: macos-latest + runs-on: macos-11 strategy: matrix: xcode: ['12.5.1', '13.0'] @@ -39,7 +39,7 @@ jobs: -destination 'platform=macOS' performance-tests-on-macOS: - runs-on: macos-latest + runs-on: macos-11 strategy: matrix: xcode: ['12.5.1', '13.0'] @@ -50,8 +50,6 @@ jobs: with: xcode-version: ${{ matrix.xcode }} - name: Build & Run Performance Tests on macOS - env: - DEVELOPER_DIR: ${{ matrix.xcode }} run: > swift test --configuration=release From 4a710715cdf1fff21c548d33b4d956ce1bcd9aca Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Wed, 14 Jul 2021 15:54:30 +0200 Subject: [PATCH 13/22] use new AsyncThrowingStream instead of YieldingContinuation Signed-off-by: David Nadoba --- Package.swift | 18 ++-- Sources/RSocketAsync/AsyncAwait.swift | 121 +++++++------------------- 2 files changed, 42 insertions(+), 97 deletions(-) diff --git a/Package.swift b/Package.swift index 43fb1c54..cc8fa24d 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:5.3 +// swift-tools-version:5.4 import PackageDescription @@ -106,7 +106,7 @@ let package = Package( ]), // Examples - .target( + .executableTarget( name: "TimerClientExample", dependencies: [ "RSocketCore", @@ -118,7 +118,7 @@ let package = Package( ], path: "Sources/Examples/TimerClient" ), - .target( + .executableTarget( name: "TwitterClientExample", dependencies: [ "RSocketCore", @@ -130,7 +130,7 @@ let package = Package( ], path: "Sources/Examples/TwitterClient" ), - .target( + .executableTarget( name: "VanillaClientExample", dependencies: [ "RSocketCore", @@ -142,7 +142,7 @@ let package = Package( ], path: "Sources/Examples/VanillaClient" ), - .target( + .executableTarget( name: "AsyncTwitterClientExample", dependencies: [ "RSocketCore", @@ -153,7 +153,13 @@ let package = Package( .product(name: "NIO", package: "swift-nio"), .product(name: "_NIOConcurrency", package: "swift-nio"), ], - path: "Sources/Examples/AsyncTwitterClient" + path: "Sources/Examples/AsyncTwitterClient", + swiftSettings: [ + .unsafeFlags([ + "-Xfrontend", + "-enable-experimental-concurrency" + ]) + ] ), ], swiftLanguageVersions: [.v5] diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift index 1dcb0bc1..92d80448 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -10,7 +10,7 @@ public protocol RSocket { func metadataPush(metadata: Data) func fireAndForget(payload: Payload) func requestResponse(payload: Payload) async throws -> Payload - func requestStream(payload: Payload) -> AsyncStreamSequence + func requestStream(payload: Payload) -> AsyncThrowingStream } @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) @@ -61,116 +61,55 @@ public struct RequesterAdapter: RSocket { } } - public func requestStream(payload: Payload) -> AsyncStreamSequence { - AsyncStreamSequence(payload: payload, requester: requester) - } -} - -@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) -public struct AsyncStreamSequence: AsyncSequence { - public typealias AsyncIterator = AsyncStreamIterator - - public typealias Element = Payload - - fileprivate init(payload: Payload, requester: RSocketCore.RSocket) { - self.payload = payload - self.requester = requester - } - private var payload: Payload - private var requester: RSocketCore.RSocket - public func makeAsyncIterator() -> AsyncStreamIterator { - let stream = AsyncStreamIterator() - stream.subscription = requester.stream(payload: payload, initialRequestN: 0, responderStream: stream) - return stream - } -} - -@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) -actor BufferedContinuation { - typealias Item = Result - let continuation = YieldingContinuation(yielding: Item.self) - var buffer = [Item]() - - public func push(result: Item) async { - if !continuation.yield(result) { - buffer.append(result) - } - } - public func push(_ element: Element) async { - await push(result: .success(element)) - } - public func push(throwing error: Error) async { - await push(result: .failure(error)) - } - - public func pop() async throws -> Element { - if buffer.count > 0 { - return try buffer.removeFirst().get() - } - return try await continuation.next().get() - } - - public func popAsResult() async -> Item { - if buffer.count > 0 { - return buffer.removeFirst() + public func requestStream(payload: Payload) -> AsyncThrowingStream { + AsyncThrowingStream(Payload.self, bufferingPolicy: .unbounded) { continuation in + let adapter = AsyncStreamAdapter(continuation: continuation) + let subscription = requester.stream(payload: payload, initialRequestN: .max, responderStream: adapter) + continuation.onTermination = { @Sendable (reason: AsyncThrowingStream.Continuation.Termination) -> Void in + switch reason { + case .cancelled: + subscription.onCancel() + case .finished: break + // TODO: `Termination` should probably be @frozen so we do not have to deal with the @unknown default case + @unknown default: break + } + } } - return await continuation.next() } } @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) -public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStream { - fileprivate var subscription: Subscription! = nil - private var yieldingContinuation = BufferedContinuation() - private var isCompleted = false +public final class AsyncStreamAdapter: UnidirectionalStream { + private var continuation: AsyncThrowingStream.Continuation + init(continuation: AsyncThrowingStream.Continuation) { + self.continuation = continuation + } public func onNext(_ payload: Payload, isCompletion: Bool) { - detach { [self] in - await yieldingContinuation.push(payload) - if isCompletion { - await yieldingContinuation.push(nil) - } + continuation.yield(payload) + if isCompletion { + continuation.finish() } } - + public func onComplete() { - detach { [self] in - await yieldingContinuation.push(nil) - } + continuation.finish() } - + public func onRequestN(_ requestN: Int32) { assertionFailure("request stream does not support \(#function)") } - + public func onCancel() { - detach { [self] in - await yieldingContinuation.push(nil) - } + continuation.finish() } - + public func onError(_ error: Error) { - detach { [self] in - await yieldingContinuation.push(throwing: error) - } + continuation.yield(with: .failure(error)) } - + public func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { assertionFailure("request stream does not support \(#function)") } - public func next() async throws -> Payload? { - guard !isCompleted else { return nil } - subscription.onRequestN(1) - let value = await yieldingContinuation.popAsResult() - switch value { - case .failure, .success(.none): - isCompleted = true - default: break - } - return try value.get() - } - deinit { - subscription.onCancel() - } } @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) From 2ccd242837153b04ff2ed9eb1e514ade7f8dd7b6 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 15 Jul 2021 16:29:54 +0200 Subject: [PATCH 14/22] remove unsafe flags Signed-off-by: David Nadoba --- Package.swift | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/Package.swift b/Package.swift index cc8fa24d..81129546 100644 --- a/Package.swift +++ b/Package.swift @@ -153,13 +153,7 @@ let package = Package( .product(name: "NIO", package: "swift-nio"), .product(name: "_NIOConcurrency", package: "swift-nio"), ], - path: "Sources/Examples/AsyncTwitterClient", - swiftSettings: [ - .unsafeFlags([ - "-Xfrontend", - "-enable-experimental-concurrency" - ]) - ] + path: "Sources/Examples/AsyncTwitterClient" ), ], swiftLanguageVersions: [.v5] From 603353ec08472c6772afb8301c0519376e074cf6 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 20 Jul 2021 22:24:48 +0200 Subject: [PATCH 15/22] implement requestChannel Signed-off-by: David Nadoba --- Sources/RSocketAsync/AsyncAwait.swift | 58 +++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/AsyncAwait.swift index 92d80448..b26f4672 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/AsyncAwait.swift @@ -11,6 +11,11 @@ public protocol RSocket { func fireAndForget(payload: Payload) func requestResponse(payload: Payload) async throws -> Payload func requestStream(payload: Payload) -> AsyncThrowingStream + func requestChannel( + initialPayload: Payload, + payloadStream: PayloadSequence + ) -> AsyncThrowingStream + where PayloadSequence: AsyncSequence, PayloadSequence.Element == Payload } @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) @@ -76,38 +81,77 @@ public struct RequesterAdapter: RSocket { } } } + + public func requestChannel( + initialPayload: Payload, + payloadStream: PayloadSequence + ) -> AsyncThrowingStream where PayloadSequence: AsyncSequence, PayloadSequence.Element == Payload { + AsyncThrowingStream(Payload.self, bufferingPolicy: .unbounded) { continuation in + let adapter = AsyncStreamAdapter(continuation: continuation) + let channel = requester.channel( + payload: initialPayload, + initialRequestN: .max, + isCompleted: false, + responderStream: adapter + ) + + let task = Task.detached { + do { + for try await payload in payloadStream { + channel.onNext(payload, isCompletion: false) + } + channel.onComplete() + } catch is CancellationError { + channel.onCancel() + } catch { + channel.onError(Error.applicationError(message: error.localizedDescription)) + } + } + + continuation.onTermination = { @Sendable (reason: AsyncThrowingStream.Continuation.Termination) -> Void in + switch reason { + case .cancelled: + channel.onCancel() + task.cancel() + case .finished: break + // TODO: `Termination` should probably be @frozen so we do not have to deal with the @unknown default case + @unknown default: break + } + } + } + } } @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) -public final class AsyncStreamAdapter: UnidirectionalStream { +internal final class AsyncStreamAdapter: UnidirectionalStream { private var continuation: AsyncThrowingStream.Continuation init(continuation: AsyncThrowingStream.Continuation) { self.continuation = continuation } - public func onNext(_ payload: Payload, isCompletion: Bool) { + internal func onNext(_ payload: Payload, isCompletion: Bool) { continuation.yield(payload) if isCompletion { continuation.finish() } } - public func onComplete() { + internal func onComplete() { continuation.finish() } - public func onRequestN(_ requestN: Int32) { + internal func onRequestN(_ requestN: Int32) { assertionFailure("request stream does not support \(#function)") } - public func onCancel() { + internal func onCancel() { continuation.finish() } - public func onError(_ error: Error) { + internal func onError(_ error: Error) { continuation.yield(with: .failure(error)) } - public func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { + internal func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { assertionFailure("request stream does not support \(#function)") } } From affca722787e098c7a56dfeeb3f313aa08951ae3 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 26 Jul 2021 21:29:47 +0200 Subject: [PATCH 16/22] split into multiple files Signed-off-by: David Nadoba --- Sources/RSocketAsync/Client.swift | 38 +++++++++++++++ Sources/RSocketAsync/RSocket.swift | 34 ++++++++++++++ .../{AsyncAwait.swift => Requester.swift} | 47 +++++++------------ 3 files changed, 88 insertions(+), 31 deletions(-) create mode 100644 Sources/RSocketAsync/Client.swift create mode 100644 Sources/RSocketAsync/RSocket.swift rename Sources/RSocketAsync/{AsyncAwait.swift => Requester.swift} (81%) diff --git a/Sources/RSocketAsync/Client.swift b/Sources/RSocketAsync/Client.swift new file mode 100644 index 00000000..fb2d9b7c --- /dev/null +++ b/Sources/RSocketAsync/Client.swift @@ -0,0 +1,38 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if compiler(>=5.5) +import RSocketCore + +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +public struct AsyncClient { + private let coreClient: RSocketCore.CoreClient + + public var requester: RSocket { RequesterAdapter(requester: coreClient.requester) } + + public init(_ coreClient: RSocketCore.CoreClient) { + self.coreClient = coreClient + } +} + +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +extension RSocketCore.ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket { + public func connect(to endpoint: Transport.Endpoint, payload: Payload) async throws -> AsyncClient { + AsyncClient(try await connect(to: endpoint, payload: payload, responder: nil).get()) + } +} + +#endif diff --git a/Sources/RSocketAsync/RSocket.swift b/Sources/RSocketAsync/RSocket.swift new file mode 100644 index 00000000..76627f9a --- /dev/null +++ b/Sources/RSocketAsync/RSocket.swift @@ -0,0 +1,34 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if compiler(>=5.5) +import Foundation +import RSocketCore + +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +public protocol RSocket { + func metadataPush(metadata: Data) + func fireAndForget(payload: Payload) + func requestResponse(payload: Payload) async throws -> Payload + func requestStream(payload: Payload) -> AsyncThrowingStream + func requestChannel( + initialPayload: Payload, + payloadStream: PayloadSequence + ) -> AsyncThrowingStream + where PayloadSequence: AsyncSequence, PayloadSequence.Element == Payload +} + +#endif diff --git a/Sources/RSocketAsync/AsyncAwait.swift b/Sources/RSocketAsync/Requester.swift similarity index 81% rename from Sources/RSocketAsync/AsyncAwait.swift rename to Sources/RSocketAsync/Requester.swift index b26f4672..d35aed46 100644 --- a/Sources/RSocketAsync/AsyncAwait.swift +++ b/Sources/RSocketAsync/Requester.swift @@ -1,23 +1,25 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #if compiler(>=5.5) import Foundation import NIO import RSocketCore -import _Concurrency import _NIOConcurrency -@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) -public protocol RSocket { - func metadataPush(metadata: Data) - func fireAndForget(payload: Payload) - func requestResponse(payload: Payload) async throws -> Payload - func requestStream(payload: Payload) -> AsyncThrowingStream - func requestChannel( - initialPayload: Payload, - payloadStream: PayloadSequence - ) -> AsyncThrowingStream - where PayloadSequence: AsyncSequence, PayloadSequence.Element == Payload -} - @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) public struct RequesterAdapter: RSocket { private let requester: RSocketCore.RSocket @@ -156,21 +158,4 @@ internal final class AsyncStreamAdapter: UnidirectionalStream { } } -@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) -public struct AsyncClient { - private let coreClient: RSocketCore.CoreClient - - public var requester: RSocket { RequesterAdapter(requester: coreClient.requester) } - - public init(_ coreClient: RSocketCore.CoreClient) { - self.coreClient = coreClient - } -} - -@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) -extension RSocketCore.ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket { - public func connect(to endpoint: Transport.Endpoint, payload: Payload) async throws -> AsyncClient { - AsyncClient(try await connect(to: endpoint, payload: payload, responder: nil).get()) - } -} #endif From 34f7042aa9956e7824013da3426050b8a8085d51 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 27 Jul 2021 00:00:22 +0200 Subject: [PATCH 17/22] async/await responder prototype Signed-off-by: David Nadoba --- Sources/RSocketAsync/Responder.swift | 219 +++++++++++++++++++++++++++ 1 file changed, 219 insertions(+) create mode 100644 Sources/RSocketAsync/Responder.swift diff --git a/Sources/RSocketAsync/Responder.swift b/Sources/RSocketAsync/Responder.swift new file mode 100644 index 00000000..45d21bef --- /dev/null +++ b/Sources/RSocketAsync/Responder.swift @@ -0,0 +1,219 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if compiler(>=5.5) +import RSocketCore +import Foundation + +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +struct ResponderAdapter: RSocketCore.RSocket { + var responder: RSocket + + func metadataPush(metadata: Data) { + responder.metadataPush(metadata: metadata) + } + + func fireAndForget(payload: Payload) { + responder.fireAndForget(payload: payload) + } + + func requestResponse( + payload: Payload, + responderStream: UnidirectionalStream + ) -> Cancellable { + let task = Task.init(priority: nil) { + do { + let response = try await responder.requestResponse(payload: payload) + responderStream.onNext(response, isCompletion: true) + } catch { + responderStream.onError(Error.applicationError(message: error.localizedDescription)) + } + } + return RequestResponseResponder(task: task) + } + + func stream( + payload: Payload, + initialRequestN: Int32, + responderStream: UnidirectionalStream + ) -> Subscription { + let task = Task.init(priority: nil) { + do { + let stream = responder.requestStream(payload: payload) + for try await responderPayload in stream { + responderStream.onNext(responderPayload, isCompletion: false) + } + responderStream.onComplete() + } catch is CancellationError { + responderStream.onCancel() + } catch { + responderStream.onError(Error.applicationError(message: error.localizedDescription)) + } + } + + return RequestStreamResponder(task: task) + } + + func channel(payload: Payload, initialRequestN: Int32, isCompleted: Bool, responderStream: UnidirectionalStream) -> UnidirectionalStream { + let requesterStream = RequestChannelAsyncSequence() + + let task = Task.init(priority: nil) { + do { + let responderPayloads = responder.requestChannel(initialPayload: payload, payloadStream: requesterStream) + for try await responderPayload in responderPayloads { + responderStream.onNext(responderPayload, isCompletion: false) + } + responderStream.onComplete() + } catch is CancellationError { + responderStream.onCancel() + } catch { + responderStream.onError(Error.applicationError(message: error.localizedDescription)) + } + } + requesterStream.task = task + return requesterStream + } +} + +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +fileprivate class RequestResponseResponder: Cancellable { + private let task: Task + + internal init(task: Task) { + self.task = task + } + + deinit { + task.cancel() + } + + func onCancel() { + task.cancel() + } + + func onError(_ error: Error) { + // TODO: Can a request actually send an error? If yes, we should probably do something with the error + task.cancel() + } + + func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { + assertionFailure("\(Self.self) does not support \(#function)") + } +} + +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +fileprivate class RequestStreamResponder: Subscription { + private let task: Task + + internal init(task: Task) { + self.task = task + } + + deinit { + task.cancel() + } + + func onCancel() { + task.cancel() + } + + func onError(_ error: Error) { + // TODO: Can a stream actually send an error? If yes, we should probably do something with the error + task.cancel() + } + + func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { + assertionFailure("\(Self.self) does not support \(#function)") + } + + func onRequestN(_ requestN: Int32) { + assertionFailure("\(Self.self) does not support \(#function)") + } +} + +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +fileprivate class RequestChannelAsyncSequence: AsyncSequence, UnidirectionalStream { + typealias AsyncIterator = AsyncThrowingStream.AsyncIterator + typealias Element = Payload + + private var iterator: AsyncThrowingStream.AsyncIterator! + private var continuation: AsyncThrowingStream.Continuation! + + internal var task: Task? + + internal init() { + let sequence = AsyncThrowingStream(Payload.self, bufferingPolicy: .unbounded) { continuation in + self.continuation = continuation + continuation.onTermination = { @Sendable [weak self] (reason) in + // TODO: `task` is not safe to access here because we set it late, after we give `self` to user code + // but just adding a lock is not enough because we could be terminated before `task` is even set and we then need to cancel `task` after it is set + // I hope we find a better solution. Maybe we can access the current task from within the `Task.init` closure which would solve both problems mentioned above + // UPDATE: Looks like it will be possible. The documentation of `withUnsafeCurrentTask(body:)` says that `UnsafeCurrentTask` has get a `task` property. But it does currently (Xcode 12 Beta 3) not have it. + switch reason { + case let .finished(.some(error)): + if error is CancellationError { + return + } + // only in the error case we cancel task + self?.task?.cancel() + case .finished(nil): break + case .cancelled: break + @unknown default: break + } + } + } + iterator = sequence.makeAsyncIterator() + } + + deinit { + task?.cancel() + continuation.finish(throwing: CancellationError()) + } + + func makeAsyncIterator() -> AsyncIterator { + iterator + } + + func onNext(_ payload: Payload, isCompletion: Bool) { + continuation.yield(payload) + if isCompletion { + continuation.finish() + } + } + + func onComplete() { + continuation.finish() + } + + func onCancel() { + continuation.finish(throwing: CancellationError()) + } + + func onError(_ error: Error) { + continuation.finish(throwing: error) + task?.cancel() + } + + func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) { + assertionFailure("\(Self.self) does not support \(#function)") + } + + func onRequestN(_ requestN: Int32) { + assertionFailure("\(Self.self) does not support \(#function)") + } +} + +#endif From bf8d21fa739bfb1acfa771ef39363c21f29a7f13 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 27 Jul 2021 00:15:27 +0200 Subject: [PATCH 18/22] fix merge conflict Signed-off-by: David Nadoba --- Sources/Examples/AsyncTwitterClient/main.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/Examples/AsyncTwitterClient/main.swift b/Sources/Examples/AsyncTwitterClient/main.swift index e7b078e9..0eddeea4 100644 --- a/Sources/Examples/AsyncTwitterClient/main.swift +++ b/Sources/Examples/AsyncTwitterClient/main.swift @@ -53,8 +53,8 @@ struct TwitterClientExample: ParsableCommand { let bootstrap = ClientBootstrap( transport: WSTransport(), config: .mobileToServer - .set(\.encoding.metadata, to: .rsocketRoutingV0) - .set(\.encoding.data, to: .json), + .set(\.encoding.metadata, to: .messageXRSocketRoutingV0) + .set(\.encoding.data, to: .applicationJson), timeout: .seconds(30) ) let client = try await bootstrap.connect(to: .init(url: url), payload: .empty) From 2c800d8897952ba47ad601b77e0907a834a76ab1 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 27 Jul 2021 00:20:35 +0200 Subject: [PATCH 19/22] use SwiftPM directly with Xcode 13 Signed-off-by: David Nadoba --- .github/workflows/swift.yml | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index c5317d15..67d1f43c 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -9,17 +9,38 @@ on: jobs: test-on-macOS-and-iOS: runs-on: macos-11 - strategy: - matrix: - xcode: ['12.5.1', '13.0'] - steps: - uses: actions/checkout@v2 - uses: maxim-lobanov/setup-xcode@v1 with: - xcode-version: ${{ matrix.xcode }} + xcode-version: 12.5.1 - name: Test on iOS Simulator + run: > + xcodebuild test + -scheme RSocket-Package + -parallelizeTargets + -skip-testing:RSocketCorePerformanceTests + -parallel-testing-enabled + -sdk:iphonesimulator + -destination:'platform=iOS Simulator,name=iPhone 12' + + - name: Test on macOS + run: > + xcodebuild test + -scheme RSocket-Package + -parallelizeTargets + -skip-testing:RSocketCorePerformanceTests + -parallel-testing-enabled + -destination 'platform=macOS' + test-on-macOS-with-Xcode-13-Beta: + runs-on: macos-11 + steps: + - uses: actions/checkout@v2 + - uses: maxim-lobanov/setup-xcode@v1 + with: + xcode-version: 13.0 + - name: Test on iOS Simulator run: > xcodebuild test -scheme RSocket-Package From a5d9b4ad1d38ae94d012ecb17227765680b2f396 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 27 Jul 2021 00:23:09 +0200 Subject: [PATCH 20/22] use `swift test` with Xcode 13 Signed-off-by: David Nadoba --- .github/workflows/swift.yml | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml index 67d1f43c..be1a6b3e 100644 --- a/.github/workflows/swift.yml +++ b/.github/workflows/swift.yml @@ -40,24 +40,9 @@ jobs: - uses: maxim-lobanov/setup-xcode@v1 with: xcode-version: 13.0 - - name: Test on iOS Simulator - run: > - xcodebuild test - -scheme RSocket-Package - -parallelizeTargets - -skip-testing:RSocketCorePerformanceTests - -parallel-testing-enabled - -sdk:iphonesimulator - -destination:'platform=iOS Simulator,name=iPhone 12' - name: Test on macOS - run: > - xcodebuild test - -scheme RSocket-Package - -parallelizeTargets - -skip-testing:RSocketCorePerformanceTests - -parallel-testing-enabled - -destination 'platform=macOS' + run: swift test performance-tests-on-macOS: runs-on: macos-11 From 9524acf07cb28be7456f1e16305cf47e66218ffd Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Wed, 4 Aug 2021 21:49:24 +0200 Subject: [PATCH 21/22] new requester API for ReactiveSwift Signed-off-by: David Nadoba --- Sources/Examples/TimerClient/main.swift | 20 ++---- Sources/Examples/TwitterClient/main.swift | 35 ++++------ Sources/Examples/VanillaClient/main.swift | 7 +- .../ConnectionEstablishment.swift | 28 ++------ Sources/RSocketCore/ChannelPipeline.swift | 12 +++- .../RSocketCore/Client/ClientBootstrap.swift | 2 + .../Client/ClientConfiguration.swift | 25 +------ Sources/RSocketCore/DefaultRSocket.swift | 1 + .../Extensions/Coder/Decoder/Decoder.swift | 10 +++ .../Extensions/Coder/Encoder/Encoder.swift | 10 +++ .../Encoder/OctetStreamMetadataEncoder.swift | 39 +++++++++++ .../RootCompositeMetadataDecoder.swift | 4 +- .../Extensions/RoutingDecoder.swift | 5 +- .../Extensions/RoutingEncoder.swift | 3 + Sources/RSocketCore/RSocket.swift | 1 + Sources/RSocketCore/RequestDescription.swift | 29 +++++++- Sources/RSocketCore/Streams/Requester.swift | 3 + Sources/RSocketCore/Streams/Responder.swift | 4 +- .../RSocketNIOChannel/ClientBootstrap.swift | 2 +- .../Client/ReactiveSwiftClient.swift | 13 ++-- Sources/RSocketReactiveSwift/Requester.swift | 68 ++++++++++++++++++- .../RequesterRSocket.swift | 40 +++++++++++ Sources/RSocketReactiveSwift/Responder.swift | 9 ++- .../{RSocket.swift => ResponderRSocket.swift} | 2 +- .../RSocketTSChannel/ClientBootstrap.swift | 2 +- .../RSocketTestUtilities/TestRSocket.swift | 3 + Tests/RSocketCoreTests/EndToEndTests.swift | 4 +- .../RSocketReactiveSwiftTests.swift | 51 ++++++++------ .../TestDemultiplexer.swift | 6 +- .../TestRSocket.swift | 2 +- 30 files changed, 309 insertions(+), 131 deletions(-) create mode 100644 Sources/RSocketCore/Extensions/Coder/Encoder/OctetStreamMetadataEncoder.swift create mode 100644 Sources/RSocketReactiveSwift/RequesterRSocket.swift rename Sources/RSocketReactiveSwift/{RSocket.swift => ResponderRSocket.swift} (97%) diff --git a/Sources/Examples/TimerClient/main.swift b/Sources/Examples/TimerClient/main.swift index 038364ca..e8a3eeac 100644 --- a/Sources/Examples/TimerClient/main.swift +++ b/Sources/Examples/TimerClient/main.swift @@ -6,14 +6,6 @@ import RSocketNIOChannel import RSocketReactiveSwift import RSocketWSTransport -func route(_ route: String) -> Data { - let encodedRoute = Data(route.utf8) - precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded") - let encodedRouteLength = Data([UInt8(encodedRoute.count)]) - - return encodedRouteLength + encodedRoute -} - extension URL: ExpressibleByArgument { public init?(argument: String) { guard let url = URL(string: argument) else { return nil } @@ -43,12 +35,12 @@ struct TimerClientExample: ParsableCommand { ) let client = try bootstrap.connect(to: .init(url: url)).first()!.get() - - try client.requester.requestStream(payload: Payload( - metadata: route("timer"), - data: Data() - )) - .map() { String.init(decoding: $0.data, as: UTF8.self) } + try client.requester(RequestStream { + Encoder() + .encodeStaticMetadata("timer", using: RoutingEncoder()) + Decoder() + .mapData { String(decoding: $0, as: UTF8.self) } + }, request: Data()) .logEvents(identifier: "route.timer") .take(first: limit) .wait() diff --git a/Sources/Examples/TwitterClient/main.swift b/Sources/Examples/TwitterClient/main.swift index 58fd340f..8fa0bc3e 100644 --- a/Sources/Examples/TwitterClient/main.swift +++ b/Sources/Examples/TwitterClient/main.swift @@ -6,14 +6,6 @@ import RSocketNIOChannel import RSocketReactiveSwift import RSocketWSTransport -func route(_ route: String) -> Data { - let encodedRoute = Data(route.utf8) - precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded") - let encodedRouteLength = Data([UInt8(encodedRoute.count)]) - - return encodedRouteLength + encodedRoute -} - extension URL: ExpressibleByArgument { public init?(argument: String) { guard let url = URL(string: argument) else { return nil } @@ -40,23 +32,26 @@ struct TwitterClientExample: ParsableCommand { func run() throws { let bootstrap = ClientBootstrap( transport: WSTransport(), - config: ClientConfiguration.mobileToServer + config: .mobileToServer .set(\.encoding.metadata, to: .messageXRSocketRoutingV0) .set(\.encoding.data, to: .applicationJson) ) let client = try bootstrap.connect(to: .init(url: url)).first()!.get() - - try client.requester.requestStream(payload: Payload( - metadata: route("searchTweets"), - data: Data(searchString.utf8) - )) - .attemptMap { payload -> String in - // pretty print json - let json = try JSONSerialization.jsonObject(with: payload.data, options: []) - let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted]) - return String(decoding: data, as: UTF8.self) - } + try client.requester(RequestStream { + Encoder() + .encodeStaticMetadata("searchTweets", using: RoutingEncoder()) + .mapData { (string: String) in + Data(string.utf8) + } + Decoder() + .mapData { data -> String in + // pretty print json + let json = try JSONSerialization.jsonObject(with: data, options: []) + let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted]) + return String(decoding: data, as: UTF8.self) + } + }, request: searchString) .logEvents(identifier: "route.searchTweets") .take(first: limit) .wait() diff --git a/Sources/Examples/VanillaClient/main.swift b/Sources/Examples/VanillaClient/main.swift index 5563c42e..352456b1 100644 --- a/Sources/Examples/VanillaClient/main.swift +++ b/Sources/Examples/VanillaClient/main.swift @@ -22,8 +22,11 @@ struct VanillaClientExample: ParsableCommand { let client = try bootstrap.connect(to: .init(host: host, port: port)).first()!.get() - let streamProducer = client.requester.requestStream(payload: .empty) - let requestProducer = client.requester.requestResponse(payload: Payload(data: Data("HelloWorld".utf8))) + let streamProducer = client.requester( + RequestStream(), + request: Data() + ) + let requestProducer = client.requester(RequestResponse(), request: Data("HelloWorld".utf8)) streamProducer.logEvents(identifier: "stream1").take(first: 1).start() streamProducer.logEvents(identifier: "stream3").take(first: 10).start() diff --git a/Sources/RSocketCore/Channel Handler/ConnectionEstablishment.swift b/Sources/RSocketCore/Channel Handler/ConnectionEstablishment.swift index c5be7722..4ffd6455 100644 --- a/Sources/RSocketCore/Channel Handler/ConnectionEstablishment.swift +++ b/Sources/RSocketCore/Channel Handler/ConnectionEstablishment.swift @@ -44,27 +44,7 @@ public struct SetupInfo { /// Token used for client resume identification public let resumeIdentificationToken: Data? - /** - MIME Type for encoding of Metadata - - This SHOULD be a US-ASCII string that includes the Internet media type specified in RFC 2045. - Many are registered with IANA such as CBOR. - Suffix rules MAY be used for handling layout. - For example, `application/x.netflix+cbor` or `application/x.reactivesocket+cbor` or `application/x.netflix+json`. - The string MUST NOT be null terminated. - */ - public let metadataEncodingMimeType: String - - /** - MIME Type for encoding of Data - - This SHOULD be a US-ASCII string that includes the Internet media type specified in RFC 2045. - Many are registered with IANA such as CBOR. - Suffix rules MAY be used for handling layout. - For example, `application/x.netflix+cbor` or `application/x.reactivesocket+cbor` or `application/x.netflix+json`. - The string MUST NOT be null terminated. - */ - public let dataEncodingMimeType: String + public let encoding: ConnectionEncoding /// Payload of this frame describing connection capabilities of the endpoint sending the Setup header public let payload: Payload @@ -114,8 +94,10 @@ extension SetupInfo { self.timeBetweenKeepaliveFrames = setup.timeBetweenKeepaliveFrames self.maxLifetime = setup.maxLifetime self.resumeIdentificationToken = setup.resumeIdentificationToken - self.metadataEncodingMimeType = setup.metadataEncodingMimeType - self.dataEncodingMimeType = setup.dataEncodingMimeType + self.encoding = .init( + metadata: .init(rawValue: setup.metadataEncodingMimeType), + data: .init(rawValue: setup.dataEncodingMimeType) + ) self.payload = setup.payload } } diff --git a/Sources/RSocketCore/ChannelPipeline.swift b/Sources/RSocketCore/ChannelPipeline.swift index cda37057..21e20845 100644 --- a/Sources/RSocketCore/ChannelPipeline.swift +++ b/Sources/RSocketCore/ChannelPipeline.swift @@ -46,7 +46,13 @@ extension ChannelPipeline { self?.writeAndFlush(NIOAny(frame), promise: nil) } let promise = eventLoop.makePromise(of: Void.self) - let requester = Requester(streamIdGenerator: .client, eventLoop: eventLoop, sendFrame: sendFrame) + let requester = Requester( + streamIdGenerator: .client, + encoding: config.encoding, + eventLoop: eventLoop, + sendFrame: sendFrame + ) + let responder = responder ?? DefaultRSocket(encoding: config.encoding) promise.futureResult.map { requester as RSocket }.cascade(to: connectedPromise) let (timeBetweenKeepaliveFrames, maxLifetime): (Int32, Int32) do { @@ -108,14 +114,14 @@ extension ChannelPipeline { FrameEncoderHandler(maximumFrameSize: maximumFrameSize), ConnectionStateHandler(), ConnectionEstablishmentHandler(initializeConnection: { [unowned self] (info, channel) in - let responder = makeResponder?(info) + let responder = makeResponder?(info) ?? DefaultRSocket(encoding: info.encoding) let sendFrame: (Frame) -> () = { [weak self] frame in self?.writeAndFlush(NIOAny(frame), promise: nil) } return channel.pipeline.addHandlers([ DemultiplexerHandler( connectionSide: .server, - requester: Requester(streamIdGenerator: .server, eventLoop: eventLoop, sendFrame: sendFrame), + requester: Requester(streamIdGenerator: .server, encoding: info.encoding, eventLoop: eventLoop, sendFrame: sendFrame), responder: Responder(responderSocket: responder, eventLoop: eventLoop, sendFrame: sendFrame) ), KeepaliveHandler(timeBetweenKeepaliveFrames: info.timeBetweenKeepaliveFrames, maxLifetime: info.maxLifetime, connectionSide: ConnectionRole.server), diff --git a/Sources/RSocketCore/Client/ClientBootstrap.swift b/Sources/RSocketCore/Client/ClientBootstrap.swift index 8e9af665..1704333c 100644 --- a/Sources/RSocketCore/Client/ClientBootstrap.swift +++ b/Sources/RSocketCore/Client/ClientBootstrap.swift @@ -21,6 +21,8 @@ public protocol ClientBootstrap { associatedtype Responder associatedtype Transport: TransportChannelHandler + var config: ClientConfiguration { get } + /// Creates a new connection to the given `endpoint`. /// - Parameters: /// - endpoint: endpoint to connect to diff --git a/Sources/RSocketCore/Client/ClientConfiguration.swift b/Sources/RSocketCore/Client/ClientConfiguration.swift index eca3e704..85f46d21 100644 --- a/Sources/RSocketCore/Client/ClientConfiguration.swift +++ b/Sources/RSocketCore/Client/ClientConfiguration.swift @@ -57,27 +57,6 @@ public struct ClientConfiguration { } } - /// encoding configuration of metadata and data which is send to the server during setup - public struct Encoding { - - /// default encoding uses `.octetStream` for metadata and data - public static let `default` = Encoding() - - /// MIME Type for encoding of Metadata - public var metadata: MIMEType - - /// MIME Type for encoding of Data - public var data: MIMEType - - public init( - metadata: MIMEType = .default, - data: MIMEType = .default - ) { - self.metadata = metadata - self.data = data - } - } - /// local fragmentation configuration which are **not** send to the server public struct Fragmentation { @@ -109,14 +88,14 @@ public struct ClientConfiguration { public var timeout: Timeout /// encoding configuration of metadata and data which is send to the server during setup - public var encoding: Encoding + public var encoding: ConnectionEncoding /// local fragmentation configuration which are **not** send to the server public var fragmentation: Fragmentation public init( timeout: Timeout, - encoding: Encoding = .default, + encoding: ConnectionEncoding = .default, fragmentation: Fragmentation = .default ) { self.timeout = timeout diff --git a/Sources/RSocketCore/DefaultRSocket.swift b/Sources/RSocketCore/DefaultRSocket.swift index 94200882..87a6235d 100644 --- a/Sources/RSocketCore/DefaultRSocket.swift +++ b/Sources/RSocketCore/DefaultRSocket.swift @@ -29,6 +29,7 @@ fileprivate final class NoOpStream: UnidirectionalStream { /// An RSocket which rejects all incoming requests (requestResponse, stream and channel) and ignores metadataPush and fireAndForget events. internal struct DefaultRSocket: RSocket { + let encoding: ConnectionEncoding func metadataPush(metadata: Data) {} func fireAndForget(payload: Payload) {} func requestResponse(payload: Payload, responderStream: UnidirectionalStream) -> Cancellable { diff --git a/Sources/RSocketCore/Extensions/Coder/Decoder/Decoder.swift b/Sources/RSocketCore/Extensions/Coder/Decoder/Decoder.swift index fb577d97..4e4a1d11 100644 --- a/Sources/RSocketCore/Extensions/Coder/Decoder/Decoder.swift +++ b/Sources/RSocketCore/Extensions/Coder/Decoder/Decoder.swift @@ -38,5 +38,15 @@ public struct Decoder: DecoderProtocol { } } +extension DecoderProtocol where Metadata == Void { + @inlinable + public mutating func decode( + _ payload: Payload, + encoding: ConnectionEncoding + ) throws -> Data { + try decode(payload, encoding: encoding).1 + } +} + /// Namespace for types conforming to the ``DecoderProtocol`` protocol public enum Decoders {} diff --git a/Sources/RSocketCore/Extensions/Coder/Encoder/Encoder.swift b/Sources/RSocketCore/Extensions/Coder/Encoder/Encoder.swift index ada5ddb1..22e50819 100644 --- a/Sources/RSocketCore/Extensions/Coder/Encoder/Encoder.swift +++ b/Sources/RSocketCore/Extensions/Coder/Encoder/Encoder.swift @@ -36,5 +36,15 @@ public struct Encoder: EncoderProtocol { } } +extension EncoderProtocol where Metadata == Void { + @inlinable + public mutating func encode( + _ data: Data, + encoding: ConnectionEncoding + ) throws -> Payload { + try encode(metadata: (), data: data, encoding: encoding) + } +} + /// Namespace for types conforming to the ``EncoderProtocol`` protocol public enum Encoders {} diff --git a/Sources/RSocketCore/Extensions/Coder/Encoder/OctetStreamMetadataEncoder.swift b/Sources/RSocketCore/Extensions/Coder/Encoder/OctetStreamMetadataEncoder.swift new file mode 100644 index 00000000..68a18937 --- /dev/null +++ b/Sources/RSocketCore/Extensions/Coder/Encoder/OctetStreamMetadataEncoder.swift @@ -0,0 +1,39 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation +import NIO + +public struct OctetStreamMetadataEncoder: MetadataEncoder { + public typealias Metadata = Data? + + @inlinable + public init() {} + + @inlinable + public var mimeType: MIMEType { .applicationOctetStream } + + @inlinable + public func encode(_ metadata: Data?, into buffer: inout ByteBuffer) throws { + guard let metadata = metadata else { return } + buffer.writeData(metadata) + } +} + +extension MetadataEncoder where Self == OctetStreamMetadataEncoder { + @inlinable + public static var octetStream: Self { .init() } +} diff --git a/Sources/RSocketCore/Extensions/RootCompositeMetadataDecoder.swift b/Sources/RSocketCore/Extensions/RootCompositeMetadataDecoder.swift index 5fb5e1e7..944ca267 100644 --- a/Sources/RSocketCore/Extensions/RootCompositeMetadataDecoder.swift +++ b/Sources/RSocketCore/Extensions/RootCompositeMetadataDecoder.swift @@ -25,10 +25,10 @@ public struct RootCompositeMetadataDecoder: MetadataDecoder { public var mimeType: MIMEType { .messageXRSocketCompositeMetadataV0 } @usableFromInline - internal let mimeTypeDecoder: MIMETypeEncoder + internal let mimeTypeDecoder: MIMETypeDecoder @inlinable - public init(mimeTypeDecoder: MIMETypeEncoder = MIMETypeEncoder()) { + public init(mimeTypeDecoder: MIMETypeDecoder = MIMETypeDecoder()) { self.mimeTypeDecoder = mimeTypeDecoder } diff --git a/Sources/RSocketCore/Extensions/RoutingDecoder.swift b/Sources/RSocketCore/Extensions/RoutingDecoder.swift index a6421e9f..0a54336b 100644 --- a/Sources/RSocketCore/Extensions/RoutingDecoder.swift +++ b/Sources/RSocketCore/Extensions/RoutingDecoder.swift @@ -18,7 +18,10 @@ import NIO public struct RoutingDecoder: MetadataDecoder { public typealias Metadata = RouteMetadata - + + @inlinable + public init() {} + @inlinable public var mimeType: MIMEType { .messageXRSocketRoutingV0 } diff --git a/Sources/RSocketCore/Extensions/RoutingEncoder.swift b/Sources/RSocketCore/Extensions/RoutingEncoder.swift index 985ac6dd..7dad0ea8 100644 --- a/Sources/RSocketCore/Extensions/RoutingEncoder.swift +++ b/Sources/RSocketCore/Extensions/RoutingEncoder.swift @@ -18,6 +18,9 @@ import NIO public struct RoutingEncoder: MetadataEncoder { public typealias Metadata = RouteMetadata + + @inlinable + public init() {} @inlinable public var mimeType: MIMEType { .messageXRSocketRoutingV0 } diff --git a/Sources/RSocketCore/RSocket.swift b/Sources/RSocketCore/RSocket.swift index bb8d297a..dd004eeb 100644 --- a/Sources/RSocketCore/RSocket.swift +++ b/Sources/RSocketCore/RSocket.swift @@ -17,6 +17,7 @@ import Foundation public protocol RSocket { + var encoding: ConnectionEncoding { get } func metadataPush(metadata: Data) func fireAndForget(payload: Payload) diff --git a/Sources/RSocketCore/RequestDescription.swift b/Sources/RSocketCore/RequestDescription.swift index a4b1df5c..6de34f93 100644 --- a/Sources/RSocketCore/RequestDescription.swift +++ b/Sources/RSocketCore/RequestDescription.swift @@ -14,11 +14,17 @@ * limitations under the License. */ +import Foundation + public struct MetadataPush { public let encoder: AnyMetadataEncoder } extension MetadataPush { + @inlinable + public init() where Metadata == Data? { + self.init(using: OctetStreamMetadataEncoder()) + } @inlinable public init( using metadataEncoder: Encoder @@ -30,7 +36,7 @@ extension MetadataPush { public init( @CompositeMetadataEncoderBuilder _ makeEncoder: () -> Encoder ) where - Encoder: MetadataEncoder, + Encoder: MetadataEncoder, Encoder.Metadata == Metadata { encoder = makeEncoder().eraseToAnyMetadataEncoder() @@ -42,6 +48,12 @@ public struct FireAndForget { } extension FireAndForget { + @inlinable + public init() where Request == Data { + self.init { Encoder() } + } + + @inlinable public init( @EncoderBuilder _ makeEncoder: () -> Encoder ) where @@ -59,6 +71,11 @@ public struct RequestResponse { } extension RequestResponse { + @inlinable + public init() where Request == Data, Response == Data { + self.init { Coder() } + } + @inlinable public init( @CoderBuilder _ makeCoder: () -> Coder @@ -78,6 +95,11 @@ public struct RequestStream { } extension RequestStream { + @inlinable + public init() where Request == Data, Response == Data { + self.init { Coder() } + } + @inlinable public init( @CoderBuilder _ makeCoder: () -> Coder @@ -97,6 +119,11 @@ public struct RequestChannel { } extension RequestChannel { + @inlinable + public init() where Request == Data, Response == Data { + self.init { Coder() } + } + @inlinable public init( @CoderBuilder _ makeCoder: () -> Coder diff --git a/Sources/RSocketCore/Streams/Requester.swift b/Sources/RSocketCore/Streams/Requester.swift index 51cfe3ce..73bb0a4d 100644 --- a/Sources/RSocketCore/Streams/Requester.swift +++ b/Sources/RSocketCore/Streams/Requester.swift @@ -18,6 +18,7 @@ import Foundation import NIO internal final class Requester { + internal let encoding: ConnectionEncoding private let sendFrame: (Frame) -> Void private let eventLoop: EventLoop private var streamIdGenerator: StreamIDGenerator @@ -26,11 +27,13 @@ internal final class Requester { internal init( streamIdGenerator: StreamIDGenerator, + encoding: ConnectionEncoding, eventLoop: EventLoop, sendFrame: @escaping (Frame) -> Void, lateFrameHandler: ((Frame) -> ())? = nil ) { self.streamIdGenerator = streamIdGenerator + self.encoding = encoding self.eventLoop = eventLoop self.sendFrame = sendFrame self.lateFrameHandler = lateFrameHandler diff --git a/Sources/RSocketCore/Streams/Responder.swift b/Sources/RSocketCore/Streams/Responder.swift index 371263ee..b215a1e6 100644 --- a/Sources/RSocketCore/Streams/Responder.swift +++ b/Sources/RSocketCore/Streams/Responder.swift @@ -23,12 +23,12 @@ internal final class Responder { private let eventLoop: EventLoop private let lateFrameHandler: ((Frame) -> ())? internal init( - responderSocket: RSocket? = nil, + responderSocket: RSocket, eventLoop: EventLoop, sendFrame: @escaping (Frame) -> Void, lateFrameHandler: ((Frame) -> ())? = nil ) { - self.responderSocket = responderSocket ?? DefaultRSocket() + self.responderSocket = responderSocket self.sendFrame = sendFrame self.eventLoop = eventLoop self.lateFrameHandler = lateFrameHandler diff --git a/Sources/RSocketNIOChannel/ClientBootstrap.swift b/Sources/RSocketNIOChannel/ClientBootstrap.swift index 46a52e6e..d23cabb0 100644 --- a/Sources/RSocketNIOChannel/ClientBootstrap.swift +++ b/Sources/RSocketNIOChannel/ClientBootstrap.swift @@ -21,7 +21,7 @@ import RSocketCore final public class ClientBootstrap { private let group: EventLoopGroup private let bootstrap: NIO.ClientBootstrap - private let config: ClientConfiguration + public let config: ClientConfiguration private let transport: Transport private let sslContext: NIOSSLContext? diff --git a/Sources/RSocketReactiveSwift/Client/ReactiveSwiftClient.swift b/Sources/RSocketReactiveSwift/Client/ReactiveSwiftClient.swift index a70a8e75..b66a42a2 100644 --- a/Sources/RSocketReactiveSwift/Client/ReactiveSwiftClient.swift +++ b/Sources/RSocketReactiveSwift/Client/ReactiveSwiftClient.swift @@ -20,7 +20,7 @@ import ReactiveSwift public struct ReactiveSwiftClient: Client { private let coreClient: CoreClient - public var requester: RSocketReactiveSwift.RSocket { RequesterAdapter(requester: coreClient.requester) } + public var requester: RSocketReactiveSwift.RequesterRSocket { RequesterAdapter(requester: coreClient.requester) } public init(_ coreClient: CoreClient) { self.coreClient = coreClient @@ -31,10 +31,11 @@ extension ClientBootstrap where Client == CoreClient, Responder == RSocketCore.R public func connect( to endpoint: Transport.Endpoint, payload: Payload = .empty, - responder: RSocketReactiveSwift.RSocket? = nil + responder: RSocketReactiveSwift.ResponderRSocket? = nil ) -> SignalProducer { SignalProducer { observer, lifetime in - let future = connect(to: endpoint, payload: payload, responder: responder?.coreAdapter) + let responder = responder.map { ResponderAdapter(responder: $0, encoding: config.encoding) } + let future = connect(to: endpoint, payload: payload, responder: responder) .map(ReactiveSwiftClient.init) future.whenComplete { result in switch result { @@ -48,9 +49,3 @@ extension ClientBootstrap where Client == CoreClient, Responder == RSocketCore.R } } } - -private extension RSocketReactiveSwift.RSocket { - var coreAdapter: RSocketCore.RSocket { - ResponderAdapter(responder: self) - } -} diff --git a/Sources/RSocketReactiveSwift/Requester.swift b/Sources/RSocketReactiveSwift/Requester.swift index 0e47577c..80de6e53 100644 --- a/Sources/RSocketReactiveSwift/Requester.swift +++ b/Sources/RSocketReactiveSwift/Requester.swift @@ -18,12 +18,78 @@ import ReactiveSwift import RSocketCore import Foundation -internal struct RequesterAdapter: RSocket { +internal struct RequesterAdapter { internal let requester: RSocketCore.RSocket internal init(requester: RSocketCore.RSocket) { self.requester = requester } +} + +extension RequesterAdapter: RequesterRSocket { + func callAsFunction(_ metadataPush: MetadataPush, metadata: Metadata) throws { + let metadata = try metadataPush.encoder.encode(metadata) + self.metadataPush(metadata: metadata) + } + + func callAsFunction(_ fireAndForget: FireAndForget, request: Request) throws { + var encoder = fireAndForget.encoder + let payload = try encoder.encode(request, encoding: encoding) + self.fireAndForget(payload: payload) + } + + func callAsFunction( + _ requestResponse: RequestResponse, + request: Request + ) -> SignalProducer { + SignalProducer { () throws -> SignalProducer in + var encoder = requestResponse.encoder + var decoder = requestResponse.decoder + let payload = try encoder.encode(request, encoding: encoding) + return self.requestResponse(payload: payload).attemptMap { response in + try decoder.decode(response, encoding: encoding) + } + }.flatten(.latest) + } + + func callAsFunction( + _ requestStream: RequestStream, + request: Request + ) -> SignalProducer { + SignalProducer { () throws -> SignalProducer in + var encoder = requestStream.encoder + var decoder = requestStream.decoder + let payload = try encoder.encode(request, encoding: encoding) + return self.requestStream(payload: payload).attemptMap { response in + try decoder.decode(response, encoding: encoding) + } + }.flatten(.latest) + } + + func callAsFunction( + _ requestChannel: RequestChannel, + initialRequest: Request, + producer: SignalProducer? + ) -> SignalProducer { + SignalProducer { () throws -> SignalProducer in + var encoder = requestChannel.encoder + var decoder = requestChannel.decoder + let payload = try encoder.encode(initialRequest, encoding: encoding) + let payloadProducer = producer?.attemptMap { data in + try encoder.encode(data, encoding: encoding) + } + return self.requestChannel(payload: payload, payloadProducer: payloadProducer).attemptMap{ response in + try decoder.decode(response, encoding: encoding) + } + }.flatten(.latest) + } +} + + +extension RequesterAdapter { + internal var encoding: ConnectionEncoding { + requester.encoding + } internal func metadataPush(metadata: Data) { requester.metadataPush(metadata: metadata) diff --git a/Sources/RSocketReactiveSwift/RequesterRSocket.swift b/Sources/RSocketReactiveSwift/RequesterRSocket.swift new file mode 100644 index 00000000..3fb5c49e --- /dev/null +++ b/Sources/RSocketReactiveSwift/RequesterRSocket.swift @@ -0,0 +1,40 @@ +/* + * Copyright 2015-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import ReactiveSwift +import RSocketCore + +public protocol RequesterRSocket { + func callAsFunction(_ metadataPush: MetadataPush, metadata: Metadata) throws + + func callAsFunction(_ fireAndForget: FireAndForget, request: Request) throws + + func callAsFunction( + _ requestResponse: RequestResponse, + request: Request + ) -> SignalProducer + + func callAsFunction( + _ requestStream: RequestStream, + request: Request + ) -> SignalProducer + + func callAsFunction( + _ requestChannel: RequestChannel, + initialRequest: Request, + producer: SignalProducer? + ) -> SignalProducer +} diff --git a/Sources/RSocketReactiveSwift/Responder.swift b/Sources/RSocketReactiveSwift/Responder.swift index 769f58dc..c201532b 100644 --- a/Sources/RSocketReactiveSwift/Responder.swift +++ b/Sources/RSocketReactiveSwift/Responder.swift @@ -18,11 +18,14 @@ import ReactiveSwift import RSocketCore import Foundation -internal struct ResponderAdapter: RSocketCore.RSocket { - private let responder: RSocket +internal struct ResponderAdapter: RSocketCore.RSocket { + private let responder: ResponderRSocket + + internal var encoding: ConnectionEncoding - internal init(responder: RSocket) { + internal init(responder: ResponderRSocket, encoding: ConnectionEncoding) { self.responder = responder + self.encoding = encoding } func metadataPush(metadata: Data) { diff --git a/Sources/RSocketReactiveSwift/RSocket.swift b/Sources/RSocketReactiveSwift/ResponderRSocket.swift similarity index 97% rename from Sources/RSocketReactiveSwift/RSocket.swift rename to Sources/RSocketReactiveSwift/ResponderRSocket.swift index f570d5de..200fe7f9 100644 --- a/Sources/RSocketReactiveSwift/RSocket.swift +++ b/Sources/RSocketReactiveSwift/ResponderRSocket.swift @@ -19,7 +19,7 @@ import ReactiveSwift import RSocketCore import Foundation -public protocol RSocket { +public protocol ResponderRSocket { func metadataPush(metadata: Data) func fireAndForget(payload: Payload) func requestResponse(payload: Payload) -> SignalProducer diff --git a/Sources/RSocketTSChannel/ClientBootstrap.swift b/Sources/RSocketTSChannel/ClientBootstrap.swift index b4b036c9..b47c424c 100644 --- a/Sources/RSocketTSChannel/ClientBootstrap.swift +++ b/Sources/RSocketTSChannel/ClientBootstrap.swift @@ -25,7 +25,7 @@ import RSocketCore final public class ClientBootstrap { private let group = NIOTSEventLoopGroup() private let bootstrap: NIOTSConnectionBootstrap - private let config: ClientConfiguration + public let config: ClientConfiguration private let transport: Transport private let tlsOptions: NWProtocolTLS.Options? public init( diff --git a/Sources/RSocketTestUtilities/TestRSocket.swift b/Sources/RSocketTestUtilities/TestRSocket.swift index 8591ec2f..38e5d4f9 100644 --- a/Sources/RSocketTestUtilities/TestRSocket.swift +++ b/Sources/RSocketTestUtilities/TestRSocket.swift @@ -19,6 +19,7 @@ import NIO import RSocketCore public final class TestRSocket: RSocket { + public var encoding: ConnectionEncoding public var metadataPush: ((Data) -> ())? = nil public var fireAndForget: ((_ payload: Payload) -> ())? = nil public var requestResponse: ((_ payload: Payload, _ responderOutput: UnidirectionalStream) -> Cancellable)? = nil @@ -34,9 +35,11 @@ public final class TestRSocket: RSocket { requestResponse: ((Payload, UnidirectionalStream) -> Cancellable)? = nil, stream: ((Payload, Int32, UnidirectionalStream) -> Subscription)? = nil, channel: ((Payload, Int32, Bool, UnidirectionalStream) -> UnidirectionalStream)? = nil, + encoding: ConnectionEncoding = .default, file: StaticString = #file, line: UInt = #line ) { + self.encoding = encoding self.metadataPush = metadataPush self.fireAndForget = fireAndForget self.requestResponse = requestResponse diff --git a/Tests/RSocketCoreTests/EndToEndTests.swift b/Tests/RSocketCoreTests/EndToEndTests.swift index ae45e5b8..c3ac16ee 100644 --- a/Tests/RSocketCoreTests/EndToEndTests.swift +++ b/Tests/RSocketCoreTests/EndToEndTests.swift @@ -157,8 +157,8 @@ class EndToEndTests: XCTestCase { let server = makeServerBootstrap(shouldAcceptClient: { clientInfo in XCTAssertEqual(clientInfo.timeBetweenKeepaliveFrames, Int32(setup.timeout.timeBetweenKeepaliveFrames)) XCTAssertEqual(clientInfo.maxLifetime, Int32(setup.timeout.maxLifetime)) - XCTAssertEqual(clientInfo.metadataEncodingMimeType, setup.encoding.metadata.rawValue) - XCTAssertEqual(clientInfo.dataEncodingMimeType, setup.encoding.data.rawValue) + XCTAssertEqual(clientInfo.encoding.metadata, setup.encoding.metadata) + XCTAssertEqual(clientInfo.encoding.data, setup.encoding.data) clientDidConnect.fulfill() return .accept }) diff --git a/Tests/RSocketReactiveSwiftTests/RSocketReactiveSwiftTests.swift b/Tests/RSocketReactiveSwiftTests/RSocketReactiveSwiftTests.swift index 94d3a7db..3f9dc9f7 100644 --- a/Tests/RSocketReactiveSwiftTests/RSocketReactiveSwiftTests.swift +++ b/Tests/RSocketReactiveSwiftTests/RSocketReactiveSwiftTests.swift @@ -20,13 +20,20 @@ import RSocketCore import RSocketTestUtilities @testable import RSocketReactiveSwift +extension Data: ExpressibleByStringLiteral { + public init(stringLiteral value: String) { + self.init(value.utf8) + } +} + func setup( - server: RSocketReactiveSwift.RSocket? = nil, - client: RSocketReactiveSwift.RSocket? = nil + server: RSocketReactiveSwift.ResponderRSocket? = nil, + client: RSocketReactiveSwift.ResponderRSocket? = nil ) -> (server: ReactiveSwiftClient, client: ReactiveSwiftClient) { let (server, client) = TestDemultiplexer.pipe( - serverResponder: server.map(ResponderAdapter.init(responder:)), - clientResponder: client.map(ResponderAdapter.init(responder:))) + serverResponder: server.map { ResponderAdapter(responder:$0, encoding: .default) }, + clientResponder: client.map { ResponderAdapter(responder:$0, encoding: .default) } + ) return ( ReactiveSwiftClient(CoreClient(requester: server.requester)), ReactiveSwiftClient(CoreClient(requester: client.requester)) @@ -34,25 +41,25 @@ func setup( } final class RSocketReactiveSwiftTests: XCTestCase { - func testMetadataPush() { - let metadata = Data(String("Hello World").utf8) + func testMetadataPush() throws { + let metadata: Data = "Hello World" let didReceiveRequest = expectation(description: "did receive request") let serverResponder = TestRSocket(metadataPush: { data in didReceiveRequest.fulfill() XCTAssertEqual(data, metadata) }) let (_, client) = setup(server: serverResponder) - client.requester.metadataPush(metadata: metadata) + try client.requester(MetadataPush(), metadata: metadata) self.wait(for: [didReceiveRequest], timeout: 0.1) } - func testFireAndForget() { + func testFireAndForget() throws { let didReceiveRequest = expectation(description: "did receive request") let serverResponder = TestRSocket(fireAndForget: { payload in didReceiveRequest.fulfill() XCTAssertEqual(payload, "Hello World") }) let (_, client) = setup(server: serverResponder) - client.requester.fireAndForget(payload: "Hello World") + try client.requester(FireAndForget(), request: "Hello World") self.wait(for: [didReceiveRequest], timeout: 0.1) } func testRequestResponse() { @@ -69,7 +76,10 @@ final class RSocketReactiveSwiftTests: XCTestCase { } }) let (_, client) = setup(server: serverResponder) - let disposable = client.requester.requestResponse(payload: "Hello World").startWithSignal { signal, _ in + let disposable = client.requester( + RequestResponse(), + request: "Hello World" + ).startWithSignal { signal, _ in signal.flatMapError({ error in XCTFail("\(error)") return .empty @@ -95,7 +105,10 @@ final class RSocketReactiveSwiftTests: XCTestCase { } }) let (_, client) = setup(server: serverResponder) - let disposable = client.requester.requestResponse(payload: "Hello World").startWithSignal { signal, _ in + let disposable = client.requester( + RequestResponse(), + request: "Hello World" + ).startWithSignal { signal, _ in signal.flatMapError({ error in XCTFail("\(error)") return .empty @@ -126,7 +139,7 @@ final class RSocketReactiveSwiftTests: XCTestCase { } }) let (_, client) = setup(server: serverResponder) - let disposable = client.requester.requestStream(payload: "Hello World").startWithSignal { signal, _ in + let disposable = client.requester(RequestStream(), request: "Hello World").startWithSignal { signal, _ in signal.flatMapError({ error in XCTFail("\(error)") return .empty @@ -172,7 +185,7 @@ final class RSocketReactiveSwiftTests: XCTestCase { } }) let (_, client) = setup(server: serverResponder) - let disposable = client.requester.requestChannel(payload: "Hello Responder", payloadProducer: .init({ observer, _ in + let disposable = client.requester(RequestChannel(), initialRequest: "Hello Responder", producer: .init({ observer, _ in requesterDidSendChannelMessages.fulfill() observer.send(value: "Hello") observer.send(value: "from") @@ -216,9 +229,9 @@ final class RSocketReactiveSwiftTests: XCTestCase { } }) let (_, client) = setup(server: serverResponder) - let disposable = client.requester.requestResponse(payload: "Hello World").startWithSignal { signal, _ -> Disposable? in + let disposable = client.requester(RequestResponse(), request: "Hello World").startWithSignal { signal, _ -> Disposable? in didStartRequestSignal.fulfill() - return signal.flatMapError({ error -> Signal in + return signal.flatMapError({ error -> Signal in XCTFail("\(error)") return .empty }).materialize().collect().observeValues { values in @@ -245,9 +258,9 @@ final class RSocketReactiveSwiftTests: XCTestCase { } }) let (_, client) = setup(server: serverResponder) - let disposable = client.requester.requestStream(payload: "Hello World").startWithSignal { signal, _ -> Disposable? in + let disposable = client.requester(RequestStream(), request: "Hello World").startWithSignal { signal, _ -> Disposable? in didStartRequestSignal.fulfill() - return signal.flatMapError({ error -> Signal in + return signal.flatMapError({ error -> Signal in XCTFail("\(error)") return .empty }).materialize().collect().observeValues { values in @@ -290,7 +303,7 @@ final class RSocketReactiveSwiftTests: XCTestCase { let requesterDidStartListeningChannelMessages = expectation(description: "responder did start listening to channel messages") let payloadProducerLifetimeEnded = expectation(description: "payload producer lifetime ended") let requesterDidStartPayloadProducer = expectation(description: "requester did start payload producer") - let disposable = client.requester.requestChannel(payload: "Hello", payloadProducer: .init({ observer, lifetime in + let disposable = client.requester(RequestChannel(), initialRequest: "Hello", producer: .init({ observer, lifetime in requesterDidStartPayloadProducer.fulfill() lifetime.observeEnded { _ = observer @@ -298,7 +311,7 @@ final class RSocketReactiveSwiftTests: XCTestCase { } })).startWithSignal { signal, _ -> Disposable? in requesterDidStartListeningChannelMessages.fulfill() - return signal.flatMapError({ error -> Signal in + return signal.flatMapError({ error -> Signal in XCTFail("\(error)") return .empty }).materialize().collect().observeValues { values in diff --git a/Tests/RSocketReactiveSwiftTests/TestDemultiplexer.swift b/Tests/RSocketReactiveSwiftTests/TestDemultiplexer.swift index a9718ae6..f6b7b007 100644 --- a/Tests/RSocketReactiveSwiftTests/TestDemultiplexer.swift +++ b/Tests/RSocketReactiveSwiftTests/TestDemultiplexer.swift @@ -48,11 +48,13 @@ extension TestDemultiplexer { serverResponder: RSocketCore.RSocket?, clientResponder: RSocketCore.RSocket? ) -> (server: TestDemultiplexer, client: TestDemultiplexer) { + let serverResponder = serverResponder ?? DefaultRSocket(encoding: .default) + let clientResponder = clientResponder ?? DefaultRSocket(encoding: .default) var client: TestDemultiplexer! let eventLoop = EmbeddedEventLoop() let server = TestDemultiplexer( connectionSide: .server, - requester: .init(streamIdGenerator: .server, eventLoop: eventLoop, sendFrame: { frame in + requester: .init(streamIdGenerator: .server, encoding: .default, eventLoop: eventLoop, sendFrame: { frame in client.receiveFrame(frame: frame) }), responder: .init(responderSocket: serverResponder, eventLoop: eventLoop, sendFrame: { frame in @@ -60,7 +62,7 @@ extension TestDemultiplexer { })) client = TestDemultiplexer( connectionSide: .client, - requester: .init(streamIdGenerator: .client, eventLoop: eventLoop, sendFrame: { frame in + requester: .init(streamIdGenerator: .client, encoding: .default, eventLoop: eventLoop, sendFrame: { frame in server.receiveFrame(frame: frame) }), responder: .init(responderSocket: clientResponder, eventLoop: eventLoop, sendFrame: { frame in diff --git a/Tests/RSocketReactiveSwiftTests/TestRSocket.swift b/Tests/RSocketReactiveSwiftTests/TestRSocket.swift index dffb9edc..cff46bba 100644 --- a/Tests/RSocketReactiveSwiftTests/TestRSocket.swift +++ b/Tests/RSocketReactiveSwiftTests/TestRSocket.swift @@ -19,7 +19,7 @@ import ReactiveSwift import Foundation import RSocketReactiveSwift -final class TestRSocket: RSocketReactiveSwift.RSocket { +final class TestRSocket: RSocketReactiveSwift.ResponderRSocket { var metadataPushCallback: (Data) -> () var fireAndForgetCallback: (Payload) -> () var requestResponseCallback: (Payload) -> SignalProducer From 66f4fab411447f5ee237973f289b607ba5a222f7 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Wed, 4 Aug 2021 23:10:07 +0200 Subject: [PATCH 22/22] implement new requester API for async/await Signed-off-by: David Nadoba --- .../Examples/AsyncTwitterClient/main.swift | 38 +++++----- Sources/RSocketAsync/Client.swift | 2 +- Sources/RSocketAsync/RSocket.swift | 2 +- Sources/RSocketAsync/Requester.swift | 74 +++++++++++++++++-- Sources/RSocketAsync/Responder.swift | 1 + .../Coder/Decoder/MultiDataDecoder.swift | 6 +- .../Coder/Encoder/DataEncoder.swift | 6 +- .../Coder/Encoder/MetadataEncoder.swift | 2 +- 8 files changed, 93 insertions(+), 38 deletions(-) diff --git a/Sources/Examples/AsyncTwitterClient/main.swift b/Sources/Examples/AsyncTwitterClient/main.swift index 0eddeea4..62368180 100644 --- a/Sources/Examples/AsyncTwitterClient/main.swift +++ b/Sources/Examples/AsyncTwitterClient/main.swift @@ -5,15 +5,16 @@ import NIO import RSocketAsync import RSocketCore import RSocketNIOChannel -import RSocketReactiveSwift import RSocketWSTransport -func route(_ route: String) -> Data { - let encodedRoute = Data(route.utf8) - precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded") - let encodedRouteLength = Data([UInt8(encodedRoute.count)]) - - return encodedRouteLength + encodedRoute +struct Tweet: Decodable { + struct User: Decodable { + let screen_name, name: String + let followers_count: Int + } + let user: User + let text: String + let reply_count, retweet_count, favorite_count: Int } extension URL: ExpressibleByArgument { @@ -32,7 +33,7 @@ struct TwitterClientExample: ParsableCommand { ) @Argument(help: "used to find tweets that match the given string") - var searchString = "spring" + var searchString = "swift" @Option var url = URL(string: "wss://demo.rsocket.io/rsocket")! @@ -59,16 +60,19 @@ struct TwitterClientExample: ParsableCommand { ) let client = try await bootstrap.connect(to: .init(url: url), payload: .empty) - let stream = client.requester.requestStream(payload: Payload( - metadata: route("searchTweets"), - data: Data(searchString.utf8) - )) + let stream = try client.requester( + RequestStream { + Encoder() + .encodeStaticMetadata("searchTweets", using: .routing) + .mapData { (string: String) in Data(string.utf8) } + Decoder() + .decodeData(using: JSONDataDecoder(type: Tweet.self)) + }, + request: searchString + ) - for try await payload in stream.prefix(limit) { - let json = try JSONSerialization.jsonObject(with: payload.data, options: []) - let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted]) - let string = String(decoding: data, as: UTF8.self) - print(string) + for try await tweet in stream.prefix(limit) { + dump(tweet) } } } diff --git a/Sources/RSocketAsync/Client.swift b/Sources/RSocketAsync/Client.swift index fb2d9b7c..65ed51e7 100644 --- a/Sources/RSocketAsync/Client.swift +++ b/Sources/RSocketAsync/Client.swift @@ -21,7 +21,7 @@ import RSocketCore public struct AsyncClient { private let coreClient: RSocketCore.CoreClient - public var requester: RSocket { RequesterAdapter(requester: coreClient.requester) } + public var requester: RequesterRSocket { RequesterRSocket(requester: coreClient.requester) } public init(_ coreClient: RSocketCore.CoreClient) { self.coreClient = coreClient diff --git a/Sources/RSocketAsync/RSocket.swift b/Sources/RSocketAsync/RSocket.swift index 76627f9a..01894796 100644 --- a/Sources/RSocketAsync/RSocket.swift +++ b/Sources/RSocketAsync/RSocket.swift @@ -26,7 +26,7 @@ public protocol RSocket { func requestStream(payload: Payload) -> AsyncThrowingStream func requestChannel( initialPayload: Payload, - payloadStream: PayloadSequence + payloadStream: PayloadSequence? ) -> AsyncThrowingStream where PayloadSequence: AsyncSequence, PayloadSequence.Element == Payload } diff --git a/Sources/RSocketAsync/Requester.swift b/Sources/RSocketAsync/Requester.swift index d35aed46..04b58e52 100644 --- a/Sources/RSocketAsync/Requester.swift +++ b/Sources/RSocketAsync/Requester.swift @@ -21,18 +21,75 @@ import RSocketCore import _NIOConcurrency @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) -public struct RequesterAdapter: RSocket { +extension RequesterRSocket { + public func callAsFunction(_ metadataPush: MetadataPush, metadata: Metadata) throws { + self.metadataPush(metadata: try metadataPush.encoder.encode(metadata)) + } + + public func callAsFunction(_ fireAndForget: FireAndForget, request: Request) throws { + var encoder = fireAndForget.encoder + self.fireAndForget(payload: try encoder.encode(request, encoding: encoding)) + } + + public func callAsFunction( + _ requestResponse: RequestResponse, + request: Request + ) async throws -> Response { + var encoder = requestResponse.encoder + let response = try await self.requestResponse(payload: encoder.encode(request, encoding: encoding)) + var decoder = requestResponse.decoder + return try decoder.decode(response, encoding: encoding) + } + + public func callAsFunction( + _ requestStream: RequestStream, + request: Request + ) throws -> AsyncThrowingMapSequence, Response> { + /// TODO: this method should not throw but rather the async sequence should throw an error + /// TODO: result type of this method should be an opaque result type with where clause (e.g. `some AsyncSequence where _.Element == Response`) once they are available in Swift + var encoder = requestStream.encoder + var decoder = requestStream.decoder + let a = self.requestStream(payload: try encoder.encode(request, encoding: encoding)).map { response throws -> Response in + try decoder.decode(response, encoding: encoding) + } + return a + } + + public func callAsFunction( + _ requestChannel: RequestChannel, + initialRequest: Request, + producer: Producer? + ) throws -> AsyncThrowingMapSequence, Response> + where Producer: AsyncSequence, Producer.Element == Request { + /// TODO: this method should not throw but rather the async sequence should throw an error + /// TODO: result type of this method should be an opaque result type with where clause (e.g. `some AsyncSequence where _.Element == Response`) once they are available in Swift + var encoder = requestChannel.encoder + var decoder = requestChannel.decoder + + return self.requestChannel( + initialPayload: try encoder.encode(initialRequest, encoding: encoding), + payloadStream: producer?.map { try encoder.encode($0, encoding: encoding) } + ).map { + try decoder.decode($0, encoding: encoding) + } + } +} + +@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) +public struct RequesterRSocket { private let requester: RSocketCore.RSocket + + internal var encoding: ConnectionEncoding { requester.encoding } public init(requester: RSocketCore.RSocket) { self.requester = requester } - public func metadataPush(metadata: Data) { + internal func metadataPush(metadata: Data) { requester.metadataPush(metadata: metadata) } - public func fireAndForget(payload: Payload) { + internal func fireAndForget(payload: Payload) { requester.fireAndForget(payload: payload) } - public func requestResponse(payload: Payload) async throws -> Payload { + internal func requestResponse(payload: Payload) async throws -> Payload { struct RequestResponseOperator: UnidirectionalStream { var continuation: CheckedContinuation func onNext(_ payload: Payload, isCompletion: Bool) { @@ -68,7 +125,7 @@ public struct RequesterAdapter: RSocket { } } - public func requestStream(payload: Payload) -> AsyncThrowingStream { + internal func requestStream(payload: Payload) -> AsyncThrowingStream { AsyncThrowingStream(Payload.self, bufferingPolicy: .unbounded) { continuation in let adapter = AsyncStreamAdapter(continuation: continuation) let subscription = requester.stream(payload: payload, initialRequestN: .max, responderStream: adapter) @@ -84,20 +141,21 @@ public struct RequesterAdapter: RSocket { } } - public func requestChannel( + internal func requestChannel( initialPayload: Payload, - payloadStream: PayloadSequence + payloadStream: PayloadSequence? ) -> AsyncThrowingStream where PayloadSequence: AsyncSequence, PayloadSequence.Element == Payload { AsyncThrowingStream(Payload.self, bufferingPolicy: .unbounded) { continuation in let adapter = AsyncStreamAdapter(continuation: continuation) let channel = requester.channel( payload: initialPayload, initialRequestN: .max, - isCompleted: false, + isCompleted: payloadStream == nil, responderStream: adapter ) let task = Task.detached { + guard let payloadStream = payloadStream else { return } do { for try await payload in payloadStream { channel.onNext(payload, isCompletion: false) diff --git a/Sources/RSocketAsync/Responder.swift b/Sources/RSocketAsync/Responder.swift index 45d21bef..fa724ee2 100644 --- a/Sources/RSocketAsync/Responder.swift +++ b/Sources/RSocketAsync/Responder.swift @@ -21,6 +21,7 @@ import Foundation @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) struct ResponderAdapter: RSocketCore.RSocket { var responder: RSocket + let encoding: ConnectionEncoding func metadataPush(metadata: Data) { responder.metadataPush(metadata: metadata) diff --git a/Sources/RSocketCore/Extensions/Coder/Decoder/MultiDataDecoder.swift b/Sources/RSocketCore/Extensions/Coder/Decoder/MultiDataDecoder.swift index b3755278..321f3fe1 100644 --- a/Sources/RSocketCore/Extensions/Coder/Decoder/MultiDataDecoder.swift +++ b/Sources/RSocketCore/Extensions/Coder/Decoder/MultiDataDecoder.swift @@ -38,11 +38,7 @@ extension MultiDataDecoderProtocol { @inlinable internal func decodeMIMEType(_ mimeType: MIMEType, from data: Foundation.Data) throws -> Data { var buffer = ByteBuffer(data: data) - let data = try self.decodeMIMEType(mimeType, from: &buffer) - guard buffer.readableBytes == 0 else { - throw Error.invalid(message: "\(Decoder.self) did not read all bytes") - } - return data + return try self.decodeMIMEType(mimeType, from: &buffer) } } diff --git a/Sources/RSocketCore/Extensions/Coder/Encoder/DataEncoder.swift b/Sources/RSocketCore/Extensions/Coder/Encoder/DataEncoder.swift index 78dc0c58..6ee2da2b 100644 --- a/Sources/RSocketCore/Extensions/Coder/Encoder/DataEncoder.swift +++ b/Sources/RSocketCore/Extensions/Coder/Encoder/DataEncoder.swift @@ -39,11 +39,7 @@ extension DataDecoderProtocol { @inlinable internal func decode(from data: Foundation.Data) throws -> Data { var buffer = ByteBuffer(data: data) - let data = try self.decode(from: &buffer) - guard buffer.readableBytes == 0 else { - throw Error.invalid(message: "\(Decoder.self) did not read all bytes") - } - return data + return try self.decode(from: &buffer) } } diff --git a/Sources/RSocketCore/Extensions/Coder/Encoder/MetadataEncoder.swift b/Sources/RSocketCore/Extensions/Coder/Encoder/MetadataEncoder.swift index 98347198..9b6e2844 100644 --- a/Sources/RSocketCore/Extensions/Coder/Encoder/MetadataEncoder.swift +++ b/Sources/RSocketCore/Extensions/Coder/Encoder/MetadataEncoder.swift @@ -29,7 +29,7 @@ extension MetadataDecoder { var buffer = ByteBuffer(data: data) let metadata = try self.decode(from: &buffer) guard buffer.readableBytes == 0 else { - throw Error.invalid(message: "\(Decoder.self) did not read all bytes") + throw Error.invalid(message: "\(Self.self) did not read all bytes") } return metadata }