BUG-5280: add ExplicitAsk utility class
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / DistributedDataStoreClientBehavior.java
index 917e759a98ea6dab80a186e7edb20ea315a6bb44..e3e781e4db8eb0d75488b29059ab555b37afa125 100644 (file)
@@ -9,11 +9,19 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
+import com.google.common.base.Throwables;
+import com.google.common.base.Verify;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
-import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,17 +53,20 @@ import org.slf4j.LoggerFactory;
  */
 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
-    private static final Object SHUTDOWN = new Object() {
-        @Override
-        public String toString() {
-            return "SHUTDOWN";
-        }
-    };
 
-    private long nextHistoryId;
+    private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
+    private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
+    private final AtomicLong nextHistoryId = new AtomicLong(1);
+    private final AtomicLong nextTransactionId = new AtomicLong();
+    private final ModuleShardBackendResolver resolver;
+    private final SingleClientHistory singleHistory;
+
+    private volatile Throwable aborted;
 
-    DistributedDataStoreClientBehavior(final ClientActorContext context) {
+    DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
         super(context);
+        resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
+        singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
     }
 
     //
@@ -66,27 +77,39 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
 
     @Override
     protected void haltClient(final Throwable cause) {
-        // FIXME: Add state flushing here once we have state
+        // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
+        // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
+        // thread.
+        if (aborted != null) {
+            abortOperations(cause);
+        }
     }
 
-    private void createLocalHistory(final CreateLocalHistoryCommand command) {
-        final CompletableFuture<ClientLocalHistory> future = command.future();
-        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
-        LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
+    private void abortOperations(final Throwable cause) {
+        // This acts as a barrier, application threads check this after they have added an entry in the maps,
+        // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
+        aborted = cause;
 
-        // FIXME: initiate backend instantiation
-        future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
+        for (ClientLocalHistory h : histories.values()) {
+            h.localAbort(cause);
+        }
+        histories.clear();
+
+        for (ClientTransaction t : transactions.values()) {
+            t.localAbort(cause);
+        }
+        transactions.clear();
+    }
+
+    private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
+        abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
+        return null;
     }
 
     @Override
-    protected ClientActorBehavior onCommand(final Object command) {
-        if (command instanceof CreateLocalHistoryCommand) {
-            createLocalHistory((CreateLocalHistoryCommand) command);
-        } else if (command instanceof GetClientRequest) {
+    protected DistributedDataStoreClientBehavior onCommand(final Object command) {
+        if (command instanceof GetClientRequest) {
             ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
-        } else if (SHUTDOWN.equals(command)) {
-            // FIXME: Add shutdown procedures here
-            return null;
         } else {
             LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
         }
@@ -100,15 +123,62 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
     //
     //
 
+    private static <K, V extends LocalAbortable> V returnIfOperational(final Map<K , V> map, final K key, final V value,
+            final Throwable aborted) {
+        Verify.verify(map.put(key, value) == null);
+
+        if (aborted != null) {
+            try {
+                value.localAbort(aborted);
+            } catch (Exception e) {
+                LOG.debug("Close of {} failed", value, e);
+            }
+            map.remove(key, value);
+            throw Throwables.propagate(aborted);
+        }
+
+        return value;
+    }
+
     @Override
-    public CompletionStage<ClientLocalHistory> createLocalHistory() {
-        final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand();
-        self().tell(command, ActorRef.noSender());
-        return command.future();
+    public ClientLocalHistory createLocalHistory() {
+        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
+            nextHistoryId.getAndIncrement());
+        final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
+        LOG.debug("{}: creating a new local history {}", persistenceId(), history);
+
+        return returnIfOperational(histories, historyId, history, aborted);
+    }
+
+    @Override
+    public ClientTransaction createTransaction() {
+        final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
+            nextTransactionId.getAndIncrement());
+        final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
+        LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
+
+        return returnIfOperational(transactions, txId, tx, aborted);
     }
 
     @Override
     public void close() {
-        self().tell(SHUTDOWN, ActorRef.noSender());
+        context().executeInActor(this::shutdown);
     }
+
+    @Override
+    protected ModuleShardBackendResolver resolver() {
+        return resolver;
+    }
+
+    void transactionComplete(final ClientTransaction transaction) {
+        transactions.remove(transaction.getIdentifier());
+    }
+
+    void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+        sendRequest(request, response -> {
+            completer.accept(response);
+            return this;
+        });
+    }
+
 }