From: Robert Varga Date: Sat, 28 May 2016 23:27:24 +0000 (+0200) Subject: BUG-5280: implement message queueing X-Git-Tag: release/boron~80 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=50a2f5eb1c94650bc1be1e49d3a5382a1a74a9b3;hp=c6e3a444f7e8702aade9839ca950bc9790be8831 BUG-5280: implement message queueing This patch implements the basic queueing and timeout retry mechanism in ClientActorBehavior. This implementation is not very efficient, as each send goes through the actor's mailbox, but it gets the job done and is correct. It will be optimized in a follow-up patch, which will refactor internal workings so that SequencedQueue is fully thread-safe and correct with regard to request enqueue, timeouts and retries. Change-Id: I207a30877328dbdc08d42f76a0db55b5ae162de5 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index ff5f8820dd..364e462e57 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -66,16 +66,17 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple // FIXME: Add state flushing here once we have state } - private ClientActorBehavior createLocalHistory(final CompletableFuture future) { + private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior, + final CompletableFuture future) { final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++); LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future); // FIXME: initiate backend instantiation future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); - return this; + return currentBehavior; } - private ClientActorBehavior shutdown() { + private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) { // FIXME: Add shutdown procedures here return null; } @@ -100,7 +101,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple @Override public CompletionStage createLocalHistory() { final CompletableFuture future = new CompletableFuture<>(); - context().executeInActor(() -> createLocalHistory(future)); + context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future)); return future; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java index 237b570320..c3df604ef2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java @@ -8,15 +8,20 @@ package org.opendaylight.controller.cluster.datastore.actors.client; import com.google.common.annotations.Beta; +import java.util.Optional; +import java.util.concurrent.CompletionStage; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * A behavior, which handles messages sent to a {@link AbstractClientActor}. @@ -34,27 +39,121 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior failure = (RequestFailure) command; - final RequestException cause = failure.getCause(); - if (cause instanceof RetiredGenerationException) { - LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); - haltClient(cause); - return null; - } + return ((InternalCommand) command).execute(this); + } + if (command instanceof RequestSuccess) { + return onRequestSuccess((RequestSuccess) command); + } + if (command instanceof RequestFailure) { + return onRequestFailure((RequestFailure) command); } - // TODO: any client-common logic (such as validation and common dispatch) needs to go here return onCommand(command); } - @Override - public final @Nonnull ClientIdentifier getIdentifier() { - return context().getIdentifier(); + private ClientActorBehavior onRequestSuccess(final RequestSuccess success) { + return context().completeRequest(this, success); + } + + private ClientActorBehavior onRequestFailure(final RequestFailure failure) { + final RequestException cause = failure.getCause(); + if (cause instanceof RetiredGenerationException) { + LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); + haltClient(cause); + context().poison(cause); + return null; + } + + if (failure.isHardFailure()) { + return context().completeRequest(this, failure); + } + + // TODO: add instanceof checks on cause to detect more problems + + LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), failure); + return context().completeRequest(this, failure); + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private ClientActorBehavior doSendRequest(final TransactionRequest request, final RequestCallback callback) { + // Get or allocate queue for the request + final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie()); + + // Note this is a tri-state return and can be null + final Optional result = queue.enqueueRequest(request, callback); + if (result == null) { + // Happy path: we are done here + return this; + } + + if (result.isPresent()) { + // Less happy path: we need to schedule a timer + scheduleQueueTimeout(queue, result.get()); + return this; + } + + startResolve(queue, request.getTarget().getHistoryId().getCookie()); + return this; + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private void startResolve(final SequencedQueue queue, final long cookie) { + // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a + // previous request to resolve. + final CompletionStage f = resolver().getBackendInfo(cookie); + + // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has + // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to + // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and + // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will + // bulk-process all requests. + if (queue.expectProof(f)) { + f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend))); + } + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private ClientActorBehavior finishResolve(final SequencedQueue queue, + final CompletionStage futureBackend, final BackendInfo backend) { + + final Optional maybeTimeout = queue.setBackendInfo(futureBackend, backend); + if (maybeTimeout.isPresent()) { + scheduleQueueTimeout(queue, maybeTimeout.get()); + } + return this; + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) { + LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout); + context().executeInActor(cb -> cb.queueTimeout(queue), timeout); + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private ClientActorBehavior queueTimeout(final SequencedQueue queue) { + final boolean needBackend; + + try { + needBackend = queue.runTimeout(); + } catch (NoProgressException e) { + // Uh-oh, no progress. The queue has already killed itself, now we need to remove it + context().removeQueue(queue); + return this; + } + + if (needBackend) { + startResolve(queue, queue.getCookie()); + } + + return this; } /** @@ -81,4 +180,15 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior resolver(); + + /** + * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke + * from any thread. + * + * @param request Request to send + * @param callback Callback to invoke + */ + public final void sendRequest(final TransactionRequest request, final RequestCallback callback) { + context().executeInActor(cb -> cb.doSendRequest(request, callback)); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java index 3aa1a5200b..9f4fd137a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java @@ -8,12 +8,25 @@ package org.opendaylight.controller.cluster.datastore.actors.client; import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Scheduler; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.concepts.WritableIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.FiniteDuration; /** * An actor context associated with this {@link AbstractClientActor}. @@ -25,13 +38,22 @@ import org.opendaylight.yangtools.concepts.Identifiable; * @author Robert Varga */ @Beta +@ThreadSafe public class ClientActorContext extends AbstractClientActorContext implements Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class); + + private final Map queues = new ConcurrentHashMap<>(); private final ClientIdentifier identifier; + private final ExecutionContext executionContext; + private final Scheduler scheduler; // Hidden to avoid subclassing - ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) { + ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext, + final String persistenceId, final ClientIdentifier identifier) { super(self, persistenceId); this.identifier = Preconditions.checkNotNull(identifier); + this.scheduler = Preconditions.checkNotNull(scheduler); + this.executionContext = Preconditions.checkNotNull(executionContext); } @Override @@ -58,4 +80,42 @@ public class ClientActorContext extends AbstractClientActorContext implements Id public void executeInActor(final @Nonnull InternalCommand command) { self().tell(Preconditions.checkNotNull(command), ActorRef.noSender()); } + + public Cancellable executeInActor(final @Nonnull InternalCommand command, final FiniteDuration delay) { + return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command), + executionContext, ActorRef.noSender()); + } + + SequencedQueue queueFor(final Long cookie) { + return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker())); + } + + void removeQueue(final SequencedQueue queue) { + queues.remove(queue.getCookie(), queue); + } + + ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response response) { + final WritableIdentifier id = response.getTarget(); + + // FIXME: this will need to be updated for other Request/Response types to extract cookie + Preconditions.checkArgument(id instanceof TransactionIdentifier); + final TransactionIdentifier txId = (TransactionIdentifier) id; + + final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie()); + if (queue == null) { + LOG.info("{}: Ignoring unknown response {}", persistenceId(), response); + return current; + } else { + return queue.complete(current, response); + } + } + + void poison(final RequestException cause) { + for (SequencedQueue q : queues.values()) { + q.poison(cause); + } + + queues.clear(); + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java index 636dd1e34f..5dce1cd3f1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InitialClientActorContext.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore.actors.client; +import akka.actor.ActorSystem; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; @@ -26,7 +27,11 @@ final class InitialClientActorContext extends AbstractClientActorContext { actor.saveSnapshot(snapshot); } - ClientActorBehavior createBehavior(final ClientActorContext context) { + ClientActorBehavior createBehavior(final ClientIdentifier clientId) { + final ActorSystem system = actor.getContext().system(); + final ClientActorContext context = new ClientActorContext(self(), system.scheduler(), system.dispatcher(), + persistenceId(), clientId); + return actor.initialBehavior(context); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InternalCommand.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InternalCommand.java index c72b854dd8..4e4757cdec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InternalCommand.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/InternalCommand.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore.actors.client; +import javax.annotation.Nonnull; import javax.annotation.Nullable; /** @@ -20,7 +21,8 @@ public interface InternalCommand { /** * Run command actions. * + * @param currentBehavior Current Behavior * @return Next behavior to use in the client actor */ - @Nullable ClientActorBehavior execute(); + @Nullable ClientActorBehavior execute(@Nonnull ClientActorBehavior currentBehavior); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/NoProgressException.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/NoProgressException.java new file mode 100644 index 0000000000..7a80f7aac7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/NoProgressException.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * Internal {@link RequestException} used as poison cause when the client fails to make progress for a long time. + * See {@link SequencedQueue} for details. + * + * @author Robert Varga + */ +final class NoProgressException extends RequestException { + private static final long serialVersionUID = 1L; + + protected NoProgressException(final long nanos) { + super(String.format("No progress in %s seconds", TimeUnit.NANOSECONDS.toSeconds(nanos))); + } + + @Override + public boolean isRetriable() { + return false; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RequestCallback.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RequestCallback.java new file mode 100644 index 0000000000..074eab8f1f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/RequestCallback.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.concepts.Response; + +@FunctionalInterface +public interface RequestCallback { + /** + * Invoked when a particular request completes. + * + * @param response Response to the request + * @return Next client actor behavior + */ + @Nullable ClientActorBehavior complete(@Nonnull Response response); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java index ac5f12fb9d..f8977344c3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SavingClientActorBehavior.java @@ -33,7 +33,7 @@ final class SavingClientActorBehavior extends RecoveredClientActorBehavior queue = new LinkedList<>(); + private final Ticker ticker; + private final Long cookie; + + // Updated/consulted from actor context only + /** + * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when + * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested + * result. + */ + private CompletionStage backendProof; + private BackendInfo backend; + + /** + * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue. + */ + private Object expectingTimer; + + private long lastProgress; + + // Updated from application thread + private volatile boolean notClosed = true; + + SequencedQueue(final Long cookie, final Ticker ticker) { + this.cookie = Preconditions.checkNotNull(cookie); + this.ticker = Preconditions.checkNotNull(ticker); + lastProgress = ticker.read(); + } + + Long getCookie() { + return cookie; + } + + private void checkNotClosed() { + Preconditions.checkState(notClosed, "Queue %s is closed", this); + } + + /** + * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller + * the following scenarios: + * 1) The request has been enqueued and transmitted. No further actions are necessary + * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer + * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that + * process needs to complete before transmission occurs + * + * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode + * the scenarios above according to the following rules: + * - if is null, the first case applies + * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend + * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)} + * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer + * + * @param request Request to be sent + * @param callback Callback to be invoked + * @return Optional duration with semantics described above. + */ + @Nullable Optional enqueueRequest(final Request request, final RequestCallback callback) { + final long now = ticker.read(); + final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now); + + // We could have check first, but argument checking needs to happen first + checkNotClosed(); + queue.add(e); + LOG.debug("Enqueued request {} to queue {}", request, this); + + if (backend == null) { + return Optional.empty(); + } + + e.retransmit(backend, now); + if (expectingTimer == null) { + expectingTimer = now + REQUEST_TIMEOUT_NANOS; + return Optional.of(INITIAL_REQUEST_TIMEOUT); + } else { + return null; + } + } + + ClientActorBehavior complete(final ClientActorBehavior current, final Response response) { + // Responses to different targets may arrive out of order, hence we use an iterator + final Iterator it = queue.iterator(); + while (it.hasNext()) { + final SequencedQueueEntry e = it.next(); + if (e.acceptsResponse(response)) { + lastProgress = ticker.read(); + it.remove(); + LOG.debug("Completing request {} with {}", e, response); + return e.complete(response); + } + } + + LOG.debug("No request matching {} found", response); + return current; + } + + Optional setBackendInfo(final CompletionStage proof, final BackendInfo backend) { + if (!proof.equals(backendProof)) { + LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof); + return Optional.empty(); + } + + this.backend = Preconditions.checkNotNull(backend); + backendProof = null; + LOG.debug("Resolved backend {}", backend); + + if (queue.isEmpty()) { + // No pending requests, hence no need for a timer + return Optional.empty(); + } + + LOG.debug("Resending requests to backend {}", backend); + final long now = ticker.read(); + for (SequencedQueueEntry e : queue) { + e.retransmit(backend, now); + } + + if (expectingTimer != null) { + // We already have a timer going, no need to schedule a new one + return Optional.empty(); + } + + // Above loop may have cost us some time. Recalculate timeout. + final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS; + expectingTimer = nextTicks; + return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS)); + } + + boolean expectProof(final CompletionStage proof) { + if (!proof.equals(backendProof)) { + LOG.debug("Setting resolution handle to {}", proof); + backendProof = proof; + return true; + } else { + LOG.trace("Already resolving handle {}", proof); + return false; + } + } + + boolean hasCompleted() { + return !notClosed && queue.isEmpty(); + } + + /** + * Check queue timeouts and return true if a timeout has occured. + * + * @return True if a timeout occured + * @throws NoProgressException if the queue failed to make progress for an extended + * time. + */ + boolean runTimeout() throws NoProgressException { + expectingTimer = null; + final long now = ticker.read(); + + if (!queue.isEmpty()) { + final long ticksSinceProgress = now - lastProgress; + if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) { + LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this, + TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress)); + + final NoProgressException ex = new NoProgressException(ticksSinceProgress); + poison(ex); + throw ex; + } + } + + // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue + final SequencedQueueEntry head = queue.peek(); + if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) { + backend = null; + LOG.debug("Queue {} invalidated backend info", this); + return true; + } else { + return false; + } + } + + void poison(final RequestException cause) { + close(); + + SequencedQueueEntry e = queue.poll(); + while (e != null) { + e.poison(cause); + e = queue.poll(); + } + } + + // FIXME: add a caller from ClientSingleTransaction + void close() { + notClosed = false; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java new file mode 100644 index 0000000000..54940cd8ff --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntry.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import akka.actor.ActorRef; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information. + * + * @author Robert Varga + * + * @param Target identifier type + */ +final class SequencedQueueEntry { + private static final class LastTry { + final Request request; + final long timeTicks; + + LastTry(final Request request, final long when) { + this.request = Preconditions.checkNotNull(request); + this.timeTicks = when; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class); + + private final Request request; + private final RequestCallback callback; + private final long enqueuedTicks; + + private Optional lastTry = Optional.empty(); + + SequencedQueueEntry(final Request request, final RequestCallback callback, final long now) { + this.request = Preconditions.checkNotNull(request); + this.callback = Preconditions.checkNotNull(callback); + this.enqueuedTicks = now; + } + + long getSequence() { + return request.getSequence(); + } + + boolean acceptsResponse(final Response response) { + return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget()); + } + + long getCurrentTry() { + final Request req = lastTry.isPresent() ? lastTry.get().request : request; + return req.getRetry(); + } + + ClientActorBehavior complete(final Response response) { + LOG.debug("Completing request {} with {}", request, response); + return callback.complete(response); + } + + void poison(final RequestException cause) { + LOG.trace("Poisoning request {}", request, cause); + callback.complete(request.toRequestFailure(cause)); + } + + boolean isTimedOut(final long now, final long timeoutNanos) { + final Request req; + final long elapsed; + + if (lastTry.isPresent()) { + final LastTry t = lastTry.get(); + elapsed = now - t.timeTicks; + req = t.request; + } else { + elapsed = now - enqueuedTicks; + req = request; + } + + if (elapsed >= timeoutNanos) { + LOG.debug("Request {} timed out after {}ns", req, elapsed); + return true; + } else { + return false; + } + } + + void retransmit(final BackendInfo backend, final long now) { + final Request nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request; + final Request toSend = nextTry.toVersion(backend.getVersion()); + final ActorRef actor = backend.getActor(); + + LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor); + actor.tell(toSend, ActorRef.noSender()); + lastTry = Optional.of(new LastTry(toSend, now)); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContextTest.java index f575815cb1..8078f39679 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContextTest.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.cluster.datastore.actors.client; import static org.junit.Assert.assertSame; import akka.actor.ActorRef; +import akka.actor.Scheduler; +import akka.dispatch.Dispatcher; import com.google.common.base.Ticker; import org.junit.Before; import org.junit.Test; @@ -30,6 +32,12 @@ public class ClientActorContextTest { @Mock private ActorRef mockSelf; + @Mock + private Scheduler mockScheduler; + + @Mock + private Dispatcher mockDispatcher; + @Before public void setup() { MockitoAnnotations.initMocks(this); @@ -37,7 +45,7 @@ public class ClientActorContextTest { @Test public void testMockingControl() { - ClientActorContext ctx = new ClientActorContext(mockSelf, PERSISTENCE_ID, CLIENT_ID); + ClientActorContext ctx = new ClientActorContext(mockSelf, mockScheduler, mockDispatcher, PERSISTENCE_ID, CLIENT_ID); assertSame(CLIENT_ID, ctx.getIdentifier()); assertSame(PERSISTENCE_ID, ctx.persistenceId()); assertSame(mockSelf, ctx.self()); @@ -45,7 +53,7 @@ public class ClientActorContextTest { @Test public void testTicker() { - ClientActorContext ctx = new ClientActorContext(mockSelf, PERSISTENCE_ID, CLIENT_ID); + ClientActorContext ctx = new ClientActorContext(mockSelf, mockScheduler, mockDispatcher, PERSISTENCE_ID, CLIENT_ID); assertSame(Ticker.systemTicker(), ctx.ticker()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java new file mode 100644 index 0000000000..68a665e445 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueEntryTest.java @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.TestProbe; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy; +import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.common.actor.TestTicker; +import org.opendaylight.yangtools.concepts.WritableIdentifier; +import scala.concurrent.duration.Duration; + +/** + * Test suite covering logic contained in {@link SequencedQueueEntry}. + * + * @author Robert Varga + */ +public class SequencedQueueEntryTest { + private static class MockFailure extends RequestFailure { + private static final long serialVersionUID = 1L; + + MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) { + super(target, sequence, retry, cause); + } + + @Override + protected AbstractRequestFailureProxy externalizableProxy(final ABIVersion version) { + return null; + } + + @Override + protected MockFailure cloneAsVersion(final ABIVersion version) { + return this; + } + } + + private static class MockRequest extends Request { + private static final long serialVersionUID = 1L; + + MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) { + super(target, sequence, 0, replyTo); + } + + + MockRequest(final MockRequest request, final long retry) { + super(request, retry); + } + + @Override + public RequestFailure toRequestFailure(final RequestException cause) { + return new MockFailure(getTarget(), getSequence(), getRetry(), cause); + } + + @Override + protected AbstractRequestProxy externalizableProxy(final ABIVersion version) { + return null; + } + + @Override + protected MockRequest cloneAsVersion(final ABIVersion version) { + return this; + } + + @Override + protected MockRequest cloneAsRetry(final long retry) { + return new MockRequest(this, retry); + } + }; + + @Mock + private ActorRef mockReplyTo; + @Mock + private WritableIdentifier mockIdentifier; + @Mock + private RequestException mockCause; + @Mock + private RequestCallback mockCallback; + @Mock + private ClientActorBehavior mockBehavior; + + private TestTicker ticker; + private BackendInfo mockBackendInfo; + private Request mockRequest; + private Response mockResponse; + + private static ActorSystem actorSystem; + private TestProbe mockActor; + + private SequencedQueueEntry entry; + + @BeforeClass + public static void setupClass() { + actorSystem = ActorSystem.apply(); + } + + @AfterClass + public static void teardownClass() { + actorSystem.terminate(); + } + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class)); + + ticker = new TestTicker(); + ticker.increment(ThreadLocalRandom.current().nextLong()); + + mockActor = TestProbe.apply(actorSystem); + mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current()); + mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo); + mockResponse = mockRequest.toRequestFailure(mockCause); + + entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read()); + } + + @After + public void teardown() { + actorSystem.stop(mockActor.ref()); + } + + @Test + public void testGetSequence() { + assertEquals(mockRequest.getSequence(), entry.getSequence()); + } + + @Test + public void testGetCurrentTry() { + assertEquals(0, entry.getCurrentTry()); + entry.retransmit(mockBackendInfo, ticker.read()); + assertEquals(0, entry.getCurrentTry()); + entry.retransmit(mockBackendInfo, ticker.read()); + assertEquals(1, entry.getCurrentTry()); + entry.retransmit(mockBackendInfo, ticker.read()); + assertEquals(2, entry.getCurrentTry()); + } + + @Test + public void testComplete() { + entry.complete(mockResponse); + verify(mockCallback).complete(mockResponse); + } + + @Test + public void testPoison() { + entry.poison(mockCause); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(MockFailure.class); + verify(mockCallback).complete(captor.capture()); + assertSame(mockCause, captor.getValue().getCause()); + } + + @Test + public void testIsTimedOut() { + assertTrue(entry.isTimedOut(ticker.read(), 0)); + assertFalse(entry.isTimedOut(ticker.read(), 1)); + + entry.retransmit(mockBackendInfo, ticker.read()); + assertTrue(entry.isTimedOut(ticker.read(), 0)); + ticker.increment(10); + assertTrue(entry.isTimedOut(ticker.read(), 10)); + assertFalse(entry.isTimedOut(ticker.read(), 20)); + + entry.retransmit(mockBackendInfo, ticker.read()); + assertTrue(entry.isTimedOut(ticker.read(), 0)); + ticker.increment(10); + assertTrue(entry.isTimedOut(ticker.read(), 10)); + assertFalse(entry.isTimedOut(ticker.read(), 11)); + } + + @Test + public void testRetransmit() { + assertFalse(mockActor.msgAvailable()); + entry.retransmit(mockBackendInfo, ticker.read()); + + assertTrue(mockActor.msgAvailable()); + assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS))); + } + + private static void assertRequestEquals(final Request expected, final Object o) { + final Request actual = (Request) o; + assertEquals(expected.getRetry(), actual.getRetry()); + assertEquals(expected.getSequence(), actual.getSequence()); + assertEquals(expected.getTarget(), actual.getTarget()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java new file mode 100644 index 0000000000..f1caeb57fd --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/actors/client/SequencedQueueTest.java @@ -0,0 +1,448 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.actors.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.TestProbe; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy; +import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.common.actor.TestTicker; +import org.opendaylight.yangtools.concepts.WritableIdentifier; +import scala.concurrent.duration.FiniteDuration; + +/** + * Test suite covering logic contained in {@link SequencedQueue}. It assumes {@link SequencedQueueEntryTest} passes. + * + * @author Robert Varga + */ +public class SequencedQueueTest { + private static class MockFailure extends RequestFailure { + private static final long serialVersionUID = 1L; + + MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) { + super(target, sequence, retry, cause); + } + + @Override + protected AbstractRequestFailureProxy externalizableProxy(final ABIVersion version) { + return null; + } + + @Override + protected MockFailure cloneAsVersion(final ABIVersion version) { + return this; + } + } + + private static class MockRequest extends Request { + private static final long serialVersionUID = 1L; + + MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) { + super(target, sequence, 0, replyTo); + } + + + MockRequest(final MockRequest request, final long retry) { + super(request, retry); + } + + @Override + public RequestFailure toRequestFailure(final RequestException cause) { + return new MockFailure(getTarget(), getSequence(), getRetry(), cause); + } + + @Override + protected AbstractRequestProxy externalizableProxy(final ABIVersion version) { + return null; + } + + @Override + protected MockRequest cloneAsVersion(final ABIVersion version) { + return this; + } + + @Override + protected MockRequest cloneAsRetry(final long retry) { + return new MockRequest(this, retry); + } + }; + + @Mock + private ActorRef mockReplyTo; + @Mock + private WritableIdentifier mockIdentifier; + @Mock + private RequestException mockCause; + @Mock + private RequestCallback mockCallback; + @Mock + private ClientActorBehavior mockBehavior; + + private TestTicker ticker; + private BackendInfo mockBackendInfo; + private MockRequest mockRequest; + private MockRequest mockRequest2; + private Response mockResponse; + private Long mockCookie; + + private static ActorSystem actorSystem; + private TestProbe mockActor; + + private SequencedQueue queue; + + @BeforeClass + public static void setupClass() { + actorSystem = ActorSystem.apply(); + } + + @AfterClass + public static void teardownClass() { + actorSystem.terminate(); + } + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class)); + + ticker = new TestTicker(); + ticker.increment(ThreadLocalRandom.current().nextLong()); + + mockActor = TestProbe.apply(actorSystem); + mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current()); + mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo); + mockRequest2 = new MockRequest(mockIdentifier, mockRequest.getSequence() + 1, mockReplyTo); + mockResponse = mockRequest.toRequestFailure(mockCause); + mockCookie = ThreadLocalRandom.current().nextLong(); + + queue = new SequencedQueue(mockCookie, ticker); + } + + @After + public void teardown() { + actorSystem.stop(mockActor.ref()); + } + + @Test + public void testGetCookie() { + assertSame(mockCookie, queue.getCookie()); + } + + @Test + public void testEmptyClose() { + assertFalse(queue.hasCompleted()); + queue.close(); + assertTrue(queue.hasCompleted()); + } + + @Test(expected=IllegalStateException.class) + public void testClosedEnqueueRequest() { + queue.close(); + + // Kaboom + queue.enqueueRequest(mockRequest, mockCallback); + } + + @Test + public void testCloseIdempotent() { + queue.close(); + queue.close(); + } + + @Test + public void testPoison() { + queue.enqueueRequest(mockRequest, mockCallback); + queue.poison(mockCause); + + final ArgumentCaptor captor = ArgumentCaptor.forClass(MockFailure.class); + verify(mockCallback).complete(captor.capture()); + assertSame(mockCause, captor.getValue().getCause()); + } + + @Test(expected=IllegalStateException.class) + public void testPoisonPerformsClose() { + // Implies close() + queue.poison(mockCause); + + // Kaboom + queue.enqueueRequest(mockRequest, mockCallback); + } + + @Test + public void testPoisonIdempotent() { + queue.poison(mockCause); + queue.poison(mockCause); + } + + @Test + public void testEnqueueRequestNeedsBackend() { + final Optional ret = queue.enqueueRequest(mockRequest, mockCallback); + + assertNotNull(ret); + assertFalse(ret.isPresent()); + } + + @Test + public void testExpectProof() { + final CompletableFuture proof = new CompletableFuture<>(); + assertTrue(queue.expectProof(proof)); + assertFalse(queue.expectProof(proof)); + } + + @Test(expected=NullPointerException.class) + public void testSetBackendNull() { + final CompletableFuture proof = new CompletableFuture<>(); + assertTrue(queue.expectProof(proof)); + queue.setBackendInfo(proof, null); + } + + @Test + public void testSetBackendWithNoResolution() { + queue.enqueueRequest(mockRequest, mockCallback); + + final CompletableFuture proof = new CompletableFuture<>(); + final Optional ret = queue.setBackendInfo(proof, mockBackendInfo); + assertNotNull(ret); + assertFalse(ret.isPresent()); + } + + @Test + public void testSetBackendWithWrongProof() { + queue.enqueueRequest(mockRequest, mockCallback); + + final CompletableFuture proof = new CompletableFuture<>(); + assertTrue(queue.expectProof(proof)); + + final Optional ret = queue.setBackendInfo(new CompletableFuture<>(), mockBackendInfo); + assertNotNull(ret); + assertFalse(ret.isPresent()); + } + + @Test + public void testSetBackendWithNoRequests() { + // this utility method covers the entire test + setupBackend(); + } + + @Test + public void testSetbackedWithRequestsNoTimer() { + queue.enqueueRequest(mockRequest, mockCallback); + + final CompletableFuture proof = new CompletableFuture<>(); + assertTrue(queue.expectProof(proof)); + assertFalse(mockActor.msgAvailable()); + + final Optional ret = queue.setBackendInfo(proof, mockBackendInfo); + assertNotNull(ret); + assertTrue(ret.isPresent()); + + assertTransmit(mockRequest); + } + + @Test + public void testEnqueueRequestNeedsTimer() { + setupBackend(); + + final Optional ret = queue.enqueueRequest(mockRequest, mockCallback); + assertNotNull(ret); + assertTrue(ret.isPresent()); + assertTransmit(mockRequest); + } + + @Test + public void testEnqueueRequestWithoutTimer() { + setupBackend(); + + // First request + Optional ret = queue.enqueueRequest(mockRequest, mockCallback); + assertNotNull(ret); + assertTrue(ret.isPresent()); + assertTransmit(mockRequest); + + // Second request, no timer fired + ret = queue.enqueueRequest(mockRequest2, mockCallback); + assertNull(ret); + assertTransmit(mockRequest2); + } + + @Test + public void testRunTimeoutEmpty() throws NoProgressException { + final boolean ret = queue.runTimeout(); + assertFalse(ret); + } + + @Test + public void testRunTimeoutWithoutShift() throws NoProgressException { + queue.enqueueRequest(mockRequest, mockCallback); + final boolean ret = queue.runTimeout(); + assertFalse(ret); + } + + @Test + public void testRunTimeoutWithTimeoutLess() throws NoProgressException { + queue.enqueueRequest(mockRequest, mockCallback); + + ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1); + + final boolean ret = queue.runTimeout(); + assertFalse(ret); + } + + @Test + public void testRunTimeoutWithTimeoutExact() throws NoProgressException { + queue.enqueueRequest(mockRequest, mockCallback); + + ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS); + + final boolean ret = queue.runTimeout(); + assertTrue(ret); + } + + @Test + public void testRunTimeoutWithTimeoutMore() throws NoProgressException { + queue.enqueueRequest(mockRequest, mockCallback); + + ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1); + + final boolean ret = queue.runTimeout(); + assertTrue(ret); + } + + @Test(expected=NoProgressException.class) + public void testRunTimeoutWithoutProgressExact() throws NoProgressException { + queue.enqueueRequest(mockRequest, mockCallback); + + ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS); + + // Kaboom + queue.runTimeout(); + } + + @Test(expected=NoProgressException.class) + public void testRunTimeoutWithoutProgressMore() throws NoProgressException { + queue.enqueueRequest(mockRequest, mockCallback); + + ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1); + + // Kaboom + queue.runTimeout(); + } + + @Test + public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException { + ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS); + + // No problem + final boolean ret = queue.runTimeout(); + assertFalse(ret); + } + + @Test + public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException { + ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1); + + // No problem + final boolean ret = queue.runTimeout(); + assertFalse(ret); + } + + @Test + public void testCompleteEmpty() { + final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse); + assertSame(mockBehavior, ret); + verifyNoMoreInteractions(mockCallback); + } + + @Test + public void testCompleteSingle() { + queue.enqueueRequest(mockRequest, mockCallback); + + ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse); + verify(mockCallback).complete(mockResponse); + assertSame(mockBehavior, ret); + + ret = queue.complete(mockBehavior, mockResponse); + assertSame(mockBehavior, ret); + verifyNoMoreInteractions(mockCallback); + } + + @Test + public void testCompleteNull() { + queue.enqueueRequest(mockRequest, mockCallback); + + doReturn(null).when(mockCallback).complete(mockResponse); + + ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse); + verify(mockCallback).complete(mockResponse); + assertNull(ret); + } + + @Test + public void testProgressRecord() throws NoProgressException { + setupBackend(); + + queue.enqueueRequest(mockRequest, mockCallback); + + ticker.increment(10); + queue.enqueueRequest(mockRequest2, mockCallback); + queue.complete(mockBehavior, mockResponse); + + ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11); + assertTrue(queue.runTimeout()); + } + + private void setupBackend() { + final CompletableFuture proof = new CompletableFuture<>(); + assertTrue(queue.expectProof(proof)); + final Optional ret = queue.setBackendInfo(proof, mockBackendInfo); + assertNotNull(ret); + assertFalse(ret.isPresent()); + assertFalse(mockActor.msgAvailable()); + } + + private void assertTransmit(final Request expected) { + assertTrue(mockActor.msgAvailable()); + assertRequestEquals(expected, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS))); + } + + private static void assertRequestEquals(final Request expected, final Object o) { + final Request actual = (Request) o; + assertEquals(expected.getRetry(), actual.getRetry()); + assertEquals(expected.getSequence(), actual.getSequence()); + assertEquals(expected.getTarget(), actual.getTarget()); + } +}