Merge "Bug 1875 - Used variables for nexusproxy host, externalized versions"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
19 import org.opendaylight.controller.cluster.datastore.Configuration;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Await;
31 import scala.concurrent.Future;
32 import scala.concurrent.duration.Duration;
33 import scala.concurrent.duration.FiniteDuration;
34
35 import java.util.concurrent.TimeUnit;
36
37 import static akka.pattern.Patterns.ask;
38
39 /**
40  * The ActorContext class contains utility methods which could be used by
41  * non-actors (like DistributedDataStore) to work with actors a little more
42  * easily. An ActorContext can be freely passed around to local object instances
43  * but should not be passed to actors especially remote actors
44  */
45 public class ActorContext {
46     private static final Logger
47         LOG = LoggerFactory.getLogger(ActorContext.class);
48
49     private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
50
51     public static final String MAILBOX = "bounded-mailbox";
52
53     private final ActorSystem actorSystem;
54     private final ActorRef shardManager;
55     private final ClusterWrapper clusterWrapper;
56     private final Configuration configuration;
57     private volatile SchemaContext schemaContext;
58     private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
59     private Timeout operationTimeout = new Timeout(operationDuration);
60
61     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
62         ClusterWrapper clusterWrapper,
63         Configuration configuration) {
64         this.actorSystem = actorSystem;
65         this.shardManager = shardManager;
66         this.clusterWrapper = clusterWrapper;
67         this.configuration = configuration;
68     }
69
70     public ActorSystem getActorSystem() {
71         return actorSystem;
72     }
73
74     public ActorRef getShardManager() {
75         return shardManager;
76     }
77
78     public ActorSelection actorSelection(String actorPath) {
79         return actorSystem.actorSelection(actorPath);
80     }
81
82     public ActorSelection actorSelection(ActorPath actorPath) {
83         return actorSystem.actorSelection(actorPath);
84     }
85
86     public void setSchemaContext(SchemaContext schemaContext) {
87         this.schemaContext = schemaContext;
88
89         if(shardManager != null) {
90             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
91         }
92     }
93
94     public void setOperationTimeout(int timeoutInSeconds) {
95         operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
96         operationTimeout = new Timeout(operationDuration);
97     }
98
99     public SchemaContext getSchemaContext() {
100         return schemaContext;
101     }
102
103     /**
104      * Finds the primary for a given shard
105      *
106      * @param shardName
107      * @return
108      */
109     public ActorSelection findPrimary(String shardName) {
110         String path = findPrimaryPath(shardName);
111         return actorSystem.actorSelection(path);
112     }
113
114     /**
115      * Finds a local shard given it's shard name and return it's ActorRef
116      *
117      * @param shardName the name of the local shard that needs to be found
118      * @return a reference to a local shard actor which represents the shard
119      *         specified by the shardName
120      */
121     public ActorRef findLocalShard(String shardName) {
122         Object result = executeLocalOperation(shardManager,
123             new FindLocalShard(shardName));
124
125         if (result instanceof LocalShardFound) {
126             LocalShardFound found = (LocalShardFound) result;
127
128             if(LOG.isDebugEnabled()) {
129                 LOG.debug("Local shard found {}", found.getPath());
130             }
131             return found.getPath();
132         }
133
134         return null;
135     }
136
137
138     public String findPrimaryPath(String shardName) {
139         Object result = executeLocalOperation(shardManager,
140             new FindPrimary(shardName).toSerializable());
141
142         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
143             PrimaryFound found = PrimaryFound.fromSerializable(result);
144
145             if(LOG.isDebugEnabled()) {
146                 LOG.debug("Primary found {}", found.getPrimaryPath());
147             }
148             return found.getPrimaryPath();
149         }
150         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
151     }
152
153
154     /**
155      * Executes an operation on a local actor and wait for it's response
156      *
157      * @param actor
158      * @param message
159      * @return The response of the operation
160      */
161     public Object executeLocalOperation(ActorRef actor, Object message) {
162         Future<Object> future = ask(actor, message, operationTimeout);
163
164         try {
165             return Await.result(future, operationDuration);
166         } catch (Exception e) {
167             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
168         }
169     }
170
171     /**
172      * Execute an operation on a remote actor and wait for it's response
173      *
174      * @param actor
175      * @param message
176      * @return
177      */
178     public Object executeRemoteOperation(ActorSelection actor, Object message) {
179
180         if(LOG.isDebugEnabled()) {
181             LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
182                 actor.toString());
183         }
184         Future<Object> future = ask(actor, message, operationTimeout);
185
186         try {
187             return Await.result(future, operationDuration);
188         } catch (Exception e) {
189             throw new TimeoutException("Sending message " + message.getClass().toString() +
190                     " to actor " + actor.toString() + " failed" , e);
191         }
192     }
193
194     /**
195      * Execute an operation on a remote actor asynchronously.
196      *
197      * @param actor the ActorSelection
198      * @param message the message to send
199      * @return a Future containing the eventual result
200      */
201     public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
202
203         if(LOG.isDebugEnabled()) {
204             LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
205         }
206         return ask(actor, message, operationTimeout);
207     }
208
209     /**
210      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
211      * reply (essentially set and forget).
212      *
213      * @param actor the ActorSelection
214      * @param message the message to send
215      */
216     public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
217         actor.tell(message, ActorRef.noSender());
218     }
219
220     public void sendShardOperationAsync(String shardName, Object message) {
221         ActorSelection primary = findPrimary(shardName);
222
223         primary.tell(message, ActorRef.noSender());
224     }
225
226
227     /**
228      * Execute an operation on the primary for a given shard
229      * <p>
230      * This method first finds the primary for a given shard ,then sends
231      * the message to the remote shard and waits for a response
232      * </p>
233      *
234      * @param shardName
235      * @param message
236      * @return
237      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
238      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
239      */
240     public Object executeShardOperation(String shardName, Object message) {
241         ActorSelection primary = findPrimary(shardName);
242
243         return executeRemoteOperation(primary, message);
244     }
245
246     /**
247      * Execute an operation on the the local shard only
248      * <p>
249      *     This method first finds the address of the local shard if any. It then
250      *     executes the operation on it.
251      * </p>
252      *
253      * @param shardName the name of the shard on which the operation needs to be executed
254      * @param message the message that needs to be sent to the shard
255      * @return the message that was returned by the local actor on which the
256      *         the operation was executed. If a local shard was not found then
257      *         null is returned
258      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
259      *         if the operation does not complete in a specified time duration
260      */
261     public Object executeLocalShardOperation(String shardName, Object message) {
262         ActorRef local = findLocalShard(shardName);
263
264         if(local != null) {
265             return executeLocalOperation(local, message);
266         }
267
268         return null;
269     }
270
271
272     /**
273      * Execute an operation on the the local shard only asynchronously
274      *
275      * <p>
276      *     This method first finds the address of the local shard if any. It then
277      *     executes the operation on it.
278      * </p>
279      *
280      * @param shardName the name of the shard on which the operation needs to be executed
281      * @param message the message that needs to be sent to the shard
282      * @param timeout the amount of time that this method should wait for a response before timing out
283      * @return null if the shard could not be located else a future on which the caller can wait
284      *
285      */
286     public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) {
287         ActorRef local = findLocalShard(shardName);
288         if(local == null){
289             return null;
290         }
291         return Patterns.ask(local, message, timeout);
292     }
293
294
295
296     public void shutdown() {
297         shardManager.tell(PoisonPill.getInstance(), null);
298         actorSystem.shutdown();
299     }
300
301     /**
302      * @deprecated Need to stop using this method. There are ways to send a
303      * remote ActorRef as a string which should be used instead of this hack
304      *
305      * @param primaryPath
306      * @param localPathOfRemoteActor
307      * @return
308      */
309     @Deprecated
310     public String resolvePath(final String primaryPath,
311         final String localPathOfRemoteActor) {
312         StringBuilder builder = new StringBuilder();
313         String[] primaryPathElements = primaryPath.split("/");
314         builder.append(primaryPathElements[0]).append("//")
315             .append(primaryPathElements[1]).append(primaryPathElements[2]);
316         String[] remotePathElements = localPathOfRemoteActor.split("/");
317         for (int i = 3; i < remotePathElements.length; i++) {
318             builder.append("/").append(remotePathElements[i]);
319         }
320
321         return builder.toString();
322
323     }
324
325     public ActorPath actorFor(String path){
326         return actorSystem.actorFor(path).path();
327     }
328
329     public String getCurrentMemberName(){
330         return clusterWrapper.getCurrentMemberName();
331     }
332
333     /**
334      * Send the message to each and every shard
335      *
336      * @param message
337      */
338     public void broadcast(Object message){
339         for(String shardName : configuration.getAllShardNames()){
340             try {
341                 sendShardOperationAsync(shardName, message);
342             } catch(Exception e){
343                 LOG.warn("broadcast failed to send message " +  message.getClass().getSimpleName() + " to shard " + shardName, e);
344             }
345         }
346     }
347
348     public FiniteDuration getOperationDuration() {
349         return operationDuration;
350     }
351 }