Merge "Fixed possible NPE in flow reconciliation"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17
18 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
19 import org.opendaylight.controller.cluster.datastore.Configuration;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import scala.concurrent.Await;
30 import scala.concurrent.Future;
31 import scala.concurrent.duration.Duration;
32 import scala.concurrent.duration.FiniteDuration;
33
34 import java.util.concurrent.TimeUnit;
35
36 import static akka.pattern.Patterns.ask;
37
38 /**
39  * The ActorContext class contains utility methods which could be used by
40  * non-actors (like DistributedDataStore) to work with actors a little more
41  * easily. An ActorContext can be freely passed around to local object instances
42  * but should not be passed to actors especially remote actors
43  */
44 public class ActorContext {
45     private static final Logger
46         LOG = LoggerFactory.getLogger(ActorContext.class);
47
48     public static final FiniteDuration ASK_DURATION =
49         Duration.create(5, TimeUnit.SECONDS);
50     public static final Duration AWAIT_DURATION =
51         Duration.create(5, TimeUnit.SECONDS);
52
53     private final ActorSystem actorSystem;
54     private final ActorRef shardManager;
55     private final ClusterWrapper clusterWrapper;
56     private final Configuration configuration;
57
58     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
59         ClusterWrapper clusterWrapper,
60         Configuration configuration) {
61         this.actorSystem = actorSystem;
62         this.shardManager = shardManager;
63         this.clusterWrapper = clusterWrapper;
64         this.configuration = configuration;
65     }
66
67     public ActorSystem getActorSystem() {
68         return actorSystem;
69     }
70
71     public ActorRef getShardManager() {
72         return shardManager;
73     }
74
75     public ActorSelection actorSelection(String actorPath) {
76         return actorSystem.actorSelection(actorPath);
77     }
78
79     public ActorSelection actorSelection(ActorPath actorPath) {
80         return actorSystem.actorSelection(actorPath);
81     }
82
83
84     /**
85      * Finds the primary for a given shard
86      *
87      * @param shardName
88      * @return
89      */
90     public ActorSelection findPrimary(String shardName) {
91         String path = findPrimaryPath(shardName);
92         return actorSystem.actorSelection(path);
93     }
94
95     /**
96      * Finds a local shard given it's shard name and return it's ActorRef
97      *
98      * @param shardName the name of the local shard that needs to be found
99      * @return a reference to a local shard actor which represents the shard
100      *         specified by the shardName
101      */
102     public ActorRef findLocalShard(String shardName) {
103         Object result = executeLocalOperation(shardManager,
104             new FindLocalShard(shardName), ASK_DURATION);
105
106         if (result instanceof LocalShardFound) {
107             LocalShardFound found = (LocalShardFound) result;
108
109             LOG.debug("Local shard found {}", found.getPath());
110
111             return found.getPath();
112         }
113
114         return null;
115     }
116
117
118     public String findPrimaryPath(String shardName) {
119         Object result = executeLocalOperation(shardManager,
120             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
121
122         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
123             PrimaryFound found = PrimaryFound.fromSerializable(result);
124
125             LOG.debug("Primary found {}", found.getPrimaryPath());
126
127             return found.getPrimaryPath();
128         }
129         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
130     }
131
132
133     /**
134      * Executes an operation on a local actor and wait for it's response
135      *
136      * @param actor
137      * @param message
138      * @param duration
139      * @return The response of the operation
140      */
141     public Object executeLocalOperation(ActorRef actor, Object message,
142         FiniteDuration duration) {
143         Future<Object> future =
144             ask(actor, message, new Timeout(duration));
145
146         try {
147             return Await.result(future, AWAIT_DURATION);
148         } catch (Exception e) {
149             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
150         }
151     }
152
153     /**
154      * Execute an operation on a remote actor and wait for it's response
155      *
156      * @param actor
157      * @param message
158      * @param duration
159      * @return
160      */
161     public Object executeRemoteOperation(ActorSelection actor, Object message,
162         FiniteDuration duration) {
163
164         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
165
166         Future<Object> future =
167             ask(actor, message, new Timeout(duration));
168
169         try {
170             return Await.result(future, AWAIT_DURATION);
171         } catch (Exception e) {
172             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
173         }
174     }
175
176     /**
177      * Execute an operation on a remote actor asynchronously.
178      *
179      * @param actor the ActorSelection
180      * @param message the message to send
181      * @param duration the maximum amount of time to send he message
182      * @return a Future containing the eventual result
183      */
184     public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
185             FiniteDuration duration) {
186
187         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
188
189         return ask(actor, message, new Timeout(duration));
190     }
191
192     /**
193      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
194      * reply (essentially set and forget).
195      *
196      * @param actor the ActorSelection
197      * @param message the message to send
198      */
199     public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
200         actor.tell(message, ActorRef.noSender());
201     }
202
203     /**
204      * Execute an operation on the primary for a given shard
205      * <p>
206      * This method first finds the primary for a given shard ,then sends
207      * the message to the remote shard and waits for a response
208      * </p>
209      *
210      * @param shardName
211      * @param message
212      * @param duration
213      * @return
214      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
215      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
216      */
217     public Object executeShardOperation(String shardName, Object message,
218         FiniteDuration duration) {
219         ActorSelection primary = findPrimary(shardName);
220
221         return executeRemoteOperation(primary, message, duration);
222     }
223
224     /**
225      * Execute an operation on the the local shard only
226      * <p>
227      *     This method first finds the address of the local shard if any. It then
228      *     executes the operation on it.
229      * </p>
230      *
231      * @param shardName the name of the shard on which the operation needs to be executed
232      * @param message the message that needs to be sent to the shard
233      * @param duration the time duration in which this operation should complete
234      * @return the message that was returned by the local actor on which the
235      *         the operation was executed. If a local shard was not found then
236      *         null is returned
237      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
238      *         if the operation does not complete in a specified time duration
239      */
240     public Object executeLocalShardOperation(String shardName, Object message,
241         FiniteDuration duration) {
242         ActorRef local = findLocalShard(shardName);
243
244         if(local != null) {
245             return executeLocalOperation(local, message, duration);
246         }
247
248         return null;
249     }
250
251
252     public void shutdown() {
253         shardManager.tell(PoisonPill.getInstance(), null);
254         actorSystem.shutdown();
255     }
256
257     /**
258      * @deprecated Need to stop using this method. There are ways to send a
259      * remote ActorRef as a string which should be used instead of this hack
260      *
261      * @param primaryPath
262      * @param localPathOfRemoteActor
263      * @return
264      */
265     @Deprecated
266     public String resolvePath(final String primaryPath,
267         final String localPathOfRemoteActor) {
268         StringBuilder builder = new StringBuilder();
269         String[] primaryPathElements = primaryPath.split("/");
270         builder.append(primaryPathElements[0]).append("//")
271             .append(primaryPathElements[1]).append(primaryPathElements[2]);
272         String[] remotePathElements = localPathOfRemoteActor.split("/");
273         for (int i = 3; i < remotePathElements.length; i++) {
274             builder.append("/").append(remotePathElements[i]);
275         }
276
277         return builder.toString();
278
279     }
280
281     public ActorPath actorFor(String path){
282         return actorSystem.actorFor(path).path();
283     }
284
285     public String getCurrentMemberName(){
286         return clusterWrapper.getCurrentMemberName();
287     }
288
289 }