X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FDistributedDataStoreClientBehavior.java;fp=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FDistributedDataStoreClientBehavior.java;h=5aca99ae591003d920f4f48d369700bae936ac53;hb=98d1c5606bad9633ce5549bcd691a98c75abdf6a;hp=364e462e57c7b923379c2ec5a548c75f88622472;hpb=e6b9c7e282a526aeb1469b6f7aa9b61f69d01743;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index 364e462e57..5aca99ae59 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -9,9 +9,16 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; import akka.actor.Status; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; +import com.google.common.base.Throwables; +import com.google.common.base.Verify; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior; import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; @@ -47,12 +54,19 @@ import org.slf4j.LoggerFactory; final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class); + private final Map transactions = new ConcurrentHashMap<>(); + private final Map histories = new ConcurrentHashMap<>(); + private final AtomicLong nextHistoryId = new AtomicLong(1); + private final AtomicLong nextTransactionId = new AtomicLong(); private final ModuleShardBackendResolver resolver; - private long nextHistoryId; + private final SingleClientHistory singleHistory; + + private volatile Throwable aborted; DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) { super(context); resolver = new ModuleShardBackendResolver(actorContext); + singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); } // @@ -63,26 +77,37 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple @Override protected void haltClient(final Throwable cause) { - // FIXME: Add state flushing here once we have state + // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up + // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor) + // thread. + if (aborted != null) { + abortOperations(cause); + } } - private ClientActorBehavior createLocalHistory(final ClientActorBehavior currentBehavior, - final CompletableFuture future) { - final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++); - LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future); + private void abortOperations(final Throwable cause) { + // This acts as a barrier, application threads check this after they have added an entry in the maps, + // and if they observe aborted being non-null, they will perform their cleanup and not return the handle. + aborted = cause; - // FIXME: initiate backend instantiation - future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); - return currentBehavior; + for (ClientLocalHistory h : histories.values()) { + h.localAbort(cause); + } + histories.clear(); + + for (ClientTransaction t : transactions.values()) { + t.localAbort(cause); + } + transactions.clear(); } - private ClientActorBehavior shutdown(final ClientActorBehavior currentBehavior) { - // FIXME: Add shutdown procedures here + private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) { + abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down")); return null; } @Override - protected ClientActorBehavior onCommand(final Object command) { + protected DistributedDataStoreClientBehavior onCommand(final Object command) { if (command instanceof GetClientRequest) { ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender()); } else { @@ -98,11 +123,41 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple // // + private static V returnIfOperational(final Map map, final K key, final V value, + final Throwable aborted) { + Verify.verify(map.put(key, value) == null); + + if (aborted != null) { + try { + value.localAbort(aborted); + } catch (Exception e) { + LOG.debug("Close of {} failed", value, e); + } + map.remove(key, value); + throw Throwables.propagate(aborted); + } + + return value; + } + @Override - public CompletionStage createLocalHistory() { - final CompletableFuture future = new CompletableFuture<>(); - context().executeInActor(currentBehavior -> createLocalHistory(currentBehavior, future)); - return future; + public ClientLocalHistory createLocalHistory() { + final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), + nextHistoryId.getAndIncrement()); + final ClientLocalHistory history = new ClientLocalHistory(this, historyId); + LOG.debug("{}: creating a new local history {}", persistenceId(), history); + + return returnIfOperational(histories, historyId, history, aborted); + } + + @Override + public ClientTransaction createTransaction() { + final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(), + nextTransactionId.getAndIncrement()); + final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId); + LOG.debug("{}: creating a new transaction {}", persistenceId(), tx); + + return returnIfOperational(transactions, txId, tx, aborted); } @Override @@ -114,4 +169,16 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple protected ModuleShardBackendResolver resolver() { return resolver; } + + void transactionComplete(final ClientTransaction transaction) { + transactions.remove(transaction.getIdentifier()); + } + + void sendRequest(final long sequence, final TransactionRequest request, final Consumer> completer) { + sendRequest(sequence, request, response -> { + completer.accept(response); + return this; + }); + } + }