Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions examples/request_finished.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const { Server } = require('proxy-chain');
const http = require('http');
const request = require('request');

(async () => {
// Create a target server
const targetServer = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('Hello World!');
});
await new Promise((resolve) => targetServer.listen(0, resolve));
const targetPort = targetServer.address().port;

// Create a proxy server
const server = new Server({
port: 0,
verbose: true,
});

server.on('requestFinished', ({ id, connectionId, request }) => {
console.log(`Request finished: { id: ${id}, connectionId: ${connectionId}, method: ${request.method}, url: ${request.url} }`);
});

await server.listen();
const proxyPort = server.port;

console.log(`Proxy server listening on port ${proxyPort}`);
console.log(`Target server listening on port ${targetPort}`);

// Make a request through the proxy
await new Promise((resolve, reject) => {
request({
url: `http://127.0.0.1:${targetPort}`,
proxy: `http://127.0.0.1:${proxyPort}`,
}, (error, response, body) => {
if (error) return reject(error);
console.log(`Response body: ${body}`);
resolve();
});
});

// Close servers
await server.close(true);
await new Promise((resolve) => targetServer.close(resolve));
console.log('Servers closed.');
})();
15 changes: 14 additions & 1 deletion src/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ export interface HandlerOpts {
ipFamily?: number;
dnsLookup?: typeof dns['lookup'];
customTag?: unknown;
requestId: string;
id: number;
}

interface ChainOpts {
request: { url?: string };
sourceSocket: Socket;
head?: Buffer;
handlerOpts: HandlerOpts;
server: EventEmitter & { log: (connectionId: unknown, str: string) => void };
server: EventEmitter & {
log: (connectionId: unknown, str: string) => void,
emit: (event: string, ...args: any[]) => boolean,
};
isPlain: boolean;
}

