// FIXME: Add state flushing here once we have state
}
- private ClientActorBehavior createLocalHistory(final CompletableFuture<ClientLocalHistory> future) {
+ private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior,
+ final CompletableFuture<ClientLocalHistory> future) {
final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
// FIXME: initiate backend instantiation
future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
- return this;
+ return currentBehavior;
}
- private ClientActorBehavior shutdown() {
+ private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) {
// FIXME: Add shutdown procedures here
return null;
}
@Override
public CompletionStage<ClientLocalHistory> createLocalHistory() {
final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
- context().executeInActor(() -> createLocalHistory(future));
+ context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future));
return future;
}
package org.opendaylight.controller.cluster.datastore.actors.client;
import com.google.common.annotations.Beta;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
/**
* A behavior, which handles messages sent to a {@link AbstractClientActor}.
super(context);
}
+ @Override
+ public final @Nonnull ClientIdentifier getIdentifier() {
+ return context().getIdentifier();
+ }
+
@Override
final ClientActorBehavior onReceiveCommand(final Object command) {
if (command instanceof InternalCommand) {
- return ((InternalCommand) command).execute();
- } else if (command instanceof RequestFailure) {
- final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) command;
- final RequestException cause = failure.getCause();
- if (cause instanceof RetiredGenerationException) {
- LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
- haltClient(cause);
- return null;
- }
+ return ((InternalCommand) command).execute(this);
+ }
+ if (command instanceof RequestSuccess) {
+ return onRequestSuccess((RequestSuccess<?, ?>) command);
+ }
+ if (command instanceof RequestFailure) {
+ return onRequestFailure((RequestFailure<?, ?>) command);
}
- // TODO: any client-common logic (such as validation and common dispatch) needs to go here
return onCommand(command);
}
- @Override
- public final @Nonnull ClientIdentifier getIdentifier() {
- return context().getIdentifier();
+ private ClientActorBehavior onRequestSuccess(final RequestSuccess<?, ?> success) {
+ return context().completeRequest(this, success);
+ }
+
+ private ClientActorBehavior onRequestFailure(final RequestFailure<?, ?> failure) {
+ final RequestException cause = failure.getCause();
+ if (cause instanceof RetiredGenerationException) {
+ LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
+ haltClient(cause);
+ context().poison(cause);
+ return null;
+ }
+
+ if (failure.isHardFailure()) {
+ return context().completeRequest(this, failure);
+ }
+
+ // TODO: add instanceof checks on cause to detect more problems
+
+ LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), failure);
+ return context().completeRequest(this, failure);
+ }
+
+ // This method is executing in the actor context, hence we can safely interact with the queue
+ private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+ // Get or allocate queue for the request
+ final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
+
+ // Note this is a tri-state return and can be null
+ final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
+ if (result == null) {
+ // Happy path: we are done here
+ return this;
+ }
+
+ if (result.isPresent()) {
+ // Less happy path: we need to schedule a timer
+ scheduleQueueTimeout(queue, result.get());
+ return this;
+ }
+
+ startResolve(queue, request.getTarget().getHistoryId().getCookie());
+ return this;
+ }
+
+ // This method is executing in the actor context, hence we can safely interact with the queue
+ private void startResolve(final SequencedQueue queue, final long cookie) {
+ // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a
+ // previous request to resolve.
+ final CompletionStage<? extends BackendInfo> f = resolver().getBackendInfo(cookie);
+
+ // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has
+ // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to
+ // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and
+ // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will
+ // bulk-process all requests.
+ if (queue.expectProof(f)) {
+ f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend)));
+ }
+ }
+
+ // This method is executing in the actor context, hence we can safely interact with the queue
+ private ClientActorBehavior finishResolve(final SequencedQueue queue,
+ final CompletionStage<? extends BackendInfo> futureBackend, final BackendInfo backend) {
+
+ final Optional<FiniteDuration> maybeTimeout = queue.setBackendInfo(futureBackend, backend);
+ if (maybeTimeout.isPresent()) {
+ scheduleQueueTimeout(queue, maybeTimeout.get());
+ }
+ return this;
+ }
+
+ // This method is executing in the actor context, hence we can safely interact with the queue
+ private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) {
+ LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout);
+ context().executeInActor(cb -> cb.queueTimeout(queue), timeout);
+ }
+
+ // This method is executing in the actor context, hence we can safely interact with the queue
+ private ClientActorBehavior queueTimeout(final SequencedQueue queue) {
+ final boolean needBackend;
+
+ try {
+ needBackend = queue.runTimeout();
+ } catch (NoProgressException e) {
+ // Uh-oh, no progress. The queue has already killed itself, now we need to remove it
+ context().removeQueue(queue);
+ return this;
+ }
+
+ if (needBackend) {
+ startResolve(queue, queue.getCookie());
+ }
+
+ return this;
}
/**
* @return
*/
protected abstract @Nonnull BackendInfoResolver<?> resolver();
+
+ /**
+ * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+ * from any thread.
+ *
+ * @param request Request to send
+ * @param callback Callback to invoke
+ */
+ public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+ context().executeInActor(cb -> cb.doSendRequest(request, callback));
+ }
}
package org.opendaylight.controller.cluster.datastore.actors.client;
import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Scheduler;
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.FiniteDuration;
/**
* An actor context associated with this {@link AbstractClientActor}.
* @author Robert Varga
*/
@Beta
+@ThreadSafe
public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
+ private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
+
+ private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
private final ClientIdentifier identifier;
+ private final ExecutionContext executionContext;
+ private final Scheduler scheduler;
// Hidden to avoid subclassing
- ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) {
+ ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
+ final String persistenceId, final ClientIdentifier identifier) {
super(self, persistenceId);
this.identifier = Preconditions.checkNotNull(identifier);
+ this.scheduler = Preconditions.checkNotNull(scheduler);
+ this.executionContext = Preconditions.checkNotNull(executionContext);
}
@Override
public void executeInActor(final @Nonnull InternalCommand command) {
self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
}
+
+ public Cancellable executeInActor(final @Nonnull InternalCommand command, final FiniteDuration delay) {
+ return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
+ executionContext, ActorRef.noSender());
+ }
+
+ SequencedQueue queueFor(final Long cookie) {
+ return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
+ }
+
+ void removeQueue(final SequencedQueue queue) {
+ queues.remove(queue.getCookie(), queue);
+ }
+
+ ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response<?, ?> response) {
+ final WritableIdentifier id = response.getTarget();
+
+ // FIXME: this will need to be updated for other Request/Response types to extract cookie
+ Preconditions.checkArgument(id instanceof TransactionIdentifier);
+ final TransactionIdentifier txId = (TransactionIdentifier) id;
+
+ final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
+ if (queue == null) {
+ LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
+ return current;
+ } else {
+ return queue.complete(current, response);
+ }
+ }
+
+ void poison(final RequestException cause) {
+ for (SequencedQueue q : queues.values()) {
+ q.poison(cause);
+ }
+
+ queues.clear();
+ }
+
}
*/
package org.opendaylight.controller.cluster.datastore.actors.client;
+import akka.actor.ActorSystem;
import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
actor.saveSnapshot(snapshot);
}
- ClientActorBehavior createBehavior(final ClientActorContext context) {
+ ClientActorBehavior createBehavior(final ClientIdentifier clientId) {
+ final ActorSystem system = actor.getContext().system();
+ final ClientActorContext context = new ClientActorContext(self(), system.scheduler(), system.dispatcher(),
+ persistenceId(), clientId);
+
return actor.initialBehavior(context);
}
*/
package org.opendaylight.controller.cluster.datastore.actors.client;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
/**
/**
* Run command actions.
*
+ * @param currentBehavior Current Behavior
* @return Next behavior to use in the client actor
*/
- @Nullable ClientActorBehavior execute();
+ @Nullable ClientActorBehavior execute(@Nonnull ClientActorBehavior currentBehavior);
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+
+/**
+ * Internal {@link RequestException} used as poison cause when the client fails to make progress for a long time.
+ * See {@link SequencedQueue} for details.
+ *
+ * @author Robert Varga
+ */
+final class NoProgressException extends RequestException {
+ private static final long serialVersionUID = 1L;
+
+ protected NoProgressException(final long nanos) {
+ super(String.format("No progress in %s seconds", TimeUnit.NANOSECONDS.toSeconds(nanos)));
+ }
+
+ @Override
+ public boolean isRetriable() {
+ return false;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+
+@FunctionalInterface
+public interface RequestCallback {
+ /**
+ * Invoked when a particular request completes.
+ *
+ * @param response Response to the request
+ * @return Next client actor behavior
+ */
+ @Nullable ClientActorBehavior complete(@Nonnull Response<?, ?> response);
+}
return null;
} else if (command instanceof SaveSnapshotSuccess) {
context().unstash();
- return context().createBehavior(new ClientActorContext(self(), persistenceId(), myId));
+ return context().createBehavior(myId);
} else {
LOG.debug("{}: stashing command {}", persistenceId(), command);
context().stash();
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+/*
+ * TODO: make this class and its users thread-safe. This will require some atomic state-keeping so that timeouts,
+ * retries and enqueues work as expected.
+ */
+@NotThreadSafe
+final class SequencedQueue {
+ private static final Logger LOG = LoggerFactory.getLogger(SequencedQueue.class);
+
+ // Keep these constant in nanoseconds, as that prevents unnecessary conversions in the fast path
+ @VisibleForTesting
+ static final long NO_PROGRESS_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(15);
+ @VisibleForTesting
+ static final long REQUEST_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(30);
+ private static final FiniteDuration INITIAL_REQUEST_TIMEOUT = FiniteDuration.apply(REQUEST_TIMEOUT_NANOS,
+ TimeUnit.NANOSECONDS);
+
+ /**
+ * We need to keep the sequence of operations towards the backend, hence we use a queue. Since targets can
+ * progress at different speeds, these may be completed out of order.
+ *
+ * TODO: The combination of target and sequence uniquely identifies a particular request, we will need to
+ * figure out a more efficient lookup mechanism to deal with responses which do not match the queue
+ * order.
+ */
+ private final Deque<SequencedQueueEntry> queue = new LinkedList<>();
+ private final Ticker ticker;
+ private final Long cookie;
+
+ // Updated/consulted from actor context only
+ /**
+ * Last scheduled resolution request. We do not use this object aside from requiring it as a proof that when
+ * resolution occurs via {@link #setBackendInfo(CompletionStage, BackendInfo)}, we only update the last requested
+ * result.
+ */
+ private CompletionStage<? extends BackendInfo> backendProof;
+ private BackendInfo backend;
+
+ /**
+ * Last scheduled timer. We use this to prevent multiple timers from being scheduled for this queue.
+ */
+ private Object expectingTimer;
+
+ private long lastProgress;
+
+ // Updated from application thread
+ private volatile boolean notClosed = true;
+
+ SequencedQueue(final Long cookie, final Ticker ticker) {
+ this.cookie = Preconditions.checkNotNull(cookie);
+ this.ticker = Preconditions.checkNotNull(ticker);
+ lastProgress = ticker.read();
+ }
+
+ Long getCookie() {
+ return cookie;
+ }
+
+ private void checkNotClosed() {
+ Preconditions.checkState(notClosed, "Queue %s is closed", this);
+ }
+
+ /**
+ * Enqueue, and possibly transmit a request. Results of this method are tri-state, indicating to the caller
+ * the following scenarios:
+ * 1) The request has been enqueued and transmitted. No further actions are necessary
+ * 2) The request has been enqueued and transmitted, but the caller needs to schedule a new timer
+ * 3) The request has been enqueued,but the caller needs to request resolution of backend information and that
+ * process needs to complete before transmission occurs
+ *
+ * These options are covered via returning an {@link Optional}. The caller needs to examine it and decode
+ * the scenarios above according to the following rules:
+ * - if is null, the first case applies
+ * - if {@link Optional#isPresent()} returns false, the third case applies and the caller should initiate backend
+ * resolution and eventually call {@link #setBackendInfo(CompletionStage, BackendInfo)}
+ * - if {@link Optional#isPresent()} returns true, the second case applies and the caller MUST schedule a timer
+ *
+ * @param request Request to be sent
+ * @param callback Callback to be invoked
+ * @return Optional duration with semantics described above.
+ */
+ @Nullable Optional<FiniteDuration> enqueueRequest(final Request<?, ?> request, final RequestCallback callback) {
+ final long now = ticker.read();
+ final SequencedQueueEntry e = new SequencedQueueEntry(request, callback, now);
+
+ // We could have check first, but argument checking needs to happen first
+ checkNotClosed();
+ queue.add(e);
+ LOG.debug("Enqueued request {} to queue {}", request, this);
+
+ if (backend == null) {
+ return Optional.empty();
+ }
+
+ e.retransmit(backend, now);
+ if (expectingTimer == null) {
+ expectingTimer = now + REQUEST_TIMEOUT_NANOS;
+ return Optional.of(INITIAL_REQUEST_TIMEOUT);
+ } else {
+ return null;
+ }
+ }
+
+ ClientActorBehavior complete(final ClientActorBehavior current, final Response<?, ?> response) {
+ // Responses to different targets may arrive out of order, hence we use an iterator
+ final Iterator<SequencedQueueEntry> it = queue.iterator();
+ while (it.hasNext()) {
+ final SequencedQueueEntry e = it.next();
+ if (e.acceptsResponse(response)) {
+ lastProgress = ticker.read();
+ it.remove();
+ LOG.debug("Completing request {} with {}", e, response);
+ return e.complete(response);
+ }
+ }
+
+ LOG.debug("No request matching {} found", response);
+ return current;
+ }
+
+ Optional<FiniteDuration> setBackendInfo(final CompletionStage<? extends BackendInfo> proof, final BackendInfo backend) {
+ if (!proof.equals(backendProof)) {
+ LOG.debug("Ignoring resolution {} while waiting for {}", proof, this.backendProof);
+ return Optional.empty();
+ }
+
+ this.backend = Preconditions.checkNotNull(backend);
+ backendProof = null;
+ LOG.debug("Resolved backend {}", backend);
+
+ if (queue.isEmpty()) {
+ // No pending requests, hence no need for a timer
+ return Optional.empty();
+ }
+
+ LOG.debug("Resending requests to backend {}", backend);
+ final long now = ticker.read();
+ for (SequencedQueueEntry e : queue) {
+ e.retransmit(backend, now);
+ }
+
+ if (expectingTimer != null) {
+ // We already have a timer going, no need to schedule a new one
+ return Optional.empty();
+ }
+
+ // Above loop may have cost us some time. Recalculate timeout.
+ final long nextTicks = ticker.read() + REQUEST_TIMEOUT_NANOS;
+ expectingTimer = nextTicks;
+ return Optional.of(FiniteDuration.apply(nextTicks - now, TimeUnit.NANOSECONDS));
+ }
+
+ boolean expectProof(final CompletionStage<? extends BackendInfo> proof) {
+ if (!proof.equals(backendProof)) {
+ LOG.debug("Setting resolution handle to {}", proof);
+ backendProof = proof;
+ return true;
+ } else {
+ LOG.trace("Already resolving handle {}", proof);
+ return false;
+ }
+ }
+
+ boolean hasCompleted() {
+ return !notClosed && queue.isEmpty();
+ }
+
+ /**
+ * Check queue timeouts and return true if a timeout has occured.
+ *
+ * @return True if a timeout occured
+ * @throws NoProgressException if the queue failed to make progress for an extended
+ * time.
+ */
+ boolean runTimeout() throws NoProgressException {
+ expectingTimer = null;
+ final long now = ticker.read();
+
+ if (!queue.isEmpty()) {
+ final long ticksSinceProgress = now - lastProgress;
+ if (ticksSinceProgress >= NO_PROGRESS_TIMEOUT_NANOS) {
+ LOG.error("Queue {} has not seen progress in {} seconds, failing all requests", this,
+ TimeUnit.NANOSECONDS.toSeconds(ticksSinceProgress));
+
+ final NoProgressException ex = new NoProgressException(ticksSinceProgress);
+ poison(ex);
+ throw ex;
+ }
+ }
+
+ // We always schedule requests in sequence, hence any timeouts really just mean checking the head of the queue
+ final SequencedQueueEntry head = queue.peek();
+ if (head != null && head.isTimedOut(now, REQUEST_TIMEOUT_NANOS)) {
+ backend = null;
+ LOG.debug("Queue {} invalidated backend info", this);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void poison(final RequestException cause) {
+ close();
+
+ SequencedQueueEntry e = queue.poll();
+ while (e != null) {
+ e.poison(cause);
+ e = queue.poll();
+ }
+ }
+
+ // FIXME: add a caller from ClientSingleTransaction
+ void close() {
+ notClosed = false;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Single entry in {@link SequencedQueue}. Tracks the request, the associated callback and accounting information.
+ *
+ * @author Robert Varga
+ *
+ * @param <I> Target identifier type
+ */
+final class SequencedQueueEntry {
+ private static final class LastTry {
+ final Request<?, ?> request;
+ final long timeTicks;
+
+ LastTry(final Request<?, ?> request, final long when) {
+ this.request = Preconditions.checkNotNull(request);
+ this.timeTicks = when;
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(SequencedQueueEntry.class);
+
+ private final Request<?, ?> request;
+ private final RequestCallback callback;
+ private final long enqueuedTicks;
+
+ private Optional<LastTry> lastTry = Optional.empty();
+
+ SequencedQueueEntry(final Request<?, ?> request, final RequestCallback callback, final long now) {
+ this.request = Preconditions.checkNotNull(request);
+ this.callback = Preconditions.checkNotNull(callback);
+ this.enqueuedTicks = now;
+ }
+
+ long getSequence() {
+ return request.getSequence();
+ }
+
+ boolean acceptsResponse(final Response<?, ?> response) {
+ return getSequence() == response.getSequence() && request.getTarget().equals(response.getTarget());
+ }
+
+ long getCurrentTry() {
+ final Request<?, ?> req = lastTry.isPresent() ? lastTry.get().request : request;
+ return req.getRetry();
+ }
+
+ ClientActorBehavior complete(final Response<?, ?> response) {
+ LOG.debug("Completing request {} with {}", request, response);
+ return callback.complete(response);
+ }
+
+ void poison(final RequestException cause) {
+ LOG.trace("Poisoning request {}", request, cause);
+ callback.complete(request.toRequestFailure(cause));
+ }
+
+ boolean isTimedOut(final long now, final long timeoutNanos) {
+ final Request<?, ?> req;
+ final long elapsed;
+
+ if (lastTry.isPresent()) {
+ final LastTry t = lastTry.get();
+ elapsed = now - t.timeTicks;
+ req = t.request;
+ } else {
+ elapsed = now - enqueuedTicks;
+ req = request;
+ }
+
+ if (elapsed >= timeoutNanos) {
+ LOG.debug("Request {} timed out after {}ns", req, elapsed);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void retransmit(final BackendInfo backend, final long now) {
+ final Request<?, ?> nextTry = lastTry.isPresent() ? lastTry.get().request.incrementRetry() : request;
+ final Request<?, ?> toSend = nextTry.toVersion(backend.getVersion());
+ final ActorRef actor = backend.getActor();
+
+ LOG.trace("Retransmitting request {} as {} to {}", request, toSend, actor);
+ actor.tell(toSend, ActorRef.noSender());
+ lastTry = Optional.of(new LastTry(toSend, now));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(SequencedQueueEntry.class).add("request", request).toString();
+ }
+}
import static org.junit.Assert.assertSame;
import akka.actor.ActorRef;
+import akka.actor.Scheduler;
+import akka.dispatch.Dispatcher;
import com.google.common.base.Ticker;
import org.junit.Before;
import org.junit.Test;
@Mock
private ActorRef mockSelf;
+ @Mock
+ private Scheduler mockScheduler;
+
+ @Mock
+ private Dispatcher mockDispatcher;
+
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
@Test
public void testMockingControl() {
- ClientActorContext ctx = new ClientActorContext(mockSelf, PERSISTENCE_ID, CLIENT_ID);
+ ClientActorContext ctx = new ClientActorContext(mockSelf, mockScheduler, mockDispatcher, PERSISTENCE_ID, CLIENT_ID);
assertSame(CLIENT_ID, ctx.getIdentifier());
assertSame(PERSISTENCE_ID, ctx.persistenceId());
assertSame(mockSelf, ctx.self());
@Test
public void testTicker() {
- ClientActorContext ctx = new ClientActorContext(mockSelf, PERSISTENCE_ID, CLIENT_ID);
+ ClientActorContext ctx = new ClientActorContext(mockSelf, mockScheduler, mockDispatcher, PERSISTENCE_ID, CLIENT_ID);
assertSame(Ticker.systemTicker(), ctx.ticker());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.TestProbe;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.common.actor.TestTicker;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Test suite covering logic contained in {@link SequencedQueueEntry}.
+ *
+ * @author Robert Varga
+ */
+public class SequencedQueueEntryTest {
+ private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
+ private static final long serialVersionUID = 1L;
+
+ MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) {
+ super(target, sequence, retry, cause);
+ }
+
+ @Override
+ protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(final ABIVersion version) {
+ return null;
+ }
+
+ @Override
+ protected MockFailure cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+ }
+
+ private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
+ private static final long serialVersionUID = 1L;
+
+ MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, 0, replyTo);
+ }
+
+
+ MockRequest(final MockRequest request, final long retry) {
+ super(request, retry);
+ }
+
+ @Override
+ public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
+ return new MockFailure(getTarget(), getSequence(), getRetry(), cause);
+ }
+
+ @Override
+ protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
+ return null;
+ }
+
+ @Override
+ protected MockRequest cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+
+ @Override
+ protected MockRequest cloneAsRetry(final long retry) {
+ return new MockRequest(this, retry);
+ }
+ };
+
+ @Mock
+ private ActorRef mockReplyTo;
+ @Mock
+ private WritableIdentifier mockIdentifier;
+ @Mock
+ private RequestException mockCause;
+ @Mock
+ private RequestCallback mockCallback;
+ @Mock
+ private ClientActorBehavior mockBehavior;
+
+ private TestTicker ticker;
+ private BackendInfo mockBackendInfo;
+ private Request<WritableIdentifier, ?> mockRequest;
+ private Response<WritableIdentifier, ?> mockResponse;
+
+ private static ActorSystem actorSystem;
+ private TestProbe mockActor;
+
+ private SequencedQueueEntry entry;
+
+ @BeforeClass
+ public static void setupClass() {
+ actorSystem = ActorSystem.apply();
+ }
+
+ @AfterClass
+ public static void teardownClass() {
+ actorSystem.terminate();
+ }
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class));
+
+ ticker = new TestTicker();
+ ticker.increment(ThreadLocalRandom.current().nextLong());
+
+ mockActor = TestProbe.apply(actorSystem);
+ mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+ mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo);
+ mockResponse = mockRequest.toRequestFailure(mockCause);
+
+ entry = new SequencedQueueEntry(mockRequest, mockCallback, ticker.read());
+ }
+
+ @After
+ public void teardown() {
+ actorSystem.stop(mockActor.ref());
+ }
+
+ @Test
+ public void testGetSequence() {
+ assertEquals(mockRequest.getSequence(), entry.getSequence());
+ }
+
+ @Test
+ public void testGetCurrentTry() {
+ assertEquals(0, entry.getCurrentTry());
+ entry.retransmit(mockBackendInfo, ticker.read());
+ assertEquals(0, entry.getCurrentTry());
+ entry.retransmit(mockBackendInfo, ticker.read());
+ assertEquals(1, entry.getCurrentTry());
+ entry.retransmit(mockBackendInfo, ticker.read());
+ assertEquals(2, entry.getCurrentTry());
+ }
+
+ @Test
+ public void testComplete() {
+ entry.complete(mockResponse);
+ verify(mockCallback).complete(mockResponse);
+ }
+
+ @Test
+ public void testPoison() {
+ entry.poison(mockCause);
+
+ final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
+ verify(mockCallback).complete(captor.capture());
+ assertSame(mockCause, captor.getValue().getCause());
+ }
+
+ @Test
+ public void testIsTimedOut() {
+ assertTrue(entry.isTimedOut(ticker.read(), 0));
+ assertFalse(entry.isTimedOut(ticker.read(), 1));
+
+ entry.retransmit(mockBackendInfo, ticker.read());
+ assertTrue(entry.isTimedOut(ticker.read(), 0));
+ ticker.increment(10);
+ assertTrue(entry.isTimedOut(ticker.read(), 10));
+ assertFalse(entry.isTimedOut(ticker.read(), 20));
+
+ entry.retransmit(mockBackendInfo, ticker.read());
+ assertTrue(entry.isTimedOut(ticker.read(), 0));
+ ticker.increment(10);
+ assertTrue(entry.isTimedOut(ticker.read(), 10));
+ assertFalse(entry.isTimedOut(ticker.read(), 11));
+ }
+
+ @Test
+ public void testRetransmit() {
+ assertFalse(mockActor.msgAvailable());
+ entry.retransmit(mockBackendInfo, ticker.read());
+
+ assertTrue(mockActor.msgAvailable());
+ assertRequestEquals(mockRequest, mockActor.receiveOne(Duration.apply(5, TimeUnit.SECONDS)));
+ }
+
+ private static void assertRequestEquals(final Request<?, ?> expected, final Object o) {
+ final Request<?, ?> actual = (Request<?, ?>) o;
+ assertEquals(expected.getRetry(), actual.getRetry());
+ assertEquals(expected.getSequence(), actual.getSequence());
+ assertEquals(expected.getTarget(), actual.getTarget());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.actors.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.TestProbe;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestFailureProxy;
+import org.opendaylight.controller.cluster.access.concepts.AbstractRequestProxy;
+import org.opendaylight.controller.cluster.access.concepts.Request;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.common.actor.TestTicker;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Test suite covering logic contained in {@link SequencedQueue}. It assumes {@link SequencedQueueEntryTest} passes.
+ *
+ * @author Robert Varga
+ */
+public class SequencedQueueTest {
+ private static class MockFailure extends RequestFailure<WritableIdentifier, MockFailure> {
+ private static final long serialVersionUID = 1L;
+
+ MockFailure(final WritableIdentifier target, final long sequence, final long retry, final RequestException cause) {
+ super(target, sequence, retry, cause);
+ }
+
+ @Override
+ protected AbstractRequestFailureProxy<WritableIdentifier, MockFailure> externalizableProxy(final ABIVersion version) {
+ return null;
+ }
+
+ @Override
+ protected MockFailure cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+ }
+
+ private static class MockRequest extends Request<WritableIdentifier, MockRequest> {
+ private static final long serialVersionUID = 1L;
+
+ MockRequest(final WritableIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, 0, replyTo);
+ }
+
+
+ MockRequest(final MockRequest request, final long retry) {
+ super(request, retry);
+ }
+
+ @Override
+ public RequestFailure<WritableIdentifier, ?> toRequestFailure(final RequestException cause) {
+ return new MockFailure(getTarget(), getSequence(), getRetry(), cause);
+ }
+
+ @Override
+ protected AbstractRequestProxy<WritableIdentifier, MockRequest> externalizableProxy(final ABIVersion version) {
+ return null;
+ }
+
+ @Override
+ protected MockRequest cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+
+ @Override
+ protected MockRequest cloneAsRetry(final long retry) {
+ return new MockRequest(this, retry);
+ }
+ };
+
+ @Mock
+ private ActorRef mockReplyTo;
+ @Mock
+ private WritableIdentifier mockIdentifier;
+ @Mock
+ private RequestException mockCause;
+ @Mock
+ private RequestCallback mockCallback;
+ @Mock
+ private ClientActorBehavior mockBehavior;
+
+ private TestTicker ticker;
+ private BackendInfo mockBackendInfo;
+ private MockRequest mockRequest;
+ private MockRequest mockRequest2;
+ private Response<WritableIdentifier, ?> mockResponse;
+ private Long mockCookie;
+
+ private static ActorSystem actorSystem;
+ private TestProbe mockActor;
+
+ private SequencedQueue queue;
+
+ @BeforeClass
+ public static void setupClass() {
+ actorSystem = ActorSystem.apply();
+ }
+
+ @AfterClass
+ public static void teardownClass() {
+ actorSystem.terminate();
+ }
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ doReturn(mockBehavior).when(mockCallback).complete(any(MockFailure.class));
+
+ ticker = new TestTicker();
+ ticker.increment(ThreadLocalRandom.current().nextLong());
+
+ mockActor = TestProbe.apply(actorSystem);
+ mockBackendInfo = new BackendInfo(mockActor.ref(), ABIVersion.current());
+ mockRequest = new MockRequest(mockIdentifier, ThreadLocalRandom.current().nextLong(), mockReplyTo);
+ mockRequest2 = new MockRequest(mockIdentifier, mockRequest.getSequence() + 1, mockReplyTo);
+ mockResponse = mockRequest.toRequestFailure(mockCause);
+ mockCookie = ThreadLocalRandom.current().nextLong();
+
+ queue = new SequencedQueue(mockCookie, ticker);
+ }
+
+ @After
+ public void teardown() {
+ actorSystem.stop(mockActor.ref());
+ }
+
+ @Test
+ public void testGetCookie() {
+ assertSame(mockCookie, queue.getCookie());
+ }
+
+ @Test
+ public void testEmptyClose() {
+ assertFalse(queue.hasCompleted());
+ queue.close();
+ assertTrue(queue.hasCompleted());
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testClosedEnqueueRequest() {
+ queue.close();
+
+ // Kaboom
+ queue.enqueueRequest(mockRequest, mockCallback);
+ }
+
+ @Test
+ public void testCloseIdempotent() {
+ queue.close();
+ queue.close();
+ }
+
+ @Test
+ public void testPoison() {
+ queue.enqueueRequest(mockRequest, mockCallback);
+ queue.poison(mockCause);
+
+ final ArgumentCaptor<MockFailure> captor = ArgumentCaptor.forClass(MockFailure.class);
+ verify(mockCallback).complete(captor.capture());
+ assertSame(mockCause, captor.getValue().getCause());
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testPoisonPerformsClose() {
+ // Implies close()
+ queue.poison(mockCause);
+
+ // Kaboom
+ queue.enqueueRequest(mockRequest, mockCallback);
+ }
+
+ @Test
+ public void testPoisonIdempotent() {
+ queue.poison(mockCause);
+ queue.poison(mockCause);
+ }
+
+ @Test
+ public void testEnqueueRequestNeedsBackend() {
+ final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+
+ assertNotNull(ret);
+ assertFalse(ret.isPresent());
+ }
+
+ @Test
+ public void testExpectProof() {
+ final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+ assertTrue(queue.expectProof(proof));
+ assertFalse(queue.expectProof(proof));
+ }
+
+ @Test(expected=NullPointerException.class)
+ public void testSetBackendNull() {
+ final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+ assertTrue(queue.expectProof(proof));
+ queue.setBackendInfo(proof, null);
+ }
+
+ @Test
+ public void testSetBackendWithNoResolution() {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+ final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
+ assertNotNull(ret);
+ assertFalse(ret.isPresent());
+ }
+
+ @Test
+ public void testSetBackendWithWrongProof() {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+ assertTrue(queue.expectProof(proof));
+
+ final Optional<FiniteDuration> ret = queue.setBackendInfo(new CompletableFuture<>(), mockBackendInfo);
+ assertNotNull(ret);
+ assertFalse(ret.isPresent());
+ }
+
+ @Test
+ public void testSetBackendWithNoRequests() {
+ // this utility method covers the entire test
+ setupBackend();
+ }
+
+ @Test
+ public void testSetbackedWithRequestsNoTimer() {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+ assertTrue(queue.expectProof(proof));
+ assertFalse(mockActor.msgAvailable());
+
+ final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
+ assertNotNull(ret);
+ assertTrue(ret.isPresent());
+
+ assertTransmit(mockRequest);
+ }
+
+ @Test
+ public void testEnqueueRequestNeedsTimer() {
+ setupBackend();
+
+ final Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+ assertNotNull(ret);
+ assertTrue(ret.isPresent());
+ assertTransmit(mockRequest);
+ }
+
+ @Test
+ public void testEnqueueRequestWithoutTimer() {
+ setupBackend();
+
+ // First request
+ Optional<FiniteDuration> ret = queue.enqueueRequest(mockRequest, mockCallback);
+ assertNotNull(ret);
+ assertTrue(ret.isPresent());
+ assertTransmit(mockRequest);
+
+ // Second request, no timer fired
+ ret = queue.enqueueRequest(mockRequest2, mockCallback);
+ assertNull(ret);
+ assertTransmit(mockRequest2);
+ }
+
+ @Test
+ public void testRunTimeoutEmpty() throws NoProgressException {
+ final boolean ret = queue.runTimeout();
+ assertFalse(ret);
+ }
+
+ @Test
+ public void testRunTimeoutWithoutShift() throws NoProgressException {
+ queue.enqueueRequest(mockRequest, mockCallback);
+ final boolean ret = queue.runTimeout();
+ assertFalse(ret);
+ }
+
+ @Test
+ public void testRunTimeoutWithTimeoutLess() throws NoProgressException {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS - 1);
+
+ final boolean ret = queue.runTimeout();
+ assertFalse(ret);
+ }
+
+ @Test
+ public void testRunTimeoutWithTimeoutExact() throws NoProgressException {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS);
+
+ final boolean ret = queue.runTimeout();
+ assertTrue(ret);
+ }
+
+ @Test
+ public void testRunTimeoutWithTimeoutMore() throws NoProgressException {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ ticker.increment(SequencedQueue.REQUEST_TIMEOUT_NANOS + 1);
+
+ final boolean ret = queue.runTimeout();
+ assertTrue(ret);
+ }
+
+ @Test(expected=NoProgressException.class)
+ public void testRunTimeoutWithoutProgressExact() throws NoProgressException {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
+
+ // Kaboom
+ queue.runTimeout();
+ }
+
+ @Test(expected=NoProgressException.class)
+ public void testRunTimeoutWithoutProgressMore() throws NoProgressException {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
+
+ // Kaboom
+ queue.runTimeout();
+ }
+
+ @Test
+ public void testRunTimeoutEmptyWithoutProgressExact() throws NoProgressException {
+ ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS);
+
+ // No problem
+ final boolean ret = queue.runTimeout();
+ assertFalse(ret);
+ }
+
+ @Test
+ public void testRunTimeoutEmptyWithoutProgressMore() throws NoProgressException {
+ ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS + 1);
+
+ // No problem
+ final boolean ret = queue.runTimeout();
+ assertFalse(ret);
+ }
+
+ @Test
+ public void testCompleteEmpty() {
+ final ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+ assertSame(mockBehavior, ret);
+ verifyNoMoreInteractions(mockCallback);
+ }
+
+ @Test
+ public void testCompleteSingle() {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+ verify(mockCallback).complete(mockResponse);
+ assertSame(mockBehavior, ret);
+
+ ret = queue.complete(mockBehavior, mockResponse);
+ assertSame(mockBehavior, ret);
+ verifyNoMoreInteractions(mockCallback);
+ }
+
+ @Test
+ public void testCompleteNull() {
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ doReturn(null).when(mockCallback).complete(mockResponse);
+
+ ClientActorBehavior ret = queue.complete(mockBehavior, mockResponse);
+ verify(mockCallback).complete(mockResponse);
+ assertNull(ret);
+ }
+
+ @Test
+ public void testProgressRecord() throws NoProgressException {
+ setupBackend();
+
+ queue.enqueueRequest(mockRequest, mockCallback);
+
+ ticker.increment(10);
+ queue.enqueueRequest(mockRequest2, mockCallback);
+ queue.complete(mockBehavior, mockResponse);
+
+ ticker.increment(SequencedQueue.NO_PROGRESS_TIMEOUT_NANOS - 11);
+ assertTrue(queue.runTimeout());
+ }
+
+ private void setupBackend() {
+ final CompletableFuture<BackendInfo> proof = new CompletableFuture<>();
+ assertTrue(queue.expectProof(proof));
+ final Optional<FiniteDuration> ret = queue.setBackendInfo(proof, mockBackendInfo);
+ assertNotNull(ret);
+ assertFalse(ret.isPresent());
+ assertFalse(mockActor.msgAvailable());
+ }
+
+ private void assertTransmit(final Request<?, ?> expected) {
+ assertTrue(mockActor.msgAvailable());
+ assertRequestEquals(expected, mockActor.receiveOne(FiniteDuration.apply(5, TimeUnit.SECONDS)));
+ }
+
+ private static void assertRequestEquals(final Request<?, ?> expected, final Object o) {
+ final Request<?, ?> actual = (Request<?, ?>) o;
+ assertEquals(expected.getRetry(), actual.getRetry());
+ assertEquals(expected.getSequence(), actual.getSequence());
+ assertEquals(expected.getTarget(), actual.getTarget());
+ }
+}