Skip to content

ZOOKEEPER-4923: Add timeout to control brand-new session establishment #2253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ static class AuthData {

private final int sessionTimeout;

private final long newSessionTimeout;

private final ZKWatchManager watchManager;

private long sessionId;
Expand Down Expand Up @@ -398,6 +400,36 @@ public ClientCnxn(
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly
) throws IOException {
this(hostProvider, sessionTimeout, Long.MAX_VALUE, clientConfig, defaultWatcher, clientCnxnSocket, sessionId, sessionPasswd, canBeReadOnly);
}

/**
* Creates a connection object. The actual network connect doesn't get
* established until needed. The start() instance method must be called
* after construction.
*
* @param hostProvider the list of ZooKeeper servers to connect to
* @param sessionTimeout the timeout for connections.
* @param newSessionTimeout the timeout before giving up brand-new session establishment.
* @param clientConfig the client configuration.
* @param defaultWatcher default watcher for this connection
* @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
* @param canBeReadOnly whether the connection is allowed to go to read-only mode in case of partitioning
* @throws IOException in cases of broken network
*/
public ClientCnxn(
HostProvider hostProvider,
int sessionTimeout,
long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly
) throws IOException {
this.hostProvider = hostProvider;
this.sessionTimeout = sessionTimeout;
Expand All @@ -413,6 +445,7 @@ public ClientCnxn(
this.connectTimeout = sessionTimeout / hostProvider.size();
this.readTimeout = sessionTimeout * 2 / 3;
this.expirationTimeout = sessionTimeout * 4 / 3;
this.newSessionTimeout = newSessionTimeout == 0 ? expirationTimeout : newSessionTimeout;

this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
Expand Down Expand Up @@ -1192,7 +1225,12 @@ public void run() {
to = connectTimeout - clientCnxnSocket.getIdleSend();
}

int expiration = sessionId == 0 ? Integer.MAX_VALUE : expirationTimeout - clientCnxnSocket.getIdleRecv();
long expiration;
if (sessionId == 0) {
expiration = newSessionTimeout - clientCnxnSocket.getIdleRecv();
} else {
expiration = expirationTimeout - clientCnxnSocket.getIdleRecv();
}
if (expiration <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ public ZooKeeper(
ClientCnxn createConnection(
HostProvider hostProvider,
int sessionTimeout,
long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
Expand All @@ -681,6 +682,7 @@ ClientCnxn createConnection(
return new ClientCnxn(
hostProvider,
sessionTimeout,
newSessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
Expand Down Expand Up @@ -1110,6 +1112,7 @@ public ZooKeeper(ZooKeeperOptions options) throws IOException {
cnxn = createConnection(
hostProvider,
sessionTimeout,
options.getNewSessionTimeout(),
this.clientConfig,
watcher,
getClientCnxnSocket(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.Watcher;
Expand All @@ -37,6 +39,7 @@
public class ZooKeeperBuilder {
private final String connectString;
private final int sessionTimeout;
private long newSessionTimeout = Long.MAX_VALUE;
private Function<Collection<InetSocketAddress>, HostProvider> hostProvider;
private Watcher defaultWatcher;
private boolean canBeReadOnly = false;
Expand Down Expand Up @@ -126,6 +129,21 @@ public ZooKeeperBuilder withSession(long sessionId, byte[] sessionPasswd) {
return this;
}

/**
* Specifies timeout to establish a brand-new session.
*
* @param timeout timeout to get {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} in
* establishing a brand-new session. {@code null}, which is the default, means endless
* retry until connected, {@code Duration.ZERO} means a sensible value deduced from
* specified session timeout.
* @return this
* @since 3.10.0
*/
public ZooKeeperBuilder withNewSessionTimeout(@Nullable Duration timeout) {
this.newSessionTimeout = timeout == null ? Long.MAX_VALUE : timeout.toMillis();
return this;
}

/**
* Specifies the client config used to construct ZooKeeper instances.
*
Expand All @@ -150,6 +168,7 @@ public ZooKeeperOptions toOptions() {
return new ZooKeeperOptions(
connectString,
sessionTimeout,
newSessionTimeout,
defaultWatcher,
hostProvider,
canBeReadOnly,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class ZooKeeperOptions {
private final String connectString;
private final int sessionTimeout;
private final long newSessionTimeout;
private final Watcher defaultWatcher;
private final Function<Collection<InetSocketAddress>, HostProvider> hostProvider;
private final boolean canBeReadOnly;
Expand All @@ -41,6 +42,7 @@ public class ZooKeeperOptions {

ZooKeeperOptions(String connectString,
int sessionTimeout,
long newSessionTimeout,
Watcher defaultWatcher,
Function<Collection<InetSocketAddress>, HostProvider> hostProvider,
boolean canBeReadOnly,
Expand All @@ -49,6 +51,7 @@ public class ZooKeeperOptions {
ZKClientConfig clientConfig) {
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
this.newSessionTimeout = newSessionTimeout;
this.hostProvider = hostProvider;
this.defaultWatcher = defaultWatcher;
this.canBeReadOnly = canBeReadOnly;
Expand All @@ -65,6 +68,10 @@ public int getSessionTimeout() {
return sessionTimeout;
}

public long getNewSessionTimeout() {
return newSessionTimeout;
}

public Watcher getDefaultWatcher() {
return defaultWatcher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class CustomClientCnxn extends ClientCnxn {
public CustomClientCnxn(
HostProvider hostProvider,
int sessionTimeout,
long newSessionTimeout,
ZKClientConfig zkClientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
Expand All @@ -292,6 +293,7 @@ public CustomClientCnxn(
super(
hostProvider,
sessionTimeout,
newSessionTimeout,
zkClientConfig,
defaultWatcher,
clientCnxnSocket,
Expand Down Expand Up @@ -357,6 +359,7 @@ public boolean isAlive() {
ClientCnxn createConnection(
HostProvider hostProvider,
int sessionTimeout,
long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
Expand All @@ -369,6 +372,7 @@ ClientCnxn createConnection(
ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(
hostProvider,
sessionTimeout,
newSessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class CustomClientCnxn extends ClientCnxn {
CustomClientCnxn(
HostProvider hostProvider,
int sessionTimeout,
long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
Expand All @@ -235,6 +236,7 @@ class CustomClientCnxn extends ClientCnxn {
super(
hostProvider,
sessionTimeout,
newSessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
Expand Down Expand Up @@ -286,6 +288,7 @@ public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher
ClientCnxn createConnection(
HostProvider hostProvider,
int sessionTimeout,
long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
Expand All @@ -296,6 +299,7 @@ ClientCnxn createConnection(
return new CustomClientCnxn(
hostProvider,
sessionTimeout,
newSessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.zookeeper.test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -29,6 +30,7 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -42,6 +44,7 @@
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZooKeeperBuilder;
import org.apache.zookeeper.common.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -201,6 +204,60 @@ public void testSessionExpirationWhenNoServerUp() throws Exception {
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
}

// when: try to establish a brand-new session using builder
watcher.reset();
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, sessionTimeout)
.withDefaultWatcher(watcher)
.build()) {
// then: never Expired
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
}

// when: try to establish a brand-new session using builder with null newSessionTimeout
watcher.reset();
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, sessionTimeout)
.withDefaultWatcher(watcher)
.withNewSessionTimeout(null)
.build()) {
// then: never Expired
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
}

// when: try to establish a brand-new session using builder with Duration.ZERO newSessionTimeout
watcher.reset();
long start = Time.currentElapsedTime();
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, sessionTimeout)
.withDefaultWatcher(watcher)
.withNewSessionTimeout(Duration.ZERO)
.build()) {
// then: get Expired after some delay
watcher.expired.join();
long elapsed = Time.currentElapsedTime() - start;
assertThat(elapsed, greaterThan((long) sessionTimeout));
assertThat(elapsed, lessThan(sessionTimeout * 10L));
// then: future request will get SessionExpiredException
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
}

// when: try to establish a brand-new session using builder with custom newSessionTimeout
watcher.reset();
start = Time.currentElapsedTime();
Duration newSessionTimeout = Duration.ofMillis(300);
try (ZooKeeper zk = new ZooKeeperBuilder(hostPort, 30000)
.withDefaultWatcher(watcher)
.withNewSessionTimeout(newSessionTimeout)
.build()) {
// then: get Expired after newSessionTimeout
watcher.expired.join();
long elapsed = Time.currentElapsedTime() - start;
assertThat(elapsed, greaterThanOrEqualTo(newSessionTimeout.toMillis()));
assertThat(elapsed, lessThan(newSessionTimeout.toMillis() * 10));
// then: future request will get SessionExpiredException
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
}
}

@Test
Expand Down
Loading