BUG-5280: implement message queueing 61/39561/75
authorRobert Varga <rovarga@cisco.com>
Sat, 28 May 2016 23:27:24 +0000 (01:27 +0200)
committerRobert Varga <rovarga@cisco.com>
Wed, 6 Jul 2016 09:52:24 +0000 (11:52 +0200)
This patch implements the basic queueing and timeout retry mechanism
in ClientActorBehavior.

This implementation is not very efficient, as each send goes through
the actor's mailbox, but it gets the job done and is correct. It will
be optimized in a follow-up patch, which will refactor internal
workings so that SequencedQueue is fully thread-safe and correct with
regard to request enqueue, timeouts and retries.

Change-Id: I207a30877328dbdc08d42f76a0db55b5ae162de5
Signed-off-by: Robert Varga <rovarga@cisco.com>
13 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InternalCommand.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/NoProgressException.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RequestCallback.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueue.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContextTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java [new file with mode: 0644]

index ff5f8820dda9b90cb7aae48efc2514f697acb65c..364e462e57c7b923379c2ec5a548c75f88622472 100644 (file)
@@ -66,16 +66,17 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
         // FIXME: Add state flushing here once we have state
     }
 
-    private ClientActorBehavior createLocalHistory(final CompletableFuture<ClientLocalHistory> future) {
+    private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior,
+            final CompletableFuture<ClientLocalHistory> future) {
         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
         LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
 
         // FIXME: initiate backend instantiation
         future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
-        return this;
+        return currentBehavior;
     }
 
-    private ClientActorBehavior shutdown() {
+    private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) {
         // FIXME: Add shutdown procedures here
         return null;
     }
@@ -100,7 +101,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
     @Override
     public CompletionStage<ClientLocalHistory> createLocalHistory() {
         final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
-        context().executeInActor(() -> createLocalHistory(future));
+        context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future));
         return future;
     }
 
index 237b570320543939df3692d720e55aefd59d61dc..c3df604ef20ce2a77fd2ddb114d6ab6b471a369c 100644 (file)
@@ -8,15 +8,20 @@
 package org.opendaylight.controller.cluster.datastore.actors.client;
 
 import com.google.common.annotations.Beta;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A behavior, which handles messages sent to a {@link AbstractClientActor}.
@@ -34,27 +39,121 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
         super(context);
     }
 
+    @Override
+    public final @Nonnull ClientIdentifier getIdentifier() {
+        return context().getIdentifier();
+    }
+
     @Override
     final ClientActorBehavior onReceiveCommand(final Object command) {
         if (command instanceof InternalCommand) {
-            return ((InternalCommand) command).execute();
-        } else if (command instanceof RequestFailure) {
-            final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) command;
-            final RequestException cause = failure.getCause();
-            if (cause instanceof RetiredGenerationException) {
-                LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
-                haltClient(cause);
-                return null;
-            }
+            return ((InternalCommand) command).execute(this);
+        }
+        if (command instanceof RequestSuccess) {
+            return onRequestSuccess((RequestSuccess<?, ?>) command);
+        }
+        if (command instanceof RequestFailure) {
+            return onRequestFailure((RequestFailure<?, ?>) command);
         }
 
-        // TODO: any client-common logic (such as validation and common dispatch) needs to go here
         return onCommand(command);
     }
 
