c989b275df3105480b035b7972e83c0822b7182d
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17
18 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
19 import org.opendaylight.controller.cluster.datastore.Configuration;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import scala.concurrent.Await;
32 import scala.concurrent.Future;
33 import scala.concurrent.duration.Duration;
34 import scala.concurrent.duration.FiniteDuration;
35
36 import java.util.concurrent.TimeUnit;
37
38 import static akka.pattern.Patterns.ask;
39
40 /**
41  * The ActorContext class contains utility methods which could be used by
42  * non-actors (like DistributedDataStore) to work with actors a little more
43  * easily. An ActorContext can be freely passed around to local object instances
44  * but should not be passed to actors especially remote actors
45  */
46 public class ActorContext {
47     private static final Logger
48         LOG = LoggerFactory.getLogger(ActorContext.class);
49
50     private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
51
52     public static final String MAILBOX = "bounded-mailbox";
53
54     private final ActorSystem actorSystem;
55     private final ActorRef shardManager;
56     private final ClusterWrapper clusterWrapper;
57     private final Configuration configuration;
58     private volatile SchemaContext schemaContext;
59     private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
60     private Timeout operationTimeout = new Timeout(operationDuration);
61
62     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
63         ClusterWrapper clusterWrapper,
64         Configuration configuration) {
65         this.actorSystem = actorSystem;
66         this.shardManager = shardManager;
67         this.clusterWrapper = clusterWrapper;
68         this.configuration = configuration;
69     }
70
71     public ActorSystem getActorSystem() {
72         return actorSystem;
73     }
74
75     public ActorRef getShardManager() {
76         return shardManager;
77     }
78
79     public ActorSelection actorSelection(String actorPath) {
80         return actorSystem.actorSelection(actorPath);
81     }
82
83     public ActorSelection actorSelection(ActorPath actorPath) {
84         return actorSystem.actorSelection(actorPath);
85     }
86
87     public void setSchemaContext(SchemaContext schemaContext) {
88         this.schemaContext = schemaContext;
89
90         if(shardManager != null) {
91             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
92         }
93     }
94
95     public void setOperationTimeout(int timeoutInSeconds) {
96         operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
97         operationTimeout = new Timeout(operationDuration);
98     }
99
100     public SchemaContext getSchemaContext() {
101         return schemaContext;
102     }
103
104     /**
105      * Finds the primary for a given shard
106      *
107      * @param shardName
108      * @return
109      */
110     public ActorSelection findPrimary(String shardName) {
111         String path = findPrimaryPath(shardName);
112         return actorSystem.actorSelection(path);
113     }
114
115     /**
116      * Finds a local shard given it's shard name and return it's ActorRef
117      *
118      * @param shardName the name of the local shard that needs to be found
119      * @return a reference to a local shard actor which represents the shard
120      *         specified by the shardName
121      */
122     public ActorRef findLocalShard(String shardName) {
123         Object result = executeLocalOperation(shardManager,
124             new FindLocalShard(shardName));
125
126         if (result instanceof LocalShardFound) {
127             LocalShardFound found = (LocalShardFound) result;
128
129             LOG.debug("Local shard found {}", found.getPath());
130
131             return found.getPath();
132         }
133
134         return null;
135     }
136
137
138     public String findPrimaryPath(String shardName) {
139         Object result = executeLocalOperation(shardManager,
140             new FindPrimary(shardName).toSerializable());
141
142         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
143             PrimaryFound found = PrimaryFound.fromSerializable(result);
144
145             LOG.debug("Primary found {}", found.getPrimaryPath());
146
147             return found.getPrimaryPath();
148         }
149         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
150     }
151
152
153     /**
154      * Executes an operation on a local actor and wait for it's response
155      *
156      * @param actor
157      * @param message
158      * @return The response of the operation
159      */
160     public Object executeLocalOperation(ActorRef actor, Object message) {
161         Future<Object> future = ask(actor, message, operationTimeout);
162
163         try {
164             return Await.result(future, operationDuration);
165         } catch (Exception e) {
166             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
167         }
168     }
169
170     /**
171      * Execute an operation on a remote actor and wait for it's response
172      *
173      * @param actor
174      * @param message
175      * @return
176      */
177     public Object executeRemoteOperation(ActorSelection actor, Object message) {
178
179         LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
180             actor.toString());
181
182         Future<Object> future = ask(actor, message, operationTimeout);
183
184         try {
185             return Await.result(future, operationDuration);
186         } catch (Exception e) {
187             throw new TimeoutException("Sending message " + message.getClass().toString() +
188                     " to actor " + actor.toString() + " failed" , e);
189         }
190     }
191
192     /**
193      * Execute an operation on a remote actor asynchronously.
194      *
195      * @param actor the ActorSelection
196      * @param message the message to send
197      * @return a Future containing the eventual result
198      */
199     public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
200
201         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
202
203         return ask(actor, message, operationTimeout);
204     }
205
206     /**
207      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
208      * reply (essentially set and forget).
209      *
210      * @param actor the ActorSelection
211      * @param message the message to send
212      */
213     public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
214         actor.tell(message, ActorRef.noSender());
215     }
216
217     public void sendShardOperationAsync(String shardName, Object message) {
218         ActorSelection primary = findPrimary(shardName);
219
220         primary.tell(message, ActorRef.noSender());
221     }
222
223
224     /**
225      * Execute an operation on the primary for a given shard
226      * <p>
227      * This method first finds the primary for a given shard ,then sends
228      * the message to the remote shard and waits for a response
229      * </p>
230      *
231      * @param shardName
232      * @param message
233      * @return
234      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
235      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
236      */
237     public Object executeShardOperation(String shardName, Object message) {
238         ActorSelection primary = findPrimary(shardName);
239
240         return executeRemoteOperation(primary, message);
241     }
242
243     /**
244      * Execute an operation on the the local shard only
245      * <p>
246      *     This method first finds the address of the local shard if any. It then
247      *     executes the operation on it.
248      * </p>
249      *
250      * @param shardName the name of the shard on which the operation needs to be executed
251      * @param message the message that needs to be sent to the shard
252      * @return the message that was returned by the local actor on which the
253      *         the operation was executed. If a local shard was not found then
254      *         null is returned
255      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
256      *         if the operation does not complete in a specified time duration
257      */
258     public Object executeLocalShardOperation(String shardName, Object message) {
259         ActorRef local = findLocalShard(shardName);
260
261         if(local != null) {
262             return executeLocalOperation(local, message);
263         }
264
265         return null;
266     }
267
268
269     public void shutdown() {
270         shardManager.tell(PoisonPill.getInstance(), null);
271         actorSystem.shutdown();
272     }
273
274     /**
275      * @deprecated Need to stop using this method. There are ways to send a
276      * remote ActorRef as a string which should be used instead of this hack
277      *
278      * @param primaryPath
279      * @param localPathOfRemoteActor
280      * @return
281      */
282     @Deprecated
283     public String resolvePath(final String primaryPath,
284         final String localPathOfRemoteActor) {
285         StringBuilder builder = new StringBuilder();
286         String[] primaryPathElements = primaryPath.split("/");
287         builder.append(primaryPathElements[0]).append("//")
288             .append(primaryPathElements[1]).append(primaryPathElements[2]);
289         String[] remotePathElements = localPathOfRemoteActor.split("/");
290         for (int i = 3; i < remotePathElements.length; i++) {
291             builder.append("/").append(remotePathElements[i]);
292         }
293
294         return builder.toString();
295
296     }
297
298     public ActorPath actorFor(String path){
299         return actorSystem.actorFor(path).path();
300     }
301
302     public String getCurrentMemberName(){
303         return clusterWrapper.getCurrentMemberName();
304     }
305
306     /**
307      * Send the message to each and every shard
308      *
309      * @param message
310      */
311     public void broadcast(Object message){
312         for(String shardName : configuration.getAllShardNames()){
313             try {
314                 sendShardOperationAsync(shardName, message);
315             } catch(Exception e){
316                 LOG.warn("broadcast failed to send message " +  message.getClass().getSimpleName() + " to shard " + shardName, e);
317             }
318         }
319     }
320
321     public FiniteDuration getOperationDuration() {
322         return operationDuration;
323     }
324 }