Skip to content

URLSessionHTTPClient test branch #730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// SPDX-License-Identifier: Apache-2.0
//

#if os(iOS) || os(macOS) || os(watchOS) || os(tvOS) || os(visionOS)

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=tvOS Simulator,OS=16.1,name=Apple TV 4K (3rd ge...

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=tvOS Simulator,OS=16.1,name=Apple TV 4K (3rd ge...

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=tvOS Simulator,OS=16.1,name=Apple TV 4K (3rd ge...

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=iOS Simulator,OS=16.1,name=iPhone 14)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=iOS Simulator,OS=16.1,name=iPhone 14)

unknown operating system for build configuration 'os'

import func Foundation.CFWriteStreamSetDispatchQueue
import class Foundation.DispatchQueue
Expand All @@ -18,6 +18,7 @@
import class Foundation.Timer
import struct Foundation.TimeInterval
import protocol Foundation.StreamDelegate
import class Foundation.DispatchWorkItem

/// Reads data from a smithy-swift native `ReadableStream` and streams the data through to a Foundation `InputStream`.
///
Expand All @@ -38,12 +39,17 @@

/// A buffer to hold data that has been read from the `ReadableStream` but not yet written to the
/// Foundation `OutputStream`. At most, it will contain `bridgeBufferSize` bytes.
///
/// Only access this buffer from the serial queue.
private var buffer: Data

/// The `ReadableStream` that will serve as the input to this bridge.
///
/// The bridge will read bytes from this stream and dump them to the Foundation stream
/// pair as they become available.
let readableStream: ReadableStream
///
/// Only access this stream from the serial queue.
var readableStream: ReadableStream

/// A Foundation stream that will carry the bytes read from the readableStream as they become available.
///
Expand Down Expand Up @@ -95,7 +101,7 @@
/// `true` if the readable stream has been closed, `false` otherwise. Will be flipped to `true` once the readable stream is read,
/// and `nil` is returned.
///
/// Access this variable only during a write operation to ensure exclusive access.
/// Only access this variable from the serial queue.
private var readableStreamIsClosed = false

// MARK: - init & deinit
Expand Down Expand Up @@ -126,20 +132,30 @@
(inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: self.boundStreamBufferSize, queue: queue)
}

func replaceStreams(completion: @escaping (InputStream?) -> Void) async {
// Close the current output stream, since it will accept no more data and is about to
// be replaced.
await close()
func replaceStreams(completion: @escaping (InputStream?) -> Void) {
queue.async { [self] in
// Close the current output stream, since it will accept no more data and is about to
// be replaced.
outputStream.close()
outputStream.delegate = nil

// Replace the bound stream pair with new bound streams.
(inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue)
// Replace the bound stream pair with new bound streams.
(inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue)

// Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`,
// the completion block will be that method's completion handler.
completion(inputStream)
if readableStream.isSeekable {
try? readableStream.seek(toOffset: 0)
} else if let bufferedStream = readableStream as? BufferedStream {
readableStream = BufferedStream(data: bufferedStream.originalData, isClosed: readableStreamIsClosed)
}

// Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`,
// the completion block will be that method's completion handler.
completion(inputStream)

// Re-open the `OutputStream` for writing.
await open()
// Re-open the `OutputStream` for writing.
outputStream.delegate = self
outputStream.open()
}
}

private static func makeStreams(boundStreamBufferSize: Int, queue: DispatchQueue) -> (InputStream, OutputStream) {
Expand Down Expand Up @@ -182,6 +198,9 @@
continuation.resume()
}
}
if readableStreamTask == nil {
readableStreamTask = Task<Void, Error> { try await readFromReadableStream() }
}
}

/// Close the output stream and unschedule this bridge from receiving stream delegate callbacks.
Expand All @@ -200,83 +219,140 @@

// MARK: - Writing to bridge

var readableStreamTask: Task<Void, Error>?

