Remove l2switch sample
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.dispatch.Mapper;
17 import akka.pattern.AskTimeoutException;
18 import akka.util.Timeout;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
22 import org.opendaylight.controller.cluster.datastore.Configuration;
23 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
24 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
25 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
26 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
27 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
29 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
30 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
31 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
32 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
33 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
34 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
35 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
36 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.concurrent.Await;
41 import scala.concurrent.Future;
42 import scala.concurrent.duration.Duration;
43 import scala.concurrent.duration.FiniteDuration;
44 import java.util.concurrent.TimeUnit;
45 import static akka.pattern.Patterns.ask;
46
47 /**
48  * The ActorContext class contains utility methods which could be used by
49  * non-actors (like DistributedDataStore) to work with actors a little more
50  * easily. An ActorContext can be freely passed around to local object instances
51  * but should not be passed to actors especially remote actors
52  */
53 public class ActorContext {
54     private static final Logger
55         LOG = LoggerFactory.getLogger(ActorContext.class);
56
57     public static final String MAILBOX = "bounded-mailbox";
58
59     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
60                                                               new Mapper<Throwable, Throwable>() {
61         @Override
62         public Throwable apply(Throwable failure) {
63             Throwable actualFailure = failure;
64             if(failure instanceof AskTimeoutException) {
65                 // A timeout exception most likely means the shard isn't initialized.
66                 actualFailure = new NotInitializedException(
67                         "Timed out trying to find the primary shard. Most likely cause is the " +
68                         "shard is not initialized yet.");
69             }
70
71             return actualFailure;
72         }
73     };
74
75     private final ActorSystem actorSystem;
76     private final ActorRef shardManager;
77     private final ClusterWrapper clusterWrapper;
78     private final Configuration configuration;
79     private final DatastoreContext datastoreContext;
80     private volatile SchemaContext schemaContext;
81     private final FiniteDuration operationDuration;
82     private final Timeout operationTimeout;
83
84     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
85             ClusterWrapper clusterWrapper, Configuration configuration) {
86         this(actorSystem, shardManager, clusterWrapper, configuration,
87                 DatastoreContext.newBuilder().build());
88     }
89
90     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
91             ClusterWrapper clusterWrapper, Configuration configuration,
92             DatastoreContext datastoreContext) {
93         this.actorSystem = actorSystem;
94         this.shardManager = shardManager;
95         this.clusterWrapper = clusterWrapper;
96         this.configuration = configuration;
97         this.datastoreContext = datastoreContext;
98
99         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
100                 TimeUnit.SECONDS);
101         operationTimeout = new Timeout(operationDuration);
102     }
103
104     public DatastoreContext getDatastoreContext() {
105         return datastoreContext;
106     }
107
108     public ActorSystem getActorSystem() {
109         return actorSystem;
110     }
111
112     public ActorRef getShardManager() {
113         return shardManager;
114     }
115
116     public ActorSelection actorSelection(String actorPath) {
117         return actorSystem.actorSelection(actorPath);
118     }
119
120     public ActorSelection actorSelection(ActorPath actorPath) {
121         return actorSystem.actorSelection(actorPath);
122     }
123
124     public void setSchemaContext(SchemaContext schemaContext) {
125         this.schemaContext = schemaContext;
126
127         if(shardManager != null) {
128             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
129         }
130     }
131
132     public SchemaContext getSchemaContext() {
133         return schemaContext;
134     }
135
136     /**
137      * Finds the primary shard for the given shard name
138      *
139      * @param shardName
140      * @return
141      */
142     public Optional<ActorSelection> findPrimaryShard(String shardName) {
143         String path = findPrimaryPathOrNull(shardName);
144         if (path == null){
145             return Optional.absent();
146         }
147         return Optional.of(actorSystem.actorSelection(path));
148     }
149
150     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
151         Future<Object> future = executeOperationAsync(shardManager,
152                 new FindPrimary(shardName, true).toSerializable(),
153                 datastoreContext.getShardInitializationTimeout());
154
155         return future.transform(new Mapper<Object, ActorSelection>() {
156             @Override
157             public ActorSelection checkedApply(Object response) throws Exception {
158                 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
159                     PrimaryFound found = PrimaryFound.fromSerializable(response);
160
161                     LOG.debug("Primary found {}", found.getPrimaryPath());
162                     return actorSystem.actorSelection(found.getPrimaryPath());
163                 } else if(response instanceof ActorNotInitialized) {
164                     throw new NotInitializedException(
165                             String.format("Found primary shard %s but it's not initialized yet. " +
166                                           "Please try again later", shardName));
167                 } else if(response instanceof PrimaryNotFound) {
168                     throw new PrimaryNotFoundException(
169                             String.format("No primary shard found for %S.", shardName));
170                 }
171
172                 throw new UnknownMessageException(String.format(
173                         "FindPrimary returned unkown response: %s", response));
174             }
175         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
176     }
177
178     /**
179      * Finds a local shard given its shard name and return it's ActorRef
180      *
181      * @param shardName the name of the local shard that needs to be found
182      * @return a reference to a local shard actor which represents the shard
183      *         specified by the shardName
184      */
185     public Optional<ActorRef> findLocalShard(String shardName) {
186         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
187
188         if (result instanceof LocalShardFound) {
189             LocalShardFound found = (LocalShardFound) result;
190             LOG.debug("Local shard found {}", found.getPath());
191             return Optional.of(found.getPath());
192         }
193
194         return Optional.absent();
195     }
196
197     /**
198      * Finds a local shard async given its shard name and return a Future from which to obtain the
199      * ActorRef.
200      *
201      * @param shardName the name of the local shard that needs to be found
202      */
203     public Future<ActorRef> findLocalShardAsync( final String shardName) {
204         Future<Object> future = executeOperationAsync(shardManager,
205                 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
206
207         return future.map(new Mapper<Object, ActorRef>() {
208             @Override
209             public ActorRef checkedApply(Object response) throws Throwable {
210                 if(response instanceof LocalShardFound) {
211                     LocalShardFound found = (LocalShardFound)response;
212                     LOG.debug("Local shard found {}", found.getPath());
213                     return found.getPath();
214                 } else if(response instanceof ActorNotInitialized) {
215                     throw new NotInitializedException(
216                             String.format("Found local shard for %s but it's not initialized yet.",
217                                     shardName));
218                 } else if(response instanceof LocalShardNotFound) {
219                     throw new LocalShardNotFoundException(
220                             String.format("Local shard for %s does not exist.", shardName));
221                 }
222
223                 throw new UnknownMessageException(String.format(
224                         "FindLocalShard returned unkown response: %s", response));
225             }
226         }, getActorSystem().dispatcher());
227     }
228
229     private String findPrimaryPathOrNull(String shardName) {
230         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
231
232         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
233             PrimaryFound found = PrimaryFound.fromSerializable(result);
234
235             LOG.debug("Primary found {}", found.getPrimaryPath());
236             return found.getPrimaryPath();
237
238         } else if (result.getClass().equals(ActorNotInitialized.class)){
239             throw new NotInitializedException(
240                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
241             );
242
243         } else {
244             return null;
245         }
246     }
247
248
249     /**
250      * Executes an operation on a local actor and wait for it's response
251      *
252      * @param actor
253      * @param message
254      * @return The response of the operation
255      */
256     public Object executeOperation(ActorRef actor, Object message) {
257         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
258
259         try {
260             return Await.result(future, operationDuration);
261         } catch (Exception e) {
262             throw new TimeoutException("Sending message " + message.getClass().toString() +
263                     " to actor " + actor.toString() + " failed. Try again later.", e);
264         }
265     }
266
267     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
268         Preconditions.checkArgument(actor != null, "actor must not be null");
269         Preconditions.checkArgument(message != null, "message must not be null");
270
271         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
272         return ask(actor, message, timeout);
273     }
274
275     /**
276      * Execute an operation on a remote actor and wait for it's response
277      *
278      * @param actor
279      * @param message
280      * @return
281      */
282     public Object executeOperation(ActorSelection actor, Object message) {
283         Future<Object> future = executeOperationAsync(actor, message);
284
285         try {
286             return Await.result(future, operationDuration);
287         } catch (Exception e) {
288             throw new TimeoutException("Sending message " + message.getClass().toString() +
289                     " to actor " + actor.toString() + " failed. Try again later.", e);
290         }
291     }
292
293     /**
294      * Execute an operation on a remote actor asynchronously.
295      *
296      * @param actor the ActorSelection
297      * @param message the message to send
298      * @param timeout the operation timeout
299      * @return a Future containing the eventual result
300      */
301     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
302             Timeout timeout) {
303         Preconditions.checkArgument(actor != null, "actor must not be null");
304         Preconditions.checkArgument(message != null, "message must not be null");
305
306         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
307
308         return ask(actor, message, timeout);
309     }
310
311     /**
312      * Execute an operation on a remote actor asynchronously.
313      *
314      * @param actor the ActorSelection
315      * @param message the message to send
316      * @return a Future containing the eventual result
317      */
318     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
319         return executeOperationAsync(actor, message, operationTimeout);
320     }
321
322     /**
323      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
324      * reply (essentially set and forget).
325      *
326      * @param actor the ActorSelection
327      * @param message the message to send
328      */
329     public void sendOperationAsync(ActorSelection actor, Object message) {
330         Preconditions.checkArgument(actor != null, "actor must not be null");
331         Preconditions.checkArgument(message != null, "message must not be null");
332
333         LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
334
335         actor.tell(message, ActorRef.noSender());
336     }
337
338     public void shutdown() {
339         shardManager.tell(PoisonPill.getInstance(), null);
340         actorSystem.shutdown();
341     }
342
343     public ClusterWrapper getClusterWrapper() {
344         return clusterWrapper;
345     }
346
347     public String getCurrentMemberName(){
348         return clusterWrapper.getCurrentMemberName();
349     }
350
351     /**
352      * Send the message to each and every shard
353      *
354      * @param message
355      */
356     public void broadcast(Object message){
357         for(String shardName : configuration.getAllShardNames()){
358
359             Optional<ActorSelection> primary = findPrimaryShard(shardName);
360             if (primary.isPresent()) {
361                 primary.get().tell(message, ActorRef.noSender());
362             } else {
363                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
364                         message.getClass().getSimpleName(), shardName);
365             }
366         }
367     }
368
369     public FiniteDuration getOperationDuration() {
370         return operationDuration;
371     }
372
373     public boolean isLocalPath(String path) {
374         String selfAddress = clusterWrapper.getSelfAddress();
375         if (path == null || selfAddress == null) {
376             return false;
377         }
378
379         int atIndex1 = path.indexOf("@");
380         int atIndex2 = selfAddress.indexOf("@");
381
382         if (atIndex1 == -1 || atIndex2 == -1) {
383             return false;
384         }
385
386         int slashIndex1 = path.indexOf("/", atIndex1);
387         int slashIndex2 = selfAddress.indexOf("/", atIndex2);
388
389         if (slashIndex1 == -1 || slashIndex2 == -1) {
390             return false;
391         }
392
393         String hostPort1 = path.substring(atIndex1, slashIndex1);
394         String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
395
396         return hostPort1.equals(hostPort2);
397     }
398 }