diff --git a/config/config.devnet.yaml b/config/config.devnet.yaml index 94a330119..de8480d00 100644 --- a/config/config.devnet.yaml +++ b/config/config.devnet.yaml @@ -8,7 +8,7 @@ api: websocket: true cron: cacheWarmer: true - fastWarm: true + fastWarm: false queueWorker: true elasticUpdater: false flags: @@ -20,6 +20,9 @@ flags: processNfts: true collectionPropertiesFromGateway: false features: + websocketSubscription: + enabled: false + port: 6002 eventsNotifier: enabled: false port: 5674 @@ -65,7 +68,7 @@ features: cronExpression: '*/5 * * * * *' activationEpoch: 1043 chainAndromeda: - enabled: false + enabled: true activationEpoch: 4 nodeEpochsLeft: enabled: false diff --git a/config/config.e2e-mocked.mainnet.yaml b/config/config.e2e-mocked.mainnet.yaml index ef1cd3eed..83d35ceac 100644 --- a/config/config.e2e-mocked.mainnet.yaml +++ b/config/config.e2e-mocked.mainnet.yaml @@ -5,6 +5,9 @@ api: private: true graphql: true features: + websocketSubscription: + enabled: false + port: 6002 dataApi: enabled: false serviceUrl: 'https://data-api.multiversx.com' diff --git a/config/config.e2e.mainnet.yaml b/config/config.e2e.mainnet.yaml index b4c431ec0..9cdb71bc6 100644 --- a/config/config.e2e.mainnet.yaml +++ b/config/config.e2e.mainnet.yaml @@ -20,6 +20,9 @@ flags: processNfts: true collectionPropertiesFromGateway: false features: + websocketSubscription: + enabled: false + port: 6002 eventsNotifier: enabled: false port: 5674 @@ -63,7 +66,7 @@ features: cronExpression: '*/5 * * * * *' activationEpoch: 1391 chainAndromeda: - enabled: false + enabled: true activationEpoch: 4 nodeEpochsLeft: enabled: false diff --git a/config/config.mainnet.yaml b/config/config.mainnet.yaml index 52b6cd891..9e9bc7250 100644 --- a/config/config.mainnet.yaml +++ b/config/config.mainnet.yaml @@ -20,6 +20,9 @@ flags: processNfts: true collectionPropertiesFromGateway: false features: + websocketSubscription: + enabled: false + port: 6002 eventsNotifier: enabled: false port: 5674 diff --git a/config/config.testnet.yaml b/config/config.testnet.yaml index 1f1887414..e0353f514 100644 --- a/config/config.testnet.yaml +++ b/config/config.testnet.yaml @@ -20,6 +20,9 @@ flags: processNfts: true collectionPropertiesFromGateway: false features: + websocketSubscription: + enabled: false + port: 6002 eventsNotifier: enabled: false port: 5674 @@ -62,7 +65,7 @@ features: cronExpression: '*/5 * * * * *' activationEpoch: 1043 chainAndromeda: - enabled: false + enabled: true activationEpoch: 4 nodeEpochsLeft: enabled: false diff --git a/package-lock.json b/package-lock.json index d8ed257b7..f964e0d74 100644 --- a/package-lock.json +++ b/package-lock.json @@ -43,6 +43,8 @@ "apollo-server-core": "^3.13.0", "apollo-server-express": "3.13.0", "bignumber.js": "^9.0.2", + "class-transformer": "^0.5.1", + "class-validator": "^0.14.2", "compression": "^1.8.0", "crypto-js": "^4.1.1", "dataloader": "^2.2.2", @@ -69,6 +71,8 @@ "rxjs": "^7.1.0", "sharp": "^0.34.2", "simple-git": "^3.16.0", + "socket.io": "^4.8.1", + "socket.io-client": "^4.8.1", "swagger-ui-express": "^4.3.0", "tiny-async-pool": "^1.2.0", "typeorm": "^0.3.25", @@ -6310,6 +6314,12 @@ "integrity": "sha512-jg+97EGIcY9AGHJJRaaPVgetKDsrTgbRjQ5Msgjh/DQKEFl0DtyRr/VCOyD1T2R1MNeWPK/u7JoGhlDZnKBAfA==", "license": "MIT" }, + "node_modules/@types/validator": { + "version": "13.15.2", + "resolved": "https://registry.npmjs.org/@types/validator/-/validator-13.15.2.tgz", + "integrity": "sha512-y7pa/oEJJ4iGYBxOpfAKn5b9+xuihvzDVnC/OSvlVnGxVg0pOqmjiMafiJ1KVNQEaPZf9HsEp5icEwGg8uIe5Q==", + "license": "MIT" + }, "node_modules/@types/webidl-conversions": { "version": "7.0.3", "resolved": "https://registry.npmjs.org/@types/webidl-conversions/-/webidl-conversions-7.0.3.tgz", @@ -8237,6 +8247,23 @@ "integrity": "sha512-9z8TZaGM1pfswYeXrUpzPrkx8UnWYdhJclsiYMm6x/w5+nN+8Tf/LnAgfLGQCm59qAOxU8WwHEq2vNwF6i4j+Q==", "license": "MIT" }, + "node_modules/class-transformer": { + "version": "0.5.1", + "resolved": "https://registry.npmjs.org/class-transformer/-/class-transformer-0.5.1.tgz", + "integrity": "sha512-SQa1Ws6hUbfC98vKGxZH3KFY0Y1lm5Zm0SY8XX9zbK7FJCyVEac3ATW0RIpwzW+oOfmHE5PMPufDG9hCfoEOMw==", + "license": "MIT" + }, + "node_modules/class-validator": { + "version": "0.14.2", + "resolved": "https://registry.npmjs.org/class-validator/-/class-validator-0.14.2.tgz", + "integrity": "sha512-3kMVRF2io8N8pY1IFIXlho9r8IPUUIfHe2hYVtiebvAzU2XeQFXTv+XI4WX+TnXmtwXMDcjngcpkiPM0O9PvLw==", + "license": "MIT", + "dependencies": { + "@types/validator": "^13.11.8", + "libphonenumber-js": "^1.11.1", + "validator": "^13.9.0" + } + }, "node_modules/cli-color": { "version": "2.0.4", "resolved": "https://registry.npmjs.org/cli-color/-/cli-color-2.0.4.tgz", @@ -9287,6 +9314,57 @@ "node": ">=10.2.0" } }, + "node_modules/engine.io-client": { + "version": "6.6.3", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.6.3.tgz", + "integrity": "sha512-T0iLjnyNWahNyv/lcjS2y4oE358tVS/SYQNxYXGAJ9/GLgH4VCvOQ/mhTjqU88mLZCQgiG8RIegFHYCdVC+j5w==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1", + "xmlhttprequest-ssl": "~2.1.1" + } + }, + "node_modules/engine.io-client/node_modules/debug": { + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/engine.io-client/node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/engine.io-parser": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz", @@ -12367,6 +12445,12 @@ "node": ">= 0.8.0" } }, + "node_modules/libphonenumber-js": { + "version": "1.12.13", + "resolved": "https://registry.npmjs.org/libphonenumber-js/-/libphonenumber-js-1.12.13.tgz", + "integrity": "sha512-QZXnR/OGiDcBjF4hGk0wwVrPcZvbSSyzlvkjXv5LFfktj7O2VZDrt4Xs8SgR/vOFco+qk1i8J43ikMXZoTrtPw==", + "license": "MIT" + }, "node_modules/limiter": { "version": "1.1.5", "resolved": "https://registry.npmjs.org/limiter/-/limiter-1.1.5.tgz", @@ -14946,6 +15030,38 @@ } } }, + "node_modules/socket.io-client": { + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.8.1.tgz", + "integrity": "sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.6.1", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-client/node_modules/debug": { + "version": "4.3.7", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.7.tgz", + "integrity": "sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, "node_modules/socket.io-parser": { "version": "4.2.4", "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", @@ -16372,6 +16488,15 @@ "node": ">=10.12.0" } }, + "node_modules/validator": { + "version": "13.15.15", + "resolved": "https://registry.npmjs.org/validator/-/validator-13.15.15.tgz", + "integrity": "sha512-BgWVbCI72aIQy937xbawcs+hrVaN/CZ2UwutgaJ36hGqRrLNM+f5LUT/YPRbo8IV/ASeFzXszezV+y2+rq3l8A==", + "license": "MIT", + "engines": { + "node": ">= 0.10" + } + }, "node_modules/value-or-promise": { "version": "1.0.12", "resolved": "https://registry.npmjs.org/value-or-promise/-/value-or-promise-1.0.12.tgz", @@ -16755,6 +16880,14 @@ } } }, + "node_modules/xmlhttprequest-ssl": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.1.2.tgz", + "integrity": "sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==", + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/xss": { "version": "1.0.15", "resolved": "https://registry.npmjs.org/xss/-/xss-1.0.15.tgz", diff --git a/package.json b/package.json index d9bc0daf4..dbbad71f9 100644 --- a/package.json +++ b/package.json @@ -113,11 +113,13 @@ "@sendgrid/mail": "^8.1.5", "agentkeepalive": "^4.2.1", "amqp-connection-manager": "^4.1.3", - "anchorme": "^3.0.8", "amqplib": "^0.10.0", + "anchorme": "^3.0.8", "apollo-server-core": "^3.13.0", "apollo-server-express": "3.13.0", "bignumber.js": "^9.0.2", + "class-transformer": "^0.5.1", + "class-validator": "^0.14.2", "compression": "^1.8.0", "crypto-js": "^4.1.1", "dataloader": "^2.2.2", @@ -144,6 +146,8 @@ "rxjs": "^7.1.0", "sharp": "^0.34.2", "simple-git": "^3.16.0", + "socket.io": "^4.8.1", + "socket.io-client": "^4.8.1", "swagger-ui-express": "^4.3.0", "tiny-async-pool": "^1.2.0", "typeorm": "^0.3.25", @@ -216,4 +220,4 @@ "node_modules" ] } -} \ No newline at end of file +} diff --git a/src/common/api-config/api.config.service.ts b/src/common/api-config/api.config.service.ts index 77c400332..e157c943a 100644 --- a/src/common/api-config/api.config.service.ts +++ b/src/common/api-config/api.config.service.ts @@ -954,4 +954,22 @@ export class ApiConfigService { getCompressionChunkSize(): number { return this.configService.get('compression.chunkSize') ?? 16384; } + + getIsWebsocketSubscriptionActive(): boolean { + const isWebsocketSubscriptionActive = this.configService.get('features.websocketSubscription.enabled'); + if (isWebsocketSubscriptionActive === undefined) { + throw new Error('No features.websocketSubscription.enabled flag present'); + } + + return isWebsocketSubscriptionActive; + } + + getWebsocketSubscriptionPort(): number { + const port = this.configService.get('features.websocketSubscription.port'); + if (port === undefined) { + throw new Error('No features.websocketSubscription.port present'); + } + + return port; + } } diff --git a/src/common/metrics/api.metrics.service.ts b/src/common/metrics/api.metrics.service.ts index eea34676a..ff2d24260 100644 --- a/src/common/metrics/api.metrics.service.ts +++ b/src/common/metrics/api.metrics.service.ts @@ -22,6 +22,7 @@ export class ApiMetricsService { private static transactionsCompletedCounter: Counter; private static transactionsPendingResultsCounter: Counter; private static batchUpdatesCounter: Counter; + private static subscriptionsConnectionsGauge: Gauge; constructor( private readonly apiConfigService: ApiConfigService, @@ -32,6 +33,13 @@ export class ApiMetricsService { private readonly metricsService: MetricsService, ) { + if (!ApiMetricsService.subscriptionsConnectionsGauge) { + ApiMetricsService.subscriptionsConnectionsGauge = new Gauge({ + name: 'websocket_subscriptions_connections', + help: 'Number of websocket connections for subscriptions', + }); + } + if (!ApiMetricsService.vmQueriesHistogram) { ApiMetricsService.vmQueriesHistogram = new Histogram({ name: 'vm_query', @@ -182,6 +190,14 @@ export class ApiMetricsService { ApiMetricsService.lastProcessedTransactionCompletedProcessorNonce.set({ shardId }, nonce); } + @OnEvent(MetricsEvents.SetWebsocketMetrics) + setWebsocketSubscriptionsMetrics(payload: { connectedClients: number }) { + const { connectedClients } = payload; + + ApiMetricsService.subscriptionsConnectionsGauge.set(connectedClients); + } + + @OnEvent(MetricsEvents.SetTransactionsCompleted) recordTransactionsCompleted(payload: { transactions: any[] }) { ApiMetricsService.transactionsCompletedCounter.inc(payload.transactions.length); diff --git a/src/crons/websocket/blocks.gateway.ts b/src/crons/websocket/blocks.gateway.ts new file mode 100644 index 000000000..40cfd5038 --- /dev/null +++ b/src/crons/websocket/blocks.gateway.ts @@ -0,0 +1,70 @@ +import { WebSocketGateway, WebSocketServer, SubscribeMessage, MessageBody, ConnectedSocket } from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { BlockService } from '../../endpoints/blocks/block.service'; +import { BlockFilter } from '../../endpoints/blocks/entities/block.filter'; +import { QueryPagination } from 'src/common/entities/query.pagination'; +import { BlockSubscribePayload } from '../../endpoints/blocks/entities/block.subscribe'; +import { UseFilters } from '@nestjs/common'; +import { WebsocketExceptionsFilter } from 'src/utils/ws-exceptions.filter'; +import { WsValidationPipe } from 'src/utils/ws-validation.pipe'; +import { OriginLogger } from '@multiversx/sdk-nestjs-common'; + +@UseFilters(WebsocketExceptionsFilter) +@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' }) +export class BlocksGateway { + private readonly logger = new OriginLogger(BlocksGateway.name); + + @WebSocketServer() + server!: Server; + + constructor(private readonly blockService: BlockService) { } + + @SubscribeMessage('subscribeBlocks') + async handleSubscription( + @ConnectedSocket() client: Socket, + @MessageBody(new WsValidationPipe()) payload: BlockSubscribePayload + ) { + const filterIdentifier = JSON.stringify(payload); + await client.join(`blocks-${filterIdentifier}`); + + return { status: 'success' }; + } + + async pushBlocksForRoom(roomName: string): Promise { + if (!roomName.startsWith("blocks-")) return; + + try { + const filterIdentifier = roomName.replace("blocks-", ""); + const filter: BlockSubscribePayload = JSON.parse(filterIdentifier); + + const blockFilter = new BlockFilter({ + shard: filter.shard, + order: filter.order, + }); + + const [blocks, blocksCount] = await Promise.all([ + this.blockService.getBlocks( + blockFilter, + new QueryPagination({ from: filter.from, size: filter.size }), + filter.withProposerIdentity, + ), + this.blockService.getBlocksCount(blockFilter), + ]); + + this.server.to(roomName).emit("blocksUpdate", { blocks, blocksCount }); + } catch (error) { + this.logger.error(error); + } + } + + async pushBlocks(): Promise { + const promises: Promise[] = []; + + for (const [roomName] of this.server.sockets.adapter.rooms) { + promises.push(this.pushBlocksForRoom(roomName)); + } + + await Promise.all(promises); + } +} + diff --git a/src/crons/websocket/connection.handler.ts b/src/crons/websocket/connection.handler.ts new file mode 100644 index 000000000..b8f50ed53 --- /dev/null +++ b/src/crons/websocket/connection.handler.ts @@ -0,0 +1,20 @@ +import { UseFilters } from "@nestjs/common"; +import { OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, WebSocketGateway, WebSocketServer } from "@nestjs/websockets"; +import { Socket, Server } from "socket.io"; +import { WebsocketExceptionsFilter } from "src/utils/ws-exceptions.filter"; + +@UseFilters(WebsocketExceptionsFilter) +@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' }) +export class ConnectionHandler implements OnGatewayDisconnect, OnGatewayConnection, OnGatewayInit { + + @WebSocketServer() + server!: Server; + + afterInit(__server: Server) { } + + handleDisconnect(_client: Socket) { } + + handleConnection(client: Socket, ..._args: any[]) { + client.setMaxListeners(12); + } +} diff --git a/src/crons/websocket/events.gateway.ts b/src/crons/websocket/events.gateway.ts new file mode 100644 index 000000000..c95e982e0 --- /dev/null +++ b/src/crons/websocket/events.gateway.ts @@ -0,0 +1,70 @@ +import { WebSocketGateway, WebSocketServer, SubscribeMessage, MessageBody, ConnectedSocket } from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { UseFilters } from '@nestjs/common'; +import { WebsocketExceptionsFilter } from 'src/utils/ws-exceptions.filter'; +import { WsValidationPipe } from 'src/utils/ws-validation.pipe'; +import { OriginLogger } from '@multiversx/sdk-nestjs-common'; +import { EventsService } from '../../endpoints/events/events.service'; +import { EventsFilter } from '../../endpoints/events/entities/events.filter'; +import { EventsSubscribePayload } from '../../endpoints/events/entities/events.subscribe'; +import { QueryPagination } from 'src/common/entities/query.pagination'; + +@UseFilters(WebsocketExceptionsFilter) +@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' }) +export class EventsGateway { + private readonly logger = new OriginLogger(EventsGateway.name); + + @WebSocketServer() + server!: Server; + + constructor(private readonly eventsService: EventsService) { } + + @SubscribeMessage('subscribeEvents') + async handleSubscription( + @ConnectedSocket() client: Socket, + @MessageBody(new WsValidationPipe()) payload: EventsSubscribePayload, + ) { + const filterIdentifier = JSON.stringify(payload); + await client.join(`events-${filterIdentifier}`); + + return { status: 'success' }; + } + + async pushEventsForRoom(roomName: string): Promise { + if (!roomName.startsWith("events-")) return; + + try { + const filterIdentifier = roomName.replace("events-", ""); + const filter: EventsSubscribePayload = JSON.parse(filterIdentifier); + + const eventsFilter = new EventsFilter({ + shard: filter.shard, + }); + + const [events, eventsCount] = await Promise.all([ + this.eventsService.getEvents( + new QueryPagination({ + from: filter.from || 0, + size: filter.size || 25, + }), + eventsFilter, + ), + this.eventsService.getEventsCount(eventsFilter), + ]); + + this.server.to(roomName).emit("eventsUpdate", { events, eventsCount }); + } catch (error) { + this.logger.error(error); + } + } + + async pushEvents(): Promise { + const promises: Promise[] = []; + + for (const [roomName] of this.server.sockets.adapter.rooms) { + promises.push(this.pushEventsForRoom(roomName)); + } + + await Promise.all(promises); + } +} diff --git a/src/crons/websocket/network.gateway.ts b/src/crons/websocket/network.gateway.ts new file mode 100644 index 000000000..5618de162 --- /dev/null +++ b/src/crons/websocket/network.gateway.ts @@ -0,0 +1,33 @@ +import { WebSocketGateway, WebSocketServer, SubscribeMessage } from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { NetworkService } from '../../endpoints/network/network.service'; +import { UseFilters } from '@nestjs/common'; +import { WebsocketExceptionsFilter } from 'src/utils/ws-exceptions.filter'; +import { OriginLogger } from '@multiversx/sdk-nestjs-common'; + +@UseFilters(WebsocketExceptionsFilter) +@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' }) +export class NetworkGateway { + private readonly logger = new OriginLogger(NetworkGateway.name); + + @WebSocketServer() + server!: Server; + + constructor(private readonly networkService: NetworkService) { } + + @SubscribeMessage('subscribeStats') + async handleSubscription(client: Socket) { + await client.join('statsRoom'); + } + + async pushStats() { + if (this.server.sockets.adapter.rooms.has('statsRoom')) { + try { + const stats = await this.networkService.getStats(); + this.server.to('statsRoom').emit('statsUpdate', stats); + } catch (error) { + this.logger.error(error); + } + } + } +} diff --git a/src/crons/websocket/pool.gateway.ts b/src/crons/websocket/pool.gateway.ts new file mode 100644 index 000000000..426e14e10 --- /dev/null +++ b/src/crons/websocket/pool.gateway.ts @@ -0,0 +1,72 @@ +import { WebSocketGateway, WebSocketServer, SubscribeMessage, MessageBody, ConnectedSocket } from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { UseFilters } from '@nestjs/common'; +import { WebsocketExceptionsFilter } from 'src/utils/ws-exceptions.filter'; +import { WsValidationPipe } from 'src/utils/ws-validation.pipe'; +import { OriginLogger } from '@multiversx/sdk-nestjs-common'; + +import { PoolService } from '../../endpoints/pool/pool.service'; +import { PoolFilter } from '../../endpoints/pool/entities/pool.filter'; +import { QueryPagination } from 'src/common/entities/query.pagination'; +import { PoolSubscribePayload } from '../../endpoints/pool/entities/pool.subscribe'; + +@UseFilters(WebsocketExceptionsFilter) +@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' }) +export class PoolGateway { + private readonly logger = new OriginLogger(PoolGateway.name); + + @WebSocketServer() + server!: Server; + + constructor(private readonly poolService: PoolService) { } + + @SubscribeMessage('subscribePool') + async handleSubscription( + @ConnectedSocket() client: Socket, + @MessageBody(new WsValidationPipe()) payload: PoolSubscribePayload, + ) { + const filterIdentifier = JSON.stringify(payload); + await client.join(`pool-${filterIdentifier}`); + + return { status: 'success' }; + } + + async pushPoolForRoom(roomName: string): Promise { + if (!roomName.startsWith("pool-")) return; + + try { + const filterIdentifier = roomName.replace("pool-", ""); + const filter: PoolSubscribePayload = JSON.parse(filterIdentifier); + + const poolFilter = new PoolFilter({ + type: filter.type, + }); + + const [pool, poolCount] = await Promise.all([ + this.poolService.getPool( + new QueryPagination({ + from: filter.from, + size: filter.size, + }), + poolFilter, + ), + this.poolService.getPoolCount(poolFilter), + ]); + + this.server.to(roomName).emit("poolUpdate", { pool, poolCount }); + } catch (error) { + this.logger.error(error); + } + } + + async pushPool(): Promise { + const promises: Promise[] = []; + + for (const [roomName] of this.server.sockets.adapter.rooms) { + promises.push(this.pushPoolForRoom(roomName)); + } + + await Promise.all(promises); + } + +} diff --git a/src/crons/websocket/transaction.gateway.ts b/src/crons/websocket/transaction.gateway.ts new file mode 100644 index 000000000..85c3a76d1 --- /dev/null +++ b/src/crons/websocket/transaction.gateway.ts @@ -0,0 +1,123 @@ +import { WebSocketGateway, WebSocketServer, SubscribeMessage, ConnectedSocket, MessageBody } from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { TransactionService } from '../../endpoints/transactions/transaction.service'; +import { TransactionFilter } from '../../endpoints/transactions/entities/transaction.filter'; +import { QueryPagination } from 'src/common/entities/query.pagination'; +import { TransactionQueryOptions } from '../../endpoints/transactions/entities/transactions.query.options'; +import { WsValidationPipe } from 'src/utils/ws-validation.pipe'; +import { TransactionSubscribePayload } from '../../endpoints/transactions/entities/dtos/transaction.subscribe'; +import { WebsocketExceptionsFilter } from 'src/utils/ws-exceptions.filter'; +import { UseFilters } from '@nestjs/common'; +import { OriginLogger } from '@multiversx/sdk-nestjs-common'; + +@UseFilters(WebsocketExceptionsFilter) +@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' }) +export class TransactionsGateway { + private readonly logger = new OriginLogger(TransactionsGateway.name); + + @WebSocketServer() + server!: Server; + + constructor(private readonly transactionService: TransactionService) { } + + @SubscribeMessage('subscribeTransactions') + async handleSubscription( + @ConnectedSocket() client: Socket, + @MessageBody(new WsValidationPipe()) payload: TransactionSubscribePayload) { + // If one of these methods throw an exception then the subscription will not be successful + TransactionQueryOptions.applyDefaultOptions(payload.size || 25, { + withScResults: payload.withScResults, + withOperations: payload.withOperations, + withLogs: payload.withLogs, + withScamInfo: payload.withScamInfo, + withUsername: payload.withUsername, + withBlockInfo: payload.withBlockInfo, + withActionTransferValue: payload.withActionTransferValue, + }); + + const transactionFilter = new TransactionFilter({ + order: payload.order, + isRelayed: payload.isRelayed, + isScCall: payload.isScCall, + withRelayedScresults: payload.withRelayedScresults, + }); + + TransactionFilter.validate(transactionFilter, payload.size || 25); + + const filterIdentifier = JSON.stringify(payload); + await client.join(`tx-${filterIdentifier}`); + + return { status: 'success' }; + } + + async pushTransactionsForRoom(roomName: string): Promise { + if (!roomName.startsWith("tx-")) return; + + try { + const filterIdentifier = roomName.replace("tx-", ""); + const filter = JSON.parse(filterIdentifier); + + const options = TransactionQueryOptions.applyDefaultOptions( + filter.size || 25, + { + withScResults: filter.withScResults, + withOperations: filter.withOperations, + withLogs: filter.withLogs, + withScamInfo: filter.withScamInfo, + withUsername: filter.withUsername, + withBlockInfo: filter.withBlockInfo, + withActionTransferValue: filter.withActionTransferValue, + }, + ); + + const transactionFilter = new TransactionFilter({ + sender: filter.sender, + receivers: filter.receiver, + token: filter.token, + functions: filter.functions, + senderShard: filter.senderShard, + receiverShard: filter.receiverShard, + miniBlockHash: filter.miniBlockHash, + hashes: filter.hashes, + status: filter.status, + before: filter.before, + after: filter.after, + condition: filter.condition, + order: filter.order, + relayer: filter.relayer, + isRelayed: filter.isRelayed, + isScCall: filter.isScCall, + round: filter.round, + withRelayedScresults: filter.withRelayedScresults, + }); + + TransactionFilter.validate(transactionFilter, filter.size || 25); + + const [transactions, transactionsCount] = await Promise.all([ + this.transactionService.getTransactions( + transactionFilter, + new QueryPagination({ from: filter.from || 0, size: filter.size || 25 }), + options, + undefined, + filter.fields || [], + ), + this.transactionService.getTransactionCount(transactionFilter), + ]); + + this.server.to(roomName).emit("transactionUpdate", { transactions, transactionsCount }); + } catch (error) { + this.logger.error(error); + } + } + + async pushTransactions(): Promise { + const promises: Promise[] = []; + + for (const [roomName] of this.server.sockets.adapter.rooms) { + promises.push(this.pushTransactionsForRoom(roomName)); + } + + await Promise.all(promises); + } + +} diff --git a/src/crons/websocket/websocket.cron.service.ts b/src/crons/websocket/websocket.cron.service.ts new file mode 100644 index 000000000..9aae69097 --- /dev/null +++ b/src/crons/websocket/websocket.cron.service.ts @@ -0,0 +1,72 @@ +import { Injectable } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { TransactionsGateway } from './transaction.gateway'; +import { BlocksGateway } from 'src/crons/websocket/blocks.gateway'; +import { NetworkGateway } from 'src/crons/websocket/network.gateway'; +import { Lock } from "@multiversx/sdk-nestjs-common"; +import { PoolGateway } from 'src/crons/websocket/pool.gateway'; +import { EventsGateway } from 'src/crons/websocket/events.gateway'; +import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { MetricsEvents } from 'src/utils/metrics-events.constants'; +import { Server } from 'socket.io'; +@Injectable() +@WebSocketGateway({ cors: { origin: '*' }, path: '/ws/subscription' }) +export class WebsocketCronService { + @WebSocketServer() + server!: Server; + + constructor( + private readonly transactionsGateway: TransactionsGateway, + private readonly blocksGateway: BlocksGateway, + private readonly networkGateway: NetworkGateway, + private readonly poolGateway: PoolGateway, + private readonly eventsGateway: EventsGateway, + private readonly eventEmitter: EventEmitter2, + ) { } + + @Cron('*/1 * * * * *') + handleWebsocketMetrics() { + const connectedClients = this.server.sockets.sockets.size ?? 0; + // TODO: add more metrics in the future + // const subscriptions: Record = {}; + + // this.server.sockets.adapter.rooms.forEach((socketsSet, roomName) => { + // subscriptions[roomName] = socketsSet.size; + // }); + + this.eventEmitter.emit(MetricsEvents.SetWebsocketMetrics, { + connectedClients, + }); + } + + @Cron('*/6 * * * * *') + @Lock({ name: 'Push transactions to subscribers', verbose: true }) + async handleTransactionsUpdate() { + await this.transactionsGateway.pushTransactions(); + } + + @Cron('*/6 * * * * *') + @Lock({ name: 'Push blocks to subscribers', verbose: true }) + async handleBlocksUpdate() { + await this.blocksGateway.pushBlocks(); + } + + @Cron('*/6 * * * * *') + @Lock({ name: 'Push stats to subscribers', verbose: true }) + async handleStatsUpdate() { + await this.networkGateway.pushStats(); + } + + @Cron('*/6 * * * * *') + @Lock({ name: 'Push pool transactions to subscribers', verbose: true }) + async handlePoolTransactions() { + await this.poolGateway.pushPool(); + } + + @Cron('*/6 * * * * *') + @Lock({ name: 'Push events to subscribers', verbose: true }) + async handleEventsUpdate() { + await this.eventsGateway.pushEvents(); + } +} diff --git a/src/crons/websocket/websocket.subscription.module.ts b/src/crons/websocket/websocket.subscription.module.ts new file mode 100644 index 000000000..89276d50d --- /dev/null +++ b/src/crons/websocket/websocket.subscription.module.ts @@ -0,0 +1,35 @@ +import { Module } from '@nestjs/common'; +import { ScheduleModule } from '@nestjs/schedule'; +import { TransactionModule } from 'src/endpoints/transactions/transaction.module'; +import { WebsocketCronService } from './websocket.cron.service'; +import { BlockModule } from 'src/endpoints/blocks/block.module'; +import { NetworkModule } from 'src/endpoints/network/network.module'; +import { PoolModule } from 'src/endpoints/pool/pool.module'; +import { EventsModule } from 'src/endpoints/events/events.module'; +import { BlocksGateway } from './blocks.gateway'; +import { NetworkGateway } from './network.gateway'; +import { TransactionsGateway } from './transaction.gateway'; +import { PoolGateway } from './pool.gateway'; +import { EventsGateway } from './events.gateway'; +import { ConnectionHandler } from './connection.handler'; + +@Module({ + imports: [ + ScheduleModule.forRoot(), + TransactionModule, + BlockModule, + NetworkModule, + PoolModule, + EventsModule, + ], + providers: [ + WebsocketCronService, + ConnectionHandler, + BlocksGateway, + NetworkGateway, + TransactionsGateway, + PoolGateway, + EventsGateway, + ], +}) +export class WebsocketSubscriptionModule { } diff --git a/src/endpoints/blocks/entities/block.subscribe.ts b/src/endpoints/blocks/entities/block.subscribe.ts new file mode 100644 index 000000000..e60e9c8dc --- /dev/null +++ b/src/endpoints/blocks/entities/block.subscribe.ts @@ -0,0 +1,28 @@ +import { IsOptional, IsNumber, IsBoolean, Min, Max, IsEnum, IsIn } from 'class-validator'; +import { SortOrder } from 'src/common/entities/sort.order'; + +export class BlockSubscribePayload { + @IsOptional() + @IsNumber() + @Min(0) + shard?: number; + + @IsOptional() + @IsEnum(SortOrder) + order?: SortOrder; + + @IsOptional() + @IsNumber() + @IsIn([0], { message: 'from can only be 0' }) + from?: number = 0; + + @IsOptional() + @IsNumber() + @Min(1, { message: 'minimum size is 1' }) + @Max(50, { message: 'maximum size is 50' }) + size?: number = 25; + + @IsOptional() + @IsBoolean() + withProposerIdentity?: boolean; +} diff --git a/src/endpoints/events/entities/events.subscribe.ts b/src/endpoints/events/entities/events.subscribe.ts new file mode 100644 index 000000000..6adffbd67 --- /dev/null +++ b/src/endpoints/events/entities/events.subscribe.ts @@ -0,0 +1,19 @@ +import { IsOptional, IsNumber, Min, Max, IsIn } from 'class-validator'; + +export class EventsSubscribePayload { + @IsOptional() + @IsNumber() + @Min(0) + shard?: number; + + @IsOptional() + @IsNumber() + @IsIn([0], { message: 'from can only be 0' }) + from?: number = 0; + + @IsOptional() + @IsNumber() + @Min(1, { message: 'minimum size is 1' }) + @Max(50, { message: 'maximum size is 50' }) + size?: number = 25; +} diff --git a/src/endpoints/events/events.module.ts b/src/endpoints/events/events.module.ts index 9cdf3f729..4e7ae6221 100644 --- a/src/endpoints/events/events.module.ts +++ b/src/endpoints/events/events.module.ts @@ -1,6 +1,5 @@ import { Module } from '@nestjs/common'; import { EventsService } from './events.service'; - @Module({ providers: [EventsService], exports: [EventsService], diff --git a/src/endpoints/pool/entities/pool.subscribe.ts b/src/endpoints/pool/entities/pool.subscribe.ts new file mode 100644 index 000000000..e9ef35074 --- /dev/null +++ b/src/endpoints/pool/entities/pool.subscribe.ts @@ -0,0 +1,19 @@ +import { IsOptional, IsNumber, Min, Max, IsEnum, IsIn } from 'class-validator'; +import { TransactionType } from 'src/endpoints/transactions/entities/transaction.type'; + +export class PoolSubscribePayload { + @IsOptional() + @IsEnum(TransactionType) + type?: TransactionType; + + @IsOptional() + @IsNumber() + @IsIn([0], { message: 'from can only be 0' }) + from?: number = 0; + + @IsOptional() + @IsNumber() + @Min(1, { message: 'minimum size is 1' }) + @Max(50, { message: 'maximum size is 50' }) + size?: number = 25; +} diff --git a/src/endpoints/transactions/entities/dtos/transaction.subscribe.ts b/src/endpoints/transactions/entities/dtos/transaction.subscribe.ts new file mode 100644 index 000000000..3ea3c602c --- /dev/null +++ b/src/endpoints/transactions/entities/dtos/transaction.subscribe.ts @@ -0,0 +1,70 @@ +import { IsOptional, IsString, IsArray, IsBoolean, IsNumber, IsEnum, Min, Max, IsIn } from 'class-validator'; +import { TransactionStatus } from '../transaction.status'; +import { SortOrder } from 'src/common/entities/sort.order'; + +export class TransactionSubscribePayload { + @IsOptional() + @IsEnum(TransactionStatus) + status?: TransactionStatus; + + @IsOptional() + @IsEnum(SortOrder) + order?: SortOrder; + + @IsOptional() + @IsBoolean() + isRelayed?: boolean; + + @IsOptional() + @IsBoolean() + isScCall?: boolean; + + @IsOptional() + @IsBoolean() + withScResults?: boolean; + + @IsOptional() + @IsBoolean() + withRelayedScresults?: boolean; + + @IsOptional() + @IsBoolean() + withOperations?: boolean; + + @IsOptional() + @IsBoolean() + withLogs?: boolean; + + @IsOptional() + @IsBoolean() + withScamInfo?: boolean; + + @IsOptional() + @IsBoolean() + withUsername?: boolean; + + @IsOptional() + @IsBoolean() + withBlockInfo?: boolean; + + @IsOptional() + @IsBoolean() + withActionTransferValue?: boolean; + + @IsOptional() + @IsNumber() + @IsIn([0], { message: 'from can only be 0' }) + from?: number = 0; + + @IsOptional() + @IsNumber() + @Min(1, { message: 'minimum size is 1' }) + @Max(50, { message: 'maximum size is 50' }) + size?: number = 25; + + + @IsOptional() + @IsArray() + @IsString({ each: true }) + fields?: string[]; +} diff --git a/src/main.ts b/src/main.ts index 3ff1d98de..aca22383e 100644 --- a/src/main.ts +++ b/src/main.ts @@ -36,6 +36,8 @@ import { NotWritableError } from './common/indexer/entities/not.writable.error'; import * as bodyParser from 'body-parser'; import * as requestIp from 'request-ip'; import compression from 'compression'; +import { IoAdapter } from '@nestjs/platform-socket.io'; +import { WebsocketSubscriptionModule } from './crons/websocket/websocket.subscription.module'; async function bootstrap() { const logger = new Logger('Bootstrapper'); @@ -87,6 +89,13 @@ async function bootstrap() { await processorApp.listen(5001); } + + if (apiConfigService.getIsWebsocketSubscriptionActive()) { + const websocketSubscriptionApp = await NestFactory.create(WebsocketSubscriptionModule); + websocketSubscriptionApp.useWebSocketAdapter(new IoAdapter(websocketSubscriptionApp)); + await websocketSubscriptionApp.listen(apiConfigService.getWebsocketSubscriptionPort()); + } + if (apiConfigService.getIsCacheWarmerCronActive()) { const cacheWarmerApp = await NestFactory.create(CacheWarmerModule); await configureCacheWarmerApp(cacheWarmerApp, apiConfigService); @@ -168,6 +177,7 @@ async function bootstrap() { logger.log(`Exchange feature active: ${apiConfigService.isExchangeEnabled()}`); logger.log(`Marketplace feature active: ${apiConfigService.isMarketplaceFeatureEnabled()}`); logger.log(`Auth active: ${apiConfigService.getIsAuthActive()}`); + logger.log(`WebSocket subscription active: ${apiConfigService.getIsWebsocketSubscriptionActive()}`); logger.log(`Use tracing: ${apiConfigService.getUseTracingFlag()}`); logger.log(`Process NFTs flag: ${apiConfigService.getIsProcessNftsFlagActive()}`); diff --git a/src/utils/cache.info.ts b/src/utils/cache.info.ts index 6ca7e69ac..00059ae49 100644 --- a/src/utils/cache.info.ts +++ b/src/utils/cache.info.ts @@ -469,7 +469,7 @@ export class CacheInfo { static BlocksCount(filter: BlockFilter): CacheInfo { return { key: `blocks:count:${JSON.stringify(filter)}`, - ttl: Constants.oneMinute(), + ttl: Constants.oneSecond() * 6, }; } diff --git a/src/utils/metrics-events.constants.ts b/src/utils/metrics-events.constants.ts index a2e480638..2a5cb12ba 100644 --- a/src/utils/metrics-events.constants.ts +++ b/src/utils/metrics-events.constants.ts @@ -10,4 +10,5 @@ export enum MetricsEvents { SetTransactionsCompleted = "setTransactionsCompleted", SetTransactionsPendingResults = "setTransactionsPendingResults", SetBatchUpdated = "setBatchUpdated", + SetWebsocketMetrics = "setWebsocketMetrics", } diff --git a/src/utils/ws-exceptions.filter.ts b/src/utils/ws-exceptions.filter.ts new file mode 100644 index 000000000..bb864231b --- /dev/null +++ b/src/utils/ws-exceptions.filter.ts @@ -0,0 +1,20 @@ +import { ArgumentsHost, Catch } from "@nestjs/common"; +import { BaseWsExceptionFilter, WsException } from "@nestjs/websockets"; +import { Socket } from "socket.io"; + +@Catch(WsException) +export class WebsocketExceptionsFilter extends BaseWsExceptionFilter { + catch(exception: WsException, host: ArgumentsHost) { + const client = host.switchToWs().getClient() as Socket; + + const pattern = host.switchToWs().getPattern(); + const data = host.switchToWs().getData(); + const error = exception.getError(); + + client.emit('error', { + pattern, + data, + error, + }); + } +} diff --git a/src/utils/ws-validation.pipe.ts b/src/utils/ws-validation.pipe.ts new file mode 100644 index 000000000..ddeb25994 --- /dev/null +++ b/src/utils/ws-validation.pipe.ts @@ -0,0 +1,15 @@ +import { Injectable, ValidationPipe, ValidationPipeOptions } from '@nestjs/common'; +import { WsException } from '@nestjs/websockets'; + +@Injectable() +export class WsValidationPipe extends ValidationPipe { + constructor(options?: ValidationPipeOptions) { + super({ + transform: true, + whitelist: true, + forbidNonWhitelisted: true, + exceptionFactory: (errors) => new WsException(errors), + ...options, + }); + } +}