diff --git a/all/pom.xml b/all/pom.xml
new file mode 100644
index 00000000..9e19ca1c
--- /dev/null
+++ b/all/pom.xml
@@ -0,0 +1,61 @@
+
+
* Clients communicate with the distributed state machine by submitting {@link Command commands} and {@link Query queries} to - * the cluster through the {@link #submit(Command)} and {@link #submit(Query)} methods respectively: + * the cluster through the {@link CopycatSession#submit(Command)} and {@link CopycatSession#submit(Query)} methods respectively: *
* {@code
* client.submit(new PutCommand("foo", "Hello world!")).thenAccept(result -> {
@@ -82,7 +85,7 @@
* {@link Query.ConsistencyLevel} documentation for more info.
*
* Throughout the lifetime of a client, the client may operate on the cluster via multiple sessions according to the configured
- * {@link RecoveryStrategy}. In the event that the client's session expires, the client may register a new session and continue
+ * recovery strategy. In the event that the client's session expires, the client may register a new session and continue
* to submit operations under the recovered session. The client will always attempt to ensure commands submitted are eventually
* committed to the cluster even across sessions. If a command is submitted under one session but is not completed before the
* session is lost and a new session is established, the client will resubmit pending commands from the prior session under
@@ -107,9 +110,9 @@
* by the cluster may be lost.
*
Session events
* Clients can receive arbitrary event notifications from the cluster by registering an event listener via
- * {@link #onEvent(String, Consumer)}. When a command is applied to a state machine, the state machine may publish any number
+ * {@link CopycatSession#onEvent(String, Consumer)}. When a command is applied to a state machine, the state machine may publish any number
* of events to any open session. Events will be sent to the client by the server to which the client is connected as dictated
- * by the configured {@link ServerSelectionStrategy}. In the event a client is disconnected from a server, events will be
+ * by the configured {@link CommunicationStrategy}. In the event a client is disconnected from a server, events will be
* retained in memory on all servers until the client reconnects to another server or its session expires. Once a client
* reconnects to a new server, the new server will resume sending session events to the client.
*
@@ -131,7 +134,7 @@
* be received by the client in sequential order. This means the {@link CompletableFuture}s returned when submitting
* operations through the client are guaranteed to be completed in the order in which they were created.
*
- * Sequential consistency is also guaranteed for {@link #onEvent(String, Consumer) events} received by a client, and events
+ * Sequential consistency is also guaranteed for {@link CopycatSession#onEvent(String, Consumer) events} received by a client, and events
* are sequenced with command and query responses. If a client submits a command that publishes an event and then immediately
* submits a concurrent query, the client will first receive the command response, then the event message, then the query
* response.
@@ -140,7 +143,7 @@
* thread, it is critical that clients not block the event thread. If clients need to perform blocking actions on response to
* an event or response, do so on another thread.
*
Serialization
- * All {@link Command commands}, {@link Query queries}, and session {@link #onEvent(String, Consumer) events} must be
+ * All {@link Command commands}, {@link Query queries}, and session {@link CopycatSession#onEvent(String, Consumer) events} must be
* serializable by the {@link Serializer} associated with the client. Serializable types can be registered at any time.
* To register a serializable type and serializer, use the {@link Serializer#register(Class) register} methods.
*
@@ -194,43 +197,16 @@ enum State {
/**
* Indicates that the client is connected and its session is open.
- *
- * The {@code CONNECTED} state indicates that the client is healthy and operating normally. {@link Command commands}
- * and {@link Query queries} submitted and completed while the client is in this state are guaranteed to adhere to
- * consistency guarantees.
- * {@link Query.ConsistencyLevel levels}.
*/
CONNECTED,
/**
* Indicates that the client is suspended and its session may or may not be expired.
- *
- * The {@code SUSPENDED} state is indicative of an inability to communicate with the cluster within the context of
- * the client's {@link Session}. Operations submitted to or completed by clients in this state should be considered
- * unsafe. An operation submitted to a {@link #CONNECTED} client that transitions to the {@code SUSPENDED} state
- * prior to the operation's completion may be committed multiple times in the event that the underlying session
- * is ultimately {@link Session.State#EXPIRED expired}, thus breaking linearizability. Additionally, state machines
- * may see the session expire while the client is in this state.
- *
- * If the client is configured with a {@link RecoveryStrategy} that recovers the client's session upon expiration,
- * the client will transition back to the {@link #CONNECTED} state once a new session is registered, otherwise the
- * client will transition either to the {@link #CONNECTED} or {@link #CLOSED} state based on whether its session
- * is expired as determined once it re-establishes communication with the cluster.
- *
- * If the client is configured with a {@link RecoveryStrategy} that does not recover the client's session
- * upon a session expiration, all guarantees will be maintained by the client even for operations submitted in this
- * state. If linearizability guarantees are essential, users should use the {@link RecoveryStrategies#CLOSE} strategy
- * and allow the client to fail when its session is lost.
*/
SUSPENDED,
/**
* Indicates that the client is closed.
- *
- * A client may transition to this state as a result of an expired session or an explicit {@link CopycatClient#close() close}
- * by the user. In the event that the client's {@link Session} is lost, if the configured {@link RecoveryStrategy}
- * forces the client to close upon failure, the client will immediately be closed. If the {@link RecoveryStrategy}
- * attempts to recover the client's session, the client still may close if it is unable to register a new session.
*/
CLOSED
@@ -297,6 +273,13 @@ static Builder builder(Collection
cluster) {
*/
Listener onStateChange(Consumer callback);
+ /**
+ * Returns the Copycat metadata.
+ *
+ * @return The Copycat metadata.
+ */
+ CopycatMetadata metadata();
+
/**
* Returns the client execution context.
*
@@ -311,17 +294,6 @@ static Builder builder(Collection
cluster) {
*/
ThreadContext context();
- /**
- * Returns the client transport.
- *
- * The transport is the mechanism through which the client communicates with the cluster. The transport cannot
- * be used to access client internals, but it serves only as a mechanism for providing users with the same
- * transport/protocol used by the client.
- *
- * @return The client transport.
- */
- Transport transport();
-
/**
* Returns the client serializer.
*
@@ -339,115 +311,11 @@ static Builder builder(Collection
cluster) {
Serializer serializer();
/**
- * Returns the client session.
- *
- * The returned {@link Session} instance will remain constant as long as the client maintains its session with the cluster.
- * Maintaining the client's session requires that the client be able to communicate with one server that can communicate
- * with the leader at any given time. During periods where the cluster is electing a new leader, the client's session will
- * not timeout but will resume once a new leader is elected.
- *
- * @return The client session or {@code null} if no session is register.
- */
- Session session();
-
- /**
- * Submits an operation to the Copycat cluster.
- *
- * This method is provided for convenience. The submitted {@link Operation} must be an instance
- * of {@link Command} or {@link Query}.
- *
- * @param operation The operation to submit.
- * @param The operation result type.
- * @return A completable future to be completed with the operation result.
- * @throws IllegalArgumentException If the {@link Operation} is not an instance of {@link Command} or {@link Query}.
- * @throws NullPointerException if {@code operation} is null
- */
- default CompletableFuture submit(Operation operation) {
- Assert.notNull(operation, "operation");
- if (operation instanceof Command) {
- return submit((Command) operation);
- } else if (operation instanceof Query) {
- return submit((Query) operation);
- } else {
- throw new IllegalArgumentException("unknown operation type");
- }
- }
-
- /**
- * Submits a command to the Copycat cluster.
- *
- * Commands are used to alter state machine state. All commands will be forwarded to the current cluster leader.
- * Once a leader receives the command, it will write the command to its internal {@code Log} and replicate it to a majority
- * of the cluster. Once the command has been replicated to a majority of the cluster, it will apply the command to its
- * {@code StateMachine} and respond with the result.
- *
- * Once the command has been applied to a server state machine, the returned {@link CompletableFuture}
- * will be completed with the state machine output.
- *
- * Note that all client submissions are guaranteed to be completed in the same order in which they were sent (program order)
- * and on the same thread. This does not, however, mean that they'll be applied to the server-side replicated state machine
- * in that order.
- *
- * @param command The command to submit.
- * @param The command result type.
- * @return A completable future to be completed with the command result. The future is guaranteed to be completed after all
- * {@link Command} or {@link Query} submission futures that preceded it. The future will always be completed on the
- * @throws NullPointerException if {@code command} is null
- */
- CompletableFuture submit(Command command);
-
- /**
- * Submits a query to the Copycat cluster.
- *
- * Queries are used to read state machine state. The behavior of query submissions is primarily dependent on the
- * query's {@link Query.ConsistencyLevel}. For {@link Query.ConsistencyLevel#LINEARIZABLE}
- * and {@link Query.ConsistencyLevel#LINEARIZABLE_LEASE} consistency levels, queries will be forwarded
- * to the cluster leader. For lower consistency levels, queries are allowed to read from followers. All queries are executed
- * by applying queries to an internal server state machine.
- *
- * Once the query has been applied to a server state machine, the returned {@link CompletableFuture}
- * will be completed with the state machine output.
+ * Returns a new session builder.
*
- * @param query The query to submit.
- * @param The query result type.
- * @return A completable future to be completed with the query result. The future is guaranteed to be completed after all
- * {@link Command} or {@link Query} submission futures that preceded it. The future will always be completed on the
- * @throws NullPointerException if {@code query} is null
+ * @return A new session builder.
*/
- CompletableFuture submit(Query query);
-
- /**
- * Registers a void event listener.
- *
- * The registered {@link Runnable} will be {@link Runnable#run() called} when an event is received
- * from the Raft cluster for the client. {@link CopycatClient} implementations must guarantee that consumers are
- * always called in the same thread for the session. Therefore, no two events will be received concurrently
- * by the session. Additionally, events are guaranteed to be received in the order in which they were sent by
- * the state machine.
- *
- * @param event The event to which to listen.
- * @param callback The session receive callback.
- * @return The listener context.
- * @throws NullPointerException if {@code event} or {@code callback} is null
- */
- Listener onEvent(String event, Runnable callback);
-
- /**
- * Registers an event listener.
- *
- * The registered {@link Consumer} will be {@link Consumer#accept(Object) called} when an event is received
- * from the Raft cluster for the session. {@link CopycatClient} implementations must guarantee that consumers are
- * always called in the same thread for the session. Therefore, no two events will be received concurrently
- * by the session. Additionally, events are guaranteed to be received in the order in which they were sent by
- * the state machine.
- *
- * @param event The event to which to listen.
- * @param callback The session receive callback.
- * @param The session event type.
- * @return The listener context.
- * @throws NullPointerException if {@code event} or {@code callback} is null
- */
- Listener onEvent(String event, Consumer callback);
+ CopycatSession.Builder sessionBuilder();
/**
* Connects the client to Copycat cluster via the default server address.
@@ -460,13 +328,13 @@ default CompletableFuture submit(Operation operation) {
* returned {@link CompletableFuture} will be completed.
*
* The client will connect to servers in the cluster according to the pattern specified by the configured
- * {@link ServerSelectionStrategy}.
+ * {@link CommunicationStrategy}.
*
* In the event that the client is unable to register a session through any of the servers listed in the provided
* {@link Address} list, the client will use the configured {@link ConnectionStrategy} to determine whether and when
* to retry the registration attempt.
*
- * @return A completable future to be completed once the client's {@link #session()} is registered.
+ * @return A completable future to be completed once the client is registered.
*/
default CompletableFuture connect() {
return connect((Collection) null);
@@ -480,14 +348,14 @@ default CompletableFuture connect() {
* returned {@link CompletableFuture} will be completed.
*
* The client will connect to servers in the cluster according to the pattern specified by the configured
- * {@link ServerSelectionStrategy}.
+ * {@link CommunicationStrategy}.
*
* In the event that the client is unable to register a session through any of the servers listed in the provided
* {@link Address} list, the client will use the configured {@link ConnectionStrategy} to determine whether and when
* to retry the registration attempt.
*
* @param members A set of server addresses to which to connect.
- * @return A completable future to be completed once the client's {@link #session()} is registered.
+ * @return A completable future to be completed once the client is registered.
*/
default CompletableFuture connect(Address... members) {
if (members == null || members.length == 0) {
@@ -505,29 +373,17 @@ default CompletableFuture connect(Address... members) {
* returned {@link CompletableFuture} will be completed.
*
* The client will connect to servers in the cluster according to the pattern specified by the configured
- * {@link ServerSelectionStrategy}.
+ * {@link CommunicationStrategy}.
*
* In the event that the client is unable to register a session through any of the servers listed in the provided
* {@link Address} list, the client will use the configured {@link ConnectionStrategy} to determine whether and when
* to retry the registration attempt.
*
* @param members A set of server addresses to which to connect.
- * @return A completable future to be completed once the client's {@link #session()} is registered.
+ * @return A completable future to be completed once the client is registered.
*/
CompletableFuture connect(Collection members);
- /**
- * Recovers the client session.
- *
- * When a client is recovered, the client will create and register a new {@link Session}. Once the session is
- * recovered, the client will transition to the {@link State#CONNECTED} state and resubmit pending operations
- * from the previous session. Pending operations are guaranteed to be submitted to the new session in the same
- * order in which they were submitted to the prior session and prior to submitting any new operations.
- *
- * @return A completable future to be completed once the client's session is recovered.
- */
- CompletableFuture recover();
-
/**
* Closes the client.
*
@@ -559,10 +415,10 @@ final class Builder implements io.atomix.catalyst.util.Builder {
private Transport transport;
private Serializer serializer;
private Duration sessionTimeout = Duration.ZERO;
- private Duration unstabilityTimeout = Duration.ZERO;
+ private Duration unsableTimeout = Duration.ZERO;
+ private int threadPoolSize = Runtime.getRuntime().availableProcessors();
private ConnectionStrategy connectionStrategy = ConnectionStrategies.ONCE;
- private ServerSelectionStrategy serverSelectionStrategy = ServerSelectionStrategies.ANY;
- private RecoveryStrategy recoveryStrategy = RecoveryStrategies.CLOSE;
+ private CommunicationStrategy communicationStrategy = CommunicationStrategies.ANY;
private Builder(Collection cluster) {
this.cluster = Assert.notNull(cluster, "cluster");
@@ -610,6 +466,18 @@ public Builder withSerializer(Serializer serializer) {
return this;
}
+ /**
+ * Sets the client thread pool size.
+ *
+ * @param threadPoolSize The client thread pool size.
+ * @return The client builder.
+ * @throws IllegalArgumentException if the thread pool size is not positive
+ */
+ public Builder withThreadPoolSize(int threadPoolSize) {
+ this.threadPoolSize = Assert.argNot(threadPoolSize, threadPoolSize <= 0, "threadPoolSize must be positive");
+ return this;
+ }
+
/**
* Sets the client session timeout.
*
@@ -632,9 +500,8 @@ public Builder withSessionTimeout(Duration sessionTimeout) {
* @throws NullPointerException if the unstability timeout is null
* @throws IllegalArgumentException if the unstability timeout is not positive
*/
- public Builder withUnstabilityTimeout(Duration unstabilityTimeout)
- {
- this.unstabilityTimeout = Assert.arg(
+ public Builder withUnstableTimeout(Duration unstabilityTimeout) {
+ this.unsableTimeout = Assert.arg(
Assert.notNull(unstabilityTimeout, "unstabilityTimeout"),
unstabilityTimeout.toMillis() > 0,
"unstability timeout must be positive"
@@ -657,22 +524,11 @@ public Builder withConnectionStrategy(ConnectionStrategy connectionStrategy) {
/**
* Sets the server selection strategy.
*
- * @param serverSelectionStrategy The server selection strategy.
+ * @param communicationStrategy The server selection strategy.
* @return The client builder.
*/
- public Builder withServerSelectionStrategy(ServerSelectionStrategy serverSelectionStrategy) {
- this.serverSelectionStrategy = Assert.notNull(serverSelectionStrategy, "serverSelectionStrategy");
- return this;
- }
-
- /**
- * Sets the client recovery strategy.
- *
- * @param recoveryStrategy The client recovery strategy.
- * @return The client builder.
- */
- public Builder withRecoveryStrategy(RecoveryStrategy recoveryStrategy) {
- this.recoveryStrategy = Assert.notNull(recoveryStrategy, "recoveryStrategy");
+ public Builder withServerSelectionStrategy(CommunicationStrategy communicationStrategy) {
+ this.communicationStrategy = Assert.notNull(communicationStrategy, "serverSelectionStrategy");
return this;
}
@@ -696,6 +552,8 @@ public CopycatClient build() {
serializer = new Serializer();
}
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(threadPoolSize, new CatalystThreadFactory("copycat-client-%d"));
+
// Add service loader types to the primary serializer.
serializer.resolve(new ClientRequestTypeResolver());
serializer.resolve(new ClientResponseTypeResolver());
@@ -704,14 +562,12 @@ public CopycatClient build() {
return new DefaultCopycatClient(
clientId,
cluster,
- transport,
- new SingleThreadContext("copycat-client-io-%d", serializer.clone()),
- new SingleThreadContext("copycat-client-event-%d", serializer.clone()),
- serverSelectionStrategy,
+ transport.client(),
+ executor,
+ serializer,
connectionStrategy,
- recoveryStrategy,
sessionTimeout,
- unstabilityTimeout
+ unsableTimeout
);
}
}
diff --git a/client/src/main/java/io/atomix/copycat/client/CopycatMetadata.java b/client/src/main/java/io/atomix/copycat/client/CopycatMetadata.java
new file mode 100644
index 00000000..70f96993
--- /dev/null
+++ b/client/src/main/java/io/atomix/copycat/client/CopycatMetadata.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.copycat.client;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.copycat.metadata.CopycatClientMetadata;
+import io.atomix.copycat.metadata.CopycatSessionMetadata;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Copycat metadata.
+ */
+public interface CopycatMetadata {
+
+ /**
+ * Returns the current cluster leader.
+ *
+ * @return The current cluster leader.
+ */
+ Address leader();
+
+ /**
+ * Returns the set of known servers in the cluster.
+ *
+ * @return The set of known servers in the cluster.
+ */
+ Collection servers();
+
+ /**
+ * Returns a list of clients connected to the cluster.
+ *
+ * @return A completable future to be completed with a list of clients connected to the cluster.
+ */
+ CompletableFuture> getClients();
+
+ /**
+ * Returns a list of open sessions.
+ *
+ * @return A completable future to be completed with a list of open sessions.
+ */
+ CompletableFuture> getSessions();
+
+ /**
+ * Returns a list of open sessions of the given type.
+ *
+ * @return A completable future to be completed with a list of open sessions of the given type.
+ */
+ CompletableFuture> getSessions(String type);
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/DefaultCopycatClient.java b/client/src/main/java/io/atomix/copycat/client/DefaultCopycatClient.java
deleted file mode 100644
index b938aa69..00000000
--- a/client/src/main/java/io/atomix/copycat/client/DefaultCopycatClient.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Copyright 2015 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-package io.atomix.copycat.client;
-
-import io.atomix.catalyst.concurrent.BlockingFuture;
-import io.atomix.catalyst.concurrent.Futures;
-import io.atomix.catalyst.concurrent.Listener;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.serializer.Serializer;
-import io.atomix.catalyst.transport.Address;
-import io.atomix.catalyst.transport.Transport;
-import io.atomix.catalyst.util.Assert;
-import io.atomix.copycat.Command;
-import io.atomix.copycat.Query;
-import io.atomix.copycat.client.session.ClientSession;
-import io.atomix.copycat.client.util.AddressSelector;
-import io.atomix.copycat.session.ClosedSessionException;
-import io.atomix.copycat.session.Session;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.function.Consumer;
-
-/**
- * Default Copycat client implementation.
- *
- * @author cluster;
- private final Transport transport;
- private final ThreadContext ioContext;
- private final ThreadContext eventContext;
- private final AddressSelector selector;
- private final Duration sessionTimeout;
- private final Duration unstabilityTimeout;
- private final ConnectionStrategy connectionStrategy;
- private final RecoveryStrategy recoveryStrategy;
- private ClientSession session;
- private volatile State state = State.CLOSED;
- private volatile CompletableFuture openFuture;
- private volatile CompletableFuture recoverFuture;
- private volatile CompletableFuture closeFuture;
- private final Set changeListeners = new CopyOnWriteArraySet<>();
- private final Set> eventListeners = new CopyOnWriteArraySet<>();
- private Listener changeListener;
-
- DefaultCopycatClient(String clientId, Collection cluster, Transport transport, ThreadContext ioContext, ThreadContext eventContext, ServerSelectionStrategy selectionStrategy, ConnectionStrategy connectionStrategy, RecoveryStrategy recoveryStrategy, Duration sessionTimeout, Duration unstabilityTimeout) {
- this.clientId = Assert.notNull(clientId, "clientId");
- this.cluster = Assert.notNull(cluster, "cluster");
- this.transport = Assert.notNull(transport, "transport");
- this.ioContext = Assert.notNull(ioContext, "ioContext");
- this.eventContext = Assert.notNull(eventContext, "eventContext");
- this.selector = new AddressSelector(selectionStrategy);
- this.connectionStrategy = Assert.notNull(connectionStrategy, "connectionStrategy");
- this.recoveryStrategy = Assert.notNull(recoveryStrategy, "recoveryStrategy");
- this.sessionTimeout = Assert.notNull(sessionTimeout, "sessionTimeout");
- this.unstabilityTimeout = Assert.notNull(unstabilityTimeout, "unstabilityTimeout");;
- }
-
- @Override
- public State state() {
- return state;
- }
-
- /**
- * Updates the client state.
- */
- private void setState(State state) {
- if (this.state != state) {
- this.state = state;
- LOGGER.debug("State changed: {}", state);
- changeListeners.forEach(l -> l.accept(state));
- }
- }
-
- @Override
- public Listener onStateChange(Consumer callback) {
- return new StateChangeListener(callback);
- }
-
- @Override
- public Transport transport() {
- return transport;
- }
-
- @Override
- public Serializer serializer() {
- ThreadContext context = ThreadContext.currentContext();
- return context != null ? context.serializer() : this.eventContext.serializer();
- }
-
- @Override
- public Session session() {
- return session;
- }
-
- @Override
- public ThreadContext context() {
- return eventContext;
- }
-
- /**
- * Creates a new child session.
- */
- private ClientSession newSession() {
- ClientSession session = new ClientSession(clientId, transport.client(), selector, ioContext, connectionStrategy, sessionTimeout,
- unstabilityTimeout
- );
-
- // Update the session change listener.
- if (changeListener != null)
- changeListener.close();
- changeListener = session.onStateChange(this::onStateChange);
-
- // Register all event listeners.
- eventListeners.forEach(l -> l.register(session));
- return session;
- }
-
- /**
- * Handles a session state change.
- */
- private void onStateChange(Session.State state) {
- switch (state) {
- // When the session is opened, transition the state to CONNECTED.
- case OPEN:
- setState(State.CONNECTED);
- break;
- // When the session becomes unstable, transition the state to SUSPENDED.
- case UNSTABLE:
- setState(State.SUSPENDED);
- break;
- case STALE:
- setState(State.SUSPENDED);
- this.close();
- break;
- // When the session is expired, transition the state to SUSPENDED if necessary. The recovery strategy
- // must determine whether to attempt to recover the client.
- case EXPIRED:
- setState(State.SUSPENDED);
- recoveryStrategy.recover(this);
- break;
- case CLOSED:
- setState(State.CLOSED);
- break;
- default:
- break;
- }
- }
-
- @Override
- public synchronized CompletableFuture connect(Collection cluster) {
- if (state != State.CLOSED)
- return CompletableFuture.completedFuture(this);
-
- if (openFuture == null) {
- openFuture = new CompletableFuture<>();
-
- // If the provided cluster list is null or empty, use the default list.
- if (cluster == null || cluster.isEmpty()) {
- cluster = this.cluster;
- }
-
- // If the default list is null or empty, use the default host:port.
- if (cluster == null || cluster.isEmpty()) {
- cluster = Collections.singletonList(new Address(DEFAULT_HOST, DEFAULT_PORT));
- }
-
- // Reset the connection list to allow the selection strategy to prioritize connections.
- selector.reset(null, cluster);
-
- // Create and register a new session.
- session = newSession();
- session.register().whenCompleteAsync((result, error) -> {
- if (error == null) {
- openFuture.complete(this);
- } else {
- openFuture.completeExceptionally(error);
- }
- }, eventContext.executor());
- }
- return openFuture;
- }
-
- @Override
- public CompletableFuture submit(Command command) {
- ClientSession session = this.session;
- if (session == null)
- return Futures.exceptionalFuture(new ClosedSessionException("session closed"));
-
- BlockingFuture future = new BlockingFuture<>();
- session.submit(command).whenComplete((result, error) -> {
- if (eventContext.isBlocked()) {
- future.accept(result, error);
- } else {
- eventContext.executor().execute(() -> future.accept(result, error));
- }
- });
- return future;
- }
-
- @Override
- public CompletableFuture submit(Query query) {
- ClientSession session = this.session;
- if (session == null)
- return Futures.exceptionalFuture(new ClosedSessionException("session closed"));
-
- BlockingFuture future = new BlockingFuture<>();
- session.submit(query).whenComplete((result, error) -> {
- if (eventContext.isBlocked()) {
- future.accept(result, error);
- } else {
- eventContext.executor().execute(() -> future.accept(result, error));
- }
- });
- return future;
- }
-
- @Override
- public Listener onEvent(String event, Runnable callback) {
- return onEvent(event, v -> callback.run());
- }
-
- @Override
- public Listener onEvent(String event, Consumer callback) {
- EventListener listener = new EventListener<>(event, callback);
- listener.register(session);
- return listener;
- }
-
- @Override
- public synchronized CompletableFuture recover() {
- if (recoverFuture == null) {
- LOGGER.debug("Recovering session {}", this.session.id());
- recoverFuture = new CompletableFuture<>();
- session.close().whenCompleteAsync((closeResult, closeError) -> {
- session = newSession();
- session.register().whenCompleteAsync((registerResult, registerError) -> {
- CompletableFuture recoverFuture = this.recoverFuture;
- if (registerError == null) {
- recoverFuture.complete(this);
- } else {
- recoverFuture.completeExceptionally(registerError);
- }
- this.recoverFuture = null;
- }, eventContext.executor());
- }, eventContext.executor());
- }
- return recoverFuture;
- }
-
- @Override
- public synchronized CompletableFuture close() {
- if (state == State.CLOSED)
- return CompletableFuture.completedFuture(null);
-
- if (closeFuture == null) {
- // Close the child session and call close listeners once complete.
- closeFuture = new CompletableFuture<>();
- session.close().whenCompleteAsync((result, error) -> {
- setState(State.CLOSED);
- CompletableFuture.runAsync(() -> {
- ioContext.close();
- eventContext.close();
- transport.close();
- if (error == null) {
- closeFuture.complete(null);
- } else {
- closeFuture.completeExceptionally(error);
- }
- });
- }, eventContext.executor());
- }
- return closeFuture;
- }
-
- /**
- * Kills the client.
- *
- * @return A completable future to be completed once the client's session has been killed.
- */
- public synchronized CompletableFuture kill() {
- if (state == State.CLOSED)
- return CompletableFuture.completedFuture(null);
-
- if (closeFuture == null) {
- closeFuture = session.kill()
- .whenComplete((result, error) -> {
- setState(State.CLOSED);
- CompletableFuture.runAsync(() -> {
- ioContext.close();
- eventContext.close();
- transport.close();
- });
- });
- }
- return closeFuture;
- }
-
- @Override
- public int hashCode() {
- return 23 + 37 * (session != null ? session.hashCode() : 0);
- }
-
- @Override
- public boolean equals(Object object) {
- return object instanceof DefaultCopycatClient && ((DefaultCopycatClient) object).session() == session;
- }
-
- @Override
- public String toString() {
- return String.format("%s[session=%s]", getClass().getSimpleName(), session);
- }
-
- /**
- * State change listener.
- */
- private final class StateChangeListener implements Listener {
- private final Consumer callback;
-
- protected StateChangeListener(Consumer callback) {
- this.callback = callback;
- changeListeners.add(this);
- }
-
- @Override
- public void accept(State state) {
- eventContext.executor().execute(() -> callback.accept(state));
- }
-
- @Override
- public void close() {
- changeListeners.remove(this);
- }
- }
-
- /**
- * Event listener wrapper.
- */
- private final class EventListener implements Listener {
- private final String event;
- private final Consumer callback;
- private Listener parent;
-
- private EventListener(String event, Consumer callback) {
- this.event = event;
- this.callback = callback;
- eventListeners.add(this);
- }
-
- /**
- * Registers the session event listener.
- */
- public void register(ClientSession session) {
- parent = session.onEvent(event, this);
- }
-
- @Override
- public void accept(T message) {
- if (eventContext.isBlocked()) {
- callback.accept(message);
- } else {
- eventContext.executor().execute(() -> callback.accept(message));
- }
- }
-
- @Override
- public void close() {
- parent.close();
- eventListeners.remove(this);
- }
- }
-}
diff --git a/client/src/main/java/io/atomix/copycat/client/RecoveryStrategies.java b/client/src/main/java/io/atomix/copycat/client/RecoveryStrategies.java
deleted file mode 100644
index f30eb9d5..00000000
--- a/client/src/main/java/io/atomix/copycat/client/RecoveryStrategies.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2015 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-package io.atomix.copycat.client;
-
-/**
- * Strategies for recovering lost client sessions.
- *
- * Client recovery strategies are responsible for recovering a crashed client. When clients fail to contact
- * a server for more than their session timeout, the client's session must be closed as linearizability is
- * lost. The recovery strategy has the opportunity to recover the crashed client gracefully.
- *
- * @author
- * Client recovery strategies are responsible for recovering a crashed client. When a client is unable
- * to communicate with the cluster for some time period, the cluster may expire the client's session.
- * In the event that a client reconnects and discovers its session is expired, the client's configured
- * recovery strategy will be queried to determine how to handle the failure. Typically, recovery strategies
- * can either {@link CopycatClient#recover() recover} or {@link CopycatClient#close() close} the client.
- *
- * @author l.accept(state));
+ }
+ return this;
+ }
+
+ /**
+ * Registers a state change listener on the session manager.
+ *
+ * @param callback The state change listener callback.
+ * @return The state change listener.
+ */
+ public Listener onStateChange(Consumer callback) {
+ Listener listener = new Listener() {
+ @Override
+ public void accept(CopycatClient.State state) {
+ callback.accept(state);
+ }
+ @Override
+ public void close() {
+ changeListeners.remove(this);
+ }
+ };
+ changeListeners.add(listener);
+ return listener;
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/impl/DefaultCopycatClient.java b/client/src/main/java/io/atomix/copycat/client/impl/DefaultCopycatClient.java
new file mode 100644
index 00000000..4765020b
--- /dev/null
+++ b/client/src/main/java/io/atomix/copycat/client/impl/DefaultCopycatClient.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.copycat.client.impl;
+
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.concurrent.ThreadPoolContext;
+import io.atomix.catalyst.serializer.Serializer;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Client;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.copycat.client.ConnectionStrategy;
+import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.client.CopycatMetadata;
+import io.atomix.copycat.client.session.CopycatSession;
+import io.atomix.copycat.client.session.impl.CopycatSessionManager;
+import io.atomix.copycat.client.util.AddressSelectorManager;
+import io.atomix.copycat.client.util.ClientConnectionManager;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Consumer;
+
+/**
+ * Default Copycat client implementation.
+ *
+ * @author cluster;
+ private final CopycatClientState state;
+ private final ThreadContext threadContext;
+ private final ClientConnectionManager connectionManager;
+ private final CopycatMetadata metadata;
+ private final AddressSelectorManager selectorManager = new AddressSelectorManager();
+ private final CopycatSessionManager sessionManager;
+ private volatile CompletableFuture openFuture;
+ private volatile CompletableFuture closeFuture;
+
+ public DefaultCopycatClient(String clientId, Collection cluster, Client client, ScheduledExecutorService threadPoolExecutor, Serializer serializer, ConnectionStrategy connectionStrategy, Duration sessionTimeout, Duration unstableTimeout) {
+ this.cluster = Assert.notNull(cluster, "cluster");
+ this.threadContext = new ThreadPoolContext(threadPoolExecutor, serializer.clone());
+ this.state = new CopycatClientState(clientId);
+ this.connectionManager = new ClientConnectionManager(client);
+ this.metadata = new DefaultCopycatMetadata(connectionManager, selectorManager);
+ this.sessionManager = new CopycatSessionManager(state, connectionManager, selectorManager, threadContext, threadPoolExecutor, connectionStrategy, sessionTimeout, unstableTimeout);
+ }
+
+ @Override
+ public State state() {
+ return state.getState();
+ }
+
+ @Override
+ public Listener onStateChange(Consumer callback) {
+ return state.onStateChange(callback);
+ }
+
+ @Override
+ public CopycatMetadata metadata() {
+ return metadata;
+ }
+
+ @Override
+ public Serializer serializer() {
+ return threadContext.serializer();
+ }
+
+ @Override
+ public ThreadContext context() {
+ return threadContext;
+ }
+
+ @Override
+ public synchronized CompletableFuture connect(Collection cluster) {
+ if (state.getState() != State.CLOSED)
+ return CompletableFuture.completedFuture(this);
+
+ if (openFuture == null) {
+ openFuture = new CompletableFuture<>();
+
+ // If the provided cluster list is null or empty, use the default list.
+ if (cluster == null || cluster.isEmpty()) {
+ cluster = this.cluster;
+ }
+
+ // If the default list is null or empty, use the default host:port.
+ if (cluster == null || cluster.isEmpty()) {
+ cluster = Collections.singletonList(new Address(DEFAULT_HOST, DEFAULT_PORT));
+ }
+
+ // Reset the connection list to allow the selection strategy to prioritize connections.
+ sessionManager.resetConnections(null, cluster);
+
+ // Register the session manager.
+ sessionManager.open().whenCompleteAsync((result, error) -> {
+ if (error == null) {
+ openFuture.complete(this);
+ } else {
+ openFuture.completeExceptionally(error);
+ }
+ }, threadContext);
+ }
+ return openFuture;
+ }
+
+ @Override
+ public CopycatSession.Builder sessionBuilder() {
+ return new SessionBuilder();
+ }
+
+ @Override
+ public synchronized CompletableFuture close() {
+ if (state.getState() == State.CLOSED)
+ return CompletableFuture.completedFuture(null);
+
+ if (closeFuture == null) {
+ closeFuture = sessionManager.close().whenComplete((r, e) -> connectionManager.close());
+ }
+ return closeFuture;
+ }
+
+ /**
+ * Kills the client.
+ *
+ * @return A completable future to be completed once the client's session has been killed.
+ */
+ public synchronized CompletableFuture kill() {
+ if (state.getState() == State.CLOSED)
+ return CompletableFuture.completedFuture(null);
+
+ if (closeFuture == null) {
+ closeFuture = sessionManager.kill();
+ }
+ return closeFuture;
+ }
+
+ @Override
+ public int hashCode() {
+ return 23 + 37 * state.getUuid().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ return object instanceof DefaultCopycatClient && ((DefaultCopycatClient) object).state.getUuid().equals(state.getUuid());
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s[id=%d, uuid=%s]", getClass().getSimpleName(), state.getId(), state.getUuid());
+ }
+
+ /**
+ * Default Copycat session builder.
+ */
+ private class SessionBuilder extends CopycatSession.Builder {
+ @Override
+ public CopycatSession build() {
+ return sessionManager.openSession(name, type, communicationStrategy).join();
+ }
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/impl/DefaultCopycatMetadata.java b/client/src/main/java/io/atomix/copycat/client/impl/DefaultCopycatMetadata.java
new file mode 100644
index 00000000..90495c1c
--- /dev/null
+++ b/client/src/main/java/io/atomix/copycat/client/impl/DefaultCopycatMetadata.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.copycat.client.impl;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.copycat.client.CommunicationStrategies;
+import io.atomix.copycat.metadata.CopycatClientMetadata;
+import io.atomix.copycat.client.CopycatMetadata;
+import io.atomix.copycat.metadata.CopycatSessionMetadata;
+import io.atomix.copycat.client.session.impl.CopycatClientConnection;
+import io.atomix.copycat.client.util.AddressSelectorManager;
+import io.atomix.copycat.client.util.ClientConnectionManager;
+import io.atomix.copycat.protocol.MetadataRequest;
+import io.atomix.copycat.protocol.MetadataResponse;
+import io.atomix.copycat.protocol.Response;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Default Copycat metadata.
+ */
+public class DefaultCopycatMetadata implements CopycatMetadata {
+ private final AddressSelectorManager selectorManager;
+ private final CopycatClientConnection connection;
+
+ public DefaultCopycatMetadata(ClientConnectionManager connectionManager, AddressSelectorManager selectorManager) {
+ this.selectorManager = Assert.notNull(selectorManager, "selectorManager");
+ this.connection = new CopycatClientConnection(connectionManager, selectorManager.createSelector(CommunicationStrategies.LEADER));
+ }
+
+ @Override
+ public Address leader() {
+ return selectorManager.leader();
+ }
+
+ @Override
+ public Collection servers() {
+ return selectorManager.servers();
+ }
+
+ /**
+ * Requests metadata from the cluster.
+ *
+ * @return A completable future to be completed with cluster metadata.
+ */
+ private CompletableFuture getMetadata() {
+ CompletableFuture future = new CompletableFuture<>();
+ connection.sendAndReceive(MetadataRequest.NAME, MetadataRequest.builder().build()).whenComplete((response, error) -> {
+ if (error == null) {
+ if (response.status() == Response.Status.OK) {
+ future.complete(response);
+ } else {
+ future.completeExceptionally(response.error().createException());
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture> getClients() {
+ return getMetadata().thenApply(MetadataResponse::clients);
+ }
+
+ @Override
+ public CompletableFuture> getSessions() {
+ return getMetadata().thenApply(MetadataResponse::sessions);
+ }
+
+ @Override
+ public CompletableFuture> getSessions(String type) {
+ return getMetadata().thenApply(response -> response.sessions().stream().filter(s -> s.type().equals(type)).collect(Collectors.toSet()));
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/ClientSessionManager.java b/client/src/main/java/io/atomix/copycat/client/session/ClientSessionManager.java
deleted file mode 100644
index 21aede07..00000000
--- a/client/src/main/java/io/atomix/copycat/client/session/ClientSessionManager.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Copyright 2015 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License
- */
-package io.atomix.copycat.client.session;
-
-import io.atomix.catalyst.concurrent.Scheduled;
-import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.util.Assert;
-import io.atomix.copycat.client.ConnectionStrategy;
-import io.atomix.copycat.client.util.ClientConnection;
-import io.atomix.copycat.error.CopycatError;
-import io.atomix.copycat.protocol.*;
-import io.atomix.copycat.session.ClosedSessionException;
-import io.atomix.copycat.session.Session;
-
-import java.net.ConnectException;
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Client session manager.
- *
- * @author open() {
- CompletableFuture future = new CompletableFuture<>();
- context.executor().execute(() -> register(new RegisterAttempt(1, future)));
- return future;
- }
-
- /**
- * Expires the manager.
- *
- * @return A completable future to be completed once the session has been expired.
- */
- public CompletableFuture expire() {
- CompletableFuture future = new CompletableFuture<>();
- context.executor().execute(() -> {
- if (keepAlive != null)
- keepAlive.cancel();
- state.setState(Session.State.EXPIRED);
- future.complete(null);
- });
- return future;
- }
-
- /**
- * Registers a session.
- */
- private void register(RegisterAttempt attempt) {
- state.getLogger().debug("Registering session: attempt {}", attempt.attempt);
-
- RegisterRequest request = RegisterRequest.builder()
- .withClient(state.getClientId())
- .withTimeout(sessionTimeout.toMillis())
- .build();
-
- state.getLogger().trace("Sending {}", request);
- connection.reset().sendAndReceive(request).whenComplete((response, error) -> {
- if (error == null) {
- state.getLogger().trace("Received {}", response);
- if (response.status() == Response.Status.OK) {
- interval = Duration.ofMillis(response.timeout()).dividedBy(2);
- connection.reset(response.leader(), response.members());
- state.setSessionId(response.session())
- .setState(Session.State.OPEN);
- state.getLogger().info("Registered session {}", response.session());
- attempt.complete();
- keepAlive();
- } else {
- strategy.attemptFailed(attempt);
- }
- } else {
- strategy.attemptFailed(attempt);
- }
- });
- }
-
- /**
- * Sends a keep-alive request to the cluster.
- */
- private void keepAlive() {
- keepAlive(true);
- }
-
- /**
- * Sends a keep-alive request to the cluster.
- */
- private void keepAlive(boolean retryOnFailure) {
- long sessionId = state.getSessionId();
-
- // If the current sessions state is unstable, reset the connection before sending a keep-alive.
- if (state.getState() == Session.State.UNSTABLE)
- connection.reset();
-
- KeepAliveRequest request = KeepAliveRequest.builder()
- .withSession(sessionId)
- .withCommandSequence(state.getCommandResponse())
- .withEventIndex(state.getEventIndex())
- .build();
-
- state.getLogger().trace("{} - Sending {}", sessionId, request);
- connection.sendAndReceive(request).whenComplete((response, error) -> {
- if (state.getState() != Session.State.CLOSED) {
- if (error == null) {
- state.getLogger().trace("{} - Received {}", sessionId, response);
- // If the request was successful, update the address selector and schedule the next keep-alive.
- if (response.status() == Response.Status.OK) {
- connection.reset(response.leader(), response.members());
- state.setState(Session.State.OPEN);
- scheduleKeepAlive();
- }
- // If the session is unknown, immediate expire the session.
- else if (response.error() == CopycatError.Type.UNKNOWN_SESSION_ERROR) {
- state.setState(Session.State.EXPIRED);
- }
- // If a leader is still set in the address selector, unset the leader and attempt to send another keep-alive.
- // This will ensure that the address selector selects all servers without filtering on the leader.
- else if (retryOnFailure && connection.leader() != null) {
- connection.reset(null, connection.servers());
- keepAlive(false);
- }
- // If no leader was set, set the session state to unstable and schedule another keep-alive.
- else {
- state.setState(Session.State.UNSTABLE);
- scheduleKeepAlive();
- }
- }
- // If a leader is still set in the address selector, unset the leader and attempt to send another keep-alive.
- // This will ensure that the address selector selects all servers without filtering on the leader.
- else if (retryOnFailure && connection.leader() != null) {
- connection.reset(null, connection.servers());
- keepAlive(false);
- }
- // If no leader was set, set the session state to unstable and schedule another keep-alive.
- else {
- state.setState(Session.State.UNSTABLE);
- scheduleKeepAlive();
- }
- }
- });
- }
-
- /**
- * Schedules a keep-alive request.
- */
- private void scheduleKeepAlive() {
- if (keepAlive != null)
- keepAlive.cancel();
- keepAlive = context.schedule(interval, () -> {
- keepAlive = null;
- if (state.getState().active()) {
- keepAlive();
- }
- });
- }
-
- /**
- * Closes the session manager.
- *
- * @return A completable future to be completed once the session manager is closed.
- */
- public CompletableFuture close() {
- if (state.getState() == Session.State.EXPIRED)
- return CompletableFuture.completedFuture(null);
-
- CompletableFuture future = new CompletableFuture<>();
- context.executor().execute(() -> {
- if (keepAlive != null) {
- keepAlive.cancel();
- keepAlive = null;
- }
- unregister(future);
- });
- return future;
- }
-
- /**
- * Unregisters the session.
- */
- private void unregister(CompletableFuture future) {
- unregister(true, future);
- }
-
- /**
- * Unregisters the session.
- *
- * @param future A completable future to be completed once the session is unregistered.
- */
- private void unregister(boolean retryOnFailure, CompletableFuture future) {
- long sessionId = state.getSessionId();
-
- // If the session is already closed, skip the unregister attempt.
- if (state.getState() == Session.State.CLOSED) {
- future.complete(null);
- return;
- }
-
- state.getLogger().debug("Unregistering session: {}", sessionId);
-
- // If a keep-alive request is already pending, cancel it.
- if (keepAlive != null) {
- keepAlive.cancel();
- keepAlive = null;
- }
-
- // If the current sessions state is unstable, reset the connection before sending an unregister request.
- if (state.getState() == Session.State.UNSTABLE) {
- connection.reset();
- }
-
- UnregisterRequest request = UnregisterRequest.builder()
- .withSession(sessionId)
- .build();
-
- state.getLogger().trace("{} - Sending {}", sessionId, request);
- connection.sendAndReceive(request).whenComplete((response, error) -> {
- if (state.getState() != Session.State.CLOSED) {
- if (error == null) {
- state.getLogger().trace("{} - Received {}", sessionId, response);
- // If the request was successful, update the session state and complete the close future.
- if (response.status() == Response.Status.OK) {
- state.setState(Session.State.CLOSED);
- future.complete(null);
- }
- // If the session is unknown, immediate expire the session and complete the close future.
- else if (response.error() == CopycatError.Type.UNKNOWN_SESSION_ERROR) {
- state.setState(Session.State.EXPIRED);
- future.complete(null);
- }
- // If a leader is still set in the address selector, unset the leader and send another unregister attempt.
- // This will ensure that the address selector selects all servers without filtering on the leader.
- else if (retryOnFailure && connection.leader() != null) {
- connection.reset(null, connection.servers());
- unregister(false, future);
- }
- // If no leader was set, set the session state to unstable and fail the unregister attempt.
- else {
- state.setState(Session.State.UNSTABLE);
- future.completeExceptionally(new ClosedSessionException("failed to unregister session"));
- }
- }
- // If a leader is still set in the address selector, unset the leader and send another unregister attempt.
- // This will ensure that the address selector selects all servers without filtering on the leader.
- else if (retryOnFailure && connection.leader() != null) {
- connection.reset(null, connection.servers());
- unregister(false, future);
- }
- // If no leader was set, set the session state to unstable and schedule another unregister attempt.
- else {
- state.setState(Session.State.UNSTABLE);
- future.completeExceptionally(new ClosedSessionException("failed to unregister session"));
- }
- }
- });
- }
-
- /**
- * Kills the client session manager.
- *
- * @return A completable future to be completed once the session manager is killed.
- */
- public CompletableFuture kill() {
- return CompletableFuture.runAsync(() -> {
- if (keepAlive != null)
- keepAlive.cancel();
- state.setState(Session.State.CLOSED);
- }, context.executor());
- }
-
- @Override
- public String toString() {
- return String.format("%s[session=%d]", getClass().getSimpleName(), state.getSessionId());
- }
-
- /**
- * Client session connection attempt.
- */
- private final class RegisterAttempt implements ConnectionStrategy.Attempt {
- private final int attempt;
- private final CompletableFuture future;
-
- private RegisterAttempt(int attempt, CompletableFuture future) {
- this.attempt = attempt;
- this.future = future;
- }
-
- @Override
- public int attempt() {
- return attempt;
- }
-
- /**
- * Completes the attempt successfully.
- */
- public void complete() {
- complete(null);
- }
-
- /**
- * Completes the attempt successfully.
- *
- * @param result The attempt result.
- */
- public void complete(Void result) {
- future.complete(result);
- }
-
- @Override
- public void fail() {
- future.completeExceptionally(new ConnectException("failed to register session"));
- }
-
- @Override
- public void fail(Throwable error) {
- future.completeExceptionally(error);
- }
-
- @Override
- public void retry() {
- state.getLogger().debug("Retrying session register attempt");
- register(new RegisterAttempt(attempt + 1, future));
- }
-
- @Override
- public void retry(Duration after) {
- state.getLogger().debug("Retrying session register attempt");
- context.schedule(after, () -> register(new RegisterAttempt(attempt + 1, future)));
- }
- }
-
-}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/CopycatSession.java b/client/src/main/java/io/atomix/copycat/client/session/CopycatSession.java
new file mode 100644
index 00000000..aedc2b7c
--- /dev/null
+++ b/client/src/main/java/io/atomix/copycat/client/session/CopycatSession.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.copycat.client.session;
+
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.copycat.Command;
+import io.atomix.copycat.Operation;
+import io.atomix.copycat.Query;
+import io.atomix.copycat.client.CommunicationStrategies;
+import io.atomix.copycat.client.CommunicationStrategy;
+import io.atomix.copycat.client.CopycatClient;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Copycat client proxy.
+ */
+public interface CopycatSession {
+
+ /**
+ * Indicates the session's state.
+ */
+ enum State {
+
+ /**
+ * Indicates that the session is open.
+ */
+ OPEN,
+
+ /**
+ * Indicates that the session is closed.
+ */
+ CLOSED
+
+ }
+
+ /**
+ * Returns the client proxy name.
+ *
+ * @return The client proxy name.
+ */
+ String name();
+
+ /**
+ * Returns the client proxy type.
+ *
+ * @return The client proxy type.
+ */
+ String type();
+
+ /**
+ * Returns the session state.
+ *
+ * @return The session state.
+ */
+ State state();
+
+ /**
+ * Registers a session state change listener.
+ *
+ * @param callback The callback to call when the session state changes.
+ * @return The session state change listener context.
+ */
+ Listener onStateChange(Consumer callback);
+
+ /**
+ * Returns the session thread context.
+ *
+ * @return The session thread context.
+ */
+ ThreadContext context();
+
+ /**
+ * Submits an operation to the Copycat cluster.
+ *
+ * This method is provided for convenience. The submitted {@link Operation} must be an instance
+ * of {@link Command} or {@link Query}.
+ *
+ * @param operation The operation to submit.
+ * @param The operation result type.
+ * @return A completable future to be completed with the operation result.
+ * @throws IllegalArgumentException If the {@link Operation} is not an instance of {@link Command} or {@link Query}.
+ * @throws NullPointerException if {@code operation} is null
+ */
+ default CompletableFuture submit(Operation operation) {
+ Assert.notNull(operation, "operation");
+ if (operation instanceof Command) {
+ return submit((Command) operation);
+ } else if (operation instanceof Query) {
+ return submit((Query) operation);
+ } else {
+ throw new IllegalArgumentException("unknown operation type");
+ }
+ }
+
+ /**
+ * Submits a command to the Copycat cluster.
+ *
+ * Commands are used to alter state machine state. All commands will be forwarded to the current cluster leader.
+ * Once a leader receives the command, it will write the command to its internal {@code Log} and replicate it to a majority
+ * of the cluster. Once the command has been replicated to a majority of the cluster, it will apply the command to its
+ * {@code StateMachine} and respond with the result.
+ *
+ * Once the command has been applied to a server state machine, the returned {@link CompletableFuture}
+ * will be completed with the state machine output.
+ *
+ * Note that all client submissions are guaranteed to be completed in the same order in which they were sent (program order)
+ * and on the same thread. This does not, however, mean that they'll be applied to the server-side replicated state machine
+ * in that order.
+ *
+ * @param command The command to submit.
+ * @param The command result type.
+ * @return A completable future to be completed with the command result. The future is guaranteed to be completed after all
+ * {@link Command} or {@link Query} submission futures that preceded it. The future will always be completed on the
+ * @throws NullPointerException if {@code command} is null
+ */
+ CompletableFuture submit(Command command);
+
+ /**
+ * Submits a query to the Copycat cluster.
+ *
+ * Queries are used to read state machine state. The behavior of query submissions is primarily dependent on the
+ * query's {@link Query.ConsistencyLevel}. For {@link Query.ConsistencyLevel#LINEARIZABLE}
+ * and {@link Query.ConsistencyLevel#LINEARIZABLE_LEASE} consistency levels, queries will be forwarded
+ * to the cluster leader. For lower consistency levels, queries are allowed to read from followers. All queries are executed
+ * by applying queries to an internal server state machine.
+ *
+ * Once the query has been applied to a server state machine, the returned {@link CompletableFuture}
+ * will be completed with the state machine output.
+ *
+ * @param query The query to submit.
+ * @param The query result type.
+ * @return A completable future to be completed with the query result. The future is guaranteed to be completed after all
+ * {@link Command} or {@link Query} submission futures that preceded it. The future will always be completed on the
+ * @throws NullPointerException if {@code query} is null
+ */
+ CompletableFuture submit(Query query);
+
+ /**
+ * Registers a void event listener.
+ *
+ * The registered {@link Runnable} will be {@link Runnable#run() called} when an event is received
+ * from the Raft cluster for the client. {@link CopycatClient} implementations must guarantee that consumers are
+ * always called in the same thread for the session. Therefore, no two events will be received concurrently
+ * by the session. Additionally, events are guaranteed to be received in the order in which they were sent by
+ * the state machine.
+ *
+ * @param event The event to which to listen.
+ * @param callback The session receive callback.
+ * @return The listener context.
+ * @throws NullPointerException if {@code event} or {@code callback} is null
+ */
+ Listener onEvent(String event, Runnable callback);
+
+ /**
+ * Registers an event listener.
+ *
+ * The registered {@link Consumer} will be {@link Consumer#accept(Object) called} when an event is received
+ * from the Raft cluster for the session. {@link CopycatClient} implementations must guarantee that consumers are
+ * always called in the same thread for the session. Therefore, no two events will be received concurrently
+ * by the session. Additionally, events are guaranteed to be received in the order in which they were sent by
+ * the state machine.
+ *
+ * @param event The event to which to listen.
+ * @param callback The session receive callback.
+ * @param The session event type.
+ * @return The listener context.
+ * @throws NullPointerException if {@code event} or {@code callback} is null
+ */
+ Listener onEvent(String event, Consumer callback);
+
+ /**
+ * Returns a boolean indicating whether the session is open.
+ *
+ * @return Indicates whether the session is open.
+ */
+ boolean isOpen();
+
+ /**
+ * Closes the session.
+ *
+ * @return A completable future to be completed once the session is closed.
+ */
+ CompletableFuture close();
+
+ /**
+ * Copycat session builder.
+ */
+ abstract class Builder implements io.atomix.catalyst.util.Builder {
+ protected String name;
+ protected String type;
+ protected CommunicationStrategy communicationStrategy = CommunicationStrategies.LEADER;
+
+ /**
+ * Sets the session name.
+ *
+ * @param name The session name.
+ * @return The session builder.
+ */
+ public Builder withName(String name) {
+ this.name = Assert.notNull(name, "name");
+ return this;
+ }
+
+ /**
+ * Sets the session type.
+ *
+ * @param type The session type.
+ * @return The session builder.
+ */
+ public Builder withType(String type) {
+ this.type = Assert.notNull(type, "type");
+ return this;
+ }
+
+ /**
+ * Sets the session's communication strategy.
+ *
+ * @param communicationStrategy The session's communication strategy.
+ * @return The session builder.
+ * @throws NullPointerException if the communication strategy is null
+ */
+ public Builder withCommunicationStrategy(CommunicationStrategy communicationStrategy) {
+ this.communicationStrategy = Assert.notNull(communicationStrategy, "communicationStrategy");
+ return this;
+ }
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatClientConnection.java b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatClientConnection.java
new file mode 100644
index 00000000..7dcdfcfe
--- /dev/null
+++ b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatClientConnection.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.copycat.client.session.impl;
+
+import io.atomix.copycat.client.util.AddressSelector;
+import io.atomix.copycat.client.util.ClientConnectionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client connection.
+ */
+public class CopycatClientConnection extends CopycatConnection {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CopycatClientConnection.class);
+
+ public CopycatClientConnection(ClientConnectionManager connections, AddressSelector selector) {
+ super(connections, selector);
+ }
+
+ @Override
+ protected String name() {
+ return "client";
+ }
+
+ @Override
+ protected Logger logger() {
+ return LOGGER;
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatConnection.java b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatConnection.java
new file mode 100644
index 00000000..d02c7645
--- /dev/null
+++ b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatConnection.java
@@ -0,0 +1,363 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.copycat.client.session.impl;
+
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.catalyst.transport.TransportException;
+import io.atomix.catalyst.util.Assert;
+import io.atomix.copycat.client.util.AddressSelector;
+import io.atomix.copycat.client.util.ClientConnectionManager;
+import io.atomix.copycat.client.util.OrderedCompletableFuture;
+import io.atomix.copycat.error.CopycatError;
+import io.atomix.copycat.protocol.Request;
+import io.atomix.copycat.protocol.Response;
+import org.slf4j.Logger;
+
+import java.net.ConnectException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Client connection that recursively connects to servers in the cluster and attempts to submit requests.
+ *
+ * @author servers() {
+ return selector.servers();
+ }
+
+ /**
+ * Resets the client connection.
+ *
+ * @return The client connection.
+ */
+ public CopycatConnection reset() {
+ selector.reset();
+ return this;
+ }
+
+ /**
+ * Resets the client connection.
+ *
+ * @param leader The current cluster leader.
+ * @param servers The current servers.
+ * @return The client connection.
+ */
+ public CopycatConnection reset(Address leader, Collection servers) {
+ selector.reset(leader, servers);
+ return this;
+ }
+
+ /**
+ * Opens the connection.
+ *
+ * @return A completable future to be completed once the connection is opened.
+ */
+ public CompletableFuture open() {
+ open = true;
+ return connect().thenApply(c -> null);
+ }
+
+ /**
+ * Sends a request to the cluster.
+ *
+ * @param type The request type.
+ * @param request The request to send.
+ * @return A completable future to be completed with the response.
+ */
+ public CompletableFuture send(String type, Object request) {
+ CompletableFuture future = new CompletableFuture<>();
+ sendRequest((Request) request, (r, c) -> c.send(type, r), future);
+ return future;
+ }
+
+ /**
+ * Sends a request to the cluster and awaits a response.
+ *
+ * @param type The request type.
+ * @param request The request to send.
+ * @param The request type.
+ * @param The response type.
+ * @return A completable future to be completed with the response.
+ */
+ public CompletableFuture sendAndReceive(String type, T request) {
+ CompletableFuture future = new CompletableFuture<>();
+ sendRequest((Request) request, (r, c) -> c.sendAndReceive(type, r), future);
+ return future;
+ }
+
+ /**
+ * Sends the given request attempt to the cluster.
+ */
+ protected void sendRequest(T request, BiFunction> sender, CompletableFuture future) {
+ if (open) {
+ connect().whenComplete((c, e) -> sendRequest(request, sender, c, e, future));
+ }
+ }
+
+ /**
+ * Sends the given request attempt to the cluster via the given connection if connected.
+ */
+ protected void sendRequest(T request, BiFunction> sender, Connection connection, Throwable error, CompletableFuture future) {
+ if (error == null) {
+ if (connection != null) {
+ logger().trace("{} - Sending {}", name(), request);
+ sender.apply(request, connection).whenComplete((r, e) -> {
+ if (e != null || r != null) {
+ handleResponse(request, sender, connection, (Response) r, e, future);
+ } else {
+ future.complete(null);
+ }
+ });
+ } else {
+ future.completeExceptionally(new ConnectException("Failed to connect to the cluster"));
+ }
+ } else {
+ logger().trace("{} - Resending {}: {}", name(), request, error);
+ resendRequest(error, request, sender, connection, future);
+ }
+ }
+
+ /**
+ * Resends a request due to a request failure, resetting the connection if necessary.
+ */
+ @SuppressWarnings("unchecked")
+ protected void resendRequest(Throwable cause, T request, BiFunction sender, Connection connection, CompletableFuture future) {
+ // If the connection has not changed, reset it and connect to the next server.
+ if (this.connection == connection) {
+ logger().trace("{} - Resetting connection. Reason: {}", name(), cause);
+ this.connection = null;
+ }
+
+ // Create a new connection and resend the request. This will force retries to piggyback on any existing
+ // connect attempts.
+ connect().whenComplete((c, e) -> sendRequest(request, sender, c, e, future));
+ }
+
+ /**
+ * Handles a response from the cluster.
+ */
+ @SuppressWarnings("unchecked")
+ protected void handleResponse(T request, BiFunction sender, Connection connection, Response response, Throwable error, CompletableFuture future) {
+ if (error == null) {
+ if (response.status() == Response.Status.OK
+ || response.error() == CopycatError.Type.COMMAND_ERROR
+ || response.error() == CopycatError.Type.QUERY_ERROR
+ || response.error() == CopycatError.Type.APPLICATION_ERROR
+ || response.error() == CopycatError.Type.UNKNOWN_CLIENT_ERROR
+ || response.error() == CopycatError.Type.UNKNOWN_SESSION_ERROR
+ || response.error() == CopycatError.Type.UNKNOWN_STATE_MACHINE_ERROR
+ || response.error() == CopycatError.Type.INTERNAL_ERROR) {
+ logger().trace("{} - Received {}", name(), response);
+ future.complete(response);
+ } else {
+ resendRequest(response.error().createException(), request, sender, connection, future);
+ }
+ } else if (error instanceof ConnectException || error instanceof TimeoutException || error instanceof TransportException || error instanceof ClosedChannelException) {
+ resendRequest(error, request, sender, connection, future);
+ } else {
+ logger().debug("{} - {} failed! Reason: {}", name(), request, error);
+ future.completeExceptionally(error);
+ }
+ }
+
+ /**
+ * Connects to the cluster.
+ */
+ protected CompletableFuture connect() {
+ // If the address selector has been reset then reset the connection.
+ if (selector.state() == AddressSelector.State.RESET && connection != null) {
+ if (connectFuture != null) {
+ return connectFuture;
+ }
+
+ CompletableFuture future = new OrderedCompletableFuture<>();
+ future.whenComplete((r, e) -> this.connectFuture = null);
+ this.connectFuture = future;
+
+ this.connection = null;
+ connect(future);
+ return future;
+ }
+
+ // If a connection was already established then use that connection.
+ if (connection != null) {
+ return CompletableFuture.completedFuture(connection);
+ }
+
+ // If a connection is currently being established then piggyback on the connect future.
+ if (connectFuture != null) {
+ return connectFuture;
+ }
+
+ // Create a new connect future and connect to the first server in the cluster.
+ CompletableFuture future = new OrderedCompletableFuture<>();
+ future.whenComplete((r, e) -> this.connectFuture = null);
+ this.connectFuture = future;
+ reset().connect(future);
+ return future;
+ }
+
+ /**
+ * Attempts to connect to the cluster.
+ */
+ protected void connect(CompletableFuture future) {
+ if (!selector.hasNext()) {
+ logger().debug("{} - Failed to connect to the cluster", name());
+ future.complete(null);
+ } else {
+ Address address = selector.next();
+ logger().debug("{} - Connecting to {}", name(), address);
+ connections.getConnection(address).whenComplete((c, e) -> handleConnection(address, c, e, future));
+ }
+ }
+
+ /**
+ * Handles a connection to a server.
+ */
+ protected void handleConnection(Address address, Connection connection, Throwable error, CompletableFuture future) {
+ if (error == null) {
+ setupConnection(address, connection, future);
+ } else {
+ logger().debug("{} - Failed to connect! Reason: {}", name(), error);
+ connect(future);
+ }
+ }
+
+ /**
+ * Sets up the given connection.
+ */
+ @SuppressWarnings("unchecked")
+ protected void setupConnection(Address address, Connection connection, CompletableFuture future) {
+ logger().debug("{} - Setting up connection to {}", name(), address);
+
+ this.connection = connection;
+
+ connection.onClose(c -> {
+ if (c.equals(this.connection)) {
+ logger().debug("{} - Connection closed", name());
+ this.connection = null;
+ connect();
+ }
+ });
+ connection.onException(c -> {
+ if (c.equals(this.connection)) {
+ logger().debug("{} - Connection lost", name());
+ this.connection = null;
+ connect();
+ }
+ });
+
+ for (Map.Entry entry : handlers.entrySet()) {
+ connection.registerHandler(entry.getKey(), entry.getValue());
+ }
+ future.complete(connection);
+ }
+
+ /**
+ * Registers a handler for the given message type.
+ *
+ * @param type The message type for which to register the handler.
+ * @param handler The handler to register.
+ * @param The handler type.
+ * @return The client connection.
+ */
+ @SuppressWarnings("unchecked")
+ public CopycatConnection registerHandler(String type, Consumer handler) {
+ return registerHandler(type, r -> {
+ handler.accept((T) r);
+ return null;
+ });
+ }
+
+ /**
+ * Registers a handler for the given message type.
+ *
+ * @param type The message type for which to register the handler.
+ * @param handler The handler to register.
+ * @param The handler type.
+ * @param The response type.
+ * @return The client connection.
+ */
+ public CopycatConnection registerHandler(String type, Function> handler) {
+ Assert.notNull(type, "type");
+ Assert.notNull(handler, "handler");
+ handlers.put(type, handler);
+ if (connection != null)
+ connection.registerHandler(type, handler);
+ return this;
+ }
+
+ /**
+ * Closes the connection.
+ *
+ * @return A completable future to be completed once the connection is closed.
+ */
+ public CompletableFuture close() {
+ open = false;
+ return CompletableFuture.completedFuture(null);
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatLeaderConnection.java b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatLeaderConnection.java
new file mode 100644
index 00000000..e3dec3f7
--- /dev/null
+++ b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatLeaderConnection.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.copycat.client.session.impl;
+
+import io.atomix.copycat.client.util.AddressSelector;
+import io.atomix.copycat.client.util.ClientConnectionManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Leader connection.
+ */
+public class CopycatLeaderConnection extends CopycatConnection {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CopycatLeaderConnection.class);
+ private final String sessionString;
+
+ public CopycatLeaderConnection(CopycatSessionState state, ClientConnectionManager connections, AddressSelector selector) {
+ super(connections, selector);
+ this.sessionString = String.valueOf(state.getSessionId());
+ }
+
+ @Override
+ protected String name() {
+ return sessionString;
+ }
+
+ @Override
+ protected Logger logger() {
+ return LOGGER;
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionConnection.java b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionConnection.java
new file mode 100644
index 00000000..ddfd8f16
--- /dev/null
+++ b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionConnection.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.copycat.client.session.impl;
+
+import io.atomix.catalyst.concurrent.ThreadContext;
+import io.atomix.catalyst.transport.Address;
+import io.atomix.catalyst.transport.Connection;
+import io.atomix.copycat.client.util.AddressSelector;
+import io.atomix.copycat.client.util.ClientConnectionManager;
+import io.atomix.copycat.protocol.ConnectRequest;
+import io.atomix.copycat.protocol.ConnectResponse;
+import io.atomix.copycat.protocol.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Session connection.
+ */
+public class CopycatSessionConnection extends CopycatConnection {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CopycatSessionConnection.class);
+
+ private static final long BASE_RECONNECT_INTERVAL = 10;
+ private static final long MAX_RECONNECT_INTERVAL = 1000;
+
+ private final CopycatSessionState state;
+ private final String sessionString;
+ private final ThreadContext context;
+ private long reconnectInterval;
+
+ public CopycatSessionConnection(CopycatSessionState state, ClientConnectionManager connections, AddressSelector selector, ThreadContext context) {
+ super(connections, selector);
+ this.state = state;
+ this.sessionString = String.valueOf(state.getSessionId());
+ this.context = context;
+ }
+
+ @Override
+ protected String name() {
+ return sessionString;
+ }
+
+ @Override
+ protected Logger logger() {
+ return LOGGER;
+ }
+
+ /**
+ * Reconnects to the cluster.
+ */
+ private void reconnect() {
+ if (open) {
+ reset().connect().whenComplete((connection, error) -> {
+ if (connection == null || error != null) {
+ reconnectInterval = Math.max(reconnectInterval * 2, MAX_RECONNECT_INTERVAL);
+ context.schedule(Duration.ofMillis(reconnectInterval), this::reconnect);
+ } else {
+ reconnectInterval = BASE_RECONNECT_INTERVAL;
+ }
+ });
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected void setupConnection(Address address, Connection connection, CompletableFuture future) {
+ logger().debug("{} - Setting up connection to {}", name(), address);
+
+ this.connection = connection;
+
+ connection.onClose(c -> {
+ if (c == this.connection) {
+ logger().debug("{} - Connection closed", name());
+ this.connection = null;
+ reconnect();
+ }
+ });
+ connection.onException(c -> {
+ if (c == this.connection) {
+ logger().debug("{} - Connection lost", name());
+ this.connection = null;
+ reconnect();
+ }
+ });
+
+ for (Map.Entry entry : handlers.entrySet()) {
+ connection.registerHandler(entry.getKey(), entry.getValue());
+ }
+
+ // When we first connect to a new server, first send a ConnectRequest to the server to establish
+ // the connection with the server-side state machine.
+ ConnectRequest request = ConnectRequest.builder()
+ .withSession(state.getSessionId())
+ .withConnection(state.nextConnection())
+ .build();
+
+ logger().trace("{} - Sending {}", name(), request);
+ connection.sendAndReceive(ConnectRequest.NAME, request).whenComplete((r, e) -> handleConnectResponse(r, e, future));
+ }
+
+ /**
+ * Handles a connect response.
+ */
+ private void handleConnectResponse(ConnectResponse response, Throwable error, CompletableFuture future) {
+ if (open) {
+ if (error == null) {
+ logger().trace("{} - Received {}", name(), response);
+ // If the connection was successfully created, immediately send a keep-alive request
+ // to the server to ensure we maintain our session and get an updated list of server addresses.
+ if (response.status() == Response.Status.OK) {
+ selector.reset(response.leader(), response.members());
+ future.complete(connection);
+ } else {
+ connect(future);
+ }
+ } else {
+ logger().debug("{} - Failed to connect! Reason: {}", name(), error);
+ connect(future);
+ }
+ }
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/ClientSessionListener.java b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionListener.java
similarity index 77%
rename from client/src/main/java/io/atomix/copycat/client/session/ClientSessionListener.java
rename to client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionListener.java
index 043bd6b3..9877fe2f 100644
--- a/client/src/main/java/io/atomix/copycat/client/session/ClientSessionListener.java
+++ b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionListener.java
@@ -1,27 +1,28 @@
/*
- * Copyright 2015 the original author or authors.
+ * Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License
+ * limitations under the License.
*/
-package io.atomix.copycat.client.session;
+package io.atomix.copycat.client.session.impl;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.protocol.PublishRequest;
import io.atomix.copycat.protocol.ResetRequest;
import io.atomix.copycat.session.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Set;
@@ -35,19 +36,23 @@
*
* @author servers) {
+ selectorManager.resetAll(leader, servers);
+ }
+
+ /**
+ * Sets the session manager state.
+ *
+ * @param state The session manager state.
+ */
+ private void setState(State state) {
+ if (this.state != state) {
+ this.state = state;
+ switch (state) {
+ case OPEN:
+ clientState.setState(CopycatClient.State.CONNECTED);
+ break;
+ case UNSTABLE:
+ clientState.setState(CopycatClient.State.SUSPENDED);
+ if (unstableTime == null) {
+ unstableTime = System.currentTimeMillis();
+ } else if (System.currentTimeMillis() - unstableTime > unstableTimeout.toMillis()) {
+ setState(State.EXPIRED);
+ }
+ break;
+ case EXPIRED:
+ clientState.setState(CopycatClient.State.CLOSED);
+ sessions.values().forEach(CopycatSessionState::close);
+ break;
+ case CLOSED:
+ clientState.setState(CopycatClient.State.CLOSED);
+ sessions.values().forEach(CopycatSessionState::close);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Opens the session manager.
+ *
+ * @return A completable future to be called once the session manager is opened.
+ */
+ public CompletableFuture open() {
+ CompletableFuture future = new CompletableFuture<>();
+ threadContext.execute(() -> connection.open().whenComplete((result, error) -> {
+ if (error == null) {
+ registerClient(new RegisterAttempt(1, future));
+ } else {
+ future.completeExceptionally(error);
+ }
+ }));
+ return future;
+ }
+
+ /**
+ * Opens a new session.
+ *
+ * @param name The session name.
+ * @param type The session type.
+ * @param communicationStrategy The strategy with which to communicate with servers.
+ * @return A completable future to be completed once the session has been opened.
+ */
+ public CompletableFuture openSession(String name, String type, CommunicationStrategy communicationStrategy) {
+ LOG.trace("Opening session; name: {}, type: {}", name, type);
+ OpenSessionRequest request = OpenSessionRequest.builder()
+ .withClient(clientState.getId())
+ .withType(type)
+ .withName(name)
+ .build();
+
+ LOG.trace("Sending {}", request);
+ CompletableFuture future = new CompletableFuture<>();
+ ThreadContext threadContext = new ThreadPoolContext(threadPoolExecutor, this.threadContext.serializer().clone());
+ Runnable callback = () -> connection.sendAndReceive(OpenSessionRequest.NAME, request).whenCompleteAsync((response, error) -> {
+ if (error == null) {
+ if (response.status() == Response.Status.OK) {
+ CopycatSessionState state = new CopycatSessionState(response.session(), name, type);
+ sessions.put(state.getSessionId(), state);
+ CopycatConnection leaderConnection = new CopycatLeaderConnection(state, connectionManager, selectorManager.createSelector(CommunicationStrategies.LEADER));
+ CopycatConnection sessionConnection = new CopycatSessionConnection(state, connectionManager, selectorManager.createSelector(communicationStrategy), threadContext);
+ leaderConnection.open().thenCompose(v -> sessionConnection.open()).whenComplete((connectResult, connectError) -> {
+ if (connectError == null) {
+ future.complete(new DefaultCopycatSession(state, leaderConnection, sessionConnection, threadContext, this));
+ } else {
+ future.completeExceptionally(connectError);
+ }
+ });
+ } else {
+ future.completeExceptionally(response.error().createException());
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
+ }, threadContext);
+
+ if (threadContext.isCurrentContext()) {
+ callback.run();
+ } else {
+ threadContext.execute(callback);
+ }
+ return future;
+ }
+
+ /**
+ * Closes a session.
+ *
+ * @param sessionId The session identifier.
+ * @return A completable future to be completed once the session is closed.
+ */
+ public CompletableFuture closeSession(long sessionId) {
+ CopycatSessionState state = sessions.get(sessionId);
+ if (state == null) {
+ return Futures.exceptionalFuture(new UnknownSessionException("Unknown session: " + sessionId));
+ }
+
+ LOG.trace("Closing session {}", sessionId);
+ CloseSessionRequest request = CloseSessionRequest.builder()
+ .withSession(sessionId)
+ .build();
+
+ LOG.trace("Sending {}", request);
+ CompletableFuture future = new CompletableFuture<>();
+ Runnable callback = () -> connection.sendAndReceive(CloseSessionRequest.NAME, request).whenComplete((response, error) -> {
+ if (error == null) {
+ if (response.status() == Response.Status.OK) {
+ sessions.remove(sessionId);
+ future.complete(null);
+ } else {
+ future.completeExceptionally(response.error().createException());
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
+ });
+
+ if (threadContext.isCurrentContext()) {
+ callback.run();
+ } else {
+ threadContext.execute(callback);
+ }
+ return future;
+ }
+
+ /**
+ * Expires the manager.
+ *
+ * @return A completable future to be completed once the session has been expired.
+ */
+ CompletableFuture expireSessions() {
+ CompletableFuture future = new CompletableFuture<>();
+ threadContext.execute(() -> {
+ if (keepAlive != null)
+ keepAlive.cancel();
+ setState(State.EXPIRED);
+ future.complete(null);
+ });
+ return future;
+ }
+
+ /**
+ * Registers a session.
+ */
+ private void registerClient(RegisterAttempt attempt) {
+ LOG.debug("Registering client: attempt {}", attempt.attempt);
+
+ RegisterRequest request = RegisterRequest.builder()
+ .withClient(clientState.getUuid())
+ .withTimeout(sessionTimeout.toMillis())
+ .build();
+
+ LOG.trace("{} - Sending {}", clientState.getUuid(), request);
+ selectorManager.resetAll();
+ connection.sendAndReceive(RegisterRequest.NAME, request).whenComplete((response, error) -> {
+ if (error == null) {
+ LOG.trace("{} - Received {}", clientState.getUuid(), response);
+ if (response.status() == Response.Status.OK) {
+ clientState.setId(response.clientId());
+ interval = Duration.ofMillis(response.timeout()).dividedBy(2);
+ selectorManager.resetAll(response.leader(), response.members());
+ setState(State.OPEN);
+ LOG.info("{} - Registered client {}", clientState.getUuid(), clientState.getId());
+ attempt.complete();
+ keepAliveSessions();
+ } else {
+ strategy.attemptFailed(attempt);
+ }
+ } else {
+ strategy.attemptFailed(attempt);
+ }
+ });
+ }
+
+ /**
+ * Resets indexes for the given session.
+ *
+ * @param sessionId The session for which to reset indexes.
+ * @return A completable future to be completed once the session's indexes have been reset.
+ */
+ CompletableFuture resetIndexes(long sessionId) {
+ CopycatSessionState sessionState = sessions.get(sessionId);
+ if (sessionState == null) {
+ return Futures.exceptionalFuture(new IllegalArgumentException("Unknown session: " + sessionId));
+ }
+
+ CompletableFuture future = new CompletableFuture<>();
+
+ KeepAliveRequest request = KeepAliveRequest.builder()
+ .withClient(clientState.getId())
+ .withSessionIds(new long[]{sessionId})
+ .withCommandSequences(new long[]{sessionState.getCommandResponse()})
+ .withEventIndexes(new long[]{sessionState.getEventIndex()})
+ .withConnections(new long[]{sessionState.getConnection()})
+ .build();
+
+ LOG.trace("{} - Sending {}", clientState.getUuid(), request);
+ connection.sendAndReceive(KeepAliveRequest.NAME, request).whenComplete((response, error) -> {
+ if (error == null) {
+ LOG.trace("{} - Received {}", clientState.getUuid(), response);
+ if (response.status() == Response.Status.OK) {
+ future.complete(null);
+ } else {
+ future.completeExceptionally(response.error().createException());
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
+ });
+ return future;
+ }
+
+ /**
+ * Sends a keep-alive request to the cluster.
+ */
+ private void keepAliveSessions() {
+ keepAliveSessions(true);
+ }
+
+ /**
+ * Sends a keep-alive request to the cluster.
+ */
+ private void keepAliveSessions(boolean retryOnFailure) {
+ // If the current sessions state is unstable, reset the connection before sending a keep-alive.
+ if (state == State.UNSTABLE) {
+ selectorManager.resetAll();
+ }
+
+ Map sessions = new HashMap<>(this.sessions);
+ long[] sessionIds = new long[sessions.size()];
+ long[] commandResponses = new long[sessions.size()];
+ long[] eventIndexes = new long[sessions.size()];
+ long[] connections = new long[sessions.size()];
+
+ int i = 0;
+ for (CopycatSessionState sessionState : sessions.values()) {
+ sessionIds[i] = sessionState.getSessionId();
+ commandResponses[i] = sessionState.getCommandResponse();
+ eventIndexes[i] = sessionState.getEventIndex();
+ connections[i] = sessionState.getConnection();
+ i++;
+ }
+
+ KeepAliveRequest request = KeepAliveRequest.builder()
+ .withClient(clientState.getId())
+ .withSessionIds(sessionIds)
+ .withCommandSequences(commandResponses)
+ .withEventIndexes(eventIndexes)
+ .withConnections(connections)
+ .build();
+
+ LOG.trace("{} - Sending {}", clientState.getUuid(), request);
+ connection.sendAndReceive(KeepAliveRequest.NAME, request).whenComplete((response, error) -> {
+ if (state != State.CLOSED) {
+ if (error == null) {
+ LOG.trace("{} - Received {}", clientState.getUuid(), response);
+ // If the request was successful, update the address selector and schedule the next keep-alive.
+ if (response.status() == Response.Status.OK) {
+ selectorManager.resetAll(response.leader(), response.members());
+ setState(State.OPEN);
+ scheduleKeepAlive();
+ }
+ // If the session is unknown, immediate expire the session.
+ else if (response.error() == CopycatError.Type.UNKNOWN_SESSION_ERROR) {
+ setState(State.EXPIRED);
+ }
+ // If a leader is still set in the address selector, unset the leader and attempt to send another keep-alive.
+ // This will ensure that the address selector selects all servers without filtering on the leader.
+ else if (retryOnFailure && connection.leader() != null) {
+ selectorManager.resetAll(null, connection.servers());
+ keepAliveSessions(false);
+ }
+ // If no leader was set, set the session state to unstable and schedule another keep-alive.
+ else {
+ setState(State.UNSTABLE);
+ scheduleKeepAlive();
+ }
+ }
+ // If a leader is still set in the address selector, unset the leader and attempt to send another keep-alive.
+ // This will ensure that the address selector selects all servers without filtering on the leader.
+ else if (retryOnFailure && connection.leader() != null) {
+ selectorManager.resetAll(null, connection.servers());
+ keepAliveSessions(false);
+ }
+ // If no leader was set, set the session state to unstable and schedule another keep-alive.
+ else {
+ setState(State.UNSTABLE);
+ scheduleKeepAlive();
+ }
+ }
+ });
+ }
+
+ /**
+ * Schedules a keep-alive request.
+ */
+ private void scheduleKeepAlive() {
+ if (keepAlive != null)
+ keepAlive.cancel();
+ keepAlive = threadContext.schedule(interval, () -> {
+ keepAlive = null;
+ if (state.isActive()) {
+ keepAliveSessions();
+ }
+ });
+ }
+
+ /**
+ * Closes the session manager.
+ *
+ * @return A completable future to be completed once the session manager is closed.
+ */
+ public CompletableFuture close() {
+ if (state == State.EXPIRED)
+ return CompletableFuture.completedFuture(null);
+
+ CompletableFuture future = new CompletableFuture<>();
+ threadContext.execute(() -> {
+ if (keepAlive != null) {
+ keepAlive.cancel();
+ keepAlive = null;
+ }
+ unregister(future);
+ });
+ return future;
+ }
+
+ /**
+ * Unregisters the session.
+ */
+ private void unregister(CompletableFuture future) {
+ unregister(true, future);
+ }
+
+ /**
+ * Unregisters the session.
+ *
+ * @param future A completable future to be completed once the session is unregistered.
+ */
+ private void unregister(boolean retryOnFailure, CompletableFuture future) {
+ // If the session is already closed, skip the unregister attempt.
+ if (state == State.CLOSED) {
+ future.complete(null);
+ return;
+ }
+
+ LOG.debug("{} - Unregistering client: {}", clientState.getUuid(), clientState.getId());
+
+ // If a keep-alive request is already pending, cancel it.
+ if (keepAlive != null) {
+ keepAlive.cancel();
+ keepAlive = null;
+ }
+
+ // If the current sessions state is unstable, reset the connection before sending an unregister request.
+ if (state == State.UNSTABLE) {
+ selectorManager.resetAll();
+ }
+
+ UnregisterRequest request = UnregisterRequest.builder()
+ .withClient(clientState.getId())
+ .build();
+
+ LOG.trace("{} - Sending {}", clientState, request);
+ connection.sendAndReceive(UnregisterRequest.NAME, request).whenComplete((response, error) -> {
+ if (state != State.CLOSED) {
+ if (error == null) {
+ LOG.trace("{} - Received {}", clientState.getUuid(), response);
+ // If the request was successful, update the session state and complete the close future.
+ if (response.status() == Response.Status.OK) {
+ setState(State.CLOSED);
+ future.complete(null);
+ }
+ // If the session is unknown, immediate expire the session and complete the close future.
+ else if (response.error() == CopycatError.Type.UNKNOWN_SESSION_ERROR) {
+ setState(State.EXPIRED);
+ future.complete(null);
+ }
+ // If a leader is still set in the address selector, unset the leader and send another unregister attempt.
+ // This will ensure that the address selector selects all servers without filtering on the leader.
+ else if (retryOnFailure && connection.leader() != null) {
+ selectorManager.resetAll(null, connection.servers());
+ unregister(false, future);
+ }
+ // If no leader was set, set the session state to unstable and fail the unregister attempt.
+ else {
+ setState(State.UNSTABLE);
+ future.completeExceptionally(new ClosedSessionException("failed to unregister session"));
+ }
+ }
+ // If a leader is still set in the address selector, unset the leader and send another unregister attempt.
+ // This will ensure that the address selector selects all servers without filtering on the leader.
+ else if (retryOnFailure && connection.leader() != null) {
+ selectorManager.resetAll(null, connection.servers());
+ unregister(false, future);
+ }
+ // If no leader was set, set the session state to unstable and schedule another unregister attempt.
+ else {
+ setState(State.UNSTABLE);
+ future.completeExceptionally(new ClosedSessionException("failed to unregister session"));
+ }
+ }
+ });
+ }
+
+ /**
+ * Kills the client session manager.
+ *
+ * @return A completable future to be completed once the session manager is killed.
+ */
+ public CompletableFuture kill() {
+ return CompletableFuture.runAsync(() -> {
+ if (keepAlive != null)
+ keepAlive.cancel();
+ setState(State.CLOSED);
+ }, threadContext);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s[client=%s]", getClass().getSimpleName(), clientState.getUuid());
+ }
+
+ /**
+ * Client session connection attempt.
+ */
+ private final class RegisterAttempt implements ConnectionStrategy.Attempt {
+ private final int attempt;
+ private final CompletableFuture future;
+
+ private RegisterAttempt(int attempt, CompletableFuture future) {
+ this.attempt = attempt;
+ this.future = future;
+ }
+
+ @Override
+ public int attempt() {
+ return attempt;
+ }
+
+ /**
+ * Completes the attempt successfully.
+ */
+ public void complete() {
+ complete(null);
+ }
+
+ /**
+ * Completes the attempt successfully.
+ *
+ * @param result The attempt result.
+ */
+ public void complete(Void result) {
+ future.complete(result);
+ }
+
+ @Override
+ public void fail() {
+ future.completeExceptionally(new ConnectException("failed to register session"));
+ }
+
+ @Override
+ public void fail(Throwable error) {
+ future.completeExceptionally(error);
+ }
+
+ @Override
+ public void retry() {
+ LOG.debug("Retrying session register attempt");
+ registerClient(new RegisterAttempt(attempt + 1, future));
+ }
+
+ @Override
+ public void retry(Duration after) {
+ LOG.debug("Retrying session register attempt");
+ threadContext.schedule(after, () -> registerClient(new RegisterAttempt(attempt + 1, future)));
+ }
+ }
+
+ /**
+ * Session manager state.
+ */
+ private enum State {
+ OPEN(true),
+ UNSTABLE(true),
+ EXPIRED(false),
+ CLOSED(false);
+
+ private final boolean active;
+
+ State(boolean active) {
+ this.active = active;
+ }
+
+ /**
+ * Returns whether the state is active, requiring keep-alives.
+ *
+ * @return Whether the state is active.
+ */
+ public boolean isActive() {
+ return active;
+ }
+ }
+
+}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/ClientSequencer.java b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionSequencer.java
similarity index 96%
rename from client/src/main/java/io/atomix/copycat/client/session/ClientSequencer.java
rename to client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionSequencer.java
index 57c3e3b4..d80b512e 100644
--- a/client/src/main/java/io/atomix/copycat/client/session/ClientSequencer.java
+++ b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionSequencer.java
@@ -1,19 +1,19 @@
/*
- * Copyright 2016 the original author or authors.
+ * Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License
+ * limitations under the License.
*/
-package io.atomix.copycat.client.session;
+package io.atomix.copycat.client.session.impl;
import io.atomix.copycat.protocol.OperationResponse;
import io.atomix.copycat.protocol.PublishRequest;
@@ -54,17 +54,17 @@
*
* @author 0 && (System.currentTimeMillis() - unstableSince) > unstabilityTimeout) {
- return setStateAndCallListeners(Session.State.STALE);
- }
- } else if (this.state != Session.State.STALE) {
- unstableSince = System.currentTimeMillis();
- return setStateAndCallListeners(state);
- }
+ void close() {
+ if (open.compareAndSet(true, false)) {
+ changeListeners.forEach(l -> l.accept(CopycatSession.State.CLOSED));
}
-
- return this;
- }
-
- private ClientSessionState setStateAndCallListeners(Session.State state) {
- this.state = state;
- changeListeners.forEach(l -> l.accept(state));
- return this;
}
/**
@@ -139,10 +108,10 @@ private ClientSessionState setStateAndCallListeners(Session.State state) {
* @param callback The state change listener callback.
* @return The state change listener.
*/
- public Listener onStateChange(Consumer callback) {
- Listener listener = new Listener() {
+ public Listener onStateChange(Consumer callback) {
+ Listener listener = new Listener() {
@Override
- public void accept(Session.State state) {
+ public void accept(CopycatSession.State state) {
callback.accept(state);
}
@Override
@@ -160,7 +129,7 @@ public void close() {
* @param commandRequest The last command request sequence number.
* @return The client session state.
*/
- public ClientSessionState setCommandRequest(long commandRequest) {
+ public CopycatSessionState setCommandRequest(long commandRequest) {
this.commandRequest = commandRequest;
return this;
}
@@ -189,7 +158,7 @@ public long nextCommandRequest() {
* @param commandResponse The last command sequence number for which a response has been received.
* @return The client session state.
*/
- public ClientSessionState setCommandResponse(long commandResponse) {
+ public CopycatSessionState setCommandResponse(long commandResponse) {
this.commandResponse = commandResponse;
return this;
}
@@ -209,7 +178,7 @@ public long getCommandResponse() {
* @param responseIndex The highest index for which a command or query response has been received.
* @return The client session state.
*/
- public ClientSessionState setResponseIndex(long responseIndex) {
+ public CopycatSessionState setResponseIndex(long responseIndex) {
this.responseIndex = Math.max(this.responseIndex, responseIndex);
return this;
}
@@ -229,7 +198,7 @@ public long getResponseIndex() {
* @param eventIndex The highest index for which an event has been received in sequence.
* @return The client session state.
*/
- public ClientSessionState setEventIndex(long eventIndex) {
+ public CopycatSessionState setEventIndex(long eventIndex) {
this.eventIndex = eventIndex;
return this;
}
@@ -243,4 +212,33 @@ public long getEventIndex() {
return eventIndex;
}
+ /**
+ * Sets the session's current connection.
+ *
+ * @param connection The session's current connection.
+ * @return The client session state.
+ */
+ public CopycatSessionState setConnection(int connection) {
+ this.connection = connection;
+ return this;
+ }
+
+ /**
+ * Returns the session's current connection.
+ *
+ * @return The session's current connection.
+ */
+ public long getConnection() {
+ return connection;
+ }
+
+ /**
+ * Returns the session's next connection ID.
+ *
+ * @return The session's next connection ID.
+ */
+ public long nextConnection() {
+ return ++connection;
+ }
+
}
diff --git a/client/src/main/java/io/atomix/copycat/client/session/ClientSessionSubmitter.java b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionSubmitter.java
similarity index 84%
rename from client/src/main/java/io/atomix/copycat/client/session/ClientSessionSubmitter.java
rename to client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionSubmitter.java
index 7bfaefdb..148437cc 100644
--- a/client/src/main/java/io/atomix/copycat/client/session/ClientSessionSubmitter.java
+++ b/client/src/main/java/io/atomix/copycat/client/session/impl/CopycatSessionSubmitter.java
@@ -1,22 +1,21 @@
/*
- * Copyright 2015 the original author or authors.
+ * Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License
+ * limitations under the License.
*/
-package io.atomix.copycat.client.session;
+package io.atomix.copycat.client.session.impl;
import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.transport.TransportException;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.Command;
@@ -28,7 +27,8 @@
import io.atomix.copycat.error.UnknownSessionException;
import io.atomix.copycat.protocol.*;
import io.atomix.copycat.session.ClosedSessionException;
-import io.atomix.copycat.session.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
@@ -48,7 +48,8 @@
*
* @author CompletableFuture submit(Command command) {
CompletableFuture future = new CompletableFuture<>();
- context.executor().execute(() -> submitCommand(command, future));
+ context.execute(() -> submitCommand(command, future));
return future;
}
@@ -113,7 +118,7 @@ private void submitCommand(CommandRequest request, CompletableFuture futu
*/
public CompletableFuture submit(Query query) {
CompletableFuture future = new CompletableFuture<>();
- context.executor().execute(() -> submitQuery(query, future));
+ context.execute(() -> submitQuery(query, future));
return future;
}
@@ -143,12 +148,12 @@ private void submitQuery(QueryRequest request, CompletableFuture future)
* @param attempt The attempt to submit.
*/
private void submit(OperationAttempt attempt) {
- if (state.getState() == Session.State.CLOSED || state.getState() == Session.State.EXPIRED) {
+ if (!state.isOpen()) {
attempt.fail(new ClosedSessionException("session closed"));
} else {
- state.getLogger().trace("{} - Sending {}", state.getSessionId(), attempt.request);
+ LOG.trace("{} - Sending {}", state.getSessionId(), attempt.request);
attempts.put(attempt.sequence, attempt);
- connection.sendAndReceive(attempt.request).whenComplete(attempt);
+ attempt.send();
attempt.future.whenComplete((r, e) -> attempts.remove(attempt.sequence));
}
}
@@ -171,28 +176,13 @@ private void resubmit(long commandSequence, OperationAttempt, ?, ?> attempt) {
long responseSequence = state.getCommandResponse();
if (commandSequence < responseSequence && keepAliveIndex.get() != responseSequence) {
keepAliveIndex.set(responseSequence);
- KeepAliveRequest request = KeepAliveRequest.builder()
- .withSession(state.getSessionId())
- .withCommandSequence(state.getCommandResponse())
- .withEventIndex(state.getEventIndex())
- .build();
- state.getLogger().trace("{} - Sending {}", state.getSessionId(), request);
- connection.sendAndReceive(request).whenComplete((response, error) -> {
+ manager.resetIndexes(state.getSessionId()).whenCompleteAsync((result, error) -> {
if (error == null) {
- state.getLogger().trace("{} - Received {}", state.getSessionId(), response);
-
- // If the keep-alive is successful, recursively resubmit operations starting
- // at the submitted response sequence number rather than the command sequence.
- if (response.status() == Response.Status.OK) {
- resubmit(responseSequence, attempt);
- } else {
- attempt.retry(Duration.ofSeconds(FIBONACCI[Math.min(attempt.attempt-1, FIBONACCI.length-1)]));
- }
+ resubmit(responseSequence, attempt);
} else {
- keepAliveIndex.set(0);
attempt.retry(Duration.ofSeconds(FIBONACCI[Math.min(attempt.attempt-1, FIBONACCI.length-1)]));
}
- });
+ }, context);
} else {
for (Map.Entry entry : attempts.entrySet()) {
OperationAttempt operation = entry.getValue();
@@ -232,6 +222,11 @@ protected OperationAttempt(long sequence, int attempt, T request, CompletableFut
this.future = future;
}
+ /**
+ * Sends the attempt.
+ */
+ protected abstract void send();
+
/**
* Returns the next instance of the attempt.
*
@@ -259,10 +254,6 @@ protected OperationAttempt(long sequence, int attempt, T request, CompletableFut
* @param error The completion exception.
*/
protected void complete(Throwable error) {
- // If the exception is an UnknownSessionException, expire the session.
- if (error instanceof UnknownSessionException) {
- state.setState(Session.State.EXPIRED);
- }
sequence(null, () -> future.completeExceptionally(error));
}
@@ -296,7 +287,7 @@ public void fail(Throwable t) {
* Immediately retries the attempt.
*/
public void retry() {
- context.executor().execute(() -> submit(next()));
+ context.execute(() -> submit(next()));
}
/**
@@ -322,6 +313,11 @@ public CommandAttempt(long sequence, int attempt, CommandRequest request, Comple
super(sequence, attempt, request, future);
}
+ @Override
+ protected void send() {
+ leaderConnection.sendAndReceive(CommandRequest.NAME, request).whenComplete(this);
+ }
+
@Override
protected OperationAttempt next() {
return new CommandAttempt<>(sequence, this.attempt + 1, request, future);
@@ -335,7 +331,7 @@ protected Throwable defaultException() {
@Override
public void accept(CommandResponse response, Throwable error) {
if (error == null) {
- state.getLogger().trace("{} - Received {}", state.getSessionId(), response);
+ LOG.trace("{} - Received {}", state.getSessionId(), response);
if (response.status() == Response.Status.OK) {
complete(response);
}
@@ -346,7 +342,9 @@ else if (response.error() == CopycatError.Type.COMMAND_ERROR) {
}
// The following exceptions need to be handled at a higher level by the client or the user.
else if (response.error() == CopycatError.Type.APPLICATION_ERROR
+ || response.error() == CopycatError.Type.UNKNOWN_CLIENT_ERROR
|| response.error() == CopycatError.Type.UNKNOWN_SESSION_ERROR
+ || response.error() == CopycatError.Type.UNKNOWN_STATE_MACHINE_ERROR
|| response.error() == CopycatError.Type.INTERNAL_ERROR) {
complete(response.error().createException());
}
@@ -370,7 +368,7 @@ public void fail(Throwable cause) {
.withSequence(this.request.sequence())
.withCommand(new NoOpCommand())
.build();
- context.executor().execute(() -> submit(new CommandAttempt<>(sequence, this.attempt + 1, request, future)));
+ context.execute(() -> submit(new CommandAttempt<>(sequence, this.attempt + 1, request, future)));
}
}
@@ -397,6 +395,11 @@ public QueryAttempt(long sequence, int attempt, QueryRequest request, Completabl
super(sequence, attempt, request, future);
}
+ @Override
+ protected void send() {
+ sessionConnection.sendAndReceive(QueryRequest.NAME, request).whenComplete(this);
+ }
+
@Override
protected OperationAttempt next() {
return new QueryAttempt<>(sequence, this.attempt + 1, request, future);
@@ -410,7 +413,7 @@ protected Throwable defaultException() {
@Override
public void accept(QueryResponse response, Throwable error) {
if (error == null) {
- state.getLogger().trace("{} - Received {}", state.getSessionId(), response);
+ LOG.trace("{} - Received {}", state.getSessionId(), response);
if (response.status() == Response.Status.OK) {
complete(response);
} else {
diff --git a/client/src/main/java/io/atomix/copycat/client/session/ClientSession.java b/client/src/main/java/io/atomix/copycat/client/session/impl/DefaultCopycatSession.java
similarity index 57%
rename from client/src/main/java/io/atomix/copycat/client/session/ClientSession.java
rename to client/src/main/java/io/atomix/copycat/client/session/impl/DefaultCopycatSession.java
index ba5860cc..1e8759a2 100644
--- a/client/src/main/java/io/atomix/copycat/client/session/ClientSession.java
+++ b/client/src/main/java/io/atomix/copycat/client/session/impl/DefaultCopycatSession.java
@@ -1,35 +1,29 @@
/*
- * Copyright 2015 the original author or authors.
+ * Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
- * limitations under the License
+ * limitations under the License.
*/
-package io.atomix.copycat.client.session;
+package io.atomix.copycat.client.session.impl;
-import io.atomix.catalyst.concurrent.Futures;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.ThreadContext;
-import io.atomix.catalyst.transport.Client;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.Command;
import io.atomix.copycat.Operation;
import io.atomix.copycat.Query;
-import io.atomix.copycat.client.ConnectionStrategy;
-import io.atomix.copycat.client.util.AddressSelector;
-import io.atomix.copycat.client.util.ClientConnection;
-import io.atomix.copycat.session.ClosedSessionException;
+import io.atomix.copycat.client.session.CopycatSession;
import io.atomix.copycat.session.Session;
-import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -39,7 +33,7 @@
* The client session is responsible for maintaining a client's connection to a Copycat cluster and coordinating
* the submission of {@link Command commands} and {@link Query queries} to various nodes in the cluster. Client
* sessions are single-use objects that represent the context within which a cluster can guarantee linearizable
- * semantics for state machine operations. When a session is {@link #register() opened}, the session will register
+ * semantics for state machine operations. When a session is opened, the session will register
* itself with the cluster by attempting to contact each of the known servers. Once the session has been successfully
* registered, kee-alive requests will be periodically sent to keep the session alive.
*