Merge "BUG 1735 Registering a data change listener should be asynchronous"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
19 import org.opendaylight.controller.cluster.datastore.Configuration;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Await;
31 import scala.concurrent.Future;
32 import scala.concurrent.duration.Duration;
33 import scala.concurrent.duration.FiniteDuration;
34
35 import java.util.concurrent.TimeUnit;
36
37 import static akka.pattern.Patterns.ask;
38
39 /**
40  * The ActorContext class contains utility methods which could be used by
41  * non-actors (like DistributedDataStore) to work with actors a little more
42  * easily. An ActorContext can be freely passed around to local object instances
43  * but should not be passed to actors especially remote actors
44  */
45 public class ActorContext {
46     private static final Logger
47         LOG = LoggerFactory.getLogger(ActorContext.class);
48
49     private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
50
51     public static final String MAILBOX = "bounded-mailbox";
52
53     private final ActorSystem actorSystem;
54     private final ActorRef shardManager;
55     private final ClusterWrapper clusterWrapper;
56     private final Configuration configuration;
57     private volatile SchemaContext schemaContext;
58     private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
59     private Timeout operationTimeout = new Timeout(operationDuration);
60
61     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
62         ClusterWrapper clusterWrapper,
63         Configuration configuration) {
64         this.actorSystem = actorSystem;
65         this.shardManager = shardManager;
66         this.clusterWrapper = clusterWrapper;
67         this.configuration = configuration;
68     }
69
70     public ActorSystem getActorSystem() {
71         return actorSystem;
72     }
73
74     public ActorRef getShardManager() {
75         return shardManager;
76     }
77
78     public ActorSelection actorSelection(String actorPath) {
79         return actorSystem.actorSelection(actorPath);
80     }
81
82     public ActorSelection actorSelection(ActorPath actorPath) {
83         return actorSystem.actorSelection(actorPath);
84     }
85
86     public void setSchemaContext(SchemaContext schemaContext) {
87         this.schemaContext = schemaContext;
88
89         if(shardManager != null) {
90             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
91         }
92     }
93
94     public void setOperationTimeout(int timeoutInSeconds) {
95         operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
96         operationTimeout = new Timeout(operationDuration);
97     }
98
99     public SchemaContext getSchemaContext() {
100         return schemaContext;
101     }
102
103     /**
104      * Finds the primary for a given shard
105      *
106      * @param shardName
107      * @return
108      */
109     public ActorSelection findPrimary(String shardName) {
110         String path = findPrimaryPath(shardName);
111         return actorSystem.actorSelection(path);
112     }
113
114     /**
115      * Finds a local shard given it's shard name and return it's ActorRef
116      *
117      * @param shardName the name of the local shard that needs to be found
118      * @return a reference to a local shard actor which represents the shard
119      *         specified by the shardName
120      */
121     public ActorRef findLocalShard(String shardName) {
122         Object result = executeLocalOperation(shardManager,
123             new FindLocalShard(shardName));
124
125         if (result instanceof LocalShardFound) {
126             LocalShardFound found = (LocalShardFound) result;
127
128             LOG.debug("Local shard found {}", found.getPath());
129
130             return found.getPath();
131         }
132
133         return null;
134     }
135
136
137     public String findPrimaryPath(String shardName) {
138         Object result = executeLocalOperation(shardManager,
139             new FindPrimary(shardName).toSerializable());
140
141         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
142             PrimaryFound found = PrimaryFound.fromSerializable(result);
143
144             LOG.debug("Primary found {}", found.getPrimaryPath());
145
146             return found.getPrimaryPath();
147         }
148         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
149     }
150
151
152     /**
153      * Executes an operation on a local actor and wait for it's response
154      *
155      * @param actor
156      * @param message
157      * @return The response of the operation
158      */
159     public Object executeLocalOperation(ActorRef actor, Object message) {
160         Future<Object> future = ask(actor, message, operationTimeout);
161
162         try {
163             return Await.result(future, operationDuration);
164         } catch (Exception e) {
165             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
166         }
167     }
168
169     /**
170      * Execute an operation on a remote actor and wait for it's response
171      *
172      * @param actor
173      * @param message
174      * @return
175      */
176     public Object executeRemoteOperation(ActorSelection actor, Object message) {
177
178         LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
179             actor.toString());
180
181         Future<Object> future = ask(actor, message, operationTimeout);
182
183         try {
184             return Await.result(future, operationDuration);
185         } catch (Exception e) {
186             throw new TimeoutException("Sending message " + message.getClass().toString() +
187                     " to actor " + actor.toString() + " failed" , e);
188         }
189     }
190
191     /**
192      * Execute an operation on a remote actor asynchronously.
193      *
194      * @param actor the ActorSelection
195      * @param message the message to send
196      * @return a Future containing the eventual result
197      */
198     public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
199
200         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
201
202         return ask(actor, message, operationTimeout);
203     }
204
205     /**
206      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
207      * reply (essentially set and forget).
208      *
209      * @param actor the ActorSelection
210      * @param message the message to send
211      */
212     public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
213         actor.tell(message, ActorRef.noSender());
214     }
215
216     public void sendShardOperationAsync(String shardName, Object message) {
217         ActorSelection primary = findPrimary(shardName);
218
219         primary.tell(message, ActorRef.noSender());
220     }
221
222
223     /**
224      * Execute an operation on the primary for a given shard
225      * <p>
226      * This method first finds the primary for a given shard ,then sends
227      * the message to the remote shard and waits for a response
228      * </p>
229      *
230      * @param shardName
231      * @param message
232      * @return
233      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
234      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
235      */
236     public Object executeShardOperation(String shardName, Object message) {
237         ActorSelection primary = findPrimary(shardName);
238
239         return executeRemoteOperation(primary, message);
240     }
241
242     /**
243      * Execute an operation on the the local shard only
244      * <p>
245      *     This method first finds the address of the local shard if any. It then
246      *     executes the operation on it.
247      * </p>
248      *
249      * @param shardName the name of the shard on which the operation needs to be executed
250      * @param message the message that needs to be sent to the shard
251      * @return the message that was returned by the local actor on which the
252      *         the operation was executed. If a local shard was not found then
253      *         null is returned
254      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
255      *         if the operation does not complete in a specified time duration
256      */
257     public Object executeLocalShardOperation(String shardName, Object message) {
258         ActorRef local = findLocalShard(shardName);
259
260         if(local != null) {
261             return executeLocalOperation(local, message);
262         }
263
264         return null;
265     }
266
267
268     /**
269      * Execute an operation on the the local shard only asynchronously
270      *
271      * <p>
272      *     This method first finds the address of the local shard if any. It then
273      *     executes the operation on it.
274      * </p>
275      *
276      * @param shardName the name of the shard on which the operation needs to be executed
277      * @param message the message that needs to be sent to the shard
278      * @param timeout the amount of time that this method should wait for a response before timing out
279      * @return null if the shard could not be located else a future on which the caller can wait
280      *
281      */
282     public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) {
283         ActorRef local = findLocalShard(shardName);
284         if(local == null){
285             return null;
286         }
287         return Patterns.ask(local, message, timeout);
288     }
289
290
291
292     public void shutdown() {
293         shardManager.tell(PoisonPill.getInstance(), null);
294         actorSystem.shutdown();
295     }
296
297     /**
298      * @deprecated Need to stop using this method. There are ways to send a
299      * remote ActorRef as a string which should be used instead of this hack
300      *
301      * @param primaryPath
302      * @param localPathOfRemoteActor
303      * @return
304      */
305     @Deprecated
306     public String resolvePath(final String primaryPath,
307         final String localPathOfRemoteActor) {
308         StringBuilder builder = new StringBuilder();
309         String[] primaryPathElements = primaryPath.split("/");
310         builder.append(primaryPathElements[0]).append("//")
311             .append(primaryPathElements[1]).append(primaryPathElements[2]);
312         String[] remotePathElements = localPathOfRemoteActor.split("/");
313         for (int i = 3; i < remotePathElements.length; i++) {
314             builder.append("/").append(remotePathElements[i]);
315         }
316
317         return builder.toString();
318
319     }
320
321     public ActorPath actorFor(String path){
322         return actorSystem.actorFor(path).path();
323     }
324
325     public String getCurrentMemberName(){
326         return clusterWrapper.getCurrentMemberName();
327     }
328
329     /**
330      * Send the message to each and every shard
331      *
332      * @param message
333      */
334     public void broadcast(Object message){
335         for(String shardName : configuration.getAllShardNames()){
336             try {
337                 sendShardOperationAsync(shardName, message);
338             } catch(Exception e){
339                 LOG.warn("broadcast failed to send message " +  message.getClass().getSimpleName() + " to shard " + shardName, e);
340             }
341         }
342     }
343
344     public FiniteDuration getOperationDuration() {
345         return operationDuration;
346     }
347 }