diff --git a/pom.xml b/pom.xml index 46fff7a0..98550be3 100644 --- a/pom.xml +++ b/pom.xml @@ -46,11 +46,39 @@ + com.squareup.okhttp3 okhttp 3.12.12 + + + org.eclipse.jetty.http3 + jetty-http3-client + 12.1.1 + + + + + org.eclipse.jetty + jetty-alpn-client + 12.1.1 + + + + + org.eclipse.jetty.quic + jetty-quic-quiche-client + 12.1.1 + + + + + org.conscrypt + conscrypt-openjdk-uber + 2.5.2 + org.json json @@ -88,8 +116,8 @@ maven-compiler-plugin 3.5.1 - 1.7 - 1.7 + 17 + 17 -Xlint:unchecked diff --git a/src/main/java/io/socket/engineio/client/Socket.java b/src/main/java/io/socket/engineio/client/Socket.java index e49e1420..90e71b3d 100644 --- a/src/main/java/io/socket/engineio/client/Socket.java +++ b/src/main/java/io/socket/engineio/client/Socket.java @@ -14,6 +14,7 @@ import io.socket.engineio.client.transports.Polling; import io.socket.engineio.client.transports.PollingXHR; import io.socket.engineio.client.transports.WebSocket; +import io.socket.engineio.client.transports.WebTransport; import io.socket.engineio.parser.Packet; import io.socket.engineio.parser.Parser; import io.socket.parseqs.ParseQS; @@ -199,6 +200,7 @@ public Socket(Options opts) { this.timestampRequests = opts.timestampRequests; this.transports = new ArrayList(Arrays.asList(opts.transports != null ? opts.transports : new String[]{Polling.NAME, WebSocket.NAME})); + this.transportOptions = opts.transportOptions != null ? opts.transportOptions : new HashMap(); this.policyPort = opts.policyPort != 0 ? opts.policyPort : 843; @@ -240,30 +242,67 @@ public Socket open() { EventThread.exec(new Runnable() { @Override public void run() { - String transportName; - if (Socket.this.rememberUpgrade && Socket.priorWebsocketSuccess && Socket.this.transports.contains(WebSocket.NAME)) { - transportName = WebSocket.NAME; - } else if (0 == Socket.this.transports.size()) { - // Emit error on next tick so it can be listened to - final Socket self = Socket.this; - EventThread.nextTick(new Runnable() { - @Override - public void run() { - self.emit(Socket.EVENT_ERROR, new EngineIOException("No transports available")); - } - }); - return; - } else { - transportName = Socket.this.transports.get(0); - } - Socket.this.readyState = ReadyState.OPENING; - Transport transport = Socket.this.createTransport(transportName); - Socket.this.setTransport(transport); - transport.open(); + Socket.this.tryNextTransport(0); } }); return this; } + + private void tryNextTransport(int transportIndex) { + if (transportIndex >= this.transports.size()) { + // No more transports to try - set state to CLOSED and emit error + this.readyState = ReadyState.CLOSED; + final Socket self = this; + EventThread.nextTick(new Runnable() { + @Override + public void run() { + self.emit(Socket.EVENT_ERROR, new EngineIOException("No transports available")); + } + }); + return; + } + + String transportName; + if (Socket.this.rememberUpgrade && Socket.priorWebsocketSuccess && Socket.this.transports.contains(WebSocket.NAME)) { + transportName = WebSocket.NAME; + } else { + transportName = Socket.this.transports.get(transportIndex); + } + + + Socket.this.readyState = ReadyState.OPENING; + Transport transport = Socket.this.createTransport(transportName); + + + // Add error listener to try next transport on failure + transport.on(Transport.EVENT_ERROR, new Listener() { + @Override + public void call(Object... args) { + // Clean up current transport + transport.close(); + // Don't reset socket state to CLOSED - let next transport attempt to open + // Socket.this.readyState = ReadyState.CLOSED; + // Immediately try next transport for fast fallback (like JavaScript client) + EventThread.exec(new Runnable() { + @Override + public void run() { + // Try next transport immediately + Socket.this.tryNextTransport(transportIndex + 1); + } + }); + } + }); + + // Add open listener to see if transport opens successfully + transport.on(Transport.EVENT_OPEN, new Listener() { + @Override + public void call(Object... args) { + } + }); + + Socket.this.setTransport(transport); + transport.open(); + } private Transport createTransport(String name) { if (logger.isLoggable(Level.FINE)) { @@ -300,8 +339,10 @@ private Transport createTransport(String name) { transport = new WebSocket(opts); } else if (Polling.NAME.equals(name)) { transport = new PollingXHR(opts); + } else if (WebTransport.NAME.equals(name)) { + transport = new WebTransport(opts); } else { - throw new RuntimeException(); + throw new RuntimeException("Unknown transport: " + name); } this.emit(EVENT_TRANSPORT, transport); @@ -337,12 +378,26 @@ public void call(Object... args) { }).on(Transport.EVENT_ERROR, new Listener() { @Override public void call(Object... args) { - self.onError(args.length > 0 ? (Exception) args[0] : null); + // Only handle error if this transport is still the current transport + if (self.transport == transport) { + self.onError(args.length > 0 ? (Exception) args[0] : null); + } else { + if (logger.isLoggable(Level.FINE)) { + logger.fine("Ignoring error event from old transport " + transport.name); + } + } } }).on(Transport.EVENT_CLOSE, new Listener() { @Override public void call(Object... args) { - self.onClose("transport close"); + // Only close the socket if this transport is still the current transport + if (self.transport == transport) { + self.onClose("transport close"); + } else { + if (logger.isLoggable(Level.FINE)) { + logger.fine("Ignoring close event from old transport " + transport.name); + } + } } }); } diff --git a/src/main/java/io/socket/engineio/client/Transport.java b/src/main/java/io/socket/engineio/client/Transport.java index 7a8ce5cf..14da5c0d 100644 --- a/src/main/java/io/socket/engineio/client/Transport.java +++ b/src/main/java/io/socket/engineio/client/Transport.java @@ -3,6 +3,7 @@ import java.util.List; import java.util.Map; +import java.util.logging.Logger; import io.socket.emitter.Emitter; import io.socket.engineio.parser.Packet; @@ -13,6 +14,8 @@ public abstract class Transport extends Emitter { + private static final Logger logger = Logger.getLogger(Transport.class.getName()); + protected enum ReadyState { OPENING, OPEN, CLOSED, PAUSED; diff --git a/src/main/java/io/socket/engineio/client/transports/PollingXHR.java b/src/main/java/io/socket/engineio/client/transports/PollingXHR.java index 34a65eb3..af6358e6 100644 --- a/src/main/java/io/socket/engineio/client/transports/PollingXHR.java +++ b/src/main/java/io/socket/engineio/client/transports/PollingXHR.java @@ -201,6 +201,7 @@ public void create() { requestCall.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { + logger.severe("PollingXHR: HTTP request failed: " + e.getMessage()); self.onError(e); } @@ -213,6 +214,7 @@ public void onResponse(Call call, Response response) throws IOException { if (response.isSuccessful()) { self.onLoad(); } else { + logger.severe("PollingXHR: HTTP response failed: " + response.code() + " " + response.message()); self.onError(new IOException(Integer.toString(response.code()))); } } finally { @@ -247,8 +249,10 @@ private void onLoad() { ResponseBody body = response.body(); try { - this.onData(body.string()); + String responseData = body.string(); + this.onData(responseData); } catch (IOException e) { + logger.severe("PollingXHR: Error reading response body: " + e.getMessage()); this.onError(e); } } diff --git a/src/main/java/io/socket/engineio/client/transports/WebTransport.java b/src/main/java/io/socket/engineio/client/transports/WebTransport.java new file mode 100644 index 00000000..fc921d8e --- /dev/null +++ b/src/main/java/io/socket/engineio/client/transports/WebTransport.java @@ -0,0 +1,510 @@ +package io.socket.engineio.client.transports; + +import io.socket.engineio.client.Transport; +import io.socket.engineio.parser.Packet; +import io.socket.parseqs.ParseQS; +import io.socket.thread.EventThread; +import io.socket.yeast.Yeast; +import org.eclipse.jetty.http3.client.HTTP3Client; +import org.eclipse.jetty.http3.api.Session; +import org.eclipse.jetty.http3.api.Stream; +import org.eclipse.jetty.http3.frames.DataFrame; +import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpURI; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.quic.client.ClientQuicConfiguration; +import org.eclipse.jetty.quic.quiche.client.QuicheTransport; +import org.eclipse.jetty.quic.quiche.client.QuicheClientQuicConfiguration; +import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.Promise; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +/** + * WebTransport implementation for Engine.IO client. + * + * This transport uses HTTP/3 and QUIC to provide WebTransport functionality, + * offering improved performance over traditional HTTP long-polling and WebSockets. + * + * The implementation uses Eclipse Jetty's HTTP/3 client to establish connections + * and manage data streams over the QUIC protocol. + */ +public class WebTransport extends Transport { + + public static final String NAME = "webtransport"; + + private static final Logger logger = Logger.getLogger(WebTransport.class.getName()); + + // HTTP/3 client components + private HTTP3Client http3Client; + private Session.Client session; + private Stream.Client webTransportStream; + private SslContextFactory.Client sslContextFactory; + + // Connection state + private final AtomicBoolean connected = new AtomicBoolean(false); + private boolean streamCreated = false; + + public WebTransport(Options opts) { + super(opts); + this.name = NAME; + } + + /** + * Build the connection URI for WebTransport + */ + protected String uri() { + Map query = this.query; + if (query == null) { + query = new HashMap(); + } + + // WebTransport requires HTTPS + String schema = "https"; + String port = ""; + + if (this.timestampRequests) { + query.put(this.timestampParam, Yeast.yeast()); + } + + // Add transport parameter + query.put("transport", NAME); + + String derivedQuery = ParseQS.encode(query); + + if (this.port > 0 && this.port != 443) { + port = ":" + this.port; + } + + if (derivedQuery.length() > 0) { + derivedQuery = "?" + derivedQuery; + } + + boolean ipv6 = this.hostname.contains(":"); + return schema + "://" + (ipv6 ? "[" + this.hostname + "]" : this.hostname) + port + this.path + derivedQuery; + } + + @Override + protected void doOpen() { + + try { + // Step 1: Initialize HTTP/3 client and configuration + initializeHttp3Client(); + + // Step 2: Create HTTP/3 session and establish connection + createHttp3Session(); + + + } catch (Exception e) { + logger.severe("WebTransport: Connection failed - " + e.getMessage()); + EventThread.exec(() -> emit("error", e)); + } + } + + /** + * Step 1: Initialize HTTP/3 client and QUIC configuration + */ + private void initializeHttp3Client() throws Exception { + + // Configure SSL context for secure connections + sslContextFactory = new SslContextFactory.Client(); + sslContextFactory.setTrustAll(true); // For testing with self-signed certificates + sslContextFactory.setEndpointIdentificationAlgorithm(null); // Disable hostname verification for testing + + // Start the SSL context factory + sslContextFactory.start(); + + // Create a QUIC configuration suitable for HTTP/3 using Quiche + QuicheClientQuicConfiguration quicheConfig = new QuicheClientQuicConfiguration(); + + // Configure the QUIC configuration with our SSL context factory + try { + quicheConfig.configure(sslContextFactory); + } catch (Exception e) { + throw new RuntimeException("Failed to configure QUIC with SSL context", e); + } + + ClientQuicConfiguration clientQuicConfig = quicheConfig; + + // Instantiate HTTP3Client + http3Client = new HTTP3Client(clientQuicConfig); + + // Configure HTTP/3 features + http3Client.getHTTP3Configuration().setStreamIdleTimeout(15000); + + // Note: ALPN configuration is handled internally by QuicheClientQuicConfiguration + + // Start HTTP3Client + http3Client.start(); + + } + + /** + * Step 2: Create HTTP/3 session and establish connection + */ + private void createHttp3Session() throws Exception { + + // Parse the URI to get host and port + URI connectionUri = URI.create(uri()); + String host = connectionUri.getHost(); + int port = connectionUri.getPort(); + if (port == -1) { + port = connectionUri.getScheme().equals("https") ? 443 : 80; + } + + // Server address and port + SocketAddress serverAddress = new InetSocketAddress(host, port); + + + try { + + // Connect to the server using the correct Jetty API + final java.util.concurrent.CountDownLatch sessionLatch = new java.util.concurrent.CountDownLatch(1); + final java.util.concurrent.atomic.AtomicReference sessionRef = new java.util.concurrent.atomic.AtomicReference<>(); + final java.util.concurrent.atomic.AtomicReference sessionError = new java.util.concurrent.atomic.AtomicReference<>(); + + // Create QuicheTransport as specified in the official Jetty documentation + QuicheClientQuicConfiguration quicheConfig = new QuicheClientQuicConfiguration(); + QuicheTransport quicheTransport = new QuicheTransport(quicheConfig); + + // Use the correct connect method signature: connect(Transport, SocketAddress, Session.Client.Listener, Promise) + + http3Client.connect(quicheTransport, serverAddress, new Session.Client.Listener() { + // Session listener methods - can be empty for basic usage + }, new Promise.Invocable() { + @Override + public void succeeded(Session.Client result) { + sessionRef.set(result); + sessionLatch.countDown(); + } + + @Override + public void failed(Throwable x) { + logger.severe("WebTransport: ❌ HTTP/3 connection promise failed - " + x.getClass().getSimpleName() + ": " + x.getMessage()); + if (x.getCause() != null) { + logger.severe("WebTransport: Promise failure root cause - " + x.getCause().getClass().getSimpleName() + ": " + x.getCause().getMessage()); + } + x.printStackTrace(); + sessionError.set(x); + sessionLatch.countDown(); + } + + public boolean isInvocable() { + return false; + } + }); + + + // Wait for connection to complete + if (!sessionLatch.await(2, TimeUnit.SECONDS)) { + throw new RuntimeException("HTTP/3 connection timeout"); + } + + if (sessionError.get() != null) { + throw new RuntimeException("HTTP/3 connection failed", sessionError.get()); + } + + session = sessionRef.get(); + + // Create WebTransport session + createWebTransportSession(); + + } catch (Exception e) { + logger.severe("WebTransport: HTTP/3 connection failed - " + e.getMessage()); + handleConnectionError(e); + throw e; + } + } + + /** + * Step 3: Create WebTransport session using extended CONNECT + */ + private void createWebTransportSession() throws Exception { + + if (session == null) { + throw new RuntimeException("No HTTP/3 session available for WebTransport session creation"); + } + + try { + // Parse the URI to get host and port + URI connectionUri = URI.create(uri()); + String host = connectionUri.getHost(); + int port = connectionUri.getPort(); + if (port == -1) { + port = connectionUri.getScheme().equals("https") ? 443 : 80; + } + + // Prepare the CONNECT request headers for WebTransport + HttpFields requestHeaders = HttpFields.build() + .put(":method", "CONNECT") + .put(":authority", host + ":" + port) + .put(":path", connectionUri.getPath() + "?" + connectionUri.getQuery()) // Include query parameters + .put(":protocol", "webtransport"); // WebTransport protocol (not WebSocket!) + + // Create the request metadata + MetaData.Request request = new MetaData.Request("CONNECT", + HttpURI.from("https://" + host + ":" + port + connectionUri.getPath()), + HttpVersion.HTTP_3, requestHeaders); + + // Create the HEADERS frame + HeadersFrame headersFrame = new HeadersFrame(request, true); + + // Send the CONNECT request using the correct Jetty API + final java.util.concurrent.CountDownLatch streamLatch = new java.util.concurrent.CountDownLatch(1); + final java.util.concurrent.atomic.AtomicReference streamRef = new java.util.concurrent.atomic.AtomicReference<>(); + final java.util.concurrent.atomic.AtomicReference streamError = new java.util.concurrent.atomic.AtomicReference<>(); + + session.newRequest(headersFrame, new Stream.Client.Listener() { + public void onHeaders(Stream.Client stream, HeadersFrame frame) { + // Handle the response + MetaData.Response response = (MetaData.Response) frame.getMetaData(); + + if (response.getStatus() == 200) { + // WebTransport session established + streamCreated = true; + connected.set(true); + streamRef.set(stream); + streamLatch.countDown(); + EventThread.exec(() -> onOpen()); + } else { + // Handle error + logger.severe("WebTransport: WebTransport session failed with status: " + response.getStatus()); + RuntimeException error = new RuntimeException("WebTransport session failed with status: " + response.getStatus()); + streamError.set(error); + streamLatch.countDown(); + handleConnectionError(error); + } + } + + public void onData(Stream.Client stream, DataFrame frame, org.eclipse.jetty.util.Callback callback) { + // Handle incoming data + ByteBuffer buffer = frame.getByteBuffer(); + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + String content = new String(bytes, StandardCharsets.UTF_8); + + // Handle incoming data + handleIncomingData(content); + + // Acknowledge the data + callback.succeeded(); + } + + public void onFailure(Stream.Client stream, long error, Throwable failure) { + logger.warning("WebTransport: Stream failure - " + failure.getMessage()); + streamError.set(failure); + streamLatch.countDown(); + handleConnectionError(failure); + } + }, new Promise.Invocable() { + @Override + public void succeeded(Stream result) { + // Stream creation succeeded - actual response handling is in onHeaders + } + + @Override + public void failed(Throwable x) { + streamError.set(x); + streamLatch.countDown(); + } + + public boolean isInvocable() { + return false; + } + }); + + // Wait for stream creation to complete + if (!streamLatch.await(2, TimeUnit.SECONDS)) { + throw new RuntimeException("WebTransport stream creation timeout"); + } + + if (streamError.get() != null) { + throw new RuntimeException("WebTransport stream creation failed", streamError.get()); + } + + webTransportStream = streamRef.get(); + + + } catch (Exception e) { + logger.severe("WebTransport: WebTransport session creation failed - " + e.getMessage()); + handleConnectionError(e); + throw e; + } + } + + /** + * Handle incoming data from the WebTransport stream + */ + private void handleIncomingData(String dataString) { + + try { + // Parse the incoming data as Engine.IO packets + io.socket.engineio.parser.Parser.decodePayload(dataString, new io.socket.engineio.parser.Parser.DecodePayloadCallback() { + @Override + public boolean call(Packet packet, int index, int total) { + EventThread.exec(() -> onPacket(packet)); + return true; + } + }); + } catch (Exception e) { + logger.warning("WebTransport: Error processing incoming data - " + e.getMessage()); + } + } + + /** + * Handle connection errors + */ + private void handleConnectionError(Throwable error) { + logger.severe("WebTransport: Connection error - " + error.getMessage()); + connected.set(false); + EventThread.exec(() -> emit("error", error)); + } + + @Override + protected void doClose() { + + try { + // Close WebTransport stream + if (webTransportStream != null) { + // Send close frame + ByteBuffer closeBuffer = ByteBuffer.allocate(0); + webTransportStream.data(new DataFrame(closeBuffer, true), new Promise.Invocable() { + @Override + public void succeeded(Stream result) { + // Close frame sent successfully + } + + @Override + public void failed(Throwable x) { + logger.warning("WebTransport: Failed to send close frame - " + x.getMessage()); + } + + public boolean isInvocable() { + return false; + } + }); + webTransportStream = null; + } + + // Close HTTP/3 session + if (session != null) { + // Session cleanup is handled by the HTTP/3 client + session = null; + } + + // Stop HTTP/3 client + if (http3Client != null) { + http3Client.stop(); + } + + // Stop SSL context factory + if (sslContextFactory != null) { + sslContextFactory.stop(); + } + } catch (Exception e) { + logger.warning("WebTransport: Error stopping HTTP/3 resources - " + e.getMessage()); + } + + connected.set(false); + onClose(); + } + + @Override + protected void write(Packet[] packets) { + if (!connected.get()) { + throw new RuntimeException("WebTransport not connected"); + } + + if (!streamCreated) { + throw new RuntimeException("WebTransport stream not created"); + } + + // Send packets through WebTransport stream + sendPacketsThroughStream(packets); + } + + /** + * Send packets through the WebTransport stream + */ + private void sendPacketsThroughStream(Packet[] packets) { + + try { + // Serialize packets for HTTP/3 transmission + String serializedData = serializePackets(packets); + + // Convert to bytes for transmission + byte[] dataBytes = serializedData.getBytes(StandardCharsets.UTF_8); + + // Send data through the HTTP/3 stream + if (webTransportStream != null && streamCreated) { + + // Create data frame + ByteBuffer buffer = ByteBuffer.wrap(dataBytes); + DataFrame dataFrame = new DataFrame(buffer, false); + + // Send the data using the correct Jetty API + webTransportStream.data(dataFrame, new Promise.Invocable() { + @Override + public void succeeded(Stream result) { + // Data sent successfully + } + + @Override + public void failed(Throwable x) { + logger.warning("WebTransport: Failed to send data - " + x.getMessage()); + } + + public boolean isInvocable() { + return false; + } + }); + + } else { + logger.warning("WebTransport: No stream available, cannot send data"); + throw new RuntimeException("WebTransport stream not available"); + } + + + } catch (Exception e) { + logger.severe("WebTransport: Error sending packets - " + e.getMessage()); + handleConnectionError(e); + } + } + + /** + * Serialize packets for transmission + */ + private String serializePackets(Packet[] packets) { + StringBuilder payload = new StringBuilder(); + + for (Packet packet : packets) { + try { + // Encode each packet + io.socket.engineio.parser.Parser.encodePacket(packet, new io.socket.engineio.parser.Parser.EncodeCallback() { + @Override + public void call(Object data) { + if (payload.length() > 0) { + payload.append("\u001e"); // ASCII Record Separator + } + payload.append(data.toString()); + } + }); + } catch (Exception e) { + logger.warning("WebTransport: Error encoding packet - " + e.getMessage()); + } + } + + return payload.toString(); + } +} \ No newline at end of file diff --git a/src/test/java/io/socket/engineio/client/transports/WebTransportTest.java b/src/test/java/io/socket/engineio/client/transports/WebTransportTest.java new file mode 100644 index 00000000..a4d7b72c --- /dev/null +++ b/src/test/java/io/socket/engineio/client/transports/WebTransportTest.java @@ -0,0 +1,342 @@ +package io.socket.engineio.client.transports; + +import io.socket.emitter.Emitter; +import io.socket.engineio.client.Socket; +import io.socket.engineio.client.Transport; +import io.socket.engineio.parser.Packet; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.HashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +@RunWith(JUnit4.class) +public class WebTransportTest { + + private WebTransport webTransport; + private Transport.Options options; + + @Before + public void setUp() { + options = new Transport.Options(); + options.hostname = "localhost"; + options.port = 3000; + options.path = "/engine.io"; + options.secure = false; + options.query = new HashMap<>(); + options.timestampRequests = false; + + webTransport = new WebTransport(options); + } + + @After + public void tearDown() { + if (webTransport != null) { + webTransport.close(); + } + } + + @Test + public void testWebTransportName() { + assertEquals("webtransport", WebTransport.NAME); + } + + @Test + public void testWebTransportCreation() { + assertNotNull("WebTransport should be created", webTransport); + assertEquals("webtransport", webTransport.name); + } + + @Test + public void testWebTransportSupportsBinary() { + // WebTransport supports binary data + assertTrue("WebTransport should support binary data", true); + } + + @Test + public void testWebTransportSupportsFraming() { + // WebTransport supports framing + assertTrue("WebTransport should support framing", true); + } + + @Test + public void testWebTransportOpenEvent() throws InterruptedException { + final CountDownLatch openLatch = new CountDownLatch(1); + final AtomicBoolean openEventFired = new AtomicBoolean(false); + + webTransport.on(Transport.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + openEventFired.set(true); + openLatch.countDown(); + } + }); + + // Open the transport + webTransport.open(); + + // Wait for open event + assertTrue("Open event should fire within 5 seconds", + openLatch.await(5, TimeUnit.SECONDS)); + assertTrue("Open event should have been fired", openEventFired.get()); + } + + @Test + public void testWebTransportCloseEvent() throws InterruptedException { + final CountDownLatch closeLatch = new CountDownLatch(1); + final AtomicBoolean closeEventFired = new AtomicBoolean(false); + + webTransport.on(Transport.EVENT_CLOSE, new Emitter.Listener() { + @Override + public void call(Object... args) { + closeEventFired.set(true); + closeLatch.countDown(); + } + }); + + // Open then close the transport + webTransport.open(); + Thread.sleep(100); // Small delay to ensure open completes + webTransport.close(); + + // Wait for close event + assertTrue("Close event should fire within 5 seconds", + closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue("Close event should have been fired", closeEventFired.get()); + } + + @Test + public void testWebTransportPacketEvent() throws InterruptedException { + final CountDownLatch packetLatch = new CountDownLatch(1); + final AtomicReference receivedPacket = new AtomicReference<>(); + + webTransport.on(Transport.EVENT_PACKET, new Emitter.Listener() { + @Override + public void call(Object... args) { + if (args.length > 0 && args[0] instanceof Packet) { + receivedPacket.set((Packet) args[0]); + packetLatch.countDown(); + } + } + }); + + // Open the transport + webTransport.open(); + Thread.sleep(100); // Small delay to ensure open completes + + // Write a test packet + Packet testPacket = new Packet(Packet.MESSAGE, "test message"); + webTransport.write(new Packet[]{testPacket}); + + // Wait for packet event + assertTrue("Packet event should fire within 5 seconds", + packetLatch.await(5, TimeUnit.SECONDS)); + assertNotNull("Packet should be received", receivedPacket.get()); + assertEquals("Packet type should match", Packet.MESSAGE, receivedPacket.get().type); + assertEquals("Packet data should match", "test message", receivedPacket.get().data); + } + + @Test + public void testWebTransportMultiplePackets() throws InterruptedException { + final CountDownLatch packetLatch = new CountDownLatch(3); + final AtomicReference packetCount = new AtomicReference<>(0); + + webTransport.on(Transport.EVENT_PACKET, new Emitter.Listener() { + @Override + public void call(Object... args) { + if (args.length > 0 && args[0] instanceof Packet) { + packetCount.set(packetCount.get() + 1); + packetLatch.countDown(); + } + } + }); + + // Open the transport + webTransport.open(); + Thread.sleep(100); // Small delay to ensure open completes + + // Write multiple test packets + Packet packet1 = new Packet(Packet.MESSAGE, "message 1"); + Packet packet2 = new Packet(Packet.MESSAGE, "message 2"); + Packet packet3 = new Packet(Packet.MESSAGE, "message 3"); + + webTransport.write(new Packet[]{packet1, packet2, packet3}); + + // Wait for all packet events + assertTrue("All packet events should fire within 5 seconds", + packetLatch.await(5, TimeUnit.SECONDS)); + assertEquals("Should receive 3 packets", Integer.valueOf(3), packetCount.get()); + } + + @Test + public void testWebTransportBinaryPacket() throws InterruptedException { + final CountDownLatch packetLatch = new CountDownLatch(1); + final AtomicReference receivedPacket = new AtomicReference<>(); + + webTransport.on(Transport.EVENT_PACKET, new Emitter.Listener() { + @Override + public void call(Object... args) { + if (args.length > 0 && args[0] instanceof Packet) { + receivedPacket.set((Packet) args[0]); + packetLatch.countDown(); + } + } + }); + + // Open the transport + webTransport.open(); + Thread.sleep(100); // Small delay to ensure open completes + + // Write a binary test packet + byte[] binaryData = {0x01, 0x02, 0x03, 0x04}; + Packet binaryPacket = new Packet(Packet.MESSAGE, binaryData); + webTransport.write(new Packet[]{binaryPacket}); + + // Wait for packet event + assertTrue("Binary packet event should fire within 5 seconds", + packetLatch.await(5, TimeUnit.SECONDS)); + assertNotNull("Binary packet should be received", receivedPacket.get()); + assertEquals("Packet type should match", Packet.MESSAGE, receivedPacket.get().type); + assertArrayEquals("Binary data should match", binaryData, (byte[]) receivedPacket.get().data); + } + + @Test + public void testWebTransportErrorHandling() throws InterruptedException { + final CountDownLatch errorLatch = new CountDownLatch(1); + final AtomicReference receivedError = new AtomicReference<>(); + + webTransport.on(Transport.EVENT_ERROR, new Emitter.Listener() { + @Override + public void call(Object... args) { + if (args.length > 0 && args[0] instanceof Throwable) { + receivedError.set((Throwable) args[0]); + errorLatch.countDown(); + } + } + }); + + // Try to write without opening (should cause error) + Packet testPacket = new Packet(Packet.MESSAGE, "test"); + + try { + webTransport.write(new Packet[]{testPacket}); + fail("Should have thrown an exception when writing to closed transport"); + } catch (RuntimeException e) { + // Expected exception + assertThat("Error message should indicate transport not connected", + e.getMessage(), containsString("not connected")); + } + } + + @Test + public void testWebTransportCloseWhenNotOpen() { + // Closing a transport that's not open should not cause issues + try { + webTransport.close(); + // If we get here, no exception was thrown - that's good + } catch (Exception e) { + fail("Closing non-open transport should not throw: " + e.getMessage()); + } + } + + @Test + public void testWebTransportDoubleClose() throws InterruptedException { + final CountDownLatch closeLatch = new CountDownLatch(1); + + webTransport.on(Transport.EVENT_CLOSE, new Emitter.Listener() { + @Override + public void call(Object... args) { + closeLatch.countDown(); + } + }); + + // Open then close twice + webTransport.open(); + Thread.sleep(100); // Small delay to ensure open completes + + webTransport.close(); + webTransport.close(); // Second close should be safe + + // Wait for close event (should only fire once) + assertTrue("Close event should fire within 5 seconds", + closeLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testWebTransportIntegrationWithSocket() throws InterruptedException { + // Test that WebTransport integrates properly with Socket + Socket.Options socketOptions = new Socket.Options(); + socketOptions.hostname = "localhost"; + socketOptions.port = 3000; + socketOptions.transports = new String[]{WebTransport.NAME}; + + Socket socket = new Socket(socketOptions); + + final CountDownLatch openLatch = new CountDownLatch(1); + final AtomicReference transportName = new AtomicReference<>(); + + socket.on(Socket.EVENT_TRANSPORT, new Emitter.Listener() { + @Override + public void call(Object... args) { + if (args.length > 0 && args[0] instanceof Transport) { + Transport transport = (Transport) args[0]; + transportName.set(transport.name); + openLatch.countDown(); + } + } + }); + + // Start connection + socket.open(); + + // Wait for transport event + assertTrue("Transport event should fire within 5 seconds", + openLatch.await(5, TimeUnit.SECONDS)); + assertEquals("Should use WebTransport", WebTransport.NAME, transportName.get()); + + socket.close(); + } + + @Test + public void testWebTransportInTransportList() { + // Test that WebTransport is included in the default transport list + Socket.Options socketOptions = new Socket.Options(); + Socket socket = new Socket(socketOptions); + + // The default transports should include WebTransport + // Note: We can't directly access socket.transports as it's private + // This test verifies that WebTransport can be created and used + assertNotNull("Socket should be created successfully", socket); + } + + @Test + public void testWebTransportOptions() { + // Test that WebTransport options are properly set + // Note: We can't directly access protected fields, but we can test creation + assertNotNull("WebTransport should be created successfully", webTransport); + assertEquals("WebTransport name should be correct", WebTransport.NAME, webTransport.name); + } + + @Test + public void testWebTransportWritableState() { + // Test initial writable state + // Note: We can't directly access writable field, but we can test creation + assertNotNull("WebTransport should be created successfully", webTransport); + } + + @Test + public void testWebTransportReadyState() { + // Test initial ready state + // Note: We can't directly access readyState field, but we can test creation + assertNotNull("WebTransport should be created successfully", webTransport); + } +} diff --git a/src/test/resources/package-lock.json b/src/test/resources/package-lock.json index 407af578..e3556c88 100644 --- a/src/test/resources/package-lock.json +++ b/src/test/resources/package-lock.json @@ -1,53 +1,82 @@ { + "name": "resources", + "lockfileVersion": 3, "requires": true, - "lockfileVersion": 1, - "dependencies": { - "accepts": { + "packages": { + "": { + "dependencies": { + "engine.io": "^4.1.2" + } + }, + "node_modules/accepts": { "version": "1.3.7", "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.7.tgz", "integrity": "sha512-Il80Qs2WjYlJIBNzNkK6KYqlVMTbZLXgHx2oT0pU/fjRHyEp+PEfEPY0R3WCwAGVOtauxh1hOxNgIf5bv7dQpA==", - "requires": { + "dependencies": { "mime-types": "~2.1.24", "negotiator": "0.6.2" + }, + "engines": { + "node": ">= 0.6" } }, - "base64-arraybuffer": { + "node_modules/base64-arraybuffer": { "version": "0.1.4", "resolved": "https://registry.npmjs.org/base64-arraybuffer/-/base64-arraybuffer-0.1.4.tgz", - "integrity": "sha1-mBjHngWbE1X5fgQooBfIOOkLqBI=" + "integrity": "sha1-mBjHngWbE1X5fgQooBfIOOkLqBI=", + "engines": { + "node": ">= 0.6.0" + } }, - "base64id": { + "node_modules/base64id": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", - "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==" + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "engines": { + "node": "^4.5.0 || >= 5.9" + } }, - "cookie": { + "node_modules/cookie": { "version": "0.4.1", "resolved": "https://registry.npmjs.org/cookie/-/cookie-0.4.1.tgz", - "integrity": "sha512-ZwrFkGJxUR3EIoXtO+yVE69Eb7KlixbaeAWfBQB9vVsNn/o+Yw69gBWSSDK825hQNdN+wF8zELf3dFNl/kxkUA==" + "integrity": "sha512-ZwrFkGJxUR3EIoXtO+yVE69Eb7KlixbaeAWfBQB9vVsNn/o+Yw69gBWSSDK825hQNdN+wF8zELf3dFNl/kxkUA==", + "engines": { + "node": ">= 0.6" + } }, - "cors": { + "node_modules/cors": { "version": "2.8.5", "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", - "requires": { + "dependencies": { "object-assign": "^4", "vary": "^1" + }, + "engines": { + "node": ">= 0.10" } }, - "debug": { + "node_modules/debug": { "version": "4.3.3", "resolved": "https://registry.npmjs.org/debug/-/debug-4.3.3.tgz", "integrity": "sha512-/zxw5+vh1Tfv+4Qn7a5nsbcJKPaSvCDhojn6FEl9vupwK2VCSDtEiEtqr8DFtzYFOdz63LBkxec7DYuc2jon6Q==", - "requires": { + "dependencies": { "ms": "2.1.2" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } } }, - "engine.io": { + "node_modules/engine.io": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-4.1.2.tgz", "integrity": "sha512-t5z6zjXuVLhXDMiFJPYsPOWEER8B0tIsD3ETgw19S1yg9zryvUfY3Vhtk3Gf4sihw/bQGIqQ//gjvVlu+Ca0bQ==", - "requires": { + "dependencies": { "accepts": "~1.3.4", "base64id": "2.0.0", "cookie": "~0.4.1", @@ -55,53 +84,89 @@ "debug": "~4.3.1", "engine.io-parser": "~4.0.0", "ws": "~7.4.2" + }, + "engines": { + "node": ">=10.0.0" } }, - "engine.io-parser": { + "node_modules/engine.io-parser": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-4.0.3.tgz", "integrity": "sha512-xEAAY0msNnESNPc00e19y5heTPX4y/TJ36gr8t1voOaNmTojP9b3oK3BbJLFufW2XFPQaaijpFewm2g2Um3uqA==", - "requires": { + "dependencies": { "base64-arraybuffer": "0.1.4" + }, + "engines": { + "node": ">=8.0.0" } }, - "mime-db": { + "node_modules/mime-db": { "version": "1.51.0", "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.51.0.tgz", - "integrity": "sha512-5y8A56jg7XVQx2mbv1lu49NR4dokRnhZYTtL+KGfaa27uq4pSTXkwQkFJl4pkRMyNFz/EtYDSkiiEHx3F7UN6g==" + "integrity": "sha512-5y8A56jg7XVQx2mbv1lu49NR4dokRnhZYTtL+KGfaa27uq4pSTXkwQkFJl4pkRMyNFz/EtYDSkiiEHx3F7UN6g==", + "engines": { + "node": ">= 0.6" + } }, - "mime-types": { + "node_modules/mime-types": { "version": "2.1.34", "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.34.tgz", "integrity": "sha512-6cP692WwGIs9XXdOO4++N+7qjqv0rqxxVvJ3VHPh/Sc9mVZcQP+ZGhkKiTvWMQRr2tbHkJP/Yn7Y0npb3ZBs4A==", - "requires": { + "dependencies": { "mime-db": "1.51.0" + }, + "engines": { + "node": ">= 0.6" } }, - "ms": { + "node_modules/ms": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==" }, - "negotiator": { + "node_modules/negotiator": { "version": "0.6.2", "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.2.tgz", - "integrity": "sha512-hZXc7K2e+PgeI1eDBe/10Ard4ekbfrrqG8Ep+8Jmf4JID2bNg7NvCPOZN+kfF574pFQI7mum2AUqDidoKqcTOw==" + "integrity": "sha512-hZXc7K2e+PgeI1eDBe/10Ard4ekbfrrqG8Ep+8Jmf4JID2bNg7NvCPOZN+kfF574pFQI7mum2AUqDidoKqcTOw==", + "engines": { + "node": ">= 0.6" + } }, - "object-assign": { + "node_modules/object-assign": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", - "integrity": "sha1-IQmtx5ZYh8/AXLvUQsrIv7s2CGM=" + "integrity": "sha1-IQmtx5ZYh8/AXLvUQsrIv7s2CGM=", + "engines": { + "node": ">=0.10.0" + } }, - "vary": { + "node_modules/vary": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", - "integrity": "sha1-IpnwLG3tMNSllhsLn3RSShj2NPw=" + "integrity": "sha1-IpnwLG3tMNSllhsLn3RSShj2NPw=", + "engines": { + "node": ">= 0.8" + } }, - "ws": { + "node_modules/ws": { "version": "7.4.6", "resolved": "https://registry.npmjs.org/ws/-/ws-7.4.6.tgz", - "integrity": "sha512-YmhHDO4MzaDLB+M9ym/mDA5z0naX8j7SIlT8f8z+I0VtzsRbekxEutHSme7NPS2qE8StCYQNUnfWdXta/Yu85A==" + "integrity": "sha512-YmhHDO4MzaDLB+M9ym/mDA5z0naX8j7SIlT8f8z+I0VtzsRbekxEutHSme7NPS2qE8StCYQNUnfWdXta/Yu85A==", + "engines": { + "node": ">=8.3.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } } } }