package org.opendaylight.controller.cluster.datastore.actors.client;
import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Scheduler;
import com.google.common.annotations.Beta;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.Response;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.WritableIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.FiniteDuration;
/**
* An actor context associated with this {@link AbstractClientActor}.
* @author Robert Varga
*/
@Beta
+@ThreadSafe
public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
+ private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
+
+ private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
private final ClientIdentifier identifier;
+ private final ExecutionContext executionContext;
+ private final Scheduler scheduler;
// Hidden to avoid subclassing
- ClientActorContext(final ActorRef self, final String persistenceId, final ClientIdentifier identifier) {
+ ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
+ final String persistenceId, final ClientIdentifier identifier) {
super(self, persistenceId);
this.identifier = Preconditions.checkNotNull(identifier);
+ this.scheduler = Preconditions.checkNotNull(scheduler);
+ this.executionContext = Preconditions.checkNotNull(executionContext);
}
@Override
public @Nonnull Ticker ticker() {
return Ticker.systemTicker();
}
+
+ /**
+ * Execute a command in the context of the client actor.
+ *
+ * @param command Block of code which needs to be execute
+ */
+ public void executeInActor(final @Nonnull InternalCommand command) {
+ self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
+ }
+
+ public Cancellable executeInActor(final @Nonnull InternalCommand command, final FiniteDuration delay) {
+ return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
+ executionContext, ActorRef.noSender());
+ }
+
+ SequencedQueue queueFor(final Long cookie) {
+ return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
+ }
+
+ void removeQueue(final SequencedQueue queue) {
+ queues.remove(queue.getCookie(), queue);
+ }
+
+ ClientActorBehavior completeRequest(final ClientActorBehavior current, final Response<?, ?> response) {
+ final WritableIdentifier id = response.getTarget();
+
+ // FIXME: this will need to be updated for other Request/Response types to extract cookie
+ Preconditions.checkArgument(id instanceof TransactionIdentifier);
+ final TransactionIdentifier txId = (TransactionIdentifier) id;
+
+ final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
+ if (queue == null) {
+ LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
+ return current;
+ } else {
+ return queue.complete(current, response);
+ }
+ }
+
+ void poison(final RequestException cause) {
+ for (SequencedQueue q : queues.values()) {
+ q.poison(cause);
+ }
+
+ queues.clear();
+ }
+
}