BUG 932 - Swagger HTTP POST contains incorrect object
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
18 import org.opendaylight.controller.cluster.datastore.Configuration;
19 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
21 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
22 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
25 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Await;
29 import scala.concurrent.Future;
30 import scala.concurrent.duration.Duration;
31 import scala.concurrent.duration.FiniteDuration;
32
33 import java.util.concurrent.TimeUnit;
34
35 import static akka.pattern.Patterns.ask;
36
37 /**
38  * The ActorContext class contains utility methods which could be used by
39  * non-actors (like DistributedDataStore) to work with actors a little more
40  * easily. An ActorContext can be freely passed around to local object instances
41  * but should not be passed to actors especially remote actors
42  */
43 public class ActorContext {
44     private static final Logger
45         LOG = LoggerFactory.getLogger(ActorContext.class);
46
47     public static final FiniteDuration ASK_DURATION =
48         Duration.create(5, TimeUnit.SECONDS);
49     public static final Duration AWAIT_DURATION =
50         Duration.create(5, TimeUnit.SECONDS);
51
52     private final ActorSystem actorSystem;
53     private final ActorRef shardManager;
54     private final ClusterWrapper clusterWrapper;
55     private final Configuration configuration;
56
57     private SchemaContext schemaContext = null;
58
59     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
60         ClusterWrapper clusterWrapper,
61         Configuration configuration) {
62         this.actorSystem = actorSystem;
63         this.shardManager = shardManager;
64         this.clusterWrapper = clusterWrapper;
65         this.configuration = configuration;
66     }
67
68     public ActorSystem getActorSystem() {
69         return actorSystem;
70     }
71
72     public ActorRef getShardManager() {
73         return shardManager;
74     }
75
76     public ActorSelection actorSelection(String actorPath) {
77         return actorSystem.actorSelection(actorPath);
78     }
79
80     public ActorSelection actorSelection(ActorPath actorPath) {
81         return actorSystem.actorSelection(actorPath);
82     }
83
84
85     /**
86      * Finds the primary for a given shard
87      *
88      * @param shardName
89      * @return
90      */
91     public ActorSelection findPrimary(String shardName) {
92         String path = findPrimaryPath(shardName);
93         return actorSystem.actorSelection(path);
94     }
95
96     /**
97      * Finds a local shard given it's shard name and return it's ActorRef
98      *
99      * @param shardName the name of the local shard that needs to be found
100      * @return a reference to a local shard actor which represents the shard
101      *         specified by the shardName
102      */
103     public ActorRef findLocalShard(String shardName) {
104         Object result = executeLocalOperation(shardManager,
105             new FindLocalShard(shardName), ASK_DURATION);
106
107         if (result instanceof LocalShardFound) {
108             LocalShardFound found = (LocalShardFound) result;
109
110             LOG.debug("Local shard found {}", found.getPath());
111
112             return found.getPath();
113         }
114
115         return null;
116     }
117
118
119     public String findPrimaryPath(String shardName) {
120         Object result = executeLocalOperation(shardManager,
121             new FindPrimary(shardName).toSerializable(), ASK_DURATION);
122
123         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
124             PrimaryFound found = PrimaryFound.fromSerializable(result);
125
126             LOG.debug("Primary found {}", found.getPrimaryPath());
127
128             return found.getPrimaryPath();
129         }
130         throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
131     }
132
133
134     /**
135      * Executes an operation on a local actor and wait for it's response
136      *
137      * @param actor
138      * @param message
139      * @param duration
140      * @return The response of the operation
141      */
142     public Object executeLocalOperation(ActorRef actor, Object message,
143         FiniteDuration duration) {
144         Future<Object> future =
145             ask(actor, message, new Timeout(duration));
146
147         try {
148             return Await.result(future, AWAIT_DURATION);
149         } catch (Exception e) {
150             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
151         }
152     }
153
154     /**
155      * Execute an operation on a remote actor and wait for it's response
156      *
157      * @param actor
158      * @param message
159      * @param duration
160      * @return
161      */
162     public Object executeRemoteOperation(ActorSelection actor, Object message,
163         FiniteDuration duration) {
164
165         LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
166
167         Future<Object> future =
168             ask(actor, message, new Timeout(duration));
169
170         try {
171             return Await.result(future, AWAIT_DURATION);
172         } catch (Exception e) {
173             throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
174         }
175     }
176
177     /**
178      * Execute an operation on the primary for a given shard
179      * <p>
180      * This method first finds the primary for a given shard ,then sends
181      * the message to the remote shard and waits for a response
182      * </p>
183      *
184      * @param shardName
185      * @param message
186      * @param duration
187      * @return
188      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException         if the message to the remote shard times out
189      * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
190      */
191     public Object executeShardOperation(String shardName, Object message,
192         FiniteDuration duration) {
193         ActorSelection primary = findPrimary(shardName);
194
195         return executeRemoteOperation(primary, message, duration);
196     }
197
198     /**
199      * Execute an operation on the the local shard only
200      * <p>
201      *     This method first finds the address of the local shard if any. It then
202      *     executes the operation on it.
203      * </p>
204      *
205      * @param shardName the name of the shard on which the operation needs to be executed
206      * @param message the message that needs to be sent to the shard
207      * @param duration the time duration in which this operation should complete
208      * @return the message that was returned by the local actor on which the
209      *         the operation was executed. If a local shard was not found then
210      *         null is returned
211      * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
212      *         if the operation does not complete in a specified time duration
213      */
214     public Object executeLocalShardOperation(String shardName, Object message,
215         FiniteDuration duration) {
216         ActorRef local = findLocalShard(shardName);
217
218         if(local != null) {
219             return executeLocalOperation(local, message, duration);
220         }
221
222         return null;
223     }
224
225
226     public void shutdown() {
227         shardManager.tell(PoisonPill.getInstance(), null);
228         actorSystem.shutdown();
229     }
230
231     /**
232      * @deprecated Need to stop using this method. There are ways to send a
233      * remote ActorRef as a string which should be used instead of this hack
234      *
235      * @param primaryPath
236      * @param localPathOfRemoteActor
237      * @return
238      */
239     @Deprecated
240     public String resolvePath(final String primaryPath,
241         final String localPathOfRemoteActor) {
242         StringBuilder builder = new StringBuilder();
243         String[] primaryPathElements = primaryPath.split("/");
244         builder.append(primaryPathElements[0]).append("//")
245             .append(primaryPathElements[1]).append(primaryPathElements[2]);
246         String[] remotePathElements = localPathOfRemoteActor.split("/");
247         for (int i = 3; i < remotePathElements.length; i++) {
248             builder.append("/").append(remotePathElements[i]);
249         }
250
251         return builder.toString();
252
253     }
254
255     public ActorPath actorFor(String path){
256         return actorSystem.actorFor(path).path();
257     }
258
259     public String getCurrentMemberName(){
260         return clusterWrapper.getCurrentMemberName();
261     }
262
263 }