BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / client / ClientActorBehavior.java
index 237b570320543939df3692d720e55aefd59d61dc..c3df604ef20ce2a77fd2ddb114d6ab6b471a369c 100644 (file)
@@ -8,15 +8,20 @@
 package org.opendaylight.controller.cluster.datastore.actors.client;
 
 import com.google.common.annotations.Beta;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * A behavior, which handles messages sent to a {@link AbstractClientActor}.
@@ -34,27 +39,121 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
         super(context);
     }
 
+    @Override
+    public final @Nonnull ClientIdentifier getIdentifier() {
+        return context().getIdentifier();
+    }
+
     @Override
     final ClientActorBehavior onReceiveCommand(final Object command) {
         if (command instanceof InternalCommand) {
-            return ((InternalCommand) command).execute();
-        } else if (command instanceof RequestFailure) {
-            final RequestFailure<?, ?> failure = (RequestFailure<?, ?>) command;
-            final RequestException cause = failure.getCause();
-            if (cause instanceof RetiredGenerationException) {
-                LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
-                haltClient(cause);
-                return null;
-            }
+            return ((InternalCommand) command).execute(this);
+        }
+        if (command instanceof RequestSuccess) {
+            return onRequestSuccess((RequestSuccess<?, ?>) command);
+        }
+        if (command instanceof RequestFailure) {
+            return onRequestFailure((RequestFailure<?, ?>) command);
         }
 
-        // TODO: any client-common logic (such as validation and common dispatch) needs to go here
         return onCommand(command);
     }
 
-    @Override
-    public final @Nonnull ClientIdentifier getIdentifier() {
-        return context().getIdentifier();
+    private ClientActorBehavior onRequestSuccess(final RequestSuccess<?, ?> success) {
+        return context().completeRequest(this, success);
+    }
+
+    private ClientActorBehavior onRequestFailure(final RequestFailure<?, ?> failure) {
+        final RequestException cause = failure.getCause();
+        if (cause instanceof RetiredGenerationException) {
+            LOG.error("{}: current generation {} has been superseded", persistenceId(), getIdentifier(), cause);
+            haltClient(cause);
+            context().poison(cause);
+            return null;
+        }
+
+        if (failure.isHardFailure()) {
+            return context().completeRequest(this, failure);
+        }
+
+        // TODO: add instanceof checks on cause to detect more problems
+
+        LOG.warn("{}: Unhandled retriable failure {}, promoting to hard failure", persistenceId(), failure);
+        return context().completeRequest(this, failure);
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private ClientActorBehavior doSendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+        // Get or allocate queue for the request
+        final SequencedQueue queue = context().queueFor(request.getTarget().getHistoryId().getCookie());
+
+        // Note this is a tri-state return and can be null
+        final Optional<FiniteDuration> result = queue.enqueueRequest(request, callback);
+        if (result == null) {
+            // Happy path: we are done here
+            return this;
+        }
+
+        if (result.isPresent()) {
+            // Less happy path: we need to schedule a timer
+            scheduleQueueTimeout(queue, result.get());
+            return this;
+        }
+
+        startResolve(queue, request.getTarget().getHistoryId().getCookie());
+        return this;
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private void startResolve(final SequencedQueue queue, final long cookie) {
+        // Queue does not have backend information. Initiate resolution, which may actually be piggy-backing on to a
+        // previous request to resolve.
+        final CompletionStage<? extends BackendInfo> f = resolver().getBackendInfo(cookie);
+
+        // This is the tricky part: depending on timing, the queue may have a stale request for resolution, which has
+        // been invalidated or it may already have a reference to this resolution request. Let us give it a chance to
+        // update and it will indicate if this resolution request is an update. If it is, we'll piggy-back on it and
+        // run backend information update in the actor thread. If it is not, we do not need to do anything, as we will
+        // bulk-process all requests.
+        if (queue.expectProof(f)) {
+            f.thenAccept(backend -> context().executeInActor(cb -> cb.finishResolve(queue, f, backend)));
+        }
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private ClientActorBehavior finishResolve(final SequencedQueue queue,
+            final CompletionStage<? extends BackendInfo> futureBackend, final BackendInfo backend) {
+
+        final Optional<FiniteDuration> maybeTimeout = queue.setBackendInfo(futureBackend, backend);
+        if (maybeTimeout.isPresent()) {
+            scheduleQueueTimeout(queue, maybeTimeout.get());
+        }
+        return this;
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private void scheduleQueueTimeout(final SequencedQueue queue, final FiniteDuration timeout) {
+        LOG.debug("{}: scheduling timeout in {}", persistenceId(), timeout);
+        context().executeInActor(cb -> cb.queueTimeout(queue), timeout);
+    }
+
+    // This method is executing in the actor context, hence we can safely interact with the queue
+    private ClientActorBehavior queueTimeout(final SequencedQueue queue) {
+        final boolean needBackend;
+
+        try {
+            needBackend = queue.runTimeout();
+        } catch (NoProgressException e) {
+            // Uh-oh, no progress. The queue has already killed itself, now we need to remove it
+            context().removeQueue(queue);
+            return this;
+        }
+
+        if (needBackend) {
+            startResolve(queue, queue.getCookie());
+        }
+
+        return this;
     }
 
     /**
@@ -81,4 +180,15 @@ public abstract class ClientActorBehavior extends RecoveredClientActorBehavior<C
      * @return
      */
     protected abstract @Nonnull BackendInfoResolver<?> resolver();
+
+    /**
+     * Send a request to the backend and invoke a specified callback when it finishes. This method is safe to invoke
+     * from any thread.
+     *
+     * @param request Request to send
+     * @param callback Callback to invoke
+     */
+    public final void sendRequest(final TransactionRequest<?> request, final RequestCallback callback) {
+        context().executeInActor(cb -> cb.doSendRequest(request, callback));
+    }
 }