Slice front-end request messages
[controller.git] / opendaylight / md-sal / cds-access-client / src / main / java / org / opendaylight / controller / cluster / access / client / ClientActorContext.java
index 26e68356d3f4404e7050bcbcc905542dc6eccb18..7e392af37570b4c735339276fb8586ff78d878c9 100644 (file)
@@ -8,23 +8,20 @@
 package org.opendaylight.controller.cluster.access.client;
 
 import akka.actor.ActorRef;
 package org.opendaylight.controller.cluster.access.client;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import akka.actor.Cancellable;
 import akka.actor.Scheduler;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
 import akka.actor.Cancellable;
 import akka.actor.Scheduler;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Ticker;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
-import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
+import org.opendaylight.controller.cluster.messaging.MessageSlicer;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.Identifiable;
-import org.opendaylight.yangtools.concepts.WritableIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -41,20 +38,27 @@ import scala.concurrent.duration.FiniteDuration;
 @Beta
 @ThreadSafe
 public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
 @Beta
 @ThreadSafe
 public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
-    private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
-
-    private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
-    private final ClientIdentifier identifier;
     private final ExecutionContext executionContext;
     private final ExecutionContext executionContext;
+    private final ClientIdentifier identifier;
     private final Scheduler scheduler;
     private final Scheduler scheduler;
+    private final Dispatchers dispatchers;
+    private final ClientActorConfig config;
+    private final MessageSlicer messageSlicer;
 
     // Hidden to avoid subclassing
 
     // Hidden to avoid subclassing
-    ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
-            final String persistenceId, final ClientIdentifier identifier) {
+    ClientActorContext(final ActorRef self, final String persistenceId, final ActorSystem system,
+            final ClientIdentifier identifier, final ClientActorConfig config) {
         super(self, persistenceId);
         this.identifier = Preconditions.checkNotNull(identifier);
         super(self, persistenceId);
         this.identifier = Preconditions.checkNotNull(identifier);
-        this.scheduler = Preconditions.checkNotNull(scheduler);
-        this.executionContext = Preconditions.checkNotNull(executionContext);
+        this.scheduler = Preconditions.checkNotNull(system).scheduler();
+        this.executionContext = system.dispatcher();
+        this.dispatchers = new Dispatchers(system.dispatchers());
+        this.config = Preconditions.checkNotNull(config);
+
+        messageSlicer = MessageSlicer.builder().messageSliceSize(config.getMaximumMessageSliceSize())
+            .logContext(persistenceId).expireStateAfterInactivity(config.getRequestTimeout(), TimeUnit.NANOSECONDS)
+                .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(),
+                    config.getTempFileDirectory())).build();
     }
 
     @Override
     }
 
     @Override
@@ -63,6 +67,21 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
         return identifier;
     }
 
         return identifier;
     }
 
+    @Nonnull
+    public ClientActorConfig config() {
+        return config;
+    }
+
+    @Nonnull
+    public Dispatchers dispatchers() {
+        return dispatchers;
+    }
+
+    @Nonnull
+    public MessageSlicer messageSlicer() {
+        return messageSlicer;
+    }
+
     /**
      * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking
      * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use
     /**
      * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking
      * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use
@@ -79,45 +98,15 @@ public class ClientActorContext extends AbstractClientActorContext implements Id
      * Execute a command in the context of the client actor.
      *
      * @param command Block of code which needs to be execute
      * Execute a command in the context of the client actor.
      *
      * @param command Block of code which needs to be execute
+     * @param <T> BackendInfo type
      */
      */
-    public void executeInActor(@Nonnull final InternalCommand command) {
+    public <T extends BackendInfo> void executeInActor(@Nonnull final InternalCommand<T> command) {
         self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
     }
 
         self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
     }
 
-    public Cancellable executeInActor(@Nonnull final InternalCommand command, final FiniteDuration delay) {
+    public <T extends BackendInfo> Cancellable executeInActor(@Nonnull final InternalCommand<T> command,
+            final FiniteDuration delay) {
         return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
             executionContext, ActorRef.noSender());
     }
         return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
             executionContext, ActorRef.noSender());
     }
-
-    SequencedQueue queueFor(final Long cookie) {
-        return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
-    }
-
-    void removeQueue(final SequencedQueue queue) {
-        queues.remove(queue.getCookie(), queue);
-    }
-
-    ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
-        final WritableIdentifier id = response.getMessage().getTarget();
-
-        // FIXME: this will need to be updated for other Request/Response types to extract cookie
-        Preconditions.checkArgument(id instanceof TransactionIdentifier);
-        final TransactionIdentifier txId = (TransactionIdentifier) id;
-
-        final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
-        if (queue == null) {
-            LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
-            return current;
-        } else {
-            return queue.complete(current, response);
-        }
-    }
-
-    void poison(final RequestException cause) {
-        for (SequencedQueue q : queues.values()) {
-            q.poison(cause);
-        }
-
-        queues.clear();
-    }
 }
 }