Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,39 @@
</prerequisites>

<dependencies>
<!-- OkHttp for HTTP/1.1, HTTP/2, and WebSocket -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.12.12</version>
</dependency>
<!-- Jetty HTTP/3 client for WebTransport support -->
<dependency>
<groupId>org.eclipse.jetty.http3</groupId>
<artifactId>jetty-http3-client</artifactId>
<version>12.1.1</version>
</dependency>

<!-- Jetty ALPN client for HTTP/3 protocol negotiation -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-alpn-client</artifactId>
<version>12.1.1</version>
</dependency>

<!-- Jetty QUIC Quiche implementation for HTTP/3 -->
<dependency>
<groupId>org.eclipse.jetty.quic</groupId>
<artifactId>jetty-quic-quiche-client</artifactId>
<version>12.1.1</version>
</dependency>

<!-- Conscrypt ALPN implementation for HTTP/3 -->
<dependency>
<groupId>org.conscrypt</groupId>
<artifactId>conscrypt-openjdk-uber</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
Expand Down Expand Up @@ -88,8 +116,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>17</source>
<target>17</target>
<compilerArgs>
<arg>-Xlint:unchecked</arg>
</compilerArgs>
Expand Down
101 changes: 78 additions & 23 deletions src/main/java/io/socket/engineio/client/Socket.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.socket.engineio.client.transports.Polling;
import io.socket.engineio.client.transports.PollingXHR;
import io.socket.engineio.client.transports.WebSocket;
import io.socket.engineio.client.transports.WebTransport;
import io.socket.engineio.parser.Packet;
import io.socket.engineio.parser.Parser;
import io.socket.parseqs.ParseQS;
Expand Down Expand Up @@ -199,6 +200,7 @@ public Socket(Options opts) {
this.timestampRequests = opts.timestampRequests;
this.transports = new ArrayList<String>(Arrays.asList(opts.transports != null ?
opts.transports : new String[]{Polling.NAME, WebSocket.NAME}));

this.transportOptions = opts.transportOptions != null ?
opts.transportOptions : new HashMap<String, Transport.Options>();
this.policyPort = opts.policyPort != 0 ? opts.policyPort : 843;
Expand Down Expand Up @@ -240,30 +242,67 @@ public Socket open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
String transportName;
if (Socket.this.rememberUpgrade && Socket.priorWebsocketSuccess && Socket.this.transports.contains(WebSocket.NAME)) {
transportName = WebSocket.NAME;
} else if (0 == Socket.this.transports.size()) {
// Emit error on next tick so it can be listened to
final Socket self = Socket.this;
EventThread.nextTick(new Runnable() {
@Override
public void run() {
self.emit(Socket.EVENT_ERROR, new EngineIOException("No transports available"));
}
});
return;
} else {
transportName = Socket.this.transports.get(0);
}
Socket.this.readyState = ReadyState.OPENING;
Transport transport = Socket.this.createTransport(transportName);
Socket.this.setTransport(transport);
transport.open();
Socket.this.tryNextTransport(0);
}
});
return this;
}

private void tryNextTransport(int transportIndex) {
if (transportIndex >= this.transports.size()) {
// No more transports to try - set state to CLOSED and emit error
this.readyState = ReadyState.CLOSED;
final Socket self = this;
EventThread.nextTick(new Runnable() {
@Override
public void run() {
self.emit(Socket.EVENT_ERROR, new EngineIOException("No transports available"));
}
});
return;
}

String transportName;
if (Socket.this.rememberUpgrade && Socket.priorWebsocketSuccess && Socket.this.transports.contains(WebSocket.NAME)) {
transportName = WebSocket.NAME;
} else {
transportName = Socket.this.transports.get(transportIndex);
}


Socket.this.readyState = ReadyState.OPENING;
Transport transport = Socket.this.createTransport(transportName);


// Add error listener to try next transport on failure
transport.on(Transport.EVENT_ERROR, new Listener() {
@Override
public void call(Object... args) {
// Clean up current transport
transport.close();
// Don't reset socket state to CLOSED - let next transport attempt to open
// Socket.this.readyState = ReadyState.CLOSED;
// Immediately try next transport for fast fallback (like JavaScript client)
EventThread.exec(new Runnable() {
@Override
public void run() {
// Try next transport immediately
Socket.this.tryNextTransport(transportIndex + 1);
}
});
}
});

// Add open listener to see if transport opens successfully
transport.on(Transport.EVENT_OPEN, new Listener() {
@Override
public void call(Object... args) {
}
});

Socket.this.setTransport(transport);
transport.open();
}

private Transport createTransport(String name) {
if (logger.isLoggable(Level.FINE)) {
Expand Down Expand Up @@ -300,8 +339,10 @@ private Transport createTransport(String name) {
transport = new WebSocket(opts);
} else if (Polling.NAME.equals(name)) {
transport = new PollingXHR(opts);
} else if (WebTransport.NAME.equals(name)) {
transport = new WebTransport(opts);
} else {
throw new RuntimeException();
throw new RuntimeException("Unknown transport: " + name);
}

this.emit(EVENT_TRANSPORT, transport);
Expand Down Expand Up @@ -337,12 +378,26 @@ public void call(Object... args) {
}).on(Transport.EVENT_ERROR, new Listener() {
@Override
public void call(Object... args) {
self.onError(args.length > 0 ? (Exception) args[0] : null);
// Only handle error if this transport is still the current transport
if (self.transport == transport) {
self.onError(args.length > 0 ? (Exception) args[0] : null);
} else {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Ignoring error event from old transport " + transport.name);
}
}
}
}).on(Transport.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... args) {
self.onClose("transport close");
// Only close the socket if this transport is still the current transport
if (self.transport == transport) {
self.onClose("transport close");
} else {
if (logger.isLoggable(Level.FINE)) {
logger.fine("Ignoring close event from old transport " + transport.name);
}
}
}
});
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/socket/engineio/client/Transport.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

import io.socket.emitter.Emitter;
import io.socket.engineio.parser.Packet;
Expand All @@ -13,6 +14,8 @@

public abstract class Transport extends Emitter {

private static final Logger logger = Logger.getLogger(Transport.class.getName());

protected enum ReadyState {
OPENING, OPEN, CLOSED, PAUSED;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public void create() {
requestCall.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.severe("PollingXHR: HTTP request failed: " + e.getMessage());
self.onError(e);
}

Expand All @@ -213,6 +214,7 @@ public void onResponse(Call call, Response response) throws IOException {
if (response.isSuccessful()) {
self.onLoad();
} else {
logger.severe("PollingXHR: HTTP response failed: " + response.code() + " " + response.message());
self.onError(new IOException(Integer.toString(response.code())));
}
} finally {
Expand Down Expand Up @@ -247,8 +249,10 @@ private void onLoad() {
ResponseBody body = response.body();

try {
this.onData(body.string());
String responseData = body.string();
this.onData(responseData);
} catch (IOException e) {
logger.severe("PollingXHR: Error reading response body: " + e.getMessage());
this.onError(e);
}
}
Expand Down
Loading