Bug 1598: Cleanup stale ShardReadTransactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17
18 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
19 import org.opendaylight.controller.cluster.datastore.Configuration;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import scala.concurrent.Await;
32 import scala.concurrent.Future;
33 import scala.concurrent.duration.Duration;
34 import scala.concurrent.duration.FiniteDuration;
35
36 import java.util.concurrent.TimeUnit;
37
38 import static akka.pattern.Patterns.ask;
39
40 /**
41  * The ActorContext class contains utility methods which could be used by
42  * non-actors (like DistributedDataStore) to work with actors a little more
43  * easily. An ActorContext can be freely passed around to local object instances
44  * but should not be passed to actors especially remote actors
45  */
46 public class ActorContext {
47     private static final Logger
48         LOG = LoggerFactory.getLogger(ActorContext.class);
49
50     public static final FiniteDuration ASK_DURATION =
51         Duration.create(5, TimeUnit.SECONDS);
52     public static final Duration AWAIT_DURATION =
53         Duration.create(5, TimeUnit.SECONDS);
54
55     private final ActorSystem actorSystem;
56     private final ActorRef shardManager;
57     private final ClusterWrapper clusterWrapper;
58     private final Configuration configuration;
59     private volatile SchemaContext schemaContext;
60
61     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
62         ClusterWrapper clusterWrapper,
63         Configuration configuration) {
64         this.actorSystem = actorSystem;
65         this.shardManager = shardManager;
66         this.clusterWrapper = clusterWrapper;
67         this.configuration = configuration;
68     }
69
70     public ActorSystem getActorSystem() {
71         return actorSystem;
72     }
73
74     public ActorRef getShardManager() {
75         return shardManager;
76     }
77
78     public ActorSelection actorSelection(String actorPath) {
79         return actorSystem.actorSelection(actorPath);
80     }
81
82     public ActorSelection actorSelection(ActorPath actorPath) {
83         return actorSystem.actorSelection(actorPath);
84     }
85
86     public void setSchemaContext(SchemaContext schemaContext) {
87         this.schemaContext = schemaContext;
88
89         if(shardManager != null) {
90             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
91         }
92     }
93
94     public SchemaContext getSchemaContext() {
95         return schemaContext;
96     }
97
98     /**
99      * Finds the primary for a given shard
100      *
101      * @param shardName
102      * @return
103      */
104     public ActorSelection findPrimary(String shardName) {
105         String path = findPrimaryPath(shardName);
106         return actorSystem.actorSelection(path);
107     }
108
109     /**
110      * Finds a local shard given it's shard name and return it's ActorRef
111      *
112      * @param shardName the name of the local shard that needs to be found
113      * @return a reference to a local shard actor which represents the shard
114      *         specified by the shardName
115      */
116     public ActorRef findLocalShard(String shardName) {
117         Object result = executeLocalOperation(shardManager,
118             new FindLocalShard(shardName), ASK_DURATION);
119
120         if (result instanceof LocalShardFound) {
121             LocalShardFound found = (LocalShardFound) result;
122
123             LOG.debug("Local shard found {}", found.getPath());
124
125             return found.getPath();
126         }
127
128         return null;
129     }
130
131
132     public String findPrimaryPath(String shardName) {
133         Object result = executeLocalOperation(shardManager,
134             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
135
136         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
137             PrimaryFound found = PrimaryFound.fromSerializable(result);
138
139             LOG.debug("Primary found {}", found.getPrimaryPath());
140
141             return found.getPrimaryPath();
142         }
143         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
144     }
145
146
147     /**
148      * Executes an operation on a local actor and wait for it's response
149      *
150      * @param actor
151      * @param message
152      * @param duration
153      * @return The response of the operation
154      */
155     public Object executeLocalOperation(ActorRef actor, Object message,
156         FiniteDuration duration) {
157         Future<Object> future =
158             ask(actor, message, new Timeout(duration));
159
160         try {
161             return Await.result(future, AWAIT_DURATION);
162         } catch (Exception e) {
163             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
164         }
165     }
166
167     /**
168      * Execute an operation on a remote actor and wait for it's response
169      *
170      * @param actor
171      * @param message
172      * @param duration
173      * @return
174      */
175     public Object executeRemoteOperation(ActorSelection actor, Object message,
176         FiniteDuration duration) {
177
178         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
179
180         Future<Object> future =
181             ask(actor, message, new Timeout(duration));
182
183         try {
184             return Await.result(future, AWAIT_DURATION);
185         } catch (Exception e) {
186             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
187         }
188     }
189
190     /**
191      * Execute an operation on a remote actor asynchronously.
192      *
193      * @param actor the ActorSelection
194      * @param message the message to send
195      * @param duration the maximum amount of time to send he message
196      * @return a Future containing the eventual result
197      */
198     public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
199             FiniteDuration duration) {
200
201         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
202
203         return ask(actor, message, new Timeout(duration));
204     }
205
206     /**
207      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
208      * reply (essentially set and forget).
209      *
210      * @param actor the ActorSelection
211      * @param message the message to send
212      */
213     public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
214         actor.tell(message, ActorRef.noSender());
215     }
216
217     /**
218      * Execute an operation on the primary for a given shard
219      * <p>
220      * This method first finds the primary for a given shard ,then sends
221      * the message to the remote shard and waits for a response
222      * </p>
223      *
224      * @param shardName
225      * @param message
226      * @param duration
227      * @return
228      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
229      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
230      */
231     public Object executeShardOperation(String shardName, Object message,
232         FiniteDuration duration) {
233         ActorSelection primary = findPrimary(shardName);
234
235         return executeRemoteOperation(primary, message, duration);
236     }
237
238     /**
239      * Execute an operation on the the local shard only
240      * <p>
241      *     This method first finds the address of the local shard if any. It then
242      *     executes the operation on it.
243      * </p>
244      *
245      * @param shardName the name of the shard on which the operation needs to be executed
246      * @param message the message that needs to be sent to the shard
247      * @param duration the time duration in which this operation should complete
248      * @return the message that was returned by the local actor on which the
249      *         the operation was executed. If a local shard was not found then
250      *         null is returned
251      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
252      *         if the operation does not complete in a specified time duration
253      */
254     public Object executeLocalShardOperation(String shardName, Object message,
255         FiniteDuration duration) {
256         ActorRef local = findLocalShard(shardName);
257
258         if(local != null) {
259             return executeLocalOperation(local, message, duration);
260         }
261
262         return null;
263     }
264
265
266     public void shutdown() {
267         shardManager.tell(PoisonPill.getInstance(), null);
268         actorSystem.shutdown();
269     }
270
271     /**
272      * @deprecated Need to stop using this method. There are ways to send a
273      * remote ActorRef as a string which should be used instead of this hack
274      *
275      * @param primaryPath
276      * @param localPathOfRemoteActor
277      * @return
278      */
279     @Deprecated
280     public String resolvePath(final String primaryPath,
281         final String localPathOfRemoteActor) {
282         StringBuilder builder = new StringBuilder();
283         String[] primaryPathElements = primaryPath.split("/");
284         builder.append(primaryPathElements[0]).append("//")
285             .append(primaryPathElements[1]).append(primaryPathElements[2]);
286         String[] remotePathElements = localPathOfRemoteActor.split("/");
287         for (int i = 3; i < remotePathElements.length; i++) {
288             builder.append("/").append(remotePathElements[i]);
289         }
290
291         return builder.toString();
292
293     }
294
295     public ActorPath actorFor(String path){
296         return actorSystem.actorFor(path).path();
297     }
298
299     public String getCurrentMemberName(){
300         return clusterWrapper.getCurrentMemberName();
301     }
302
303 }