Skip to content

Commit caf1679

Browse files
committed
Fix a bug that would overcount bytes read from the client due to the PushbackInputStream.
1 parent bfbff1d commit caf1679

File tree

7 files changed

+36
-27
lines changed

7 files changed

+36
-27
lines changed

src/main/java/io/fusionauth/http/io/PushbackInputStream.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.io.IOException;
1919
import java.io.InputStream;
2020

21+
import io.fusionauth.http.server.Instrumenter;
22+
2123
/**
2224
* An input stream that allows a portion of bytes read into a buffer to be pushed back to be read again.
2325
*
@@ -28,14 +30,18 @@ public class PushbackInputStream extends InputStream {
2830

2931
private final InputStream delegate;
3032

33+
private final Instrumenter instrumenter;
34+
3135
private byte[] buffer;
3236

3337
private int bufferEndPosition;
3438

3539
private int bufferPosition;
3640

37-
public PushbackInputStream(InputStream delegate) {
41+
42+
public PushbackInputStream(InputStream delegate, Instrumenter instrumenter) {
3843
this.delegate = delegate;
44+
this.instrumenter = instrumenter;
3945
}
4046

4147
public int getAvailableBufferedBytesRemaining() {
@@ -78,10 +84,17 @@ public int read(byte[] b, int off, int len) throws IOException {
7884
// complete processing of the bytes we just read from the buffer in order to send the HTTP response.
7985
// The end result is the client will block while waiting for us to send a response until we take an exception waiting
8086
// for the read timeout.
87+
// - Do not count this as a read from the client, that would double count.
88+
// We should only be counting bytes read from the delegate.
8189
return read;
8290
}
8391

84-
return delegate.read(b, off, len);
92+
var read = delegate.read(b, off, len);
93+
if (read > 0 && instrumenter != null) {
94+
instrumenter.readFromClient(read);
95+
}
96+
97+
return read;
8598
}
8699

87100
@Override

src/main/java/io/fusionauth/http/server/internal/HTTPWorker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@
2121
import java.net.SocketException;
2222
import java.net.SocketTimeoutException;
2323

24-
import io.fusionauth.http.server.io.ConnectionClosedException;
2524
import io.fusionauth.http.HTTPValues;
2625
import io.fusionauth.http.HTTPValues.Connections;
2726
import io.fusionauth.http.HTTPValues.Headers;
2827
import io.fusionauth.http.HTTPValues.Protocols;
2928
import io.fusionauth.http.ParseException;
30-
import io.fusionauth.http.server.io.TooManyBytesToDrainException;
3129
import io.fusionauth.http.io.PushbackInputStream;
3230
import io.fusionauth.http.log.Logger;
3331
import io.fusionauth.http.server.HTTPHandler;
@@ -36,11 +34,13 @@
3634
import io.fusionauth.http.server.HTTPResponse;
3735
import io.fusionauth.http.server.HTTPServerConfiguration;
3836
import io.fusionauth.http.server.Instrumenter;
37+
import io.fusionauth.http.server.io.ConnectionClosedException;
3938
import io.fusionauth.http.server.io.HTTPInputStream;
4039
import io.fusionauth.http.server.io.HTTPOutputStream;
4140
import io.fusionauth.http.server.io.Throughput;
4241
import io.fusionauth.http.server.io.ThroughputInputStream;
4342
import io.fusionauth.http.server.io.ThroughputOutputStream;
43+
import io.fusionauth.http.server.io.TooManyBytesToDrainException;
4444
import io.fusionauth.http.util.HTTPTools;
4545

4646
/**
@@ -80,7 +80,7 @@ public HTTPWorker(Socket socket, HTTPServerConfiguration configuration, Instrume
8080
this.throughput = throughput;
8181
this.buffers = new HTTPBuffers(configuration);
8282
this.logger = configuration.getLoggerFactory().getLogger(HTTPWorker.class);
83-
this.inputStream = new PushbackInputStream(new ThroughputInputStream(socket.getInputStream(), throughput));
83+
this.inputStream = new PushbackInputStream(new ThroughputInputStream(socket.getInputStream(), throughput), instrumenter);
8484
this.state = State.Read;
8585
this.startInstant = System.currentTimeMillis();
8686
logger.trace("[{}] Starting HTTP worker.", Thread.currentThread().threadId());
@@ -123,7 +123,7 @@ public void run() {
123123
// Not this line of code will block
124124
// - When a client is using Keep-Alive - we will loop and block here while we wait for the client to send us bytes.
125125
byte[] requestBuffer = buffers.requestBuffer();
126-
HTTPTools.parseRequestPreamble(inputStream, request, requestBuffer, instrumenter, () -> state = State.Read);
126+
HTTPTools.parseRequestPreamble(inputStream, request, requestBuffer, () -> state = State.Read);
127127
if (logger.isTraceEnabled()) {
128128
int availableBufferedBytes = inputStream.getAvailableBufferedBytesRemaining();
129129
if (availableBufferedBytes != 0) {

src/main/java/io/fusionauth/http/server/io/HTTPInputStream.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,7 @@ public int read(byte[] b, int off, int len) throws IOException {
143143
}
144144

145145
// When we have a fixed length request, read beyond the remainingBytes if possible.
146-
// - Under heavy load we may be able to start reading the next request. Just push those bytes
147-
// back onto the InputStream and we will read them later.
146+
// - If we have read past the end of the current request, push those bytes back onto the InputStream.
148147
int read = delegate.read(b, off, len);
149148
if (fixedLength && read > 0) {
150149
int extraBytes = (int) (read - bytesRemaining);
@@ -154,10 +153,6 @@ public int read(byte[] b, int off, int len) throws IOException {
154153
}
155154

156155
if (read > 0) {
157-
if (instrumenter != null) {
158-
instrumenter.readFromClient(read);
159-
}
160-
161156
if (fixedLength) {
162157
bytesRemaining -= read;
163158
}

src/main/java/io/fusionauth/http/server/io/ThroughputInputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024, FusionAuth, All Rights Reserved
2+
* Copyright (c) 2024-2025, FusionAuth, All Rights Reserved
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,7 +44,7 @@ public void close() throws IOException {
4444
}
4545

4646
@Override
47-
public void mark(int readlimit) {
47+
public void mark(@SuppressWarnings("SpellCheckingInspection") int readlimit) {
4848
delegate.mark(readlimit);
4949
}
5050

src/main/java/io/fusionauth/http/util/HTTPTools.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import io.fusionauth.http.log.LoggerFactory;
4040
import io.fusionauth.http.server.HTTPRequest;
4141
import io.fusionauth.http.server.HTTPResponse;
42-
import io.fusionauth.http.server.Instrumenter;
4342
import io.fusionauth.http.server.io.ConnectionClosedException;
4443

4544
public final class HTTPTools {
@@ -93,8 +92,8 @@ public static boolean isHexadecimalCharacter(byte ch) {
9392
*/
9493
public static boolean isTokenCharacter(byte ch) {
9594
return ch == '!' || ch == '#' || ch == '$' || ch == '%' || ch == '&' || ch == '\'' || ch == '*' || ch == '+' || ch == '-' || ch == '.' ||
96-
ch == '^' || ch == '_' || ch == '`' || ch == '|' || ch == '~' || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') ||
97-
(ch >= '0' && ch <= '9');
95+
ch == '^' || ch == '_' || ch == '`' || ch == '|' || ch == '~' || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') ||
96+
(ch >= '0' && ch <= '9');
9897
}
9998