Expand Down Expand Up @@ -166,6 +171,14 @@ export const chain = (
// We need to enable flowing, otherwise the socket would remain open indefinitely.
// Nothing would consume the data, we just want to close the socket.
targetSocket.on('close', () => {
const { requestId, id: connectionId } = handlerOpts;

server.emit('requestFinished', {
id: requestId,
request,
connectionId,
customTag,
});
sourceSocket.resume();

if (sourceSocket.writable) {
Expand Down
19 changes: 17 additions & 2 deletions src/direct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@ export interface HandlerOpts {
localAddress?: string;
ipFamily?: number;
dnsLookup?: typeof dns['lookup'];
customTag?: unknown;
requestId: string;
id: number;
}

interface DirectOpts {
request: { url?: string };
request: { url?: string, [key: string]: any };
sourceSocket: Socket;
head: Buffer;
server: EventEmitter & { log: (connectionId: unknown, str: string) => void };
server: EventEmitter & {
log: (connectionId: unknown, str:string) => void,
emit: (event: string, ...args: any[]) => boolean,
};
handlerOpts: HandlerOpts;
}

Expand Down Expand Up @@ -79,6 +85,15 @@ export const direct = (
// We need to enable flowing, otherwise the socket would remain open indefinitely.
// Nothing would consume the data, we just want to close the socket.
targetSocket.on('close', () => {
const { requestId, customTag, id: connectionId } = handlerOpts;

server.emit('requestFinished', {
id: requestId,
request,
connectionId,
customTag,
});

sourceSocket.resume();

if (sourceSocket.writable) {
Expand Down
21 changes: 21 additions & 0 deletions src/forward.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type dns from 'node:dns';
import type { EventEmitter } from 'node:events';
import http from 'node:http';
import https from 'node:https';
import stream from 'node:stream';
Expand Down Expand Up @@ -29,6 +30,10 @@ export interface HandlerOpts {
localAddress?: string;
ipFamily?: number;
dnsLookup?: typeof dns['lookup'];
requestId: string;
customTag?: unknown;
id: number;
server: EventEmitter;
}

/**
Expand Down Expand Up @@ -122,6 +127,22 @@ export const forward = async (

: http.request(origin!, options as unknown as http.RequestOptions, requestCallback);

response.once('close', () => {
const {
requestId,
customTag,
id: connectionId,
server,
} = handlerOpts;

server.emit('requestFinished', {
id: requestId,
request,
connectionId,
customTag,
});
});

client.once('socket', (socket: SocketWithPreviousStats) => {
// Socket can be re-used by multiple requests.
// That's why we need to track the previous stats.
Expand Down
100 changes: 80 additions & 20 deletions src/server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* eslint-disable no-use-before-define */
import { Buffer } from 'node:buffer';
import { randomUUID } from 'node:crypto';
import type dns from 'node:dns';
import { EventEmitter } from 'node:events';
import http from 'node:http';
Expand Down Expand Up @@ -47,9 +48,22 @@ export type ConnectionStats = {
trgRxBytes: number | null;
};

export type RequestStats = {
/** Total bytes received from the client. */
srcRxBytes: number,
/** Total bytes sent to the client. */
srcTxBytes: number,
/** Total bytes received from the target. */
trgRxBytes: number | null,
/** Total bytes sent to the target. */
trgTxBytes: number | null,
};

type HandlerOpts = {
server: Server;
id: number;
requestId: string;
startTime: number;
srcRequest: http.IncomingMessage;
srcResponse: http.ServerResponse | null;
srcHead: Buffer | null;
Expand All @@ -75,6 +89,18 @@ export type PrepareRequestFunctionOpts = {
isHttp: boolean;
};

export type RequestBypassedData = {
id: string;
request: http.IncomingMessage;
connectionId: number;
customTag?: unknown;
};

export type RequestFinishedData = RequestBypassedData & {
stats: RequestStats;
response?: http.IncomingMessage;
};

export type PrepareRequestFunctionResult = {
customResponseFunction?: CustomResponseOpts['customResponseFunction'];
customConnectServer?: http.Server | null;
Expand All @@ -93,8 +119,11 @@ export type PrepareRequestFunction = (opts: PrepareRequestFunctionOpts) => Promi

/**
* Represents the proxy server.
* It emits the 'requestFailed' event on unexpected request errors, with the following parameter `{ error, request }`.
* It emits the 'connectionClosed' event when connection to proxy server is closed, with parameter `{ connectionId, stats }`.
*
* It emits the `requestFailed` event on unexpected request errors, with the following parameter `{ error, request }`.
* It emits the `connectionClosed` event when connection to proxy server is closed, with parameter `{ connectionId, stats }`.
* It emits the `requestBypassed` event when a request is bypassed, with parameter `RequestBypassedData`.
* It emits the `requestFinished` event when a request is finished, with parameter `RequestFinishedData`.
*/
export class Server extends EventEmitter {
port: number;
Expand Down Expand Up @@ -271,13 +300,20 @@ export class Server extends EventEmitter {
async onRequest(request: http.IncomingMessage, response: http.ServerResponse): Promise<void> {
try {
const handlerOpts = await this.prepareRequestHandling(request);

handlerOpts.srcResponse = response;

const { proxyChainId } = request.socket as Socket;

if (handlerOpts.customResponseFunction) {
this.log(proxyChainId, 'Using handleCustomResponse()');
await handleCustomResponse(request, response, handlerOpts as CustomResponseOpts);
this.emit('requestBypassed', {
id: handlerOpts.requestId,
request,
connectionId: handlerOpts.id,
customTag: handlerOpts.customTag,
});
return;
}

Expand Down Expand Up @@ -310,6 +346,12 @@ export class Server extends EventEmitter {
if (handlerOpts.customConnectServer) {
socket.unshift(head); // See chain.ts for why we do this
await customConnect(socket, handlerOpts.customConnectServer);
this.emit('requestBypassed', {
id: handlerOpts.requestId,
request,
connectionId: handlerOpts.id,
customTag: handlerOpts.customTag,
});
return;
}

Expand All @@ -336,9 +378,15 @@ export class Server extends EventEmitter {
* @see {prepareRequestHandling}
*/
getHandlerOpts(request: http.IncomingMessage): HandlerOpts {
const requestId = randomUUID();
// Casing does not matter, but we do it to avoid breaking changes.
request.headers['request-id'] = requestId;

const handlerOpts: HandlerOpts = {
server: this,
id: (request.socket as Socket).proxyChainId!,
requestId,
startTime: Date.now(),
srcRequest: request,
srcHead: null,
trgParsed: null,
Expand Down Expand Up @@ -504,20 +552,31 @@ export class Server extends EventEmitter {
* @param error
*/
failRequest(request: http.IncomingMessage, error: NodeJS.ErrnoException): void {
const { proxyChainId } = request.socket as Socket;
this.emit('requestFailed', {
request,
error,
});

if (error.name === 'RequestError') {
const typedError = error as RequestError;
const { srcResponse } = (request as any).handlerOpts as HandlerOpts;

this.log(proxyChainId, `Request failed (status ${typedError.statusCode}): ${error.message}`);
this.sendSocketResponse(request.socket, typedError.statusCode, typedError.headers, error.message);
} else {
this.log(proxyChainId, `Request failed with error: ${error.stack || error}`);
this.sendSocketResponse(request.socket, 500, {}, 'Internal error in proxy server');
this.emit('requestFailed', { error, request });
if (!request.socket) {
return;
}

this.log(proxyChainId, 'Closing because request failed with error');
if (request.socket.destroyed) {
return;
}

// If the request was not handled yet, we need to close the socket.
// The client will get an empty response.
if (srcResponse && !srcResponse.headersSent) {
// We need to wait for the client to send the full request, otherwise it may get ECONNRESET.
// This is particularly important for HTTP CONNECT, because the client sends the first data packet
// along with the request headers.
request.on('end', () => request.socket.end());
// If the client never sends the full request, the socket will timeout and close.
request.resume();
}
}

/**
Expand Down Expand Up @@ -607,22 +666,23 @@ export class Server extends EventEmitter {
}

/**
* Gets data transfer statistics of a specific proxy connection.
* Returns the statistics of a specific connection.
* @param connectionId The ID of the connection.
* @returns The statistics object, or undefined if the connection does not exist.
*/
getConnectionStats(connectionId: number): ConnectionStats | undefined {
const socket = this.connections.get(connectionId);
if (!socket) return undefined;

const targetStats = getTargetStats(socket);
if (!socket) return;

const result = {
const { bytesWritten, bytesRead } = getTargetStats(socket);

return {
srcTxBytes: socket.bytesWritten,
srcRxBytes: socket.bytesRead,
trgTxBytes: targetStats.bytesWritten,
trgRxBytes: targetStats.bytesRead,
trgTxBytes: bytesWritten,
trgRxBytes: bytesRead,
};

return result;
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/tcp_tunnel_tools.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { randomUUID } from 'node:crypto';
import net from 'node:net';
import { URL } from 'node:url';

Expand Down Expand Up @@ -71,6 +72,9 @@ export async function createTunnel(
handlerOpts: {
upstreamProxyUrlParsed: parsedProxyUrl,
ignoreUpstreamProxyCertificate: options?.ignoreProxyCertificate ?? false,
requestId: randomUUID(),
customTag: undefined,
id: -1,
},
server: server as net.Server & { log: typeof log },
isPlain: true,
Expand Down
Loading
Loading