Skip to content

SharedWorker refactor #471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 216 additions & 45 deletions dist/web/pubnub.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions dist/web/pubnub.min.js

Large diffs are not rendered by default.

6,412 changes: 4,520 additions & 1,892 deletions dist/web/pubnub.worker.js

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions dist/web/pubnub.worker.min.js

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions lib/core/components/configuration.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ const makeConfiguration = (base, setupCryptoModule) => {
getUseRandomIVs() {
return base.useRandomIVs;
},
isSharedWorkerEnabled() {
// @ts-expect-error: Access field from web-based SDK configuration.
return base.sdkFamily === 'Web' && base['subscriptionWorkerUrl'];
},
getKeepPresenceChannelsInPresenceRequests() {
// @ts-expect-error: Access field from web-based SDK configuration.
return base.sdkFamily === 'Web' && base['subscriptionWorkerUrl'];
Expand Down
18 changes: 17 additions & 1 deletion lib/core/components/logger-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ class LoggerManager {
* @internal
*/
constructor(pubNubId, minLogLevel, loggers) {
/**
* Keeping track of previous entry timestamp.
*
* This information will be used to make sure that multiple sequential entries doesn't have same timestamp. Adjustment
* on .001 will be done to make it possible to properly stort entries.
*
* @internal
*/
this.previousEntryTimestamp = 0;
this.pubNubId = pubNubId;
this.minLogLevel = minLogLevel;
this.loggers = loggers;
Expand Down Expand Up @@ -100,8 +109,15 @@ class LoggerManager {
// Check whether a log message should be handled at all or not.
if (logLevel < this.minLogLevel || this.loggers.length === 0)
return;
const date = new Date();
if (date.getTime() <= this.previousEntryTimestamp) {
this.previousEntryTimestamp++;
date.setTime(this.previousEntryTimestamp);
}
else
this.previousEntryTimestamp = date.getTime();
const level = logger_1.LogLevel[logLevel].toLowerCase();
const message = Object.assign({ timestamp: new Date(), pubNubId: this.pubNubId, level: logLevel, minimumLevel: this.minLogLevel, location }, (typeof messageFactory === 'function' ? messageFactory() : { messageType: 'text', message: messageFactory }));
const message = Object.assign({ timestamp: date, pubNubId: this.pubNubId, level: logLevel, minimumLevel: this.minLogLevel, location }, (typeof messageFactory === 'function' ? messageFactory() : { messageType: 'text', message: messageFactory }));
this.loggers.forEach((logger) => logger[level](message));
}
}
Expand Down
54 changes: 37 additions & 17 deletions lib/core/components/subscription-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ var __importDefault = (this && this.__importDefault) || function (mod) {
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.SubscriptionManager = void 0;
const utils_1 = require("../utils");
const subscribe_1 = require("../endpoints/subscribe");
const utils_1 = require("../utils");
const reconnection_manager_1 = require("./reconnection_manager");
const categories_1 = __importDefault(require("../constants/categories"));
const deduping_manager_1 = require("./deduping_manager");
Expand All @@ -38,6 +38,13 @@ class SubscriptionManager {
this.subscribeCall = subscribeCall;
this.heartbeatCall = heartbeatCall;
this.leaveCall = leaveCall;
/**
* Whether user code in event handlers requested disconnection or not.
*
* Won't continue subscription loop if user requested disconnection/unsubscribe from all in response to received
* event.
*/
this.disconnectedWhileHandledEvent = false;
configuration.logger().trace('SubscriptionManager', 'Create manager.');
this.reconnectionManager = new reconnection_manager_1.ReconnectionManager(time);
this.dedupingManager = new deduping_manager_1.DedupingManager(this.configuration);
Expand Down Expand Up @@ -83,6 +90,9 @@ class SubscriptionManager {
// endregion
// region Subscription
disconnect() {
// Potentially called during received events handling.
// Mark to prevent subscription loop continuation in subscribe response handler.
this.disconnectedWhileHandledEvent = true;
this.stopSubscribeLoop();
this.stopHeartbeatTimer();
this.reconnectionManager.stopPolling();
Expand Down Expand Up @@ -168,6 +178,22 @@ class SubscriptionManager {
// There is no need to unsubscribe to empty list of data sources.
if (actualChannels.size === 0 && actualChannelGroups.size === 0)
return;
const lastTimetoken = this.lastTimetoken;
const currentTimetoken = this.currentTimetoken;
if (Object.keys(this.channels).length === 0 &&
Object.keys(this.presenceChannels).length === 0 &&
Object.keys(this.channelGroups).length === 0 &&
Object.keys(this.presenceChannelGroups).length === 0) {
this.lastTimetoken = '0';
this.currentTimetoken = '0';
this.referenceTimetoken = null;
this.storedTimetoken = null;
this.region = null;
this.reconnectionManager.stopPolling();
}
this.reconnect(true);
// Send leave request after long-poll connection closed and loop restarted (the same way as it happens in new
// subscription flow).
if (this.configuration.suppressLeaveEvents === false && !isOffline) {
channelGroups = Array.from(actualChannelGroups);
channels = Array.from(actualChannels);
Expand All @@ -183,23 +209,13 @@ class SubscriptionManager {
else if ('message' in status && typeof status.message === 'string')
errorMessage = status.message;
}
this.emitStatus(Object.assign(Object.assign({}, restOfStatus), { error: errorMessage !== null && errorMessage !== void 0 ? errorMessage : false, affectedChannels: channels, affectedChannelGroups: channelGroups, currentTimetoken: this.currentTimetoken, lastTimetoken: this.lastTimetoken }));
this.emitStatus(Object.assign(Object.assign({}, restOfStatus), { error: errorMessage !== null && errorMessage !== void 0 ? errorMessage : false, affectedChannels: channels, affectedChannelGroups: channelGroups, currentTimetoken,
lastTimetoken }));
});
}
if (Object.keys(this.channels).length === 0 &&
Object.keys(this.presenceChannels).length === 0 &&
Object.keys(this.channelGroups).length === 0 &&
Object.keys(this.presenceChannelGroups).length === 0) {
this.lastTimetoken = '0';
this.currentTimetoken = '0';
this.referenceTimetoken = null;
this.storedTimetoken = null;
this.region = null;
this.reconnectionManager.stopPolling();
}
this.reconnect(true);
}
unsubscribeAll(isOffline = false) {
this.disconnectedWhileHandledEvent = true;
this.unsubscribe({
channels: this.subscribedChannels,
channelGroups: this.subscribedChannelGroups,
Expand All @@ -214,6 +230,7 @@ class SubscriptionManager {
* @internal
*/
startSubscribeLoop(restartOnUnsubscribe = false) {
this.disconnectedWhileHandledEvent = false;
this.stopSubscribeLoop();
const channelGroups = [...Object.keys(this.channelGroups)];
const channels = [...Object.keys(this.channels)];
Expand All @@ -222,8 +239,8 @@ class SubscriptionManager {
// There is no need to start subscription loop for an empty list of data sources.
if (channels.length === 0 && channelGroups.length === 0)
return;
this.subscribeCall(Object.assign(Object.assign({ channels,
channelGroups, state: this.presenceState, heartbeat: this.configuration.getPresenceTimeout(), timetoken: this.currentTimetoken }, (this.region !== null ? { region: this.region } : {})), (this.configuration.filterExpression ? { filterExpression: this.configuration.filterExpression } : {})), (status, result) => {
this.subscribeCall(Object.assign(Object.assign(Object.assign({ channels,
channelGroups, state: this.presenceState, heartbeat: this.configuration.getPresenceTimeout(), timetoken: this.currentTimetoken }, (this.region !== null ? { region: this.region } : {})), (this.configuration.filterExpression ? { filterExpression: this.configuration.filterExpression } : {})), { onDemand: !this.subscriptionStatusAnnounced || restartOnUnsubscribe }), (status, result) => {
this.processSubscribeResponse(status, result);
});
if (!restartOnUnsubscribe && this.configuration.useSmartHeartbeat)
Expand Down Expand Up @@ -354,7 +371,10 @@ class SubscriptionManager {
this.emitStatus(errorStatus);
}
this.region = result.cursor.region;
this.startSubscribeLoop();
if (!this.disconnectedWhileHandledEvent)
this.startSubscribeLoop();
else
this.disconnectedWhileHandledEvent = false;
}
// endregion
// region Presence
Expand Down
7 changes: 7 additions & 0 deletions lib/core/constants/categories.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,12 @@ var StatusCategory;
* PubNub client unexpectedly disconnected from the real-time updates streams.
*/
StatusCategory["PNDisconnectedUnexpectedlyCategory"] = "PNDisconnectedUnexpectedlyCategory";
// --------------------------------------------------------
// ------------------ Shared worker events ----------------
// --------------------------------------------------------
/**
* SDK will announce when newer shared worker will be 'noticed'.
*/
StatusCategory["PNSharedWorkerUpdatedCategory"] = "PNSharedWorkerUpdatedCategory";
})(StatusCategory || (StatusCategory = {}));
exports.default = StatusCategory;
4 changes: 3 additions & 1 deletion lib/core/endpoints/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,10 @@ class SubscribeRequest extends BaseSubscribeRequest {
return `/v2/subscribe/${subscribeKey}/${(0, utils_1.encodeNames)((_a = channels === null || channels === void 0 ? void 0 : channels.sort()) !== null && _a !== void 0 ? _a : [], ',')}/0`;
}
get queryParameters() {
const { channelGroups, filterExpression, heartbeat, state, timetoken, region } = this.parameters;
const { channelGroups, filterExpression, heartbeat, state, timetoken, region, onDemand } = this.parameters;
const query = {};
if (onDemand)
query['on-demand'] = 1;
if (channelGroups && channelGroups.length > 0)
query['channel-group'] = channelGroups.sort().join(',');
if (filterExpression && filterExpression.length > 0)
Expand Down
5 changes: 4 additions & 1 deletion lib/core/endpoints/subscriptionUtils/handshake.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ class HandshakeSubscribeRequest extends subscribe_1.BaseSubscribeRequest {
return `/v2/subscribe/${subscribeKey}/${(0, utils_1.encodeNames)(channels.sort(), ',')}/0`;
}
get queryParameters() {
const { channelGroups, filterExpression, state } = this.parameters;
const { channelGroups, filterExpression, state, onDemand } = this
.parameters;
const query = { ee: '' };
if (onDemand)
query['on-demand'] = 1;
if (channelGroups && channelGroups.length > 0)
query['channel-group'] = channelGroups.sort().join(',');
if (filterExpression && filterExpression.length > 0)
Expand Down
5 changes: 4 additions & 1 deletion lib/core/endpoints/subscriptionUtils/receiveMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ class ReceiveMessagesSubscribeRequest extends subscribe_1.BaseSubscribeRequest {
return `/v2/subscribe/${subscribeKey}/${(0, utils_1.encodeNames)(channels.sort(), ',')}/0`;
}
get queryParameters() {
const { channelGroups, filterExpression, timetoken, region } = this.parameters;
const { channelGroups, filterExpression, timetoken, region, onDemand } = this
.parameters;
const query = { ee: '' };
if (onDemand)
query['on-demand'] = 1;
if (channelGroups && channelGroups.length > 0)
query['channel-group'] = channelGroups.sort().join(',');
if (filterExpression && filterExpression.length > 0)
Expand Down
22 changes: 16 additions & 6 deletions lib/core/pubnub-common.js
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ class PubNubCore {
* Change the current PubNub client user identifier.
*
* **Important:** Change won't affect ongoing REST API calls.
* **Warning:** Because ongoing REST API calls won't be canceled there could happen unexpected events like implicit
* `join` event for the previous `userId` after a long-poll subscribe request will receive a response. To avoid this
* it is advised to unsubscribe from all/disconnect before changing `userId`.
*
* @param value - New PubNub client user identifier.
*
Expand Down Expand Up @@ -1197,6 +1200,9 @@ class PubNubCore {
*/
makeSubscribe(parameters, callback) {
if (process.env.SUBSCRIBE_MANAGER_MODULE !== 'disabled') {
// `on-demand` query parameter not required for non-SharedWorker environment.
if (!this._configuration.isSharedWorkerEnabled())
parameters.onDemand = false;
const request = new subscribe_1.SubscribeRequest(Object.assign(Object.assign({}, parameters), { keySet: this._configuration.keySet, crypto: this._configuration.getCryptoModule(), getFileUrl: this.getFileUrl.bind(this) }));
this.sendRequest(request, (status, result) => {
var _a;
Expand Down Expand Up @@ -1358,6 +1364,9 @@ class PubNubCore {
subscribeHandshake(parameters) {
return __awaiter(this, void 0, void 0, function* () {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
// `on-demand` query parameter not required for non-SharedWorker environment.
if (!this._configuration.isSharedWorkerEnabled())
parameters.onDemand = false;
const request = new handshake_1.HandshakeSubscribeRequest(Object.assign(Object.assign({}, parameters), { keySet: this._configuration.keySet, crypto: this._configuration.getCryptoModule(), getFileUrl: this.getFileUrl.bind(this) }));
const abortUnsubscribe = parameters.abortSignal.subscribe((err) => {
request.abort('Cancel subscribe handshake request');
Expand Down Expand Up @@ -1388,6 +1397,9 @@ class PubNubCore {
subscribeReceiveMessages(parameters) {
return __awaiter(this, void 0, void 0, function* () {
if (process.env.SUBSCRIBE_EVENT_ENGINE_MODULE !== 'disabled') {
// `on-demand` query parameter not required for non-SharedWorker environment.
if (!this._configuration.isSharedWorkerEnabled())
parameters.onDemand = false;
const request = new receiveMessages_1.ReceiveMessagesSubscribeRequest(Object.assign(Object.assign({}, parameters), { keySet: this._configuration.keySet, crypto: this._configuration.getCryptoModule(), getFileUrl: this.getFileUrl.bind(this) }));
const abortUnsubscribe = parameters.abortSignal.subscribe((err) => {
request.abort('Cancel long-poll subscribe request');
Expand Down Expand Up @@ -1879,12 +1891,10 @@ class PubNubCore {
// Filtering out presence channels and groups.
let { channels, channelGroups } = parameters;
// Remove `-pnpres` channels / groups if they not acceptable in the current PubNub client configuration.
if (!this._configuration.getKeepPresenceChannelsInPresenceRequests()) {
if (channelGroups)
channelGroups = channelGroups.filter((channelGroup) => !channelGroup.endsWith('-pnpres'));
if (channels)
channels = channels.filter((channel) => !channel.endsWith('-pnpres'));
}
if (channelGroups)
channelGroups = channelGroups.filter((channelGroup) => !channelGroup.endsWith('-pnpres'));
if (channels)
channels = channels.filter((channel) => !channel.endsWith('-pnpres'));
// Complete immediately request only for presence channels.
if ((channelGroups !== null && channelGroups !== void 0 ? channelGroups : []).length === 0 && (channels !== null && channels !== void 0 ? channels : []).length === 0) {
const responseStatus = {
Expand Down
3 changes: 2 additions & 1 deletion lib/event-engine/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class EventEngineDispatcher extends core_1.Dispatcher {
this.on(effects.handshake.type, (0, core_1.asyncHandler)((payload_1, abortSignal_1, _a) => __awaiter(this, [payload_1, abortSignal_1, _a], void 0, function* (payload, abortSignal, { handshake, presenceState, config }) {
abortSignal.throwIfAborted();
try {
const result = yield handshake(Object.assign({ abortSignal: abortSignal, channels: payload.channels, channelGroups: payload.groups, filterExpression: config.filterExpression }, (config.maintainPresenceState && { state: presenceState })));
const result = yield handshake(Object.assign(Object.assign({ abortSignal: abortSignal, channels: payload.channels, channelGroups: payload.groups, filterExpression: config.filterExpression }, (config.maintainPresenceState && { state: presenceState })), { onDemand: payload.onDemand }));
return engine.transition(events.handshakeSuccess(result));
}
catch (e) {
Expand All @@ -90,6 +90,7 @@ class EventEngineDispatcher extends core_1.Dispatcher {
timetoken: payload.cursor.timetoken,
region: payload.cursor.region,
filterExpression: config.filterExpression,
onDemand: payload.onDemand,
});
engine.transition(events.receiveSuccess(result.cursor, result.messages));
}
Expand Down
10 changes: 8 additions & 2 deletions lib/event-engine/effects.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ const core_1 = require("./core");
*
* @internal
*/
exports.handshake = (0, core_1.createManagedEffect)('HANDSHAKE', (channels, groups) => ({
exports.handshake = (0, core_1.createManagedEffect)('HANDSHAKE', (channels, groups, onDemand) => ({
channels,
groups,
onDemand,
}));
/**
* Real-time updates receive effect.
Expand All @@ -26,7 +27,12 @@ exports.handshake = (0, core_1.createManagedEffect)('HANDSHAKE', (channels, grou
*
* @internal
*/
exports.receiveMessages = (0, core_1.createManagedEffect)('RECEIVE_MESSAGES', (channels, groups, cursor) => ({ channels, groups, cursor }));
exports.receiveMessages = (0, core_1.createManagedEffect)('RECEIVE_MESSAGES', (channels, groups, cursor, onDemand) => ({
channels,
groups,
cursor,
onDemand,
}));
/**
* Emit real-time updates effect.
*
Expand Down
10 changes: 8 additions & 2 deletions lib/event-engine/states/handshake_failed.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ exports.HandshakeFailedState = new state_1.State('HANDSHAKE_FAILED');
exports.HandshakeFailedState.on(events_1.subscriptionChange.type, (context, { payload }) => {
if (payload.channels.length === 0 && payload.groups.length === 0)
return unsubscribed_1.UnsubscribedState.with(undefined);
return handshaking_1.HandshakingState.with({ channels: payload.channels, groups: payload.groups, cursor: context.cursor });
return handshaking_1.HandshakingState.with({
channels: payload.channels,
groups: payload.groups,
cursor: context.cursor,
onDemand: true,
});
});
exports.HandshakeFailedState.on(events_1.reconnect.type, (context, { payload }) => handshaking_1.HandshakingState.with(Object.assign(Object.assign({}, context), { cursor: payload.cursor || context.cursor })));
exports.HandshakeFailedState.on(events_1.reconnect.type, (context, { payload }) => handshaking_1.HandshakingState.with(Object.assign(Object.assign({}, context), { cursor: payload.cursor || context.cursor, onDemand: true })));
exports.HandshakeFailedState.on(events_1.restore.type, (context, { payload }) => {
var _a, _b;
if (payload.channels.length === 0 && payload.groups.length === 0)
Expand All @@ -36,6 +41,7 @@ exports.HandshakeFailedState.on(events_1.restore.type, (context, { payload }) =>
timetoken: `${payload.cursor.timetoken}`,
region: payload.cursor.region ? payload.cursor.region : ((_b = (_a = context === null || context === void 0 ? void 0 : context.cursor) === null || _a === void 0 ? void 0 : _a.region) !== null && _b !== void 0 ? _b : 0),
},
onDemand: true,
});
});
exports.HandshakeFailedState.on(events_1.unsubscribeAll.type, (_) => unsubscribed_1.UnsubscribedState.with());
Loading