From 942d22b0599ddb4cfa70ab813b781d609edb1702 Mon Sep 17 00:00:00 2001 From: Oshan Shrestha Date: Sun, 20 Jul 2025 12:09:39 +0545 Subject: [PATCH 1/2] feat: added event for per-request tracking --- src/chain.ts | 15 +++++- src/direct.ts | 19 +++++++- src/forward.ts | 21 +++++++++ src/server.ts | 100 ++++++++++++++++++++++++++++++++-------- src/tcp_tunnel_tools.ts | 4 ++ 5 files changed, 136 insertions(+), 23 deletions(-) diff --git a/src/chain.ts b/src/chain.ts index 06cd1603..d6bd3830 100644 --- a/src/chain.ts +++ b/src/chain.ts @@ -27,6 +27,8 @@ export interface HandlerOpts { ipFamily?: number; dnsLookup?: typeof dns['lookup']; customTag?: unknown; + requestId: string; + id: number; } interface ChainOpts { @@ -34,7 +36,10 @@ interface ChainOpts { sourceSocket: Socket; head?: Buffer; handlerOpts: HandlerOpts; - server: EventEmitter & { log: (connectionId: unknown, str: string) => void }; + server: EventEmitter & { + log: (connectionId: unknown, str: string) => void, + emit: (event: string, ...args: any[]) => boolean, + }; isPlain: boolean; } @@ -166,6 +171,14 @@ export const chain = ( // We need to enable flowing, otherwise the socket would remain open indefinitely. // Nothing would consume the data, we just want to close the socket. targetSocket.on('close', () => { + const { requestId, id: connectionId } = handlerOpts; + + server.emit('requestFinished', { + id: requestId, + request, + connectionId, + customTag, + }); sourceSocket.resume(); if (sourceSocket.writable) { diff --git a/src/direct.ts b/src/direct.ts index f4c7d68d..c17ffedb 100644 --- a/src/direct.ts +++ b/src/direct.ts @@ -11,13 +11,19 @@ export interface HandlerOpts { localAddress?: string; ipFamily?: number; dnsLookup?: typeof dns['lookup']; + customTag?: unknown; + requestId: string; + id: number; } interface DirectOpts { - request: { url?: string }; + request: { url?: string, [key: string]: any }; sourceSocket: Socket; head: Buffer; - server: EventEmitter & { log: (connectionId: unknown, str: string) => void }; + server: EventEmitter & { + log: (connectionId: unknown, str:string) => void, + emit: (event: string, ...args: any[]) => boolean, + }; handlerOpts: HandlerOpts; } @@ -79,6 +85,15 @@ export const direct = ( // We need to enable flowing, otherwise the socket would remain open indefinitely. // Nothing would consume the data, we just want to close the socket. targetSocket.on('close', () => { + const { requestId, customTag, id: connectionId } = handlerOpts; + + server.emit('requestFinished', { + id: requestId, + request, + connectionId, + customTag, + }); + sourceSocket.resume(); if (sourceSocket.writable) { diff --git a/src/forward.ts b/src/forward.ts index b7c656f6..1c3a2dd3 100644 --- a/src/forward.ts +++ b/src/forward.ts @@ -1,4 +1,5 @@ import type dns from 'node:dns'; +import type { EventEmitter } from 'node:events'; import http from 'node:http'; import https from 'node:https'; import stream from 'node:stream'; @@ -29,6 +30,10 @@ export interface HandlerOpts { localAddress?: string; ipFamily?: number; dnsLookup?: typeof dns['lookup']; + requestId: string; + customTag?: unknown; + id: number; + server: EventEmitter; } /** @@ -122,6 +127,22 @@ export const forward = async ( : http.request(origin!, options as unknown as http.RequestOptions, requestCallback); + response.once('close', () => { + const { + requestId, + customTag, + id: connectionId, + server, + } = handlerOpts; + + server.emit('requestFinished', { + id: requestId, + request, + connectionId, + customTag, + }); + }); + client.once('socket', (socket: SocketWithPreviousStats) => { // Socket can be re-used by multiple requests. // That's why we need to track the previous stats. diff --git a/src/server.ts b/src/server.ts index 41fba80d..70f71b41 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,5 +1,6 @@ /* eslint-disable no-use-before-define */ import { Buffer } from 'node:buffer'; +import { randomUUID } from 'node:crypto'; import type dns from 'node:dns'; import { EventEmitter } from 'node:events'; import http from 'node:http'; @@ -47,9 +48,22 @@ export type ConnectionStats = { trgRxBytes: number | null; }; +export type RequestStats = { + /** Total bytes received from the client. */ + srcRxBytes: number, + /** Total bytes sent to the client. */ + srcTxBytes: number, + /** Total bytes received from the target. */ + trgRxBytes: number | null, + /** Total bytes sent to the target. */ + trgTxBytes: number | null, +}; + type HandlerOpts = { server: Server; id: number; + requestId: string; + startTime: number; srcRequest: http.IncomingMessage; srcResponse: http.ServerResponse | null; srcHead: Buffer | null; @@ -75,6 +89,18 @@ export type PrepareRequestFunctionOpts = { isHttp: boolean; }; +export type RequestBypassedData = { + id: string; + request: http.IncomingMessage; + connectionId: number; + customTag?: unknown; +}; + +export type RequestFinishedData = RequestBypassedData & { + stats: RequestStats; + response?: http.IncomingMessage; +}; + export type PrepareRequestFunctionResult = { customResponseFunction?: CustomResponseOpts['customResponseFunction']; customConnectServer?: http.Server | null; @@ -93,8 +119,11 @@ export type PrepareRequestFunction = (opts: PrepareRequestFunctionOpts) => Promi /** * Represents the proxy server. - * It emits the 'requestFailed' event on unexpected request errors, with the following parameter `{ error, request }`. - * It emits the 'connectionClosed' event when connection to proxy server is closed, with parameter `{ connectionId, stats }`. + * + * It emits the `requestFailed` event on unexpected request errors, with the following parameter `{ error, request }`. + * It emits the `connectionClosed` event when connection to proxy server is closed, with parameter `{ connectionId, stats }`. + * It emits the `requestBypassed` event when a request is bypassed, with parameter `RequestBypassedData`. + * It emits the `requestFinished` event when a request is finished, with parameter `RequestFinishedData`. */ export class Server extends EventEmitter { port: number; @@ -271,6 +300,7 @@ export class Server extends EventEmitter { async onRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise { try { const handlerOpts = await this.prepareRequestHandling(request); + handlerOpts.srcResponse = response; const { proxyChainId } = request.socket as Socket; @@ -278,6 +308,12 @@ export class Server extends EventEmitter { if (handlerOpts.customResponseFunction) { this.log(proxyChainId, 'Using handleCustomResponse()'); await handleCustomResponse(request, response, handlerOpts as CustomResponseOpts); + this.emit('requestBypassed', { + id: handlerOpts.requestId, + request, + connectionId: handlerOpts.id, + customTag: handlerOpts.customTag, + }); return; } @@ -310,6 +346,12 @@ export class Server extends EventEmitter { if (handlerOpts.customConnectServer) { socket.unshift(head); // See chain.ts for why we do this await customConnect(socket, handlerOpts.customConnectServer); + this.emit('requestBypassed', { + id: handlerOpts.requestId, + request, + connectionId: handlerOpts.id, + customTag: handlerOpts.customTag, + }); return; } @@ -336,9 +378,15 @@ export class Server extends EventEmitter { * @see {prepareRequestHandling} */ getHandlerOpts(request: http.IncomingMessage): HandlerOpts { + const requestId = randomUUID(); + // Casing does not matter, but we do it to avoid breaking changes. + request.headers['request-id'] = requestId; + const handlerOpts: HandlerOpts = { server: this, id: (request.socket as Socket).proxyChainId!, + requestId, + startTime: Date.now(), srcRequest: request, srcHead: null, trgParsed: null, @@ -504,20 +552,31 @@ export class Server extends EventEmitter { * @param error */ failRequest(request: http.IncomingMessage, error: NodeJS.ErrnoException): void { - const { proxyChainId } = request.socket as Socket; + this.emit('requestFailed', { + request, + error, + }); - if (error.name === 'RequestError') { - const typedError = error as RequestError; + const { srcResponse } = (request as any).handlerOpts as HandlerOpts; - this.log(proxyChainId, `Request failed (status ${typedError.statusCode}): ${error.message}`); - this.sendSocketResponse(request.socket, typedError.statusCode, typedError.headers, error.message); - } else { - this.log(proxyChainId, `Request failed with error: ${error.stack || error}`); - this.sendSocketResponse(request.socket, 500, {}, 'Internal error in proxy server'); - this.emit('requestFailed', { error, request }); + if (!request.socket) { + return; } - this.log(proxyChainId, 'Closing because request failed with error'); + if (request.socket.destroyed) { + return; + } + + // If the request was not handled yet, we need to close the socket. + // The client will get an empty response. + if (srcResponse && !srcResponse.headersSent) { + // We need to wait for the client to send the full request, otherwise it may get ECONNRESET. + // This is particularly important for HTTP CONNECT, because the client sends the first data packet + // along with the request headers. + request.on('end', () => request.socket.end()); + // If the client never sends the full request, the socket will timeout and close. + request.resume(); + } } /** @@ -607,22 +666,23 @@ export class Server extends EventEmitter { } /** - * Gets data transfer statistics of a specific proxy connection. + * Returns the statistics of a specific connection. + * @param connectionId The ID of the connection. + * @returns The statistics object, or undefined if the connection does not exist. */ getConnectionStats(connectionId: number): ConnectionStats | undefined { const socket = this.connections.get(connectionId); - if (!socket) return undefined; - const targetStats = getTargetStats(socket); + if (!socket) return; - const result = { + const { bytesWritten, bytesRead } = getTargetStats(socket); + + return { srcTxBytes: socket.bytesWritten, srcRxBytes: socket.bytesRead, - trgTxBytes: targetStats.bytesWritten, - trgRxBytes: targetStats.bytesRead, + trgTxBytes: bytesWritten, + trgRxBytes: bytesRead, }; - - return result; } /** diff --git a/src/tcp_tunnel_tools.ts b/src/tcp_tunnel_tools.ts index f3c2e001..f842c19b 100644 --- a/src/tcp_tunnel_tools.ts +++ b/src/tcp_tunnel_tools.ts @@ -1,3 +1,4 @@ +import { randomUUID } from 'node:crypto'; import net from 'node:net'; import { URL } from 'node:url'; @@ -71,6 +72,9 @@ export async function createTunnel( handlerOpts: { upstreamProxyUrlParsed: parsedProxyUrl, ignoreUpstreamProxyCertificate: options?.ignoreProxyCertificate ?? false, + requestId: randomUUID(), + customTag: undefined, + id: -1, }, server: server as net.Server & { log: typeof log }, isPlain: true, From 52ae82c5cdf32fc579df4debed9ee13ad7a44fc7 Mon Sep 17 00:00:00 2001 From: Oshan Shrestha Date: Sun, 20 Jul 2025 13:44:43 +0545 Subject: [PATCH 2/2] test & example added --- examples/request_finished.js | 46 ++++++++++++++++++++++++++++++++++++ test/server.js | 19 +++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 examples/request_finished.js diff --git a/examples/request_finished.js b/examples/request_finished.js new file mode 100644 index 00000000..5b4a891d --- /dev/null +++ b/examples/request_finished.js @@ -0,0 +1,46 @@ +const { Server } = require('proxy-chain'); +const http = require('http'); +const request = require('request'); + +(async () => { + // Create a target server + const targetServer = http.createServer((req, res) => { + res.writeHead(200, { 'Content-Type': 'text/plain' }); + res.end('Hello World!'); + }); + await new Promise((resolve) => targetServer.listen(0, resolve)); + const targetPort = targetServer.address().port; + + // Create a proxy server + const server = new Server({ + port: 0, + verbose: true, + }); + + server.on('requestFinished', ({ id, connectionId, request }) => { + console.log(`Request finished: { id: ${id}, connectionId: ${connectionId}, method: ${request.method}, url: ${request.url} }`); + }); + + await server.listen(); + const proxyPort = server.port; + + console.log(`Proxy server listening on port ${proxyPort}`); + console.log(`Target server listening on port ${targetPort}`); + + // Make a request through the proxy + await new Promise((resolve, reject) => { + request({ + url: `http://127.0.0.1:${targetPort}`, + proxy: `http://127.0.0.1:${proxyPort}`, + }, (error, response, body) => { + if (error) return reject(error); + console.log(`Response body: ${body}`); + resolve(); + }); + }); + + // Close servers + await server.close(true); + await new Promise((resolve) => targetServer.close(resolve)); + console.log('Servers closed.'); +})(); \ No newline at end of file diff --git a/test/server.js b/test/server.js index 70c0ff99..805fd479 100644 --- a/test/server.js +++ b/test/server.js @@ -154,6 +154,7 @@ const createTestSuite = ({ const mainProxyServerConnectionIds = []; const mainProxyServerConnectionsClosed = []; const mainProxyServerConnectionId2Stats = {}; + const mainProxyServerRequestsFinished = []; let upstreamProxyHostname = '127.0.0.1'; @@ -419,6 +420,10 @@ const createTestSuite = ({ mainProxyServerConnectionId2Stats[connectionId] = stats; }); + mainProxyServer.on('requestFinished', ({ id, connectionId }) => { + mainProxyServerRequestsFinished.push({ id, connectionId }); + }); + return mainProxyServer.listen(); } }) @@ -832,6 +837,19 @@ const createTestSuite = ({ }); } + if (useMainProxy) { + _it('should emit requestFinished event', () => { + const opts = getRequestOpts('/hello-world'); + opts.method = 'GET'; + return requestPromised(opts) + .then((response) => { + expect(response.body).to.eql('Hello world!'); + expect(response.statusCode).to.eql(200); + expect(mainProxyServerRequestsFinished.length).to.be.above(0); + }); + }); + } + if (!useSsl && mainProxyAuth && mainProxyAuth.username && mainProxyAuth.password) { it('handles GET request using puppeteer with invalid credentials', async () => { const phantomUrl = `${useSsl ? 'https' : 'http'}://${LOCALHOST_TEST}:${targetServerPort}/hello-world`; @@ -1178,6 +1196,7 @@ const createTestSuite = ({ expect(mainProxyServer.getConnectionIds()).to.be.deep.eql([]); } expect(mainProxyServerConnectionIds).to.be.deep.eql([]); + mainProxyServerRequestsFinished.splice(0, mainProxyServerRequestsFinished.length); const closedSomeConnectionsTwice = mainProxyServerConnectionsClosed .reduce((duplicateConnections, id, index) => {