X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FDistributedDataStoreClientBehavior.java;h=e3e781e4db8eb0d75488b29059ab555b37afa125;hp=917e759a98ea6dab80a186e7edb20ea315a6bb44;hb=2ebf9ef718ea7ddd790784a6d241e68ef8d1c564;hpb=3ebd44f9b7a4a217222036c2889d2a04b4f1eb30 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index 917e759a98..e3e781e4db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -9,11 +9,19 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import akka.actor.ActorRef; import akka.actor.Status; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; +import com.google.common.base.Throwables; +import com.google.common.base.Verify; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorBehavior; -import org.opendaylight.controller.cluster.datastore.actors.client.ClientActorContext; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,17 +53,20 @@ import org.slf4j.LoggerFactory; */ final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class); - private static final Object SHUTDOWN = new Object() { - @Override - public String toString() { - return "SHUTDOWN"; - } - }; - private long nextHistoryId; + private final Map transactions = new ConcurrentHashMap<>(); + private final Map histories = new ConcurrentHashMap<>(); + private final AtomicLong nextHistoryId = new AtomicLong(1); + private final AtomicLong nextTransactionId = new AtomicLong(); + private final ModuleShardBackendResolver resolver; + private final SingleClientHistory singleHistory; + + private volatile Throwable aborted; - DistributedDataStoreClientBehavior(final ClientActorContext context) { + DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) { super(context); + resolver = new ModuleShardBackendResolver(context.getIdentifier(), actorContext); + singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); } // @@ -66,27 +77,39 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple @Override protected void haltClient(final Throwable cause) { - // FIXME: Add state flushing here once we have state + // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up + // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor) + // thread. + if (aborted != null) { + abortOperations(cause); + } } - private void createLocalHistory(final CreateLocalHistoryCommand command) { - final CompletableFuture future = command.future(); - final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), nextHistoryId++); - LOG.debug("{}: creating a new local history {} for {}", persistenceId(), historyId, future); + private void abortOperations(final Throwable cause) { + // This acts as a barrier, application threads check this after they have added an entry in the maps, + // and if they observe aborted being non-null, they will perform their cleanup and not return the handle. + aborted = cause; - // FIXME: initiate backend instantiation - future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + for (ClientLocalHistory h : histories.values()) { + h.localAbort(cause); + } + histories.clear(); + + for (ClientTransaction t : transactions.values()) { + t.localAbort(cause); + } + transactions.clear(); + } + + private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) { + abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down")); + return null; } @Override - protected ClientActorBehavior onCommand(final Object command) { - if (command instanceof CreateLocalHistoryCommand) { - createLocalHistory((CreateLocalHistoryCommand) command); - } else if (command instanceof GetClientRequest) { + protected DistributedDataStoreClientBehavior onCommand(final Object command) { + if (command instanceof GetClientRequest) { ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender()); - } else if (SHUTDOWN.equals(command)) { - // FIXME: Add shutdown procedures here - return null; } else { LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command); } @@ -100,15 +123,62 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple // // + private static V returnIfOperational(final Map map, final K key, final V value, + final Throwable aborted) { + Verify.verify(map.put(key, value) == null); + + if (aborted != null) { + try { + value.localAbort(aborted); + } catch (Exception e) { + LOG.debug("Close of {} failed", value, e); + } + map.remove(key, value); + throw Throwables.propagate(aborted); + } + + return value; + } + @Override - public CompletionStage createLocalHistory() { - final CreateLocalHistoryCommand command = new CreateLocalHistoryCommand(); - self().tell(command, ActorRef.noSender()); - return command.future(); + public ClientLocalHistory createLocalHistory() { + final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), + nextHistoryId.getAndIncrement()); + final ClientLocalHistory history = new ClientLocalHistory(this, historyId); + LOG.debug("{}: creating a new local history {}", persistenceId(), history); + + return returnIfOperational(histories, historyId, history, aborted); + } + + @Override + public ClientTransaction createTransaction() { + final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(), + nextTransactionId.getAndIncrement()); + final ClientTransaction tx = new ClientTransaction(singleHistory, txId); + LOG.debug("{}: creating a new transaction {}", persistenceId(), tx); + + return returnIfOperational(transactions, txId, tx, aborted); } @Override public void close() { - self().tell(SHUTDOWN, ActorRef.noSender()); + context().executeInActor(this::shutdown); } + + @Override + protected ModuleShardBackendResolver resolver() { + return resolver; + } + + void transactionComplete(final ClientTransaction transaction) { + transactions.remove(transaction.getIdentifier()); + } + + void sendRequest(final TransactionRequest request, final Consumer> completer) { + sendRequest(request, response -> { + completer.accept(response); + return this; + }); + } + }