X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FDistributedDataStoreClientBehavior.java;h=792b5b31c1cadef2ce9c3c6575e9b9963cf36686;hb=db3d7caeeb310f76a9a159f9a8d7e9beff89f645;hp=b84008ca39e0ecb9bda50fe2fb895661431fea92;hpb=9b4f21460c6dcb10c381df631d064d05de16546c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index b84008ca39..792b5b31c1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -7,178 +7,37 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import akka.actor.ActorRef; -import akka.actor.Status; -import com.google.common.base.Throwables; -import com.google.common.base.Verify; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import org.opendaylight.controller.cluster.access.client.ClientActorBehavior; +import java.util.function.Function; import org.opendaylight.controller.cluster.access.client.ClientActorContext; -import org.opendaylight.controller.cluster.access.commands.TransactionRequest; -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; -import org.opendaylight.controller.cluster.access.concepts.Response; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; /** - * {@link ClientActorBehavior} acting as an intermediary between the backend actors and the DistributedDataStore - * frontend. - * - * This class is not visible outside of this package because it breaks the actor containment. Services provided to - * Java world outside of actor containment are captured in {@link DistributedDataStoreClient}. - * - * IMPORTANT: this class breaks actor containment via methods implementing {@link DistributedDataStoreClient} contract. - * When touching internal state, be mindful of the execution context from which execution context, Actor - * or POJO, is the state being accessed or modified. - * - * THREAD SAFETY: this class must always be kept thread-safe, so that both the Actor System thread and the application - * threads can run concurrently. All state transitions must be made in a thread-safe manner. When in - * doubt, feel free to synchronize on this object. - * - * PERFORMANCE: this class lies in a performance-critical fast path. All code needs to be concise and efficient, but - * performance must not come at the price of correctness. Any optimizations need to be carefully analyzed - * for correctness and performance impact. - * - * TRADE-OFFS: part of the functionality runs in application threads without switching contexts, which makes it ideal - * for performing work and charging applications for it. That has two positive effects: - * - CPU usage is distributed across applications, minimizing work done in the actor thread - * - CPU usage provides back-pressure towards the application. + * {@link AbstractDataStoreClientBehavior} which performs module-based sharding. * * @author Robert Varga */ -final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient { - private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class); - - private final Map transactions = new ConcurrentHashMap<>(); - private final Map histories = new ConcurrentHashMap<>(); - private final AtomicLong nextHistoryId = new AtomicLong(1); - private final AtomicLong nextTransactionId = new AtomicLong(); - private final ModuleShardBackendResolver resolver; - private final SingleClientHistory singleHistory; - - private volatile Throwable aborted; - - DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) { - super(context); - resolver = new ModuleShardBackendResolver(actorContext); - singleHistory = new SingleClientHistory(this, new LocalHistoryIdentifier(getIdentifier(), 0)); - } +final class DistributedDataStoreClientBehavior extends AbstractDataStoreClientBehavior { + private final Function pathToShard; - // - // - // Methods below are invoked from the client actor thread - // - // - - @Override - protected void haltClient(final Throwable cause) { - // If we have encountered a previous problem there is not cleanup necessary, as we have already cleaned up - // Thread safely is not an issue, as both this method and any failures are executed from the same (client actor) - // thread. - if (aborted != null) { - abortOperations(cause); - } - } - - private void abortOperations(final Throwable cause) { - // This acts as a barrier, application threads check this after they have added an entry in the maps, - // and if they observe aborted being non-null, they will perform their cleanup and not return the handle. - aborted = cause; - - for (ClientLocalHistory h : histories.values()) { - h.localAbort(cause); - } - histories.clear(); - - for (ClientTransaction t : transactions.values()) { - t.localAbort(cause); - } - transactions.clear(); - } - - private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) { - abortOperations(new IllegalStateException("Client " + getIdentifier() + " has been shut down")); - return null; + private DistributedDataStoreClientBehavior(final ClientActorContext context, + final ModuleShardBackendResolver resolver) { + super(context, resolver); + pathToShard = resolver::resolveShardForPath; } - @Override - protected DistributedDataStoreClientBehavior onCommand(final Object command) { - if (command instanceof GetClientRequest) { - ((GetClientRequest) command).getReplyTo().tell(new Status.Success(this), ActorRef.noSender()); - } else { - LOG.warn("{}: ignoring unhandled command {}", persistenceId(), command); - } - - return this; - } - - // - // - // Methods below are invoked from application threads - // - // - - private static V returnIfOperational(final Map map, final K key, final V value, - final Throwable aborted) { - Verify.verify(map.put(key, value) == null); - - if (aborted != null) { - try { - value.localAbort(aborted); - } catch (Exception e) { - LOG.debug("Close of {} failed", value, e); - } - map.remove(key, value); - throw Throwables.propagate(aborted); - } - - return value; + DistributedDataStoreClientBehavior(final ClientActorContext context, final ActorContext actorContext) { + this(context, new ModuleShardBackendResolver(context.getIdentifier(), actorContext)); } @Override - public ClientLocalHistory createLocalHistory() { - final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(getIdentifier(), - nextHistoryId.getAndIncrement()); - final ClientLocalHistory history = new ClientLocalHistory(this, historyId); - LOG.debug("{}: creating a new local history {}", persistenceId(), history); - - return returnIfOperational(histories, historyId, history, aborted); - } - - @Override - public ClientTransaction createTransaction() { - final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(), - nextTransactionId.getAndIncrement()); - final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId); - LOG.debug("{}: creating a new transaction {}", persistenceId(), tx); - - return returnIfOperational(transactions, txId, tx, aborted); + Long resolveShardForPath(final YangInstanceIdentifier path) { + return pathToShard.apply(path); } @Override public void close() { - context().executeInActor(this::shutdown); + super.close(); + resolver().close(); } - - @Override - protected ModuleShardBackendResolver resolver() { - return resolver; - } - - void transactionComplete(final ClientTransaction transaction) { - transactions.remove(transaction.getIdentifier()); - } - - void sendRequest(final TransactionRequest request, final Consumer> completer) { - sendRequest(request, response -> { - completer.accept(response); - return this; - }); - } - }