Move MessageTrackerTest
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / actors / client / ClientActorContext.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.actors.client;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Scheduler;
13 import com.google.common.annotations.Beta;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Ticker;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import javax.annotation.Nonnull;
19 import javax.annotation.concurrent.ThreadSafe;
20 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
21 import org.opendaylight.controller.cluster.access.concepts.RequestException;
22 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
23 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
24 import org.opendaylight.yangtools.concepts.Identifiable;
25 import org.opendaylight.yangtools.concepts.WritableIdentifier;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.ExecutionContext;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /**
32  * An actor context associated with this {@link AbstractClientActor}.
33  *
34  * Time-keeping in a client actor is based on monotonic time. The precision of this time can be expected to be the
35  * same as {@link System#nanoTime()}, but it is not tied to that particular clock. Actor clock is exposed as
36  * a {@link Ticker}, which can be obtained via {@link #ticker()}.
37  *
38  * @author Robert Varga
39  */
40 @Beta
41 @ThreadSafe
42 public class ClientActorContext extends AbstractClientActorContext implements Identifiable<ClientIdentifier> {
43     private static final Logger LOG = LoggerFactory.getLogger(ClientActorContext.class);
44
45     private final Map<Long, SequencedQueue> queues = new ConcurrentHashMap<>();
46     private final ClientIdentifier identifier;
47     private final ExecutionContext executionContext;
48     private final Scheduler scheduler;
49
50     // Hidden to avoid subclassing
51     ClientActorContext(final ActorRef self, final Scheduler scheduler, final ExecutionContext executionContext,
52             final String persistenceId, final ClientIdentifier identifier) {
53         super(self, persistenceId);
54         this.identifier = Preconditions.checkNotNull(identifier);
55         this.scheduler = Preconditions.checkNotNull(scheduler);
56         this.executionContext = Preconditions.checkNotNull(executionContext);
57     }
58
59     @Override
60     public @Nonnull ClientIdentifier getIdentifier() {
61         return identifier;
62     }
63
64     /**
65      * Return the time ticker for this {@link ClientActorContext}. This should be used for in all time-tracking
66      * done within a client actor. Subclasses of {@link ClientActorBehavior} are encouraged to use
67      * {@link com.google.common.base.Stopwatch}.
68      *
69      * @return Client actor time source
70      */
71     public @Nonnull Ticker ticker() {
72         return Ticker.systemTicker();
73     }
74
75     /**
76      * Execute a command in the context of the client actor.
77      *
78      * @param command Block of code which needs to be execute
79      */
80     public void executeInActor(final @Nonnull InternalCommand command) {
81         self().tell(Preconditions.checkNotNull(command), ActorRef.noSender());
82     }
83
84     public Cancellable executeInActor(final @Nonnull InternalCommand command, final FiniteDuration delay) {
85         return scheduler.scheduleOnce(Preconditions.checkNotNull(delay), self(), Preconditions.checkNotNull(command),
86             executionContext, ActorRef.noSender());
87     }
88
89     SequencedQueue queueFor(final Long cookie) {
90         return queues.computeIfAbsent(cookie, t -> new SequencedQueue(t, ticker()));
91     }
92
93     void removeQueue(final SequencedQueue queue) {
94         queues.remove(queue.getCookie(), queue);
95     }
96
97     ClientActorBehavior completeRequest(final ClientActorBehavior current, final ResponseEnvelope<?> response) {
98         final WritableIdentifier id = response.getMessage().getTarget();
99
100         // FIXME: this will need to be updated for other Request/Response types to extract cookie
101         Preconditions.checkArgument(id instanceof TransactionIdentifier);
102         final TransactionIdentifier txId = (TransactionIdentifier) id;
103
104         final SequencedQueue queue = queues.get(txId.getHistoryId().getCookie());
105         if (queue == null) {
106             LOG.info("{}: Ignoring unknown response {}", persistenceId(), response);
107             return current;
108         } else {
109             return queue.complete(current, response);
110         }
111     }
112
113     void poison(final RequestException cause) {
114         for (SequencedQueue q : queues.values()) {
115             q.poison(cause);
116         }
117
118         queues.clear();
119     }
120 }