Skip to content

fix: Fix CRT HTTP client continuation bug #711

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 48 additions & 21 deletions Sources/ClientRuntime/Networking/Http/CRT/CRTClientEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ public class CRTClientEngine: HTTPClient {
self.logger.debug("Using HTTP/1.1 connection")
let crtRequest = try request.toHttpRequest()
return try await withCheckedThrowingContinuation { (continuation: StreamContinuation) in
let requestOptions = makeHttpRequestStreamOptions(request: crtRequest,
continuation: continuation)
let wrappedContinuation = ContinuationWrapper(continuation)
let requestOptions = makeHttpRequestStreamOptions(
request: crtRequest,
continuation: wrappedContinuation
)
do {
let stream = try connection.makeRequest(requestOptions: requestOptions)
try stream.activate()
Expand Down Expand Up @@ -217,28 +220,34 @@ public class CRTClientEngine: HTTPClient {
}
}
} catch {
continuation.resume(throwing: error)
logger.error(error.localizedDescription)
wrappedContinuation.safeResume(error: error)
}
}
}
} catch {
continuation.resume(throwing: error)
logger.error(error.localizedDescription)
wrappedContinuation.safeResume(error: error)
}
}
case .version_2:
self.logger.debug("Using HTTP/2 connection")
let crtRequest = try request.toHttp2Request()
return try await withCheckedThrowingContinuation { (continuation: StreamContinuation) in
let requestOptions = makeHttpRequestStreamOptions(request: crtRequest,
continuation: continuation,
http2ManualDataWrites: true)
let wrappedContinuation = ContinuationWrapper(continuation)
let requestOptions = makeHttpRequestStreamOptions(
request: crtRequest,
continuation: wrappedContinuation,
http2ManualDataWrites: true
)
let stream: HTTP2Stream
do {
// swiftlint:disable:next force_cast
stream = try connection.makeRequest(requestOptions: requestOptions) as! HTTP2Stream
try stream.activate()
} catch {
continuation.resume(throwing: error)
logger.error(error.localizedDescription)
wrappedContinuation.safeResume(error: error)
return
}

Expand Down Expand Up @@ -267,29 +276,29 @@ public class CRTClientEngine: HTTPClient {
let crtRequest = try request.toHttp2Request()

return try await withCheckedThrowingContinuation { (continuation: StreamContinuation) in
let wrappedContinuation = ContinuationWrapper(continuation)
let requestOptions = makeHttpRequestStreamOptions(
request: crtRequest,
continuation: continuation,
continuation: wrappedContinuation,
http2ManualDataWrites: true
)
Task {
Task { [logger] in
let stream: HTTP2Stream
do {
stream = try await connectionMgr.acquireStream(requestOptions: requestOptions)
} catch {
continuation.resume(throwing: error)
logger.error(error.localizedDescription)
wrappedContinuation.safeResume(error: error)
return
}

// At this point, continuation is resumed when the initial headers are received
// it is now safe to write the body
// writing is done in a separate task to avoid blocking the continuation
Task { [logger] in
do {
try await stream.write(body: request.body)
} catch {
logger.error(error.localizedDescription)
}
do {
try await stream.write(body: request.body)
} catch {
logger.error(error.localizedDescription)
}
}
}
Expand All @@ -298,7 +307,7 @@ public class CRTClientEngine: HTTPClient {
/// Creates a `HTTPRequestOptions` object that can be used to make a HTTP request
/// - Parameters:
/// - request: The `HTTPRequestBase` object that contains the request information
/// - continuation: The continuation that will be resumed when the request is complete
/// - continuation: The wrapped continuation that will be resumed when the request is complete
/// - http2ManualDataWrites: Whether or not the request is using HTTP/2 manual data writes, defaults to `false`
/// If set to false, HTTP/2 manual data writes will be disabled and result in a runtime error on writing on the
/// HTTP/2 stream
Expand All @@ -308,7 +317,7 @@ public class CRTClientEngine: HTTPClient {
/// - Returns: A `HTTPRequestOptions` object that can be used to make a HTTP request
private func makeHttpRequestStreamOptions(
request: HTTPRequestBase,
continuation: StreamContinuation,
continuation: ContinuationWrapper,
http2ManualDataWrites: Bool = false
) -> HTTPRequestOptions {
let response = HttpResponse()
Expand All @@ -330,7 +339,7 @@ public class CRTClientEngine: HTTPClient {
// resume the continuation as soon as we have all the initial headers
// this allows callers to start reading the response as it comes in
// instead of waiting for the entire response to be received
continuation.resume(returning: response)
continuation.safeResume(response: response)
} onIncomingBody: { bodyChunk in
self.logger.debug("Body chunk received")
do {
Expand All @@ -348,7 +357,7 @@ public class CRTClientEngine: HTTPClient {
response.statusCode = makeStatusCode(statusCode)
case .failure(let error):
self.logger.error("Response encountered an error: \(error)")
continuation.resume(throwing: error)
continuation.safeResume(error: error)
}

// closing the stream is required to signal to the caller that the response is complete
Expand All @@ -361,4 +370,22 @@ public class CRTClientEngine: HTTPClient {
response.body = .stream(stream)
return requestOptions
}

class ContinuationWrapper {
private var continuation: StreamContinuation?

public init(_ continuation: StreamContinuation) {
self.continuation = continuation
}

public func safeResume(response: HttpResponse) {
continuation?.resume(returning: response)
self.continuation = nil
}

public func safeResume(error: Error) {
continuation?.resume(throwing: error)
self.continuation = nil
}
}
}
Loading