10099
/**
@@ -262,13 +261,10 @@ public static HeaderValue parseHeaderValue(String value) {
262261
* @param inputStream The input stream to read the preamble from.
263262
* @param request The HTTP request to populate.
264263
* @param requestBuffer A buffer used for reading to help reduce memory thrashing.
265-
* @param instrumenter The Instrumenter that is informed of bytes read.
266264
* @param readObserver An observer that is called once one byte has been read.
267265
* @throws IOException If the read fails.
268266
*/
269-
public static void parseRequestPreamble(PushbackInputStream inputStream, HTTPRequest request, byte[] requestBuffer,
270-
Instrumenter instrumenter,
271-
Runnable readObserver)
267+
public static void parseRequestPreamble(PushbackInputStream inputStream, HTTPRequest request, byte[] requestBuffer, Runnable readObserver)
272268
throws IOException {
273269
RequestPreambleState state = RequestPreambleState.RequestMethod;
274270
var valueBuffer = new ByteArrayOutputStream(512);
@@ -288,10 +284,6 @@ public static void parseRequestPreamble(PushbackInputStream inputStream, HTTPReq
288284

289285
logger.trace("Read [{}] from client for preamble.", read);
290286

291-
if (instrumenter != null) {
292-
instrumenter.readFromClient(read);
293-
}
294-
295287
// Tell the callback that we've read at least one byte
296288
readObserver.run();
297289

src/test/java/io/fusionauth/http/CoreTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,15 @@ public void large_body(String scheme) throws Exception {
435435

436436
assertEquals(response.statusCode(), 200);
437437
assertEquals(response.body(), bytes);
438+
439+
// This assertion is always true unless we change what we are writing above. But it is here for reference.
440+
assertEquals(bytes.length, 16_804);
441+
// This should prove the PushbackInputStream isn't double counting.
442+
// - We should expect the bytes read to be roughly equivalent to the payload length. We can add some bytes to account
443+
// for the HTTP preamble, and for HTTPs additional overhead is incurred due to encryption. But if we are counting incorrectly
444+
// due to bytes being pushed back and counted again, the numbers would be almost double.
445+
// - So we should expect the bytes read to be within the ball bark of the payload length.
446+
assertEquals(instrumenter.getBytesRead(), scheme.equals("http") ? 17_032: 16_933);
438447
}
439448
}
440449

src/test/java/io/fusionauth/http/io/ChunkedInputStreamTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ private Builder withBody(String body) {
182182
}
183183

184184
private PushbackInputStream withParts(String... parts) {
185-
return new PushbackInputStream(new PieceMealInputStream(parts));
185+
return new PushbackInputStream(new PieceMealInputStream(parts), null);
186186
}
187187

188188
@SuppressWarnings("UnusedReturnValue")
@@ -221,7 +221,7 @@ public Builder assertNextRead(ThrowingFunction<ChunkedInputStream, Integer> func
221221
}
222222

223223
public Builder assertResult(String expected) throws IOException {
224-
pushbackInputStream = new PushbackInputStream(new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)));
224+
pushbackInputStream = new PushbackInputStream(new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)), null);
225225
chunkedInputStream = new ChunkedInputStream(pushbackInputStream, 2048);
226226

227227
String actual = new String(chunkedInputStream.readAllBytes(), StandardCharsets.UTF_8);

0 commit comments

Comments
 (0)