-    @Override
-    public final @Nonnull ClientIdentifier getIdentifier() {
-        return context().getIdentifier();
+    private ClientActorBehavior onRequestSuccess(final RequestSuccess<?, ?> success) {
+        return context().completeRequest(this, success);
+    }
+
+    private ClientActorBehavior onRequestFailure(final RequestFailure<?, ?> failure) {
+        final RequestException cause = failure.getCause();
+        if (cause instanceof RetiredGenerationException) {
+            LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
+            haltClient(cause);
+            context().poison(cause);
+            return null;
+        }
+
+        if (failure.isHardFailure()) {
+            return context().completeRequest(this, failure);
+        }
+
+        // TODO: add instanceof checks on cause to detect more problems
+
+        LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), failure);
+        return context().completeRequest(this, failure);
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+        // Get or allocate queue for the request
+        final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
+
+        // Note this is a tri-state return and can be null
+        final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
+        if (result == null) {
+            // Happy path: we are done here
+            return this;
+        }
+
+        if (result.isPresent()) {
+            // Less happy path: we need to schedule a timer
+            scheduleQueueTimeout(queue, result.get());
+            return this;
+        }
+
+        startResolve(queue, request.getTarget().getHistoryId().getCookie());
+        return this;
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private void startResolve(final SequencedQueue queue, final long cookie) {
+        // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a
+        // previous request to resolve.
+        final CompletionStage<? extends BackendInfo> f = resolver().getBackendInfo(cookie);
+
+        // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has
+        // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to
+        // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and
+        // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will
+        // bulk-process all requests.
+        if (queue.expectProof(f)) {
+            f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend)));
+        }
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private ClientActorBehavior finishResolve(final SequencedQueue queue,
+            final CompletionStage<? extends BackendInfo> futureBackend, final BackendInfo backend) {
+
+        final Optional<FiniteDuration> maybeTimeout = queue.setBackendInfo(futureBackend, backend);
+        if (maybeTimeout.isPresent()) {
+            scheduleQueueTimeout(queue, maybeTimeout.get());
+        }
+        return this;
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) {
+        LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout);
+        context().executeInActor(cb -> cb.queueTimeout(queue), timeout);
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private ClientActorBehavior queueTimeout(final SequencedQueue queue) {
+        final boolean needBackend;
+
+        try {
+            needBackend = queue.runTimeout();
+        } catch (NoProgressException e) {
+            // Uh-oh, no progress. The queue has already killed itself, now we need to remove it
+            context().removeQueue(queue);
+            return this;
+        }
+
+        if (needBackend) {
+            startResolve(queue, queue.getCookie());
+        }
+
+        return this;
     }
 
     /**
@@ -81,4 +180,15 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
      * @return
      */
     protected abstract @Nonnull BackendInfoResolver<?> resolver();
+
+    /**
+     * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+     * from any thread.
+     *
+     * @param request Request to send
+     * @param callback Callback to invoke
+     */
+    public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+        context().executeInActor(cb -> cb.doSendRequest(request, callback));
+    }
 }
index 3aa1a5200b077f91fda048ba2704648de98c8f88..9f4fd137a4ca5dc88834935714d1f0285b658235 100644 (file)
@@ -8,12 +8,25 @@
 package org.opendaylight.controller.cluster.datastore.actors.client;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Scheduler;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * An actor context associated with this {@link AbstractClientActor}.
@@ -25,13 +38,22 @@ import org.opendaylight.yangtools.concepts.Identifiable;
  * @author Robert Varga
  */
 @Beta
+@ThreadSafe
 public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
+
+    private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
     private final ClientIdentifier identifier;
+    private final ExecutionContext executionContext;
+    private final Scheduler scheduler;
 
     // Hidden to avoid subclassing
-    ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) {
+    ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
+            final String persistenceId, final ClientIdentifier identifier) {
         super(self, persistenceId);
         this.identifier = Preconditions.checkNotNull(identifier);
+        this.scheduler = Preconditions.checkNotNull(scheduler);
+        this.executionContext = Preconditions.checkNotNull(executionContext);
     }
 
     @Override
@@ -58,4 +80,42 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
     public void executeInActor(final @Nonnull InternalCommand command) {
         self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
     }
+
+    public Cancellable executeInActor(final @Nonnull InternalCommand command, final FiniteDuration delay) {
+        return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
+            executionContext, ActorRef.noSender());
+    }
+
+    SequencedQueue queueFor(final Long cookie) {
+        return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
+    }
+
+    void removeQueue(final SequencedQueue queue) {
+        queues.remove(queue.getCookie(), queue);
+    }
+
+    ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response<?, ?> response) {
+        final WritableIdentifier id = response.getTarget();
+
+        // FIXME: this will need to be updated for other Request/Response types to extract cookie
+        Preconditions.checkArgument(id instanceof TransactionIdentifier);
+        final TransactionIdentifier txId = (TransactionIdentifier) id;
+
+        final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
+        if (queue == null) {
+            LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
+            return current;
+        } else {
+            return queue.complete(current, response);
+        }
+    }
+
+    void poison(final RequestException cause) {
+        for (SequencedQueue q : queues.values()) {
+            q.poison(cause);
+        }
+
+        queues.clear();
+    }
+
 }
index 636dd1e34f576d70842f288efe755ed12ac2b86f..5dce1cd3f13214df8efcfc1ffd6da65e0ada92c2 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.actors.client;
 
+import akka.actor.ActorSystem;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 
@@ -26,7 +27,11 @@ final class InitialClientActorContext extends AbstractClientActorContext {
         actor.saveSnapshot(snapshot);
     }
 
