BUG-5280: implement message queueing
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / DistributedDataStoreClientBehavior.java
index 2b5e6753be25146015396036b656522df17f86ff..364e462e57c7b923379c2ec5a548c75f88622472 100644 (file)
@@ -14,6 +14,7 @@ import java.util.concurrent.CompletionStage;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
 import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,17 +46,13 @@ import org.slf4j.LoggerFactory;
  */
 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
-    private static final Object SHUTDOWN = new Object() {
-        @Override
-        public String toString() {
-            return "SHUTDOWN";
-        }
-    };
 
+    private final ModuleShardBackendResolver resolver;
     private long nextHistoryId;
 
-    DistributedDataStoreClientBehavior(final ClientActorContext context) {
+    DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
         super(context);
+        resolver = new ModuleShardBackendResolver(actorContext);
     }
 
     //
@@ -64,24 +61,30 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
     //
     //
 
-    private void createLocalHistory(final CreateLocalHistoryCommand command) {
-        final CompletableFuture<ClientLocalHistory> future = command.future();
+    @Override
+    protected void haltClient(final Throwable cause) {
+        // FIXME: Add state flushing here once we have state
+    }
+
+    private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior,
+            final CompletableFuture<ClientLocalHistory> future) {
         final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
         LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
 
         // FIXME: initiate backend instantiation
         future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
+        return currentBehavior;
+    }
+
+    private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) {
+        // FIXME: Add shutdown procedures here
+        return null;
     }
 
     @Override
     protected ClientActorBehavior onCommand(final Object command) {
-        if (command instanceof CreateLocalHistoryCommand) {
-            createLocalHistory((CreateLocalHistoryCommand) command);
-        } else if (command instanceof GetClientRequest) {
+        if (command instanceof GetClientRequest) {
             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
-        } else if (SHUTDOWN.equals(command)) {
-            // Add shutdown procedures here
-            return null;
         } else {
             LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
         }
@@ -97,13 +100,18 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
 
     @Override
     public CompletionStage<ClientLocalHistory> createLocalHistory() {
-        final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand();
-        self().tell(command, ActorRef.noSender());
-        return command.future();
+        final CompletableFuture<ClientLocalHistory> future = new CompletableFuture<>();
+        context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future));
+        return future;
     }
 
     @Override
     public void close() {
-        self().tell(SHUTDOWN, ActorRef.noSender());
+        context().executeInActor(this::shutdown);
+    }
+
+    @Override
+    protected ModuleShardBackendResolver resolver() {
+        return resolver;
     }
 }