2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Scheduler;
13 import com.google.common.annotations.Beta;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Ticker;
17 import java.util.concurrent.ConcurrentHashMap;
18 import javax.annotation.Nonnull;
19 import javax.annotation.concurrent.ThreadSafe;
20 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.RequestException;
22 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
23 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
24 import org.opendaylight.yangtools.concepts.Identifiable;
25 import org.opendaylight.yangtools.concepts.WritableIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.ExecutionContext;
29 import scala.concurrent.duration.FiniteDuration;
32 * An actor context associated with this {@link AbstractClientActor}.
35 * Time-keeping in a client actor is based on monotonic time. The precision of this time can be expected to be the
36 * same as {@link System#nanoTime()}, but it is not tied to that particular clock. Actor clock is exposed as
37 * a {@link Ticker}, which can be obtained via {@link #ticker()}.
39 * @author Robert Varga
43 public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
44 private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
46 private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
47 private final ClientIdentifier identifier;
48 private final ExecutionContext executionContext;
49 private final Scheduler scheduler;
51 // Hidden to avoid subclassing
52 ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
53 final String persistenceId, final ClientIdentifier identifier) {
54 super(self, persistenceId);
55 this.identifier = Preconditions.checkNotNull(identifier);
56 this.scheduler = Preconditions.checkNotNull(scheduler);
57 this.executionContext = Preconditions.checkNotNull(executionContext);
62 public ClientIdentifier getIdentifier() {
67 * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking
68 * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use
69 * {@link com.google.common.base.Stopwatch}.
71 * @return Client actor time source
74 public Ticker ticker() {
75 return Ticker.systemTicker();
79 * Execute a command in the context of the client actor.
81 * @param command Block of code which needs to be execute
83 public void executeInActor(@Nonnull final InternalCommand command) {
84 self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
87 public Cancellable executeInActor(@Nonnull final InternalCommand command, final FiniteDuration delay) {
88 return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
89 executionContext, ActorRef.noSender());
92 SequencedQueue queueFor(final Long cookie) {
93 return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
96 void removeQueue(final SequencedQueue queue) {
97 queues.remove(queue.getCookie(), queue);
100 ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
101 final WritableIdentifier id = response.getMessage().getTarget();
103 // FIXME: this will need to be updated for other Request/Response types to extract cookie
104 Preconditions.checkArgument(id instanceof TransactionIdentifier);
105 final TransactionIdentifier txId = (TransactionIdentifier) id;
107 final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
109 LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
112 return queue.complete(current, response);
116 void poison(final RequestException cause) {
117 for (SequencedQueue q : queues.values()) {