X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fcds-access-client%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Faccess%2Fclient%2FClientActorContext.java;h=7e392af37570b4c735339276fb8586ff78d878c9;hp=26e68356d3f4404e7050bcbcc905542dc6eccb18;hb=d7c9a8ccfcb57f005490a226803d094289997ef9;hpb=314d5b41ea6b464db939da95a33c872f594ccada diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java index 26e68356d3..7e392af375 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorContext.java @@ -8,23 +8,20 @@ package org.opendaylight.controller.cluster.access.client; import akka.actor.ActorRef; +import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.actor.Scheduler; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.concurrent.ThreadSafe; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; -import org.opendaylight.controller.cluster.access.concepts.RequestException; -import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.common.actor.Dispatchers; +import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory; +import org.opendaylight.controller.cluster.messaging.MessageSlicer; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.concepts.WritableIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext; import scala.concurrent.duration.FiniteDuration; @@ -41,20 +38,27 @@ import scala.concurrent.duration.FiniteDuration; @Beta @ThreadSafe public class ClientActorContext extends AbstractClientActorContext implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class); - - private final Map queues = new ConcurrentHashMap<>(); - private final ClientIdentifier identifier; private final ExecutionContext executionContext; + private final ClientIdentifier identifier; private final Scheduler scheduler; + private final Dispatchers dispatchers; + private final ClientActorConfig config; + private final MessageSlicer messageSlicer; // Hidden to avoid subclassing - ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext, - final String persistenceId, final ClientIdentifier identifier) { + ClientActorContext(final ActorRef self, final String persistenceId, final ActorSystem system, + final ClientIdentifier identifier, final ClientActorConfig config) { super(self, persistenceId); this.identifier = Preconditions.checkNotNull(identifier); - this.scheduler = Preconditions.checkNotNull(scheduler); - this.executionContext = Preconditions.checkNotNull(executionContext); + this.scheduler = Preconditions.checkNotNull(system).scheduler(); + this.executionContext = system.dispatcher(); + this.dispatchers = new Dispatchers(system.dispatchers()); + this.config = Preconditions.checkNotNull(config); + + messageSlicer = MessageSlicer.builder().messageSliceSize(config.getMaximumMessageSliceSize()) + .logContext(persistenceId).expireStateAfterInactivity(config.getRequestTimeout(), TimeUnit.NANOSECONDS) + .fileBackedStreamFactory(new FileBackedOutputStreamFactory(config.getFileBackedStreamingThreshold(), + config.getTempFileDirectory())).build(); } @Override @@ -63,6 +67,21 @@ public class ClientActorContext extends AbstractClientActorContext implements Id return identifier; } + @Nonnull + public ClientActorConfig config() { + return config; + } + + @Nonnull + public Dispatchers dispatchers() { + return dispatchers; + } + + @Nonnull + public MessageSlicer messageSlicer() { + return messageSlicer; + } + /** * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use @@ -79,45 +98,15 @@ public class ClientActorContext extends AbstractClientActorContext implements Id * Execute a command in the context of the client actor. * * @param command Block of code which needs to be execute + * @param BackendInfo type */ - public void executeInActor(@Nonnull final InternalCommand command) { + public void executeInActor(@Nonnull final InternalCommand command) { self().tell(Preconditions.checkNotNull(command), ActorRef.noSender()); } - public Cancellable executeInActor(@Nonnull final InternalCommand command, final FiniteDuration delay) { + public Cancellable executeInActor(@Nonnull final InternalCommand command, + final FiniteDuration delay) { return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command), executionContext, ActorRef.noSender()); } - - SequencedQueue queueFor(final Long cookie) { - return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker())); - } - - void removeQueue(final SequencedQueue queue) { - queues.remove(queue.getCookie(), queue); - } - - ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope response) { - final WritableIdentifier id = response.getMessage().getTarget(); - - // FIXME: this will need to be updated for other Request/Response types to extract cookie - Preconditions.checkArgument(id instanceof TransactionIdentifier); - final TransactionIdentifier txId = (TransactionIdentifier) id; - - final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie()); - if (queue == null) { - LOG.info("{}: Ignoring unknown response {}", persistenceId(), response); - return current; - } else { - return queue.complete(current, response); - } - } - - void poison(final RequestException cause) { - for (SequencedQueue q : queues.values()) { - q.poison(cause); - } - - queues.clear(); - } }