Skip to content

Commit 73b24aa

Browse files
2 parents 7390fa7 + 25ded6d commit 73b24aa

File tree

3 files changed

+206
-21
lines changed

3 files changed

+206
-21
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
import {Channel} from '@grpc/grpc-js';
2+
13
export interface CloseableGrpcClient {
24
close(): void;
35
}
46

57
export interface GrpcClientWrapper<T extends CloseableGrpcClient> {
68
getClient(): T;
79
}
10+
11+
export interface GrpcClientWithChannel extends CloseableGrpcClient {
12+
getChannel(): Channel;
13+
}

packages/client-sdk-nodejs/src/internal/grpc/idle-grpc-client-wrapper.ts

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
import {CloseableGrpcClient, GrpcClientWrapper} from './grpc-client-wrapper';
1+
import {
2+
CloseableGrpcClient,
3+
GrpcClientWithChannel,
4+
GrpcClientWrapper,
5+
} from './grpc-client-wrapper';
26
import {MomentoLogger, MomentoLoggerFactory} from '@gomomento/sdk-core';
7+
import {ConnectivityState} from '@grpc/grpc-js/build/src/connectivity-state';
38

49
export interface IdleGrpcClientWrapperProps<T extends CloseableGrpcClient> {
510
clientFactoryFn: () => T;
@@ -48,32 +53,60 @@ export class IdleGrpcClientWrapper<T extends CloseableGrpcClient>
4853
}
4954

