diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift b/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift index aad956734..166a79e16 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift @@ -18,6 +18,7 @@ import class Foundation.RunLoop import class Foundation.Timer import struct Foundation.TimeInterval import protocol Foundation.StreamDelegate +import class Foundation.DispatchWorkItem /// Reads data from a smithy-swift native `ReadableStream` and streams the data through to a Foundation `InputStream`. /// @@ -38,12 +39,17 @@ class FoundationStreamBridge: NSObject, StreamDelegate { /// A buffer to hold data that has been read from the `ReadableStream` but not yet written to the /// Foundation `OutputStream`. At most, it will contain `bridgeBufferSize` bytes. + /// + /// Only access this buffer from the serial queue. private var buffer: Data /// The `ReadableStream` that will serve as the input to this bridge. + /// /// The bridge will read bytes from this stream and dump them to the Foundation stream /// pair as they become available. - let readableStream: ReadableStream + /// + /// Only access this stream from the serial queue. + var readableStream: ReadableStream /// A Foundation stream that will carry the bytes read from the readableStream as they become available. /// @@ -95,7 +101,7 @@ class FoundationStreamBridge: NSObject, StreamDelegate { /// `true` if the readable stream has been closed, `false` otherwise. Will be flipped to `true` once the readable stream is read, /// and `nil` is returned. /// - /// Access this variable only during a write operation to ensure exclusive access. + /// Only access this variable from the serial queue. private var readableStreamIsClosed = false // MARK: - init & deinit @@ -126,20 +132,30 @@ class FoundationStreamBridge: NSObject, StreamDelegate { (inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: self.boundStreamBufferSize, queue: queue) } - func replaceStreams(completion: @escaping (InputStream?) -> Void) async { - // Close the current output stream, since it will accept no more data and is about to - // be replaced. - await close() + func replaceStreams(completion: @escaping (InputStream?) -> Void) { + queue.async { [self] in + // Close the current output stream, since it will accept no more data and is about to + // be replaced. + outputStream.close() + outputStream.delegate = nil - // Replace the bound stream pair with new bound streams. - (inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue) + // Replace the bound stream pair with new bound streams. + (inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue) - // Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`, - // the completion block will be that method's completion handler. - completion(inputStream) + if readableStream.isSeekable { + try? readableStream.seek(toOffset: 0) + } else if let bufferedStream = readableStream as? BufferedStream { + readableStream = BufferedStream(data: bufferedStream.originalData, isClosed: readableStreamIsClosed) + } + + // Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`, + // the completion block will be that method's completion handler. + completion(inputStream) - // Re-open the `OutputStream` for writing. - await open() + // Re-open the `OutputStream` for writing. + outputStream.delegate = self + outputStream.open() + } } private static func makeStreams(boundStreamBufferSize: Int, queue: DispatchQueue) -> (InputStream, OutputStream) { @@ -182,6 +198,9 @@ class FoundationStreamBridge: NSObject, StreamDelegate { continuation.resume() } } + if readableStreamTask == nil { + readableStreamTask = Task { try await readFromReadableStream() } + } } /// Close the output stream and unschedule this bridge from receiving stream delegate callbacks. @@ -200,12 +219,23 @@ class FoundationStreamBridge: NSObject, StreamDelegate { // MARK: - Writing to bridge + var readableStreamTask: Task? + /// Writes buffered data to the output stream. /// If the buffer is empty, the `ReadableStream` will be read first to replenish the buffer. /// /// If the buffer is empty and the readable stream is closed, there is no more data to bridge, and the output stream is closed. - private func writeToOutput() async throws { - + private func readFromReadableStream() async throws { + replaceWatchdogWorkItem() + + var keepGoing = true + while keepGoing { + let data = try await readableStream.readAsync(upToCount: bridgeBufferSize) + if Task.isCancelled { keepGoing = false; continue } + let streamError = await writeToOutputStream(data: data) + if let streamError { throw streamError } + if data == nil || Task.isCancelled { keepGoing = false } + } // Perform the write on the `WriteCoordinator` to ensure that writes happen in-order // and one at a time. // @@ -213,42 +243,43 @@ class FoundationStreamBridge: NSObject, StreamDelegate { // from inside the block passed to `perform()` because this is the only place // these instance vars are accessed, and the code in the `perform()` block runs // in series with any other calls to `perform()`. - try await writeCoordinator.perform { [self] in +// try await writeCoordinator.perform { [self] in - // If there is no data in the buffer and the `ReadableStream` is still open, - // attempt to read the stream. Otherwise, skip reading the `ReadableStream` and + // Attempt to read the stream. Otherwise, skip reading the `ReadableStream` and // write what's in the buffer immediately. - if !readableStreamIsClosed && buffer.isEmpty { - if let newData = try await readableStream.readAsync(upToCount: bridgeBufferSize - buffer.count) { - buffer.append(newData) - } else { - readableStreamIsClosed = true - } - } +// let data = try await readableStream.readAsync(upToCount: bridgeBufferSize - buffer.count) // Write the previously buffered data and/or newly read data, if any, to the Foundation `OutputStream`. // Capture the error from the stream write, if any. - var streamError: Error? - if !buffer.isEmpty { - streamError = await writeToOutputStream() - } - - // If the readable stream has closed and there is no data in the buffer, - // there is nothing left to forward to the output stream, so close it. - if readableStreamIsClosed && buffer.isEmpty { - await close() - } +// let streamError = await writeToOutputStream(data: data) // If the output stream write produced an error, throw it now, else just return. - if let streamError { throw streamError } +// if let streamError { throw streamError } +// } + } + + var watchdogWorkItem: DispatchWorkItem? + + func newWatchdogWorkItem() -> DispatchWorkItem { + return DispatchWorkItem { [weak self] in + _ = self?.writeToOutputOnQueue() } } + func replaceWatchdogWorkItem() { + watchdogWorkItem?.cancel() + let newWorkItem = newWatchdogWorkItem() + queue.asyncAfter(deadline: .now().advanced(by: .milliseconds(100)), execute: newWorkItem) + watchdogWorkItem = newWorkItem + } + /// Using the output stream's callback queue, write the buffered data to the Foundation `OutputStream`. - /// + /// /// After writing, remove the written data from the buffer. /// - Returns: The error resulting from the write to the Foundation `OutputStream`, or `nil` if no error occurred. - private func writeToOutputStream() async -> Error? { + /// - Parameters: + /// - data: The data that was read from the readable stream, or `nil` if the readable stream is closed. + private func writeToOutputStream(data: Data?) async -> Error? { // Suspend the caller while the write is performed on the Foundation `OutputStream`'s queue. await withCheckedContinuation { continuation in @@ -256,27 +287,72 @@ class FoundationStreamBridge: NSObject, StreamDelegate { // Perform the write to the Foundation `OutputStream` on its queue. queue.async { [self] in - // Write to the output stream. It may not accept all data, so get the number of bytes - // it accepted in `writeCount`. - var writeCount = 0 - buffer.withUnsafeBytes { bufferPtr in - guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return } - writeCount = outputStream.write(bytePtr, maxLength: buffer.count) + // Add the read data to the buffer, or set the stream closed flag if necessary. + if let data { + buffer.append(data) + } else { + readableStreamIsClosed = true } - // `writeCount` will be a positive number if bytes were written. - // Remove the written bytes from the front of the buffer. - if writeCount > 0 { - logger.info("FoundationStreamBridge: wrote \(writeCount) bytes to request body") - buffer.removeFirst(writeCount) - } + // Attempt to write buffered data to the output stream. + let error = writeToOutputOnQueue() // Resume the caller now that the write is complete, returning the stream error, if any. - continuation.resume(returning: outputStream.streamError) + continuation.resume(returning: error) } } } + /// Writes the contents of the buffer to the Foundation output stream. + /// + /// If the buffer is emptied by the write, attempt to read more from the `ReadableStream`. + /// + /// If the buffer is empty and the `ReadableStream` is closed, close the Foundation output stream. + /// - Returns: <#description#> + private func writeToOutputOnQueue() -> Error? { + // Call this function only from the output stream's serial queue. + // + // If there are any bytes to be written currently in the buffer, then write them to the output stream. + // It may not accept all data, so get the number of bytes it accepted in `writeCount`. + let bufferCount = buffer.count + if bufferCount > 0 && outputStream.hasSpaceAvailable { + var writeCount = 0 + buffer.withUnsafeBytes { bufferPtr in + guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return } + writeCount = outputStream.write(bytePtr, maxLength: bufferCount) + } + + // `writeCount` will be a positive number if bytes were written. + // Remove the written bytes from the front of the buffer. + if writeCount > 0 { + logger.info("FoundationStreamBridge: wrote \(writeCount) bytes to request body") + buffer.removeFirst(writeCount) + } + } + + // If the buffer is empty after the write, either read more data from the readable stream, + // or if the readable stream is closed, close the output stream. + // + // If the buffer is not empty after writing, then take no action. The output stream will post a + // `.hasSpaceAvailable` event once it can accept more data. + if buffer.isEmpty { + if readableStreamIsClosed { + // Close the output stream and unschedule this bridge from receiving stream delegate callbacks. + self.outputStream.close() + self.outputStream.delegate = nil + } else { + // If the buffer is now emptied and the readable stream has not already been closed, + // try to read from the readable stream again. +// Task { try await writeToOutput() } + } + } + + replaceWatchdogWorkItem() + + // Return any stream error to the caller. + return outputStream.streamError + } + // MARK: - StreamDelegate protocol /// The stream places this callback when an event happens. @@ -298,10 +374,8 @@ class FoundationStreamBridge: NSObject, StreamDelegate { case .hasBytesAvailable: break case .hasSpaceAvailable: - // Since space is available, try and read from the ReadableStream and - // transfer the data to the Foundation stream pair. - // Use a `Task` to perform the operation within Swift concurrency. - Task { try await writeToOutput() } + // Since space is available, try and write buffered data to the output queue. + _ = writeToOutputOnQueue() case .errorOccurred: logger.info("FoundationStreamBridge: .errorOccurred event") logger.info("FoundationStreamBridge: Stream error: \(aStream.streamError.debugDescription)") diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift index 9a683cee3..9678e174f 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift @@ -17,6 +17,7 @@ extension URLSessionConfiguration { config.httpShouldSetCookies = false config.httpCookieAcceptPolicy = .never config.httpCookieStorage = nil +// config.tlsMaximumSupportedProtocolVersion = .TLSv12 return config } } diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift index 433409b50..1ec2709f7 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift @@ -285,7 +285,7 @@ public final class URLSessionHTTPClient: HTTPClient { ) { storage.modify(task) { connection in guard let streamBridge = connection.streamBridge else { completionHandler(nil); return } - Task { await streamBridge.replaceStreams(completion: completionHandler) } + streamBridge.replaceStreams(completion: completionHandler) } } @@ -416,27 +416,28 @@ public final class URLSessionHTTPClient: HTTPClient { public func send(request: SdkHttpRequest) async throws -> HttpResponse { return try await withCheckedThrowingContinuation { continuation in - // Get the request stream to use for the body, if any. - let requestStream: ReadableStream? + // Get the in-memory data or request stream to use for the body, if any. + // Keep a reference to the stream bridge for a streaming request. + let body: Body + var streamBridge: FoundationStreamBridge? + switch request.body { case .data(let data): - requestStream = BufferedStream(data: data, isClosed: true) + body = .data(data) case .stream(let stream): - requestStream = stream + // Create a stream bridge that streams data from a SDK stream to a Foundation InputStream + // that URLSession can stream its request body from. + // Allow 16kb of in-memory buffer for request body streaming + let bridge = FoundationStreamBridge(readableStream: stream, bridgeBufferSize: 16_384, logger: logger) + streamBridge = bridge + body = .stream(bridge) case .noStream: - requestStream = nil - } - - // If needed, create a stream bridge that streams data from a SDK stream to a Foundation InputStream - // that URLSession can stream its request body from. - // Allow 16kb of in-memory buffer for request body streaming - let streamBridge = requestStream.map { - FoundationStreamBridge(readableStream: $0, bridgeBufferSize: 16_384, logger: logger) + body = .data(nil) } - // Create the request (with a streaming body when needed.) do { - let urlRequest = try self.makeURLRequest(from: request, httpBodyStream: streamBridge?.inputStream) + // Create a data task for the request, and store it as a Connection along with its continuation. + let urlRequest = try self.makeURLRequest(from: request, body: body) // Create the data task and associated connection object, then place them in storage. let dataTask = session.dataTask(with: urlRequest) let connection = Connection(streamBridge: streamBridge, continuation: continuation) @@ -445,22 +446,29 @@ public final class URLSessionHTTPClient: HTTPClient { // Start the HTTP connection and start streaming the request body data dataTask.resume() logger.info("start URLRequest(\(urlRequest.url?.absoluteString ?? "")) called") - Task { await streamBridge?.open() } + Task { [streamBridge] in + await streamBridge?.open() + } } catch { continuation.resume(throwing: error) } - } } - // MARK: - Private methods + // MARK: - Private methods & types + + /// A private type used to encapsulate the body to be used for a URLRequest. + private enum Body { + case stream(FoundationStreamBridge) + case data(Data?) + } /// Create a `URLRequest` for the Smithy operation to be performed. /// - Parameters: /// - request: The SDK-native, signed `SdkHttpRequest` ready to be transmitted. /// - httpBodyStream: A Foundation `InputStream` carrying the HTTP body for this request. /// - Returns: A `URLRequest` ready to be transmitted by `URLSession` for this operation. - private func makeURLRequest(from request: SdkHttpRequest, httpBodyStream: InputStream?) throws -> URLRequest { + private func makeURLRequest(from request: SdkHttpRequest, body: Body) throws -> URLRequest { var components = URLComponents() components.scheme = config.protocolType?.rawValue ?? request.endpoint.protocolType?.rawValue ?? "https" components.host = request.endpoint.host @@ -474,7 +482,12 @@ public final class URLSessionHTTPClient: HTTPClient { guard let url = components.url else { throw URLSessionHTTPClientError.incompleteHTTPRequest } var urlRequest = URLRequest(url: url, timeoutInterval: self.connectionTimeout) urlRequest.httpMethod = request.method.rawValue - urlRequest.httpBodyStream = httpBodyStream + switch body { + case .stream(let bridge): + urlRequest.httpBodyStream = bridge.inputStream + case .data(let data): + urlRequest.httpBody = data + } for header in request.headers.headers + config.defaultHeaders.headers { for value in header.value { urlRequest.addValue(value, forHTTPHeaderField: header.name) diff --git a/Sources/ClientRuntime/Networking/Streaming/BufferedStream.swift b/Sources/ClientRuntime/Networking/Streaming/BufferedStream.swift index 88e487ce2..6db822f69 100644 --- a/Sources/ClientRuntime/Networking/Streaming/BufferedStream.swift +++ b/Sources/ClientRuntime/Networking/Streaming/BufferedStream.swift @@ -15,6 +15,8 @@ import class Foundation.NSRecursiveLock /// or reach the maximum size of a `Data` object. public class BufferedStream: Stream { + var originalData: Data + /// Returns the cumulative length of all data so far written to the stream, if known. /// For a buffered stream, the length will only be known if the stream has closed. public var length: Int? { @@ -107,6 +109,7 @@ public class BufferedStream: Stream { /// - isClosed: Whether the stream is closed. public init(data: Data? = nil, isClosed: Bool = false) { self._buffer = data ?? Data() + self.originalData = data ?? Data() self._position = _buffer.startIndex self._dataCount = _buffer.count self._isClosed = isClosed @@ -202,6 +205,7 @@ public class BufferedStream: Stream { /// - Parameter data: The data to write. public func write(contentsOf data: Data) throws { lock.withLockingClosure { + originalData.append(data) // append the data to the buffer // this will increase the in-memory size of the buffer _buffer.append(data)