Merge "Bug 1569 - [DEV] Too small variable for OF-Port-Number"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17
18 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
19 import org.opendaylight.controller.cluster.datastore.Configuration;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import scala.concurrent.Await;
32 import scala.concurrent.Future;
33 import scala.concurrent.duration.Duration;
34 import scala.concurrent.duration.FiniteDuration;
35
36 import java.util.concurrent.TimeUnit;
37
38 import static akka.pattern.Patterns.ask;
39
40 /**
41  * The ActorContext class contains utility methods which could be used by
42  * non-actors (like DistributedDataStore) to work with actors a little more
43  * easily. An ActorContext can be freely passed around to local object instances
44  * but should not be passed to actors especially remote actors
45  */
46 public class ActorContext {
47     private static final Logger
48         LOG = LoggerFactory.getLogger(ActorContext.class);
49
50     public static final FiniteDuration ASK_DURATION =
51         Duration.create(5, TimeUnit.SECONDS);
52     public static final Duration AWAIT_DURATION =
53         Duration.create(5, TimeUnit.SECONDS);
54
55     public static final String MAILBOX = "bounded-mailbox";
56
57     private final ActorSystem actorSystem;
58     private final ActorRef shardManager;
59     private final ClusterWrapper clusterWrapper;
60     private final Configuration configuration;
61     private volatile SchemaContext schemaContext;
62
63     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
64         ClusterWrapper clusterWrapper,
65         Configuration configuration) {
66         this.actorSystem = actorSystem;
67         this.shardManager = shardManager;
68         this.clusterWrapper = clusterWrapper;
69         this.configuration = configuration;
70     }
71
72     public ActorSystem getActorSystem() {
73         return actorSystem;
74     }
75
76     public ActorRef getShardManager() {
77         return shardManager;
78     }
79
80     public ActorSelection actorSelection(String actorPath) {
81         return actorSystem.actorSelection(actorPath);
82     }
83
84     public ActorSelection actorSelection(ActorPath actorPath) {
85         return actorSystem.actorSelection(actorPath);
86     }
87
88     public void setSchemaContext(SchemaContext schemaContext) {
89         this.schemaContext = schemaContext;
90
91         if(shardManager != null) {
92             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
93         }
94     }
95
96     public SchemaContext getSchemaContext() {
97         return schemaContext;
98     }
99
100     /**
101      * Finds the primary for a given shard
102      *
103      * @param shardName
104      * @return
105      */
106     public ActorSelection findPrimary(String shardName) {
107         String path = findPrimaryPath(shardName);
108         return actorSystem.actorSelection(path);
109     }
110
111     /**
112      * Finds a local shard given it's shard name and return it's ActorRef
113      *
114      * @param shardName the name of the local shard that needs to be found
115      * @return a reference to a local shard actor which represents the shard
116      *         specified by the shardName
117      */
118     public ActorRef findLocalShard(String shardName) {
119         Object result = executeLocalOperation(shardManager,
120             new FindLocalShard(shardName), ASK_DURATION);
121
122         if (result instanceof LocalShardFound) {
123             LocalShardFound found = (LocalShardFound) result;
124
125             LOG.debug("Local shard found {}", found.getPath());
126
127             return found.getPath();
128         }
129
130         return null;
131     }
132
133
134     public String findPrimaryPath(String shardName) {
135         Object result = executeLocalOperation(shardManager,
136             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
137
138         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
139             PrimaryFound found = PrimaryFound.fromSerializable(result);
140
141             LOG.debug("Primary found {}", found.getPrimaryPath());
142
143             return found.getPrimaryPath();
144         }
145         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
146     }
147
148
149     /**
150      * Executes an operation on a local actor and wait for it's response
151      *
152      * @param actor
153      * @param message
154      * @param duration
155      * @return The response of the operation
156      */
157     public Object executeLocalOperation(ActorRef actor, Object message,
158         FiniteDuration duration) {
159         Future<Object> future =
160             ask(actor, message, new Timeout(duration));
161
162         try {
163             return Await.result(future, AWAIT_DURATION);
164         } catch (Exception e) {
165             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
166         }
167     }
168
169     /**
170      * Execute an operation on a remote actor and wait for it's response
171      *
172      * @param actor
173      * @param message
174      * @param duration
175      * @return
176      */
177     public Object executeRemoteOperation(ActorSelection actor, Object message,
178         FiniteDuration duration) {
179
180         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
181
182         Future<Object> future =
183             ask(actor, message, new Timeout(duration));
184
185         try {
186             return Await.result(future, AWAIT_DURATION);
187         } catch (Exception e) {
188             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
189         }
190     }
191
192     /**
193      * Execute an operation on a remote actor asynchronously.
194      *
195      * @param actor the ActorSelection
196      * @param message the message to send
197      * @param duration the maximum amount of time to send he message
198      * @return a Future containing the eventual result
199      */
200     public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
201             FiniteDuration duration) {
202
203         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
204
205         return ask(actor, message, new Timeout(duration));
206     }
207
208     /**
209      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
210      * reply (essentially set and forget).
211      *
212      * @param actor the ActorSelection
213      * @param message the message to send
214      */
215     public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
216         actor.tell(message, ActorRef.noSender());
217     }
218
219     /**
220      * Execute an operation on the primary for a given shard
221      * <p>
222      * This method first finds the primary for a given shard ,then sends
223      * the message to the remote shard and waits for a response
224      * </p>
225      *
226      * @param shardName
227      * @param message
228      * @param duration
229      * @return
230      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
231      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
232      */
233     public Object executeShardOperation(String shardName, Object message,
234         FiniteDuration duration) {
235         ActorSelection primary = findPrimary(shardName);
236
237         return executeRemoteOperation(primary, message, duration);
238     }
239
240     /**
241      * Execute an operation on the the local shard only
242      * <p>
243      *     This method first finds the address of the local shard if any. It then
244      *     executes the operation on it.
245      * </p>
246      *
247      * @param shardName the name of the shard on which the operation needs to be executed
248      * @param message the message that needs to be sent to the shard
249      * @param duration the time duration in which this operation should complete
250      * @return the message that was returned by the local actor on which the
251      *         the operation was executed. If a local shard was not found then
252      *         null is returned
253      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
254      *         if the operation does not complete in a specified time duration
255      */
256     public Object executeLocalShardOperation(String shardName, Object message,
257         FiniteDuration duration) {
258         ActorRef local = findLocalShard(shardName);
259
260         if(local != null) {
261             return executeLocalOperation(local, message, duration);
262         }
263
264         return null;
265     }
266
267
268     public void shutdown() {
269         shardManager.tell(PoisonPill.getInstance(), null);
270         actorSystem.shutdown();
271     }
272
273     /**
274      * @deprecated Need to stop using this method. There are ways to send a
275      * remote ActorRef as a string which should be used instead of this hack
276      *
277      * @param primaryPath
278      * @param localPathOfRemoteActor
279      * @return
280      */
281     @Deprecated
282     public String resolvePath(final String primaryPath,
283         final String localPathOfRemoteActor) {
284         StringBuilder builder = new StringBuilder();
285         String[] primaryPathElements = primaryPath.split("/");
286         builder.append(primaryPathElements[0]).append("//")
287             .append(primaryPathElements[1]).append(primaryPathElements[2]);
288         String[] remotePathElements = localPathOfRemoteActor.split("/");
289         for (int i = 3; i < remotePathElements.length; i++) {
290             builder.append("/").append(remotePathElements[i]);
291         }
292
293         return builder.toString();
294
295     }
296
297     public ActorPath actorFor(String path){
298         return actorSystem.actorFor(path).path();
299     }
300
301     public String getCurrentMemberName(){
302         return clusterWrapper.getCurrentMemberName();
303     }
304
305 }