5055
getClient(): T {
51-
this.logger.trace(
52-
`Checking to see if client has been idle for more than ${this.maxIdleMillis} ms`
53-
);
54-
if (Date.now() - this.lastAccessTime > this.maxIdleMillis) {
55-
this.logger.info(
56-
`Client has been idle for more than ${this.maxIdleMillis} ms; reconnecting.`
56+
const now = Date.now();
57+
58+
// Reconnect if channel is in a bad state.
59+
// Although the generic type `T` only extends `CloseableGrpcClient` (which doesn't define `getChannel()`),
60+
// we know that in practice, the client returned by `clientFactoryFn()` is a gRPC client that inherits from
61+
// `grpc.Client`: https://grpc.github.io/grpc/node/grpc.Client.html
62+
//
63+
// Since `grpc.Client` defines a public `getChannel()` method, we can safely assume it's present on the client.
64+
// To make TypeScript accept this, we cast the client to `unknown` first and then to `GrpcClientWithChannel`.
65+
// This double-cast is necessary to override structural type checking and is safe in our controlled use case.
66+
const clientWithChannel = this.client as unknown as GrpcClientWithChannel;
67+
const channel = clientWithChannel.getChannel?.();
68+
if (channel) {
69+
const state = channel.getConnectivityState(true);
70+
if (
71+
state === ConnectivityState.TRANSIENT_FAILURE ||
72+
state === ConnectivityState.SHUTDOWN
73+
) {
74+
return this.recreateClient(
75+
`gRPC channel is in bad state: ${
76+
state === ConnectivityState.TRANSIENT_FAILURE
77+
? 'TRANSIENT_FAILURE'
78+
: 'SHUTDOWN'
79+
}`
80+
);
81+
}
82+
}
83+
84+
if (now - this.lastAccessTime > this.maxIdleMillis) {
85+
return this.recreateClient(
86+
`Client has been idle for more than ${this.maxIdleMillis} ms`
5787
);
58-
this.client.close();
59-
this.client = this.clientFactoryFn();
6088
}
6189

62-
if (this.maxClientAgeMillis !== undefined) {
63-
this.logger.trace(
64-
`Checking to see if client was created more than ${this.maxClientAgeMillis} ms`
90+
if (
91+
this.maxClientAgeMillis !== undefined &&
92+
now - this.clientCreatedTime > this.maxClientAgeMillis
93+
) {
94+
return this.recreateClient(
95+
`Client was created more than ${this.maxClientAgeMillis} ms ago`
6596
);
66-
if (Date.now() - this.clientCreatedTime > this.maxClientAgeMillis) {
67-
this.logger.info(
68-
`Client was created more than ${this.maxClientAgeMillis} millis ago; recreating as asked.`
69-
);
70-
this.client.close();
71-
this.client = this.clientFactoryFn();
72-
this.clientCreatedTime = Date.now();
73-
}
7497
}
7598

76-
this.lastAccessTime = Date.now();
99+
this.lastAccessTime = now;
100+
return this.client;
101+
}
102+
103+
private recreateClient(reason: string): T {
104+
this.logger.info(`${reason}; reconnecting client.`);
105+
this.client.close();
106+
this.client = this.clientFactoryFn();
107+
const now = Date.now();
108+
this.clientCreatedTime = now;
109+
this.lastAccessTime = now;
77110
return this.client;
78111
}
79112
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import {Channel} from '@grpc/grpc-js';
2+
import {ConnectivityState} from '@grpc/grpc-js/build/src/connectivity-state';
3+
import {GrpcClientWithChannel} from '../../../src/internal/grpc/grpc-client-wrapper';
4+
import {IdleGrpcClientWrapper} from '../../../src/internal/grpc/idle-grpc-client-wrapper';
5+
import {
6+
DefaultMomentoLoggerFactory,
7+
DefaultMomentoLoggerLevel,
8+
} from '../../../src';
9+
10+
const createMockChannel = (state: ConnectivityState): jest.Mocked<Channel> => {
11+
return {
12+
getConnectivityState: jest.fn(() => state),
13+
} as unknown as jest.Mocked<Channel>;
14+
};
15+
16+
const createMockGrpcClient = (
17+
channel: jest.Mocked<Channel>
18+
): jest.Mocked<GrpcClientWithChannel> => ({
19+
close: jest.fn(),
20+
getChannel: jest.fn(() => channel),
21+
});
22+
23+
describe('IdleGrpcClientWrapper', () => {
24+
let now: number;
25+
const loggerFactory = new DefaultMomentoLoggerFactory(
26+
DefaultMomentoLoggerLevel.INFO
27+
);
28+
29+
beforeEach(() => {
30+
jest.useFakeTimers();
31+
now = Date.now();
32+
jest.setSystemTime(now);
33+
});
34+
35+
afterEach(() => {
36+
jest.useRealTimers();
37+
jest.clearAllMocks();
38+
});
39+
40+
it('returns the same client if not idle and channel is healthy', () => {
41+
const channel = createMockChannel(ConnectivityState.READY);
42+
const client = createMockGrpcClient(channel);
43+
44+
const wrapper = new IdleGrpcClientWrapper<GrpcClientWithChannel>({
45+
clientFactoryFn: () => client,
46+
loggerFactory: loggerFactory,
47+
maxIdleMillis: 10000,
48+
});
49+
50+
wrapper.getClient(); // first call to getClient() initializes the client
51+
jest.advanceTimersByTime(500); // simulate 500ms
52+
wrapper.getClient(); // second call should return the same client
53+
// eslint-disable-next-line @typescript-eslint/unbound-method
54+
expect(client.close).not.toHaveBeenCalled();
55+
});
56+
57+
it('recreates the client if the gRPC channel is in TRANSIENT_FAILURE', () => {
58+
// create a mock client whose channel is in a bad state
59+
const badChannel = createMockChannel(ConnectivityState.TRANSIENT_FAILURE);
60+
const badClient = createMockGrpcClient(badChannel);
61+
62+
const factory = jest
63+
.fn<GrpcClientWithChannel, []>()
64+
.mockReturnValue(badClient);
65+
66+
const wrapper = new IdleGrpcClientWrapper<GrpcClientWithChannel>({
67+
clientFactoryFn: factory,
68+
loggerFactory,
69+
maxIdleMillis: 10000,
70+
});
71+
72+
// call getClient(), which should recreate the client due to bad channel
73+
wrapper.getClient();
74+
// factory should be called twice (initial + reconnection)
75+
expect(factory).toHaveBeenCalledTimes(2);
76+
// the first client should be closed
77+
// eslint-disable-next-line @typescript-eslint/unbound-method
78+
expect(badClient.close).toHaveBeenCalledTimes(1);
79+
});
80+
81+
it('recreates the client if it exceeds maxIdleMillis', () => {
82+
const initialChannel = createMockChannel(ConnectivityState.READY);
83+
const newChannel = createMockChannel(ConnectivityState.READY);
84+
85+
const initialClient = createMockGrpcClient(initialChannel);
86+
const newClient = createMockGrpcClient(newChannel);
87+
88+
const factory = jest
89+
.fn<GrpcClientWithChannel, []>()
90+
.mockReturnValueOnce(initialClient)
91+
.mockReturnValueOnce(newClient);
92+
93+
const wrapper = new IdleGrpcClientWrapper<GrpcClientWithChannel>({
94+
clientFactoryFn: factory,
95+
loggerFactory,
96+
maxIdleMillis: 10_000,
97+
maxClientAgeMillis: 60_000, // longer, so it won’t interfere
98+
});
99+
100+
// Initial client use
101+
const c1 = wrapper.getClient();
102+
expect(c1).toBe(initialClient);
103+
104+
// Advance past idle threshold only
105+
jest.advanceTimersByTime(15_000);
106+
107+
const c2 = wrapper.getClient();
108+
expect(c2).toBe(newClient);
109+
expect(factory).toHaveBeenCalledTimes(2);
110+
// eslint-disable-next-line @typescript-eslint/unbound-method
111+
expect(initialClient.close).toHaveBeenCalledTimes(1);
112+
});
113+
114+
it('recreates the client if it exceeds maxClientAgeMillis', () => {
115+
const initialChannel = createMockChannel(ConnectivityState.READY);
116+
const newChannel = createMockChannel(ConnectivityState.READY);
117+
118+
const initialClient = createMockGrpcClient(initialChannel);
119+
const newClient = createMockGrpcClient(newChannel);
120+
121+
const factory = jest
122+
.fn<GrpcClientWithChannel, []>()
123+
.mockReturnValueOnce(initialClient)
124+
.mockReturnValueOnce(newClient);
125+
126+
const wrapper = new IdleGrpcClientWrapper<GrpcClientWithChannel>({
127+
clientFactoryFn: factory,
128+
loggerFactory,
129+
maxIdleMillis: 60_000, // longer, so it won’t interfere
130+
maxClientAgeMillis: 10_000,
131+
});
132+
133+
// Initial use
134+
const c1 = wrapper.getClient();
135+
expect(c1).toBe(initialClient);
136+
137+
// Regular usage, but pass age threshold
138+
jest.advanceTimersByTime(12_000);
139+
140+
const c2 = wrapper.getClient();
141+
expect(c2).toBe(newClient);
142+
expect(factory).toHaveBeenCalledTimes(2);
143+
// eslint-disable-next-line @typescript-eslint/unbound-method
144+
expect(initialClient.close).toHaveBeenCalledTimes(1);
145+
});
146+
});

0 commit comments

Comments
 (0)