X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Factors%2Fclient%2FClientActorContext.java;h=9f4fd137a4ca5dc88834935714d1f0285b658235;hp=3aa1a5200b077f91fda048ba2704648de98c8f88;hb=50a2f5eb1c94650bc1be1e49d3a5382a1a74a9b3;hpb=c6e3a444f7e8702aade9839ca950bc9790be8831 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java index 3aa1a5200b..9f4fd137a4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/actors/client/ClientActorContext.java @@ -8,12 +8,25 @@ package org.opendaylight.controller.cluster.datastore.actors.client; import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Scheduler; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.concepts.WritableIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; +import scala.concurrent.duration.FiniteDuration; /** * An actor context associated with this {@link AbstractClientActor}. @@ -25,13 +38,22 @@ import org.opendaylight.yangtools.concepts.Identifiable; * @author Robert Varga */ @Beta +@ThreadSafe public class ClientActorContext extends AbstractClientActorContext implements Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class); + + private final Map queues = new ConcurrentHashMap<>(); private final ClientIdentifier identifier; + private final ExecutionContext executionContext; + private final Scheduler scheduler; // Hidden to avoid subclassing - ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) { + ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext, + final String persistenceId, final ClientIdentifier identifier) { super(self, persistenceId); this.identifier = Preconditions.checkNotNull(identifier); + this.scheduler = Preconditions.checkNotNull(scheduler); + this.executionContext = Preconditions.checkNotNull(executionContext); } @Override @@ -58,4 +80,42 @@ public class ClientActorContext extends AbstractClientActorContext implements Id public void executeInActor(final @Nonnull InternalCommand command) { self().tell(Preconditions.checkNotNull(command), ActorRef.noSender()); } + + public Cancellable executeInActor(final @Nonnull InternalCommand command, final FiniteDuration delay) { + return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command), + executionContext, ActorRef.noSender()); + } + + SequencedQueue queueFor(final Long cookie) { + return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker())); + } + + void removeQueue(final SequencedQueue queue) { + queues.remove(queue.getCookie(), queue); + } + + ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response response) { + final WritableIdentifier id = response.getTarget(); + + // FIXME: this will need to be updated for other Request/Response types to extract cookie + Preconditions.checkArgument(id instanceof TransactionIdentifier); + final TransactionIdentifier txId = (TransactionIdentifier) id; + + final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie()); + if (queue == null) { + LOG.info("{}: Ignoring unknown response {}", persistenceId(), response); + return current; + } else { + return queue.complete(current, response); + } + } + + void poison(final RequestException cause) { + for (SequencedQueue q : queues.values()) { + q.poison(cause); + } + + queues.clear(); + } + }