Skip to content

Commit 1e54f05

Browse files
author
Guy Bedford
authored
StarlingMonkey port body streaming and event loop lifetime fixes (#795)
1 parent 4dfa8d7 commit 1e54f05

File tree

10 files changed

+124
-65
lines changed

10 files changed

+124
-65
lines changed

integration-tests/js-compute/fixtures/app/tests-starlingmonkey.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
"GET /async-select/hello",
33
"GET /btoa",
44
"GET /byob",
5+
"GET /byte-repeater",
56
"GET /cache-override/constructor/called-as-regular-function",
67
"GET /cache-override/constructor/parameter-calls-7.1.17-ToString",
78
"GET /cache-override/constructor/empty-parameter",
@@ -61,6 +62,7 @@
6162
"GET /simple-cache-entry/json/valid",
6263
"GET /simple-cache-entry/json/invalid",
6364
"GET /simple-cache-entry/arrayBuffer/valid",
65+
"GET /simple-cache-entry/body",
6466
"GET /simple-cache-entry/bodyUsed",
6567
"GET /simple-cache-entry/readablestream",
6668
"GET /simple-cache/getOrSet/called-as-constructor",
@@ -368,6 +370,7 @@
368370
"GET /kv-store-entry/json/valid",
369371
"GET /kv-store-entry/json/invalid",
370372
"GET /kv-store-entry/arrayBuffer/valid",
373+
"GET /kv-store-entry/body",
371374
"GET /kv-store-entry/bodyUsed",
372375
"GET /logger",
373376
"GET /missing-backend",
@@ -441,6 +444,7 @@
441444
"GET /urlsearchparams/sort",
442445
"GET /random",
443446
"GET /error",
447+
"GET /react-byob",
444448
"GET /tee/error",
445449
"GET /override-content-length/request/init/object-literal/true",
446450
"GET /override-content-length/request/init/object-literal/false",

runtime/fastly/builtins/fetch-event.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,9 @@ void inc_pending_promise_count(JSObject *self) {
315315
auto count =
316316
JS::GetReservedSlot(self, static_cast<uint32_t>(FetchEvent::Slots::PendingPromiseCount))
317317
.toInt32();
318+
if (count == 0) {
319+
ENGINE->incr_event_loop_interest();
320+
}
318321
count++;
319322
MOZ_ASSERT(count > 0);
320323
JS::SetReservedSlot(self, static_cast<uint32_t>(FetchEvent::Slots::PendingPromiseCount),
@@ -328,8 +331,9 @@ void dec_pending_promise_count(JSObject *self) {
328331
.toInt32();
329332
MOZ_ASSERT(count > 0);
330333
count--;
331-
if (count == 0)
334+
if (count == 0) {
332335
ENGINE->decr_event_loop_interest();
336+
}
333337
JS::SetReservedSlot(self, static_cast<uint32_t>(FetchEvent::Slots::PendingPromiseCount),
334338
JS::Int32Value(count));
335339
}
@@ -371,7 +375,6 @@ bool FetchEvent::client_get(JSContext *cx, unsigned argc, JS::Value *vp) {
371375

372376
void dispatch_fetch_event(HandleObject event, double *total_compute) {
373377
MOZ_ASSERT(FetchEvent::is_instance(event));
374-
ENGINE->incr_event_loop_interest();
375378
auto pre_handler = system_clock::now();
376379

377380
RootedValue result(ENGINE->cx());
@@ -566,6 +569,9 @@ bool response_promise_then_handler(JSContext *cx, JS::HandleObject event, JS::Ha
566569
return false;
567570
}
568571

572+
if (streaming) {
573+
ENGINE->incr_event_loop_interest();
574+
}
569575
FetchEvent::set_state(event, streaming ? FetchEvent::State::responseStreaming
570576
: FetchEvent::State::responseDone);
571577
return start_response(cx, response_obj, streaming);
@@ -776,11 +782,13 @@ bool FetchEvent::is_dispatching(JSObject *self) {
776782

777783
void FetchEvent::start_dispatching(JSObject *self) {
778784
MOZ_ASSERT(!is_dispatching(self));
785+
ENGINE->incr_event_loop_interest();
779786
JS::SetReservedSlot(self, static_cast<uint32_t>(Slots::Dispatch), JS::TrueValue());
780787
}
781788

782789
void FetchEvent::stop_dispatching(JSObject *self) {
783790
MOZ_ASSERT(is_dispatching(self));
791+
ENGINE->decr_event_loop_interest();
784792
JS::SetReservedSlot(self, static_cast<uint32_t>(Slots::Dispatch), JS::FalseValue());
785793
}
786794

runtime/fastly/builtins/fetch/fetch.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class FetchTask final : public api::AsyncTask {
3030
}
3131

3232
[[nodiscard]] bool run(api::Engine *engine) override {
33-
3433
JSContext *cx = engine->cx();
3534

3635
const RootedObject request(cx, request_);

runtime/fastly/builtins/fetch/request-response.cpp

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,69 @@ bool NativeStreamSource::stream_is_body(JSContext *cx, JS::HandleObject stream)
5858
namespace fastly::fetch {
5959

6060
namespace {
61+
bool error_stream_controller_with_pending_exception(JSContext *cx, JS::HandleObject controller) {
62+
JS::RootedValue exn(cx);
63+
if (!JS_GetPendingException(cx, &exn))
64+
return false;
65+
JS_ClearPendingException(cx);
66+
67+
JS::RootedValueArray<1> args(cx);
68+
args[0].set(exn);
69+
JS::RootedValue r(cx);
70+
return JS::Call(cx, controller, "error", args, &r);
71+
}
6172

6273
constexpr size_t HANDLE_READ_CHUNK_SIZE = 8192;
6374

75+
bool process_body_read(JSContext *cx, FastlyHandle handle, JS::HandleObject context,
76+
JS::HandleObject promise) {
77+
MOZ_ASSERT(context);
78+
JS::RootedObject streamSource(cx, context);
79+
MOZ_ASSERT(NativeStreamSource::is_instance(streamSource));
80+
host_api::HttpBody body(handle);
81+
JS::RootedObject owner(cx, NativeStreamSource::owner(streamSource));
82+
JS::RootedObject controller(cx, NativeStreamSource::controller(streamSource));
83+
84+
auto read_res = body.read(HANDLE_READ_CHUNK_SIZE);
85+
if (auto *err = read_res.to_err()) {
86+
HANDLE_ERROR(cx, *err);
87+
return error_stream_controller_with_pending_exception(cx, controller);
88+
}
89+
90+
auto &chunk = read_res.unwrap();
91+
if (chunk.len == 0) {
92+
JS::RootedValue r(cx);
93+
return JS::Call(cx, controller, "close", JS::HandleValueArray::empty(), &r);
94+
}
95+
96+
// We don't release control of chunk's data until after we've checked that the array buffer
97+
// allocation has been successful, as that ensures that the return path frees chunk automatically
98+
// when necessary.
99+
JS::RootedObject buffer(
100+
cx, JS::NewArrayBufferWithContents(cx, chunk.len, chunk.ptr.get(),
101+
JS::NewArrayBufferOutOfMemory::CallerMustFreeMemory));
102+
if (!buffer) {
103+
return error_stream_controller_with_pending_exception(cx, controller);
104+
}
105+
106+
// At this point `buffer` has taken full ownership of the chunk's data.
107+
std::ignore = chunk.ptr.release();
108+
109+
JS::RootedObject byte_array(cx, JS_NewUint8ArrayWithBuffer(cx, buffer, 0, chunk.len));
110+
if (!byte_array) {
111+
return false;
112+
}
113+
114+
JS::RootedValueArray<1> enqueue_args(cx);
115+
enqueue_args[0].setObject(*byte_array);
116+
JS::RootedValue r(cx);
117+
if (!JS::Call(cx, controller, "enqueue", enqueue_args, &r)) {
118+
return error_stream_controller_with_pending_exception(cx, controller);
119+
}
120+
121+
return true;
122+
}
123+
64124
// https://fetch.spec.whatwg.org/#concept-method-normalize
65125
// Returns `true` if the method name was normalized, `false` otherwise.
66126
bool normalize_http_method(char *method) {
@@ -131,15 +191,9 @@ ReadResult read_from_handle_all(JSContext *cx, host_api::HttpBody body) {
131191
return {std::move(buf), bytes_read};
132192
}
133193

134-
// host_api::HttpReq request_handle(JSObject *obj) {
135-
// MOZ_ASSERT(Request::is_instance(obj));
136-
// return host_api::HttpReq(
137-
// JS::GetReservedSlot(obj, static_cast<uint32_t>(Request::Slots::Request)).toInt32());
138-
// }
139-
140194
} // namespace
141195

142-
bool RequestOrResponse::process_pending_request(JSContext *cx, int32_t handle,
196+
bool RequestOrResponse::process_pending_request(JSContext *cx, FastlyHandle handle,
143197
JS::HandleObject context,
144198
JS::HandleObject promise) {
145199
MOZ_ASSERT(Request::is_instance(context));
@@ -869,8 +923,8 @@ bool RequestOrResponse::body_source_pull_algorithm(JSContext *cx, JS::CallArgs a
869923
JS::RootedObject self(cx, &args.thisv().toObject());
870924
JS::RootedObject owner(cx, NativeStreamSource::owner(self));
871925

872-
ENGINE->queue_async_task(
873-
new FastlyAsyncTask(RequestOrResponse::body_handle(owner).async_handle()));
926+
ENGINE->queue_async_task(new FastlyAsyncTask(RequestOrResponse::body_handle(owner).async_handle(),
927+
source, nullptr, process_body_read));
874928

875929
args.rval().setUndefined();
876930
return true;
@@ -907,6 +961,7 @@ bool RequestOrResponse::body_reader_then_handler(JSContext *cx, JS::HandleObject
907961
// certain that if we have a response here, we can advance the FetchState to
908962
// `responseDone`.
909963
if (Response::is_instance(body_owner)) {
964+
ENGINE->decr_event_loop_interest();
910965
FetchEvent::set_state(FetchEvent::instance(), FetchEvent::State::responseDone);
911966
}
912967

@@ -917,7 +972,9 @@ bool RequestOrResponse::body_reader_then_handler(JSContext *cx, JS::HandleObject
917972
}
918973

919974
if (Request::is_instance(body_owner)) {
920-
ENGINE->queue_async_task(new FastlyAsyncTask(body.async_handle()));
975+
JS::RootedObject promise(cx, Request::response_promise(body_owner));
976+
ENGINE->queue_async_task(
977+
new FastlyAsyncTask(body.async_handle(), body_owner, promise, process_pending_request));
921978
}
922979

923980
return true;
@@ -993,6 +1050,7 @@ bool RequestOrResponse::body_reader_catch_handler(JSContext *cx, JS::HandleObjec
9931050
// `responseDone` is the right state: `responsedWithError` is for when sending
9941051
// a response at all failed.)
9951052
if (Response::is_instance(body_owner)) {
1053+
ENGINE->decr_event_loop_interest();
9961054
FetchEvent::set_state(FetchEvent::instance(), FetchEvent::State::responseDone);
9971055
}
9981056
return true;

runtime/fastly/builtins/fetch/request-response.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class RequestOrResponse final {
3535
static void set_manual_framing_headers(JSContext *cx, JSObject *obj, JS::HandleValue url);
3636
static bool body_unusable(JSContext *cx, JS::HandleObject body);
3737
static bool extract_body(JSContext *cx, JS::HandleObject self, JS::HandleValue body_val);
38-
static bool process_pending_request(JSContext *cx, int32_t handle, JS::HandleObject context,
38+
static bool process_pending_request(JSContext *cx, FastlyHandle handle, JS::HandleObject context,
3939
JS::HandleObject promise);
4040

4141
/**

runtime/fastly/builtins/kv-store.cpp

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -186,22 +186,22 @@ bool parse_and_validate_key(JSContext *cx, const char *key, size_t len) {
186186

187187
} // namespace
188188

189-
bool KVStore::process_pending_kv_store_delete(FastlyHandle handle, JS::HandleObject context,
190-
JS::HandleObject promise) {
189+
bool KVStore::process_pending_kv_store_delete(JSContext *cx, FastlyHandle handle,
190+
JS::HandleObject context, JS::HandleObject promise) {
191191
host_api::ObjectStorePendingDelete pending_delete(handle);
192192

193193
auto res = pending_delete.wait();
194194
if (auto *err = res.to_err()) {
195195
if (host_api::error_is_invalid_argument(*err)) {
196-
JS_ReportErrorNumberASCII(ENGINE->cx(), FastlyGetErrorMessage, nullptr,
196+
JS_ReportErrorNumberASCII(cx, FastlyGetErrorMessage, nullptr,
197197
JSMSG_KV_STORE_DELETE_KEY_DOES_NOT_EXIST);
198198
} else {
199-
HANDLE_ERROR(ENGINE->cx(), *err);
199+
HANDLE_ERROR(cx, *err);
200200
}
201-
return RejectPromiseWithPendingError(ENGINE->cx(), promise);
201+
return RejectPromiseWithPendingError(cx, promise);
202202
}
203203

204-
JS::ResolvePromise(ENGINE->cx(), promise, JS::UndefinedHandleValue);
204+
JS::ResolvePromise(cx, promise, JS::UndefinedHandleValue);
205205
return true;
206206
}
207207

@@ -233,40 +233,39 @@ bool KVStore::delete_(JSContext *cx, unsigned argc, JS::Value *vp) {
233233
}
234234
auto handle = res.unwrap();
235235

236-
auto task = new FastlyAsyncTask(handle, ENGINE->cx(), self, result_promise,
237-
KVStore::process_pending_kv_store_delete);
238-
ENGINE->queue_async_task(task);
236+
ENGINE->queue_async_task(
237+
new FastlyAsyncTask(handle, self, result_promise, KVStore::process_pending_kv_store_delete));
239238

240239
args.rval().setObject(*result_promise);
241240
return true;
242241
}
243242

244-
bool KVStore::process_pending_kv_store_lookup(FastlyHandle handle, JS::HandleObject context,
245-
JS::HandleObject promise) {
243+
bool KVStore::process_pending_kv_store_lookup(JSContext *cx, FastlyHandle handle,
244+
JS::HandleObject context, JS::HandleObject promise) {
246245
host_api::ObjectStorePendingLookup pending_lookup(handle);
247246

248247
auto res = pending_lookup.wait();
249248

250249
if (auto *err = res.to_err()) {
251-
HANDLE_ERROR(ENGINE->cx(), *err);
252-
return RejectPromiseWithPendingError(ENGINE->cx(), promise);
250+
HANDLE_ERROR(cx, *err);
251+
return RejectPromiseWithPendingError(cx, promise);
253252
}
254253

255254
auto ret = res.unwrap();
256255

257256
// When no entry is found, we are going to resolve the Promise with `null`.
258257
if (!ret.has_value()) {
259-
JS::RootedValue result(ENGINE->cx());
258+
JS::RootedValue result(cx);
260259
result.setNull();
261-
JS::ResolvePromise(ENGINE->cx(), promise, result);
260+
JS::ResolvePromise(cx, promise, result);
262261
} else {
263-
JS::RootedObject entry(ENGINE->cx(), KVStoreEntry::create(ENGINE->cx(), ret.value()));
262+
JS::RootedObject entry(cx, KVStoreEntry::create(cx, ret.value()));
264263
if (!entry) {
265264
return false;
266265
}
267-
JS::RootedValue result(ENGINE->cx());
266+
JS::RootedValue result(cx);
268267
result.setObject(*entry);
269-
JS::ResolvePromise(ENGINE->cx(), promise, result);
268+
JS::ResolvePromise(cx, promise, result);
270269
}
271270

272271
return true;
@@ -300,8 +299,8 @@ bool KVStore::get(JSContext *cx, unsigned argc, JS::Value *vp) {
300299
}
301300
auto handle = res.unwrap();
302301

303-
auto task = new FastlyAsyncTask(handle, ENGINE->cx(), self, result_promise,
304-
KVStore::process_pending_kv_store_lookup);
302+
auto task =
303+
new FastlyAsyncTask(handle, self, result_promise, KVStore::process_pending_kv_store_lookup);
305304
ENGINE->queue_async_task(task);
306305

307306
args.rval().setObject(*result_promise);

runtime/fastly/builtins/kv-store.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ class KVStore final : public builtins::BuiltinImpl<KVStore> {
4848
static const unsigned ctor_length = 1;
4949

5050
static bool constructor(JSContext *cx, unsigned argc, JS::Value *vp);
51-
static bool process_pending_kv_store_lookup(FastlyHandle handle, JS::HandleObject context,
52-
JS::HandleObject promise);
53-
static bool process_pending_kv_store_delete(FastlyHandle handle, JS::HandleObject context,
54-
JS::HandleObject promise);
51+
static bool process_pending_kv_store_lookup(JSContext *cx, FastlyHandle handle,
52+
JS::HandleObject context, JS::HandleObject promise);
53+
static bool process_pending_kv_store_delete(JSContext *cx, FastlyHandle handle,
54+
JS::HandleObject context, JS::HandleObject promise);
5555
};
5656

5757
} // namespace fastly::kv_store

runtime/fastly/host-api/host_api.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ Result<Void> HttpBody::close() {
401401
return res;
402402
}
403403

404-
FastlyAsyncTask HttpBody::async_handle() const { return FastlyAsyncTask{this->handle}; }
404+
FastlyHandle HttpBody::async_handle() const { return FastlyHandle{this->handle}; }
405405

406406
namespace {
407407

@@ -521,7 +521,7 @@ api::FastlyResult<Response, FastlySendError> HttpPendingReq::wait() {
521521
return res;
522522
}
523523

524-
FastlyAsyncTask HttpPendingReq::async_handle() const { return FastlyAsyncTask{this->handle}; }
524+
FastlyHandle HttpPendingReq::async_handle() const { return FastlyHandle{this->handle}; }
525525

526526
void CacheOverrideTag::set_pass() {
527527
this->value |= FASTLY_COMPUTE_AT_EDGE_HTTP_REQ_CACHE_OVERRIDE_TAG_PASS;

0 commit comments

Comments
 (0)