X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Factors%2Fclient%2FClientActorBehavior.java;h=c3df604ef20ce2a77fd2ddb114d6ab6b471a369c;hp=237b570320543939df3692d720e55aefd59d61dc;hb=50a2f5eb1c94650bc1be1e49d3a5382a1a74a9b3;hpb=c6e3a444f7e8702aade9839ca950bc9790be8831 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java index 237b570320..c3df604ef2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorBehavior.java @@ -8,15 +8,20 @@ package org.opendaylight.controller.cluster.datastore.actors.client; import com.google.common.annotations.Beta; +import java.util.Optional; +import java.util.concurrent.CompletionStage; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; import org.opendaylight.yangtools.concepts.Identifiable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; /** * A behavior, which handles messages sent to a {@link AbstractClientActor}. @@ -34,27 +39,121 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior failure = (RequestFailure) command; - final RequestException cause = failure.getCause(); - if (cause instanceof RetiredGenerationException) { - LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); - haltClient(cause); - return null; - } + return ((InternalCommand) command).execute(this); + } + if (command instanceof RequestSuccess) { + return onRequestSuccess((RequestSuccess) command); + } + if (command instanceof RequestFailure) { + return onRequestFailure((RequestFailure) command); } - // TODO: any client-common logic (such as validation and common dispatch) needs to go here return onCommand(command); } - @Override - public final @Nonnull ClientIdentifier getIdentifier() { - return context().getIdentifier(); + private ClientActorBehavior onRequestSuccess(final RequestSuccess success) { + return context().completeRequest(this, success); + } + + private ClientActorBehavior onRequestFailure(final RequestFailure failure) { + final RequestException cause = failure.getCause(); + if (cause instanceof RetiredGenerationException) { + LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause); + haltClient(cause); + context().poison(cause); + return null; + } + + if (failure.isHardFailure()) { + return context().completeRequest(this, failure); + } + + // TODO: add instanceof checks on cause to detect more problems + + LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), failure); + return context().completeRequest(this, failure); + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private ClientActorBehavior doSendRequest(final TransactionRequest request, final RequestCallback callback) { + // Get or allocate queue for the request + final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie()); + + // Note this is a tri-state return and can be null + final Optional result = queue.enqueueRequest(request, callback); + if (result == null) { + // Happy path: we are done here + return this; + } + + if (result.isPresent()) { + // Less happy path: we need to schedule a timer + scheduleQueueTimeout(queue, result.get()); + return this; + } + + startResolve(queue, request.getTarget().getHistoryId().getCookie()); + return this; + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private void startResolve(final SequencedQueue queue, final long cookie) { + // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a + // previous request to resolve. + final CompletionStage f = resolver().getBackendInfo(cookie); + + // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has + // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to + // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and + // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will + // bulk-process all requests. + if (queue.expectProof(f)) { + f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend))); + } + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private ClientActorBehavior finishResolve(final SequencedQueue queue, + final CompletionStage futureBackend, final BackendInfo backend) { + + final Optional maybeTimeout = queue.setBackendInfo(futureBackend, backend); + if (maybeTimeout.isPresent()) { + scheduleQueueTimeout(queue, maybeTimeout.get()); + } + return this; + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) { + LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout); + context().executeInActor(cb -> cb.queueTimeout(queue), timeout); + } + + // This method is executing in the actor context, hence we can safely interact with the queue + private ClientActorBehavior queueTimeout(final SequencedQueue queue) { + final boolean needBackend; + + try { + needBackend = queue.runTimeout(); + } catch (NoProgressException e) { + // Uh-oh, no progress. The queue has already killed itself, now we need to remove it + context().removeQueue(queue); + return this; + } + + if (needBackend) { + startResolve(queue, queue.getCookie()); + } + + return this; } /** @@ -81,4 +180,15 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior resolver(); + + /** + * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke + * from any thread. + * + * @param request Request to send + * @param callback Callback to invoke + */ + public final void sendRequest(final TransactionRequest request, final RequestCallback callback) { + context().executeInActor(cb -> cb.doSendRequest(request, callback)); + } }