/// Writes buffered data to the output stream.
/// If the buffer is empty, the `ReadableStream` will be read first to replenish the buffer.
///
/// If the buffer is empty and the readable stream is closed, there is no more data to bridge, and the output stream is closed.
private func writeToOutput() async throws {

private func readFromReadableStream() async throws {
replaceWatchdogWorkItem()

var keepGoing = true
while keepGoing {
let data = try await readableStream.readAsync(upToCount: bridgeBufferSize)
if Task.isCancelled { keepGoing = false; continue }
let streamError = await writeToOutputStream(data: data)
if let streamError { throw streamError }
if data == nil || Task.isCancelled { keepGoing = false }
}
// Perform the write on the `WriteCoordinator` to ensure that writes happen in-order
// and one at a time.
//
// Note that it is safe to access `buffer` and `readableStreamIsClosed` instance vars
// from inside the block passed to `perform()` because this is the only place
// these instance vars are accessed, and the code in the `perform()` block runs
// in series with any other calls to `perform()`.
try await writeCoordinator.perform { [self] in
// try await writeCoordinator.perform { [self] in

// If there is no data in the buffer and the `ReadableStream` is still open,
// attempt to read the stream. Otherwise, skip reading the `ReadableStream` and
// Attempt to read the stream. Otherwise, skip reading the `ReadableStream` and
// write what's in the buffer immediately.
if !readableStreamIsClosed && buffer.isEmpty {
if let newData = try await readableStream.readAsync(upToCount: bridgeBufferSize - buffer.count) {
buffer.append(newData)
} else {
readableStreamIsClosed = true
}
}
// let data = try await readableStream.readAsync(upToCount: bridgeBufferSize - buffer.count)

// Write the previously buffered data and/or newly read data, if any, to the Foundation `OutputStream`.
// Capture the error from the stream write, if any.
var streamError: Error?
if !buffer.isEmpty {
streamError = await writeToOutputStream()
}

// If the readable stream has closed and there is no data in the buffer,
// there is nothing left to forward to the output stream, so close it.
if readableStreamIsClosed && buffer.isEmpty {
await close()
}
// let streamError = await writeToOutputStream(data: data)

// If the output stream write produced an error, throw it now, else just return.
if let streamError { throw streamError }
// if let streamError { throw streamError }
// }
}

var watchdogWorkItem: DispatchWorkItem?

func newWatchdogWorkItem() -> DispatchWorkItem {
return DispatchWorkItem { [weak self] in
_ = self?.writeToOutputOnQueue()
}
}

func replaceWatchdogWorkItem() {
watchdogWorkItem?.cancel()
let newWorkItem = newWatchdogWorkItem()
queue.asyncAfter(deadline: .now().advanced(by: .milliseconds(100)), execute: newWorkItem)
watchdogWorkItem = newWorkItem
}

/// Using the output stream's callback queue, write the buffered data to the Foundation `OutputStream`.
///
///
/// After writing, remove the written data from the buffer.
/// - Returns: The error resulting from the write to the Foundation `OutputStream`, or `nil` if no error occurred.
private func writeToOutputStream() async -> Error? {
/// - Parameters:
/// - data: The data that was read from the readable stream, or `nil` if the readable stream is closed.
private func writeToOutputStream(data: Data?) async -> Error? {

// Suspend the caller while the write is performed on the Foundation `OutputStream`'s queue.
await withCheckedContinuation { continuation in

// Perform the write to the Foundation `OutputStream` on its queue.
queue.async { [self] in

// Write to the output stream. It may not accept all data, so get the number of bytes
// it accepted in `writeCount`.
var writeCount = 0
buffer.withUnsafeBytes { bufferPtr in
guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return }
writeCount = outputStream.write(bytePtr, maxLength: buffer.count)
// Add the read data to the buffer, or set the stream closed flag if necessary.
if let data {
buffer.append(data)
} else {
readableStreamIsClosed = true
}

// `writeCount` will be a positive number if bytes were written.
// Remove the written bytes from the front of the buffer.
if writeCount > 0 {
logger.info("FoundationStreamBridge: wrote \(writeCount) bytes to request body")
buffer.removeFirst(writeCount)
}
// Attempt to write buffered data to the output stream.
let error = writeToOutputOnQueue()

// Resume the caller now that the write is complete, returning the stream error, if any.
continuation.resume(returning: outputStream.streamError)
continuation.resume(returning: error)
}
}
}

/// Writes the contents of the buffer to the Foundation output stream.
///
/// If the buffer is emptied by the write, attempt to read more from the `ReadableStream`.
///
/// If the buffer is empty and the `ReadableStream` is closed, close the Foundation output stream.
/// - Returns: <#description#>
private func writeToOutputOnQueue() -> Error? {
// Call this function only from the output stream's serial queue.
//
// If there are any bytes to be written currently in the buffer, then write them to the output stream.
// It may not accept all data, so get the number of bytes it accepted in `writeCount`.
let bufferCount = buffer.count
if bufferCount > 0 && outputStream.hasSpaceAvailable {
var writeCount = 0
buffer.withUnsafeBytes { bufferPtr in
guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return }
writeCount = outputStream.write(bytePtr, maxLength: bufferCount)
}

// `writeCount` will be a positive number if bytes were written.
// Remove the written bytes from the front of the buffer.
if writeCount > 0 {
logger.info("FoundationStreamBridge: wrote \(writeCount) bytes to request body")
buffer.removeFirst(writeCount)
}
}

// If the buffer is empty after the write, either read more data from the readable stream,
// or if the readable stream is closed, close the output stream.
//
// If the buffer is not empty after writing, then take no action. The output stream will post a
// `.hasSpaceAvailable` event once it can accept more data.
if buffer.isEmpty {
if readableStreamIsClosed {
// Close the output stream and unschedule this bridge from receiving stream delegate callbacks.
self.outputStream.close()
self.outputStream.delegate = nil
} else {
// If the buffer is now emptied and the readable stream has not already been closed,
// try to read from the readable stream again.
// Task { try await writeToOutput() }
}
}

replaceWatchdogWorkItem()

// Return any stream error to the caller.
return outputStream.streamError
}

// MARK: - StreamDelegate protocol

/// The stream places this callback when an event happens.
Expand All @@ -298,10 +374,8 @@
case .hasBytesAvailable:
break
case .hasSpaceAvailable:
// Since space is available, try and read from the ReadableStream and
// transfer the data to the Foundation stream pair.
// Use a `Task` to perform the operation within Swift concurrency.
Task { try await writeToOutput() }
// Since space is available, try and write buffered data to the output queue.
_ = writeToOutputOnQueue()
case .errorOccurred:
logger.info("FoundationStreamBridge: .errorOccurred event")
logger.info("FoundationStreamBridge: Stream error: \(aStream.streamError.debugDescription)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// SPDX-License-Identifier: Apache-2.0
//

#if os(iOS) || os(macOS) || os(watchOS) || os(tvOS) || os(visionOS)

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=tvOS Simulator,OS=16.1,name=Apple TV 4K (3rd ge...

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=tvOS Simulator,OS=16.1,name=Apple TV 4K (3rd ge...

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=tvOS Simulator,OS=16.1,name=Apple TV 4K (3rd ge...

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=iOS Simulator,OS=16.1,name=iPhone 14)

unknown operating system for build configuration 'os'

import class Foundation.URLSessionConfiguration

Expand All @@ -17,6 +17,7 @@
config.httpShouldSetCookies = false
config.httpCookieAcceptPolicy = .never
config.httpCookieStorage = nil
// config.tlsMaximumSupportedProtocolVersion = .TLSv12
return config
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// SPDX-License-Identifier: Apache-2.0
//

#if os(iOS) || os(macOS) || os(watchOS) || os(tvOS) || os(visionOS)

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=tvOS Simulator,OS=16.1,name=Apple TV 4K (3rd ge...

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=tvOS Simulator,OS=16.1,name=Apple TV 4K (3rd ge...

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=OS X)

unknown operating system for build configuration 'os'

Check warning on line 8 in Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift

View workflow job for this annotation

GitHub Actions / downstream (macos-13-xlarge, Xcode_14.1, platform=iOS Simulator,OS=16.1,name=iPhone 14)

unknown operating system for build configuration 'os'

import class Foundation.Bundle
import class Foundation.InputStream
Expand Down Expand Up @@ -285,7 +285,7 @@
) {
storage.modify(task) { connection in
guard let streamBridge = connection.streamBridge else { completionHandler(nil); return }
Task { await streamBridge.replaceStreams(completion: completionHandler) }
streamBridge.replaceStreams(completion: completionHandler)
}
}

Expand Down Expand Up @@ -416,27 +416,28 @@
public func send(request: SdkHttpRequest) async throws -> HttpResponse {
return try await withCheckedThrowingContinuation { continuation in

// Get the request stream to use for the body, if any.
let requestStream: ReadableStream?
// Get the in-memory data or request stream to use for the body, if any.
// Keep a reference to the stream bridge for a streaming request.
let body: Body
var streamBridge: FoundationStreamBridge?

switch request.body {
case .data(let data):
requestStream = BufferedStream(data: data, isClosed: true)
body = .data(data)
case .stream(let stream):
requestStream = stream
// Create a stream bridge that streams data from a SDK stream to a Foundation InputStream
// that URLSession can stream its request body from.
// Allow 16kb of in-memory buffer for request body streaming
let bridge = FoundationStreamBridge(readableStream: stream, bridgeBufferSize: 16_384, logger: logger)
streamBridge = bridge
body = .stream(bridge)
case .noStream:
requestStream = nil
}

// If needed, create a stream bridge that streams data from a SDK stream to a Foundation InputStream
// that URLSession can stream its request body from.
// Allow 16kb of in-memory buffer for request body streaming
let streamBridge = requestStream.map {
FoundationStreamBridge(readableStream: $0, bridgeBufferSize: 16_384, logger: logger)
body = .data(nil)
}

// Create the request (with a streaming body when needed.)
do {
let urlRequest = try self.makeURLRequest(from: request, httpBodyStream: streamBridge?.inputStream)
// Create a data task for the request, and store it as a Connection along with its continuation.
let urlRequest = try self.makeURLRequest(from: request, body: body)
// Create the data task and associated connection object, then place them in storage.
let dataTask = session.dataTask(with: urlRequest)
let connection = Connection(streamBridge: streamBridge, continuation: continuation)
Expand All @@ -445,22 +446,29 @@
// Start the HTTP connection and start streaming the request body data
dataTask.resume()
logger.info("start URLRequest(\(urlRequest.url?.absoluteString ?? "")) called")
Task { await streamBridge?.open() }
Task { [streamBridge] in
await streamBridge?.open()
}
} catch {
continuation.resume(throwing: error)
}

}
}

// MARK: - Private methods
// MARK: - Private methods & types

/// A private type used to encapsulate the body to be used for a URLRequest.
private enum Body {
case stream(FoundationStreamBridge)
case data(Data?)
}

/// Create a `URLRequest` for the Smithy operation to be performed.
/// - Parameters:
/// - request: The SDK-native, signed `SdkHttpRequest` ready to be transmitted.
/// - httpBodyStream: A Foundation `InputStream` carrying the HTTP body for this request.
/// - Returns: A `URLRequest` ready to be transmitted by `URLSession` for this operation.
private func makeURLRequest(from request: SdkHttpRequest, httpBodyStream: InputStream?) throws -> URLRequest {
private func makeURLRequest(from request: SdkHttpRequest, body: Body) throws -> URLRequest {
var components = URLComponents()
components.scheme = config.protocolType?.rawValue ?? request.endpoint.protocolType?.rawValue ?? "https"
components.host = request.endpoint.host
Expand All @@ -474,7 +482,12 @@
guard let url = components.url else { throw URLSessionHTTPClientError.incompleteHTTPRequest }
var urlRequest = URLRequest(url: url, timeoutInterval: self.connectionTimeout)
urlRequest.httpMethod = request.method.rawValue
urlRequest.httpBodyStream = httpBodyStream
switch body {
case .stream(let bridge):
urlRequest.httpBodyStream = bridge.inputStream
case .data(let data):
urlRequest.httpBody = data
}
for header in request.headers.headers + config.defaultHeaders.headers {
for value in header.value {
urlRequest.addValue(value, forHTTPHeaderField: header.name)
Expand Down
Loading
Loading