BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / client / ClientActorContext.java
index 3aa1a5200b077f91fda048ba2704648de98c8f88..9f4fd137a4ca5dc88834935714d1f0285b658235 100644 (file)
@@ -8,12 +8,25 @@
 package org.opendaylight.controller.cluster.datastore.actors.client;
 
 import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Scheduler;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * An actor context associated with this {@link AbstractClientActor}.
@@ -25,13 +38,22 @@ import org.opendaylight.yangtools.concepts.Identifiable;
  * @author Robert Varga
  */
 @Beta
+@ThreadSafe
 public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
+
+    private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
     private final ClientIdentifier identifier;
+    private final ExecutionContext executionContext;
+    private final Scheduler scheduler;
 
     // Hidden to avoid subclassing
-    ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) {
+    ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
+            final String persistenceId, final ClientIdentifier identifier) {
         super(self, persistenceId);
         this.identifier = Preconditions.checkNotNull(identifier);
+        this.scheduler = Preconditions.checkNotNull(scheduler);
+        this.executionContext = Preconditions.checkNotNull(executionContext);
     }
 
     @Override
@@ -58,4 +80,42 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
     public void executeInActor(final @Nonnull InternalCommand command) {
         self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
     }
+
+    public Cancellable executeInActor(final @Nonnull InternalCommand command, final FiniteDuration delay) {
+        return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
+            executionContext, ActorRef.noSender());
+    }
+
+    SequencedQueue queueFor(final Long cookie) {
+        return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
+    }
+
+    void removeQueue(final SequencedQueue queue) {
+        queues.remove(queue.getCookie(), queue);
+    }
+
+    ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response<?, ?> response) {
+        final WritableIdentifier id = response.getTarget();
+
+        // FIXME: this will need to be updated for other Request/Response types to extract cookie
+        Preconditions.checkArgument(id instanceof TransactionIdentifier);
+        final TransactionIdentifier txId = (TransactionIdentifier) id;
+
+        final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
+        if (queue == null) {
+            LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
+            return current;
+        } else {
+            return queue.complete(current, response);
+        }
+    }
+
+    void poison(final RequestException cause) {
+        for (SequencedQueue q : queues.values()) {
+            q.poison(cause);
+        }
+
+        queues.clear();
+    }
+
 }