X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FDistributedDataStoreClientBehavior.java;h=eb1dd17bfd9438fc6adff5351db71527d90b3453;hb=refs%2Fchanges%2F99%2F47499%2F3;hp=917e759a98ea6dab80a186e7edb20ea315a6bb44;hpb=3ebd44f9b7a4a217222036c2889d2a04b4f1eb30;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
index 917e759a98..eb1dd17bfd 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
@@ -9,11 +9,19 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
import akka.actor.ActorRef;
import akka.actor.Status;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
+import com.google.common.base.Throwables;
+import com.google.common.base.Verify;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.opendaylight.controller.cluster.access.client.ClientActorBehavior;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior;
-import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -21,21 +29,26 @@ import org.slf4j.LoggerFactory;
* {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore
* frontend.
*
+ *
* This class is not visible outside of this package because it breaks the actor containment. Services provided to
* Java world outside of actor containment are captured in {@link DistributedDataStoreClient}.
*
+ *
* IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract.
* When touching internal state, be mindful of the execution context from which execution context, Actor
* or POJO, is the state being accessed or modified.
*
+ *
* THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application
* threads can run concurrently. All state transitions must be made in a thread-safe manner. When in
* doubt, feel free to synchronize on this object.
*
+ *
* PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but
* performance must not come at the price of correctness. Any optimizations need to be carefully analyzed
* for correctness and performance impact.
*
+ *
* TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal
* for performing work and charging applications for it. That has two positive effects:
* - CPU usage is distributed across applications, minimizing work done in the actor thread
@@ -45,17 +58,20 @@ import org.slf4j.LoggerFactory;
*/
final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
- private static final Object SHUTDOWN = new Object() {
- @Override
- public String toString() {
- return "SHUTDOWN";
- }
- };
- private long nextHistoryId;
+ private final Map transactions = new ConcurrentHashMap<>();
+ private final Map histories = new ConcurrentHashMap<>();
+ private final AtomicLong nextHistoryId = new AtomicLong(1);
+ private final AtomicLong nextTransactionId = new AtomicLong();
+ private final ModuleShardBackendResolver resolver;
+ private final SingleClientHistory singleHistory;
+
+ private volatile Throwable aborted;
- DistributedDataStoreClientBehavior(final ClientActorContext context) {
+ DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) {
super(context);
+ resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext);
+ singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0));
}
//
@@ -66,27 +82,39 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
@Override
protected void haltClient(final Throwable cause) {
- // FIXME: Add state flushing here once we have state
+ // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up
+ // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor)
+ // thread.
+ if (aborted != null) {
+ abortOperations(cause);
+ }
}
- private void createLocalHistory(final CreateLocalHistoryCommand command) {
- final CompletableFuture future = command.future();
- final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++);
- LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future);
+ private void abortOperations(final Throwable cause) {
+ // This acts as a barrier, application threads check this after they have added an entry in the maps,
+ // and if they observe aborted being non-null, they will perform their cleanup and not return the handle.
+ aborted = cause;
- // FIXME: initiate backend instantiation
- future.completeExceptionally(new UnsupportedOperationException("Not implemented yet"));
+ for (ClientLocalHistory h : histories.values()) {
+ h.localAbort(cause);
+ }
+ histories.clear();
+
+ for (ClientTransaction t : transactions.values()) {
+ t.localAbort(cause);
+ }
+ transactions.clear();
+ }
+
+ private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
+ abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down"));
+ return null;
}
@Override
- protected ClientActorBehavior onCommand(final Object command) {
- if (command instanceof CreateLocalHistoryCommand) {
- createLocalHistory((CreateLocalHistoryCommand) command);
- } else if (command instanceof GetClientRequest) {
+ protected DistributedDataStoreClientBehavior onCommand(final Object command) {
+ if (command instanceof GetClientRequest) {
((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender());
- } else if (SHUTDOWN.equals(command)) {
- // FIXME: Add shutdown procedures here
- return null;
} else {
LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command);
}
@@ -100,15 +128,63 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
//
//
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private static V returnIfOperational(final Map map, final K key, final V value,
+ final Throwable aborted) {
+ Verify.verify(map.put(key, value) == null);
+
+ if (aborted != null) {
+ try {
+ value.localAbort(aborted);
+ } catch (Exception e) {
+ LOG.debug("Close of {} failed", value, e);
+ }
+ map.remove(key, value);
+ throw Throwables.propagate(aborted);
+ }
+
+ return value;
+ }
+
@Override
- public CompletionStage createLocalHistory() {
- final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand();
- self().tell(command, ActorRef.noSender());
- return command.future();
+ public ClientLocalHistory createLocalHistory() {
+ final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(),
+ nextHistoryId.getAndIncrement());
+ final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
+ LOG.debug("{}: creating a new local history {}", persistenceId(), history);
+
+ return returnIfOperational(histories, historyId, history, aborted);
+ }
+
+ @Override
+ public ClientTransaction createTransaction() {
+ final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
+ nextTransactionId.getAndIncrement());
+ final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
+ LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
+
+ return returnIfOperational(transactions, txId, tx, aborted);
}
@Override
public void close() {
- self().tell(SHUTDOWN, ActorRef.noSender());
+ context().executeInActor(this::shutdown);
}
+
+ @Override
+ protected ModuleShardBackendResolver resolver() {
+ return resolver;
+ }
+
+ void transactionComplete(final ClientTransaction transaction) {
+ transactions.remove(transaction.getIdentifier());
+ }
+
+ void sendRequest(final TransactionRequest> request, final Consumer> completer) {
+ sendRequest(request, response -> {
+ completer.accept(response);
+ return this;
+ });
+ }
+
}