Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/heapsnapshot/src/HeapSnapshotLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*/

import { MessagePort } from 'node:worker_threads';
import { fakePromise } from '@whatwg-node/promise-helpers';
import {
HeapSnapshotProgress,
JSHeapSnapshot,
Expand Down Expand Up @@ -166,7 +167,7 @@ export class HeapSnapshotLoader {
// sequentially. This means it's fine to stash away a single #dataCallback
// instead of an array of them.
if (this.#buffer.length > 0) {
return Promise.resolve(this.#buffer.shift() as string);
return fakePromise(this.#buffer.shift() as string);
}

const { promise, resolve } =
Expand Down
4 changes: 2 additions & 2 deletions internal/proc/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import childProcess from 'child_process';
import fs from 'fs/promises';
import path from 'path';
import { setTimeout } from 'timers/promises';
import { createDeferred } from '@graphql-tools/utils';
import { createDeferred, fakePromise } from '@graphql-tools/utils';
import { hostnames, isDebug, trimError } from '@internal/testing';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
import { fetch } from '@whatwg-node/fetch';
Expand Down Expand Up @@ -127,7 +127,7 @@ export function spawn(
[DisposableSymbols.asyncDispose]: async () => {
if (exited) {
// there's nothing to dispose since the process already exitted (error or not)
return Promise.resolve();
return fakePromise();
}
if (child.pid) {
await terminate(child.pid);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"@yarnpkg/cli": "4.10.3",
"@yarnpkg/core": "4.4.4",
"@yarnpkg/plugin-pack": "4.0.3",
"bun": "1.2.23",
"bun": "1.3.0",
"cross-env": "10.1.0",
"eslint": "9.38.0",
"eslint-plugin-import": "2.32.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ describe('handleEventStreamResponse', () => {
);
const iterator = asyncIterable[Symbol.asyncIterator]();

Promise.resolve().then(() => {
queueMicrotask(() => {
ctrl.abort(); // we abort
readableStream.cancel(); // then cancel
// so that the error reported is the abort error
Expand Down Expand Up @@ -188,7 +188,7 @@ describe('handleEventStreamResponse', () => {
const iterator = asyncIterable[Symbol.asyncIterator]();

const originalError = new Error('Oops!');
Promise.resolve().then(() => {
queueMicrotask(() => {
readableStream.cancel(originalError); // this will throw in reader.read()
});

Expand Down
2 changes: 1 addition & 1 deletion packages/fusion-runtime/tests/polling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ describe('Polling', () => {
);
}
makeQuery(10_000);
await advanceTimersByTimeAsync(10_000);
await advanceTimersByTimeAsync(10_500);
makeQuery(0);
expect(callTimes).toHaveLength(2);
// It can be 0 or 1 or any one-digit number
Expand Down
4 changes: 2 additions & 2 deletions packages/gateway/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@
"@rollup/plugin-sucrase": "^5.0.2",
"@tsconfig/node18": "^18.2.4",
"@types/adm-zip": "^0.5.5",
"@types/bun": "1.2.23",
"@types/bun": "1.3.0",
"@types/ws": "^8.5.12",
"@whatwg-node/fetch": "^0.10.11",
"adm-zip": "^0.5.15",
"bun": "^1.2.23",
"bun": "^1.3.0",
"graphql": "^16.9.0",
"parse-duration": "^2.0.0",
"pkgroll": "2.20.1",
Expand Down
27 changes: 16 additions & 11 deletions packages/gateway/src/servers/bun.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { getGraphQLWSOptions } from '@graphql-hive/gateway-runtime';
import type { Server, TLSServeOptions, WebSocketServeOptions } from 'bun';
import {
DisposableSymbols,
getGraphQLWSOptions,
} from '@graphql-hive/gateway-runtime';
import type { Server, WebSocketOptions } from 'bun';
import type { Extra } from 'graphql-ws/use/bun';
import { defaultOptions, GatewayRuntime } from '..';
import type { ServerForRuntimeOptions } from './types';
Expand All @@ -8,36 +11,38 @@ export async function startBunServer<TContext extends Record<string, any>>(
gwRuntime: GatewayRuntime<TContext>,
opts: ServerForRuntimeOptions,
): Promise<void> {
const serverOptions: TLSServeOptions & Partial<WebSocketServeOptions> = {
const serverOptions: Bun.Serve.Options<{}> & Partial<WebSocketOptions> = {
fetch: gwRuntime,
port: opts.port || defaultOptions.port,
hostname: opts.host || defaultOptions.host,
reusePort: true,
idleTimeout: opts.requestTimeout,
};
if (opts.sslCredentials) {
const tlsOptions: Bun.TLSOptions = {};
if (opts.sslCredentials.ca_file_name) {
serverOptions.ca = Bun.file(opts.sslCredentials.ca_file_name);
tlsOptions.ca = Bun.file(opts.sslCredentials.ca_file_name);
}
if (opts.sslCredentials.cert_file_name) {
serverOptions.cert = Bun.file(opts.sslCredentials.cert_file_name);
tlsOptions.cert = Bun.file(opts.sslCredentials.cert_file_name);
}
if (opts.sslCredentials.dh_params_file_name) {
serverOptions.dhParamsFile = opts.sslCredentials.dh_params_file_name;
tlsOptions.dhParamsFile = opts.sslCredentials.dh_params_file_name;
}
if (opts.sslCredentials.key_file_name) {
serverOptions.key = Bun.file(opts.sslCredentials.key_file_name);
tlsOptions.key = Bun.file(opts.sslCredentials.key_file_name);
}
if (opts.sslCredentials.passphrase) {
serverOptions.passphrase = opts.sslCredentials.passphrase;
tlsOptions.passphrase = opts.sslCredentials.passphrase;
}
if (opts.sslCredentials.ssl_ciphers) {
// TODO: Check if there is a correct way to set ciphers
}
if (opts.sslCredentials.ssl_prefer_low_memory_usage) {
serverOptions.lowMemoryMode =
tlsOptions.lowMemoryMode =
opts.sslCredentials.ssl_prefer_low_memory_usage;
}
serverOptions.tls = tlsOptions;
}
if (!opts.disableWebsockets) {
const { makeHandler } = await import('graphql-ws/use/bun');
Expand All @@ -47,7 +52,7 @@ export async function startBunServer<TContext extends Record<string, any>>(
...(ctx.extra.socket.data || {}),
})),
);
serverOptions.fetch = function (request: Request, server: Server) {
serverOptions.fetch = function (request: Request, server: Server<{}>) {
// header to check if websocket
if (
request.headers.has('Sec-WebSocket-Key') &&
Expand All @@ -65,5 +70,5 @@ export async function startBunServer<TContext extends Record<string, any>>(
}
const server = Bun.serve(serverOptions);
opts.log.info(`Listening on ${server.url}`);
gwRuntime.disposableStack.use(server);
gwRuntime.disposableStack.defer(() => server[DisposableSymbols.dispose]());
}
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,6 @@ describe('useOpenTelemetry', () => {
const operationSpan = spanExporter.spans.find(({ name }) =>
name.startsWith('graphql.operation'),
);

expect(httpSpan.attributes['gateway.cache.response_cache']).toBe(
attrs.http,
);
Expand Down Expand Up @@ -1198,7 +1197,7 @@ describe('useOpenTelemetry', () => {

it('should have all attributes required by Hive Tracing', async () => {
await using gateway = await buildTestGateway({
fetch: () => () => Promise.resolve(new Response(null, { status: 500 })),
fetch: () => () => new Response(null, { status: 500 }),
});
await gateway.query({
shouldReturnErrors: true,
Expand Down
9 changes: 5 additions & 4 deletions packages/plugins/opentelemetry/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
type TracerConfig,
} from '@opentelemetry/sdk-trace-base';
import { AsyncDisposableStack } from '@whatwg-node/disposablestack';
import { fakePromise } from '@whatwg-node/promise-helpers';
import { createSchema, createYoga, type GraphQLParams } from 'graphql-yoga';
import { expect } from 'vitest';
import { hive } from '../src/api';
Expand Down Expand Up @@ -182,11 +183,11 @@ export class MockSpanExporter implements SpanExporter {
}
shutdown() {
this.reset();
return Promise.resolve();
return fakePromise();
}
forceFlush() {
this.reset();
return Promise.resolve();
return fakePromise();
}
reset() {
this.spans = [];
Expand Down Expand Up @@ -352,12 +353,12 @@ export class MockLogRecordExporter implements LogRecordExporter {

shutdown(): Promise<void> {
this.reset();
return Promise.resolve();
return fakePromise();
}

forceFlush(): Promise<void> {
this.reset();
return Promise.resolve();
return fakePromise();
}

reset() {
Expand Down
17 changes: 9 additions & 8 deletions packages/pubsub/src/mem.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Repeater } from '@repeaterjs/repeater';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
import { type MaybePromise } from '@whatwg-node/promise-helpers';
import { fakePromise, type MaybePromise } from '@whatwg-node/promise-helpers';
import { PubSub, PubSubListener, TopicDataMap } from './pubsub';

/** In-memory {@link PubSub} implementation. */
Expand Down Expand Up @@ -53,13 +53,14 @@ export class MemPubSub<M extends TopicDataMap = TopicDataMap>
}

if (!listener) {
return new Repeater<M[Topic], any, any>(async (push, stop) => {
return new Repeater<M[Topic], any, any>((push, stop) => {
listeners.set(push, stop);
await stop;
listeners.delete(push);
if (listeners.size === 0) {
this.#subscribers.delete(topic);
}
return stop.then(() => {
listeners.delete(push);
if (listeners.size === 0) {
this.#subscribers.delete(topic);
}
});
});
}

Expand Down Expand Up @@ -88,6 +89,6 @@ export class MemPubSub<M extends TopicDataMap = TopicDataMap>
}

[DisposableSymbols.asyncDispose]() {
return Promise.resolve(this.dispose());
return fakePromise(this.dispose());
}
}
7 changes: 5 additions & 2 deletions packages/pubsub/tests/pubsub.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ import { setTimeout } from 'timers/promises';
import { Container, createTenv } from '@internal/e2e';
import { connect as natsConnect } from '@nats-io/transport-node';
import { crypto } from '@whatwg-node/fetch';
import { createDeferredPromise } from '@whatwg-node/promise-helpers';
import {
createDeferredPromise,
fakePromise,
} from '@whatwg-node/promise-helpers';
import Redis from 'ioredis';
import LeakDetector from 'jest-leak-detector';
import { beforeAll, describe, expect, it, vi } from 'vitest';
Expand Down Expand Up @@ -59,7 +62,7 @@ for (const PubSub of PubSubCtors) {
function flush(ms: number = 100) {
if (PubSub === MemPubSub) {
// MemPubSub is synchronous, no need to wait
return Promise.resolve();
return fakePromise();
}
return setTimeout(ms);
}
Expand Down
11 changes: 6 additions & 5 deletions packages/runtime/tests/gateway-runtime.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import restTransport from '@graphql-mesh/transport-rest';
import { KeyValueCache, Logger } from '@graphql-mesh/types';
import {
createDeferred,
fakePromise,
printSchemaWithDirectives,
} from '@graphql-tools/utils';
import { isDebug } from '@internal/testing';
import { fakeRejectPromise, isDebug } from '@internal/testing';
import { loadOpenAPISubgraph } from '@omnigraph/openapi';
import { DisposableSymbols } from '@whatwg-node/disposablestack';
import { createRouter, Response, Type } from 'fets';
Expand Down Expand Up @@ -310,16 +311,16 @@ describe('Gateway Runtime', () => {
function createCache(cachedSupergraph?: string) {
return {
get: vi.fn((_key) => {
return Promise.resolve(cachedSupergraph);
return fakePromise(cachedSupergraph);
}),
set: vi.fn((_key, _value, _options) => {
return Promise.resolve();
return fakePromise();
}),
delete() {
return Promise.reject('noop');
return fakeRejectPromise('noop');
},
getKeysByPrefix() {
return Promise.reject('noop');
return fakeRejectPromise('noop');
},
} satisfies KeyValueCache;
}
Expand Down
25 changes: 13 additions & 12 deletions packages/runtime/tests/graphos.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
import { LegacyLogger, Logger } from '@graphql-hive/logger';
import { TransportContext } from '@graphql-mesh/transport-common';
import { Response } from '@whatwg-node/fetch';
import { fakePromise } from '@whatwg-node/promise-helpers';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { createGraphOSFetcher } from '../src/fetchers/graphos';

Expand All @@ -21,7 +22,7 @@ describe('GraphOS', () => {
it('should fetch the supergraph SDL', async () => {
const { unifiedGraphFetcher } = createTestFetcher({ fetch: mockSDL });

const result = Promise.resolve().then(() => unifiedGraphFetcher());
const result = unifiedGraphFetcher();
await advanceTimersByTimeAsync(1_000);
expect(await result).toBe(supergraphSdl);
});
Expand All @@ -38,7 +39,7 @@ describe('GraphOS', () => {
},
});

const result = Promise.resolve().then(() => unifiedGraphFetcher());
const result = unifiedGraphFetcher();
for (let i = 0; i < 3; i++) {
await advanceTimersByTimeAsync(1_000);
}
Expand All @@ -52,7 +53,7 @@ describe('GraphOS', () => {
{ maxRetries: 3 },
);

const result = Promise.resolve()
const result = fakePromise()
.then(() => unifiedGraphFetcher())
.catch((err) => err);
for (let i = 0; i < 3; i++) {
Expand All @@ -68,7 +69,7 @@ describe('GraphOS', () => {
{ maxRetries: 3 },
);

const result = Promise.resolve()
const result = fakePromise()
.then(() => unifiedGraphFetcher())
.catch(() => {});
await advanceTimersByTimeAsync(25);
Expand All @@ -85,12 +86,12 @@ describe('GraphOS', () => {
it('should respect min-delay between polls', async () => {
const { unifiedGraphFetcher } = createTestFetcher({ fetch: mockSDL });

Promise.resolve().then(() => unifiedGraphFetcher());
unifiedGraphFetcher();
await advanceTimersByTimeAsync(20);
expect(mockSDL).toHaveBeenCalledTimes(1);
await advanceTimersByTimeAsync(20);
expect(mockSDL).toHaveBeenCalledTimes(1);
Promise.resolve().then(() => unifiedGraphFetcher());
unifiedGraphFetcher();
await advanceTimersByTimeAsync(20);
expect(mockSDL).toHaveBeenCalledTimes(1);
await advanceTimersByTimeAsync(50);
Expand All @@ -108,19 +109,19 @@ describe('GraphOS', () => {
return mockSDL();
},
});
const result1 = Promise.resolve().then(() => unifiedGraphFetcher());
const result1 = unifiedGraphFetcher();
await advanceTimersByTimeAsync(1_000);
const result2 = Promise.resolve().then(() => unifiedGraphFetcher());
const result2 = unifiedGraphFetcher();
await advanceTimersByTimeAsync(1_000);
expect(await result1).toBe(await result2);
}, 30_000);

it('should not wait if min delay is superior to polling interval', async () => {
const { unifiedGraphFetcher } = createTestFetcher({ fetch: mockSDL });
const result = Promise.resolve().then(() => unifiedGraphFetcher());
const result = unifiedGraphFetcher();
await advanceTimersByTimeAsync(1_000);
await result;
const result2 = Promise.resolve().then(() => unifiedGraphFetcher());
const result2 = unifiedGraphFetcher();
await advanceTimersByTimeAsync(1_000);
expect(await result).toBe(await result2);
});
Expand All @@ -147,9 +148,9 @@ describe('GraphOS', () => {
},
});

const result = Promise.resolve().then(() => unifiedGraphFetcher());
const result = unifiedGraphFetcher();
await advanceTimersByTimeAsync(1_000);
const result2 = Promise.resolve().then(() => unifiedGraphFetcher());
const result2 = unifiedGraphFetcher();
await advanceTimersByTimeAsync(1_000);
expect(await result).toBe(await result2);
});
Expand Down
Loading
Loading