-    ClientActorBehavior createBehavior(final ClientActorContext context) {
+    ClientActorBehavior createBehavior(final ClientIdentifier clientId) {
+        final ActorSystem system = actor.getContext().system();
+        final ClientActorContext context = new ClientActorContext(self(), system.scheduler(), system.dispatcher(),
+            persistenceId(), clientId);
+
         return actor.initialBehavior(context);
     }
 
index c72b854dd887405a8d5e8db3cda177466f97d2bb..4e4757cdecd9197c53f0abb57b51d5041acd383d 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.actors.client;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 /**
@@ -20,7 +21,8 @@ public interface InternalCommand {
     /**
      * Run command actions.
      *
+     * @param currentBehavior Current Behavior
      * @return Next behavior to use in the client actor
      */
-    @Nullable ClientActorBehavior execute();
+    @Nullable ClientActorBehavior execute(@Nonnull ClientActorBehavior currentBehavior);
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/NoProgressException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/NoProgressException.java
new file mode 100644 (file)
index 0000000..7a80f7a
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * Internal {@link RequestException} used as poison cause when the client fails to make progress for a long time.
+ * See {@link SequencedQueue} for details.
+ *
+ * @author Robert Varga
+ */
+final class NoProgressException extends RequestException {
+    private static final long serialVersionUID = 1L;
+
+    protected NoProgressException(final long nanos) {
+        super(String.format("No progress in %s seconds", TimeUnit.NANOSECONDS.toSeconds(nanos)));
+    }
+
+    @Override
+    public boolean isRetriable() {
+        return false;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RequestCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RequestCallback.java
new file mode 100644 (file)
index 0000000..074eab8
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+
+@FunctionalInterface
+public interface RequestCallback {
+    /**
+     * Invoked when a particular request completes.
+     *
+     * @param response Response to the request
+     * @return Next client actor behavior
+     */
+    @Nullable ClientActorBehavior complete(@Nonnull Response<?, ?> response);
+}
index ac5f12fb9d5d8cc00c8a91433a298ad98ee5724c..f8977344c3ab9190397964d03f25fe4fdbaaad71 100644 (file)
@@ -33,7 +33,7 @@ final class SavingClientActorBehavior extends RecoveredClientActorBehavior<Initi
             return null;
         } else if (command instanceof SaveSnapshotSuccess) {
             context().unstash();
-            return context().createBehavior(new ClientActorContext(self(), persistenceId(), myId));
+            return context().createBehavior(myId);
         } else {
             LOG.debug("{}: stashing command {}", persistenceId(), command);
             context().stash();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueue.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueue.java
new file mode 100644 (file)
index 0000000..8cae0e1
--- /dev/null
@@ -0,0 +1,242 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/*
+ * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
+ *       retries and enqueues work as expected.
+ */
+@NotThreadSafe
+final class SequencedQueue {
+    private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
+
+    // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
+    @VisibleForTesting
+    static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
+    @VisibleForTesting
+    static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+    private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
+        TimeUnit.NANOSECONDS);
+
+    /**
+     * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
+     * progress at different speeds, these may be completed out of order.
+     *
+     * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
+     *       figure out a more efficient lookup mechanism to deal with responses which do not match the queue
+     *       order.
+     */
+    private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
+    private final Ticker ticker;
+    private final Long cookie;
+
+    // Updated/consulted from actor context only
+    /**
+     * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
+     * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
+     * result.
+     */
+    private CompletionStage<? extends BackendInfo> backendProof;
+    private BackendInfo backend;
+
+    /**
+     * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
+     */
+    private Object expectingTimer;
+
+    private long lastProgress;
+
+    // Updated from application thread
+    private volatile boolean notClosed = true;
+
+    SequencedQueue(final Long cookie, final Ticker ticker) {
+        this.cookie = Preconditions.checkNotNull(cookie);
+        this.ticker = Preconditions.checkNotNull(ticker);
+        lastProgress = ticker.read();
+    }
+
+    Long getCookie() {
+        return cookie;
+    }
+
+    private void checkNotClosed() {
+        Preconditions.checkState(notClosed, "Queue %s is closed", this);
+    }
+
+    /**
+     * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
+     * the following scenarios:
+     * 1) The request has been enqueued and transmitted. No further actions are necessary
+     * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
+     * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
+     *    process needs to complete before transmission occurs
+     *
+     * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
+     * the scenarios above according to the following rules:
+     * - if is null, the first case applies
+     * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
+     *      resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
+     * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
+     *
+     * @param request Request to be sent
+     * @param callback Callback to be invoked
+     * @return Optional duration with semantics described above.
+     */
+    @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
+        final long now = ticker.read();
+        final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
+
+        // We could have check first, but argument checking needs to happen first
+        checkNotClosed();
+        queue.add(e);
+        LOG.debug("Enqueued request {} to queue {}", request, this);
+
+        if (backend == null) {
+            return Optional.empty();
+        }
+
+        e.retransmit(backend, now);
+        if (expectingTimer == null) {
+            expectingTimer = now + REQUEST_TIMEOUT_NANOS;
+            return Optional.of(INITIAL_REQUEST_TIMEOUT);
+        } else {
+            return null;
+        }
+    }
+
+    ClientActorBehavior complete(final ClientActorBehavior current, final Response<?, ?> response) {
+        // Responses to different targets may arrive out of order, hence we use an iterator
+        final Iterator<SequencedQueueEntry> it = queue.iterator();
+        while (it.hasNext()) {
+            final SequencedQueueEntry e = it.next();
+            if (e.acceptsResponse(response)) {
+                lastProgress = ticker.read();
+                it.remove();
+                LOG.debug("Completing request {} with {}", e, response);
+                return e.complete(response);
+            }
+        }
+
+        LOG.debug("No request matching {} found", response);
+        return current;
+    }
+
+    Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
+        if (!proof.equals(backendProof)) {
+            LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
+            return Optional.empty();
+        }
+
+        this.backend = Preconditions.checkNotNull(backend);
+        backendProof = null;
+        LOG.debug("Resolved backend {}",  backend);
+
+        if (queue.isEmpty()) {
+            // No pending requests, hence no need for a timer
+            return Optional.empty();
+        }
+
+        LOG.debug("Resending requests to backend {}", backend);
+        final long now = ticker.read();
+        for (SequencedQueueEntry e : queue) {
+            e.retransmit(backend, now);
+        }
+
+        if (expectingTimer != null) {
+            // We already have a timer going, no need to schedule a new one
+            return Optional.empty();
+        }
+
+        // Above loop may have cost us some time. Recalculate timeout.
+        final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
+        expectingTimer = nextTicks;
+        return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
+    }
+
+    boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
+        if (!proof.equals(backendProof)) {
+            LOG.debug("Setting resolution handle to {}", proof);
+            backendProof = proof;
+            return true;
+        } else {
+            LOG.trace("Already resolving handle {}", proof);
+            return false;
+        }
+    }
+
+    boolean hasCompleted() {
+        return !notClosed && queue.isEmpty();
+    }
+
+    /**
+     * Check queue timeouts and return true if a timeout has occured.
+     *
+     * @return True if a timeout occured
+     * @throws NoProgressException if the queue failed to make progress for an extended
+     *                             time.
+     */
+    boolean runTimeout() throws NoProgressException {
+        expectingTimer = null;
+        final long now = ticker.read();
+
+        if (!queue.isEmpty()) {
+            final long ticksSinceProgress = now - lastProgress;
+            if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
+                LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+                    TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+
+                final NoProgressException ex = new NoProgressException(ticksSinceProgress);
+                poison(ex);
+                throw ex;
+            }
+        }
+
+        // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
+        final SequencedQueueEntry head = queue.peek();
+        if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
+            backend = null;
+            LOG.debug("Queue {} invalidated backend info", this);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    void poison(final RequestException cause) {
+        close();
+
+        SequencedQueueEntry e = queue.poll();
+        while (e != null) {
+            e.poison(cause);
+            e = queue.poll();
+        }
+    }
+
+    // FIXME: add a caller from ClientSingleTransaction
+    void close() {
+        notClosed = false;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java
new file mode 100644 (file)
index 0000000..54940cd
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
+ *
+ * @author Robert Varga
+ *
+ * @param <I> Target identifier type
+ */
+final class SequencedQueueEntry {
+    private static final class LastTry {
+        final Request<?, ?> request;
+        final long timeTicks;
+
+        LastTry(final Request<?, ?> request, final long when) {
+            this.request = Preconditions.checkNotNull(request);
+            this.timeTicks = when;
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
+
+    private final Request<?, ?> request;
+    private final RequestCallback callback;
+    private final long enqueuedTicks;
+
+    private Optional<LastTry> lastTry = Optional.empty();
+
+    SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback, final long now) {
+        this.request = Preconditions.checkNotNull(request);
+        this.callback = Preconditions.checkNotNull(callback);
+        this.enqueuedTicks = now;
+    }
+
+    long getSequence() {
+        return request.getSequence();
+    }
+
+    boolean acceptsResponse(final Response<?, ?> response) {
+        return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget());
+    }
+
+    long getCurrentTry() {
+        final Request<?, ?> req = lastTry.isPresent() ? lastTry.get().request : request;
+        return req.getRetry();
+     }
+
+    ClientActorBehavior complete(final Response<?, ?> response) {
+        LOG.debug("Completing request {} with {}", request, response);
+        return callback.complete(response);
+    }
+
+    void poison(final RequestException cause) {
+        LOG.trace("Poisoning request {}", request, cause);
+        callback.complete(request.toRequestFailure(cause));
+    }
+
+    boolean isTimedOut(final long now, final long timeoutNanos) {
+        final Request<?, ?> req;
+        final long elapsed;
+
+        if (lastTry.isPresent()) {
+            final LastTry t = lastTry.get();
+            elapsed = now - t.timeTicks;
+            req = t.request;
+        } else {
+            elapsed = now - enqueuedTicks;
+            req = request;
+        }
+
+        if (elapsed >= timeoutNanos) {
+            LOG.debug("Request {} timed out after {}ns", req, elapsed);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    void retransmit(final BackendInfo backend, final long now) {
+        final Request<?, ?> nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request;
+        final Request<?, ?> toSend = nextTry.toVersion(backend.getVersion());
+        final ActorRef actor = backend.getActor();
+
+        LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
+        actor.tell(toSend, ActorRef.noSender());
+        lastTry = Optional.of(new LastTry(toSend, now));
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
+    }
+}
index f575815cb1722ae1a05fc7ba74b6562ba6b23f85..8078f39679f1393aeff25edd397667a9535e203b 100644 (file)
@@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore.actors.client;
 
 import static org.junit.Assert.assertSame;
 import akka.actor.ActorRef;
+import akka.actor.Scheduler;
+import akka.dispatch.Dispatcher;
 import com.google.common.base.Ticker;
 import org.junit.Before;
 import org.junit.Test;
@@ -30,6 +32,12 @@ public class ClientActorContextTest {
     @Mock
     private ActorRef mockSelf;
 
+    @Mock
+    private Scheduler mockScheduler;
+
+    @Mock
+    private Dispatcher mockDispatcher;
+
     @Before
     public void setup() {
         MockitoAnnotations.initMocks(this);
@@ -37,7 +45,7 @@ public class ClientActorContextTest {
 
     @Test
     public void testMockingControl() {
-        ClientActorContext ctx = new ClientActorContext(mockSelf, PERSISTENCE_ID, CLIENT_ID);
+        ClientActorContext ctx = new ClientActorContext(mockSelf, mockScheduler, mockDispatcher, PERSISTENCE_ID, CLIENT_ID);
         assertSame(CLIENT_ID, ctx.getIdentifier());
         assertSame(PERSISTENCE_ID, ctx.persistenceId());
         assertSame(mockSelf, ctx.self());
@@ -45,7 +53,7 @@ public class ClientActorContextTest {
 
     @Test
     public void testTicker() {
-        ClientActorContext ctx = new ClientActorContext(mockSelf, PERSISTENCE_ID, CLIENT_ID);
+        ClientActorContext ctx = new ClientActorContext(mockSelf, mockScheduler, mockDispatcher, PERSISTENCE_ID, CLIENT_ID);
         assertSame(Ticker.systemTicker(), ctx.ticker());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java
new file mode 100644 (file)
index 0000000..68a665e
--- /dev/null
@@ -0,0 +1,215 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.TestProbe;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.common.actor.TestTicker;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Test suite covering logic contained in {@link SequencedQueueEntry}.
+ *
+ * @author Robert Varga
+ */
+public class SequencedQueueEntryTest {
+    private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
+        private static final long serialVersionUID = 1L;
+
+        MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) {
+            super(target, sequence, retry, cause);
+        }
+
+        @Override
+        protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(final ABIVersion version) {
+            return null;
+        }
+
+        @Override
+        protected MockFailure cloneAsVersion(final ABIVersion version) {
+            return this;
+        }
+    }
+
+    private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
+        private static final long serialVersionUID = 1L;
+
+        MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) {
+            super(target, sequence, 0, replyTo);
+        }
+
+
+        MockRequest(final MockRequest request, final long retry) {
+            super(request, retry);
+        }
+
+        @Override
+        public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
+            return new MockFailure(getTarget(), getSequence(), getRetry(), cause);
+        }
+
+        @Override
+        protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
+            return null;
+        }
+
+        @Override
+        protected MockRequest cloneAsVersion(final ABIVersion version) {
+            return this;
+        }
+
+        @Override
+        protected MockRequest cloneAsRetry(final long retry) {
+            return new MockRequest(this, retry);
+        }
+    };
+
+    @Mock
+    private ActorRef mockReplyTo;
+    @Mock
+    private WritableIdentifier mockIdentifier;
+    @Mock
+    private RequestException mockCause;
+    @Mock
+    private RequestCallback mockCallback;
+    @Mock
+    private ClientActorBehavior mockBehavior;
+
+    private TestTicker ticker;
+    private BackendInfo mockBackendInfo;
+    private Request<WritableIdentifier, ?> mockRequest;
+    private Response<WritableIdentifier, ?> mockResponse;
+
+    private static ActorSystem actorSystem;
+    private TestProbe mockActor;
+
+    private SequencedQueueEntry entry;
+
+    @BeforeClass
+    public static void setupClass() {
+        actorSystem = ActorSystem.apply();
+    }
+
+    @AfterClass
+    public static void teardownClass() {
+        actorSystem.terminate();
+    }
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class));
+
+        ticker = new TestTicker();
+        ticker.increment(ThreadLocalRandom.current().nextLong());
+
+        mockActor = TestProbe.apply(actorSystem);
+        mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+        mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo);
+        mockResponse = mockRequest.toRequestFailure(mockCause);
+
+        entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read());
+    }
+
+    @After
+    public void teardown() {
+        actorSystem.stop(mockActor.ref());
+    }
+
+    @Test
+    public void testGetSequence() {
+        assertEquals(mockRequest.getSequence(), entry.getSequence());
+    }
+
+    @Test
+    public void testGetCurrentTry() {
+        assertEquals(0, entry.getCurrentTry());
+        entry.retransmit(mockBackendInfo, ticker.read());
+        assertEquals(0, entry.getCurrentTry());
+        entry.retransmit(mockBackendInfo, ticker.read());
+        assertEquals(1, entry.getCurrentTry());
+        entry.retransmit(mockBackendInfo, ticker.read());
+        assertEquals(2, entry.getCurrentTry());
+    }
+
+    @Test
+    public void testComplete() {
+        entry.complete(mockResponse);
+        verify(mockCallback).complete(mockResponse);
+    }
+
+    @Test
+    public void testPoison() {
+        entry.poison(mockCause);
+
+        final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
+        verify(mockCallback).complete(captor.capture());
+        assertSame(mockCause, captor.getValue().getCause());
+    }
+
+    @Test
+    public void testIsTimedOut() {
+        assertTrue(entry.isTimedOut(ticker.read(), 0));
+        assertFalse(entry.isTimedOut(ticker.read(), 1));
+
+        entry.retransmit(mockBackendInfo, ticker.read());
+        assertTrue(entry.isTimedOut(ticker.read(), 0));
+        ticker.increment(10);
+        assertTrue(entry.isTimedOut(ticker.read(), 10));
+        assertFalse(entry.isTimedOut(ticker.read(), 20));
+
+        entry.retransmit(mockBackendInfo, ticker.read());
+        assertTrue(entry.isTimedOut(ticker.read(), 0));
+        ticker.increment(10);
+        assertTrue(entry.isTimedOut(ticker.read(), 10));
+        assertFalse(entry.isTimedOut(ticker.read(), 11));
+    }
+
+    @Test
+    public void testRetransmit() {
+        assertFalse(mockActor.msgAvailable());
+        entry.retransmit(mockBackendInfo, ticker.read());
+
+        assertTrue(mockActor.msgAvailable());
+        assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS)));
+    }
+
+     private static void assertRequestEquals(final Request<?, ?> expected, final Object o) {
+         final Request<?, ?> actual = (Request<?, ?>) o;
+         assertEquals(expected.getRetry(), actual.getRetry());
+         assertEquals(expected.getSequence(), actual.getSequence());
+         assertEquals(expected.getTarget(), actual.getTarget());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java
new file mode 100644 (file)
index 0000000..f1caeb5
--- /dev/null
@@ -0,0 +1,448 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.TestProbe;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.common.actor.TestTicker;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Test suite covering logic contained in {@link SequencedQueue}. It assumes {@link SequencedQueueEntryTest} passes.
+ *
+ * @author Robert Varga
+ */
+public class SequencedQueueTest {
+    private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
+        private static final long serialVersionUID = 1L;
+
+        MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) {
+            super(target, sequence, retry, cause);
+        }
+
+        @Override
+        protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(final ABIVersion version) {
+            return null;
+        }
+
+        @Override
+        protected MockFailure cloneAsVersion(final ABIVersion version) {
+            return this;
+        }
+    }
+
+    private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
+        private static final long serialVersionUID = 1L;
+
+        MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) {
+            super(target, sequence, 0, replyTo);
+        }
+
+
+        MockRequest(final MockRequest request, final long retry) {
+            super(request, retry);
+        }
+
+        @Override
+        public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
+            return new MockFailure(getTarget(), getSequence(), getRetry(), cause);
+        }
+
+        @Override
+        protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
+            return null;
+        }
+
+        @Override
+        protected MockRequest cloneAsVersion(final ABIVersion version) {
+            return this;
+        }
+
+        @Override
+        protected MockRequest cloneAsRetry(final long retry) {
+            return new MockRequest(this, retry);
+        }
+    };
+
+    @Mock
+    private ActorRef mockReplyTo;
+    @Mock
+    private WritableIdentifier mockIdentifier;
+    @Mock
+    private RequestException mockCause;
+    @Mock
+    private RequestCallback mockCallback;
+    @Mock
+    private ClientActorBehavior mockBehavior;
+
+    private TestTicker ticker;
+    private BackendInfo mockBackendInfo;
+    private MockRequest mockRequest;
+    private MockRequest mockRequest2;
+    private Response<WritableIdentifier, ?> mockResponse;
+    private Long mockCookie;
+
+    private static ActorSystem actorSystem;
+    private TestProbe mockActor;
+
+    private SequencedQueue queue;
+
+    @BeforeClass
+    public static void setupClass() {
+        actorSystem = ActorSystem.apply();
+    }
+
+    @AfterClass
+    public static void teardownClass() {
+        actorSystem.terminate();
+    }
+
+    @Before
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class));
+
+        ticker = new TestTicker();
+        ticker.increment(ThreadLocalRandom.current().nextLong());
+
+        mockActor = TestProbe.apply(actorSystem);
+        mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+        mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo);
+        mockRequest2 = new MockRequest(mockIdentifier, mockRequest.getSequence() + 1, mockReplyTo);
+        mockResponse = mockRequest.toRequestFailure(mockCause);
+        mockCookie = ThreadLocalRandom.current().nextLong();
+
+        queue = new SequencedQueue(mockCookie, ticker);
+    }
+
+    @After
+    public void teardown() {
+        actorSystem.stop(mockActor.ref());
+    }
+
+    @Test
+    public void testGetCookie() {
+        assertSame(mockCookie, queue.getCookie());
+    }
+
+    @Test
+    public void testEmptyClose() {
+        assertFalse(queue.hasCompleted());
+        queue.close();
+        assertTrue(queue.hasCompleted());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testClosedEnqueueRequest() {
+        queue.close();
+
+        // Kaboom
+        queue.enqueueRequest(mockRequest, mockCallback);
+    }
+
+    @Test
+    public void testCloseIdempotent() {
+        queue.close();
+        queue.close();
+    }
+
+    @Test
+    public void testPoison() {
+        queue.enqueueRequest(mockRequest, mockCallback);
+        queue.poison(mockCause);
+
+        final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
+        verify(mockCallback).complete(captor.capture());
+        assertSame(mockCause, captor.getValue().getCause());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testPoisonPerformsClose() {
+        // Implies close()
+        queue.poison(mockCause);
+
+        // Kaboom
+        queue.enqueueRequest(mockRequest, mockCallback);
+    }
+
+    @Test
+    public void testPoisonIdempotent() {
+        queue.poison(mockCause);
+        queue.poison(mockCause);
+    }
+
+    @Test
+    public void testEnqueueRequestNeedsBackend() {
+        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+
+        assertNotNull(ret);
+        assertFalse(ret.isPresent());
+    }
+
+    @Test
+    public void testExpectProof() {
+        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+        assertTrue(queue.expectProof(proof));
+        assertFalse(queue.expectProof(proof));
+    }
+
+    @Test(expected=NullPointerException.class)
+    public void testSetBackendNull() {
+        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+        assertTrue(queue.expectProof(proof));
+        queue.setBackendInfo(proof, null);
+    }
+
+    @Test
+    public void testSetBackendWithNoResolution() {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+        final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
+        assertNotNull(ret);
+        assertFalse(ret.isPresent());
+    }
+
+    @Test
+    public void testSetBackendWithWrongProof() {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+        assertTrue(queue.expectProof(proof));
+
+        final Optional<FiniteDuration> ret = queue.setBackendInfo(new CompletableFuture<>(), mockBackendInfo);
+        assertNotNull(ret);
+        assertFalse(ret.isPresent());
+    }
+
+    @Test
+    public void testSetBackendWithNoRequests() {
+        // this utility method covers the entire test
+        setupBackend();
+    }
+
+    @Test
+    public void testSetbackedWithRequestsNoTimer() {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+        assertTrue(queue.expectProof(proof));
+        assertFalse(mockActor.msgAvailable());
+
+        final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
+        assertNotNull(ret);
+        assertTrue(ret.isPresent());
+
+        assertTransmit(mockRequest);
+    }
+
+    @Test
+    public void testEnqueueRequestNeedsTimer() {
+        setupBackend();
+
+        final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+        assertNotNull(ret);
+        assertTrue(ret.isPresent());
+        assertTransmit(mockRequest);
+    }
+
+    @Test
+    public void testEnqueueRequestWithoutTimer() {
+        setupBackend();
+
+        // First request
+        Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+        assertNotNull(ret);
+        assertTrue(ret.isPresent());
+        assertTransmit(mockRequest);
+
+        // Second request, no timer fired
+        ret = queue.enqueueRequest(mockRequest2, mockCallback);
+        assertNull(ret);
+        assertTransmit(mockRequest2);
+    }
+
+    @Test
+    public void testRunTimeoutEmpty() throws NoProgressException {
+        final boolean ret = queue.runTimeout();
+        assertFalse(ret);
+    }
+
+    @Test
+    public void testRunTimeoutWithoutShift() throws NoProgressException {
+        queue.enqueueRequest(mockRequest, mockCallback);
+        final boolean ret = queue.runTimeout();
+        assertFalse(ret);
+    }
+
+    @Test
+    public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
+
+        final boolean ret = queue.runTimeout();
+        assertFalse(ret);
+    }
+
+    @Test
+    public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
+
+        final boolean ret = queue.runTimeout();
+        assertTrue(ret);
+    }
+
+    @Test
+    public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
+
+        final boolean ret = queue.runTimeout();
+        assertTrue(ret);
+    }
+
+    @Test(expected=NoProgressException.class)
+    public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
+
+        // Kaboom
+        queue.runTimeout();
+    }
+
+    @Test(expected=NoProgressException.class)
+    public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
+
+        // Kaboom
+        queue.runTimeout();
+    }
+
+    @Test
+    public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
+        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
+
+        // No problem
+        final boolean ret = queue.runTimeout();
+        assertFalse(ret);
+    }
+
+    @Test
+    public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
+        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
+
+        // No problem
+        final boolean ret = queue.runTimeout();
+        assertFalse(ret);
+    }
+
+    @Test
+    public void testCompleteEmpty() {
+        final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+        assertSame(mockBehavior, ret);
+        verifyNoMoreInteractions(mockCallback);
+    }
+
+    @Test
+    public void testCompleteSingle() {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+        verify(mockCallback).complete(mockResponse);
+        assertSame(mockBehavior, ret);
+
+        ret = queue.complete(mockBehavior, mockResponse);
+        assertSame(mockBehavior, ret);
+        verifyNoMoreInteractions(mockCallback);
+    }
+
+    @Test
+    public void testCompleteNull() {
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        doReturn(null).when(mockCallback).complete(mockResponse);
+
+        ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+        verify(mockCallback).complete(mockResponse);
+        assertNull(ret);
+    }
+
+    @Test
+    public void testProgressRecord() throws NoProgressException {
+        setupBackend();
+
+        queue.enqueueRequest(mockRequest, mockCallback);
+
+        ticker.increment(10);
+        queue.enqueueRequest(mockRequest2, mockCallback);
+        queue.complete(mockBehavior, mockResponse);
+
+        ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
+        assertTrue(queue.runTimeout());
+    }
+
+    private void setupBackend() {
+        final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+        assertTrue(queue.expectProof(proof));
+        final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
+        assertNotNull(ret);
+        assertFalse(ret.isPresent());
+        assertFalse(mockActor.msgAvailable());
+    }
+
+    private void assertTransmit(final Request<?, ?> expected) {
+        assertTrue(mockActor.msgAvailable());
+        assertRequestEquals(expected, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
+    }
+
+    private static void assertRequestEquals(final Request<?, ?> expected, final Object o) {
+        final Request<?, ?> actual = (Request<?, ?>) o;
+        assertEquals(expected.getRetry(), actual.getRetry());
+        assertEquals(expected.getSequence(), actual.getSequence());
+        assertEquals(expected.getTarget(), actual.getTarget());
+    }
+}