f217d05bb21a12e6f92add47da5536c6f6fe12d9
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.PoisonPill;
17 import akka.dispatch.Mapper;
18 import akka.pattern.AskTimeoutException;
19 import akka.util.Timeout;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
25 import org.opendaylight.controller.cluster.datastore.Configuration;
26 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
27 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
29 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
30 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
31 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
32 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
33 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
34 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
35 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
36 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
37 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
38 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
39 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 import scala.concurrent.Await;
44 import scala.concurrent.Future;
45 import scala.concurrent.duration.Duration;
46 import scala.concurrent.duration.FiniteDuration;
47
48 import static akka.pattern.Patterns.ask;
49
50 /**
51  * The ActorContext class contains utility methods which could be used by
52  * non-actors (like DistributedDataStore) to work with actors a little more
53  * easily. An ActorContext can be freely passed around to local object instances
54  * but should not be passed to actors especially remote actors
55  */
56 public class ActorContext {
57     private static final Logger
58         LOG = LoggerFactory.getLogger(ActorContext.class);
59
60     public static final String MAILBOX = "bounded-mailbox";
61
62     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
63                                                               new Mapper<Throwable, Throwable>() {
64         @Override
65         public Throwable apply(Throwable failure) {
66             Throwable actualFailure = failure;
67             if(failure instanceof AskTimeoutException) {
68                 // A timeout exception most likely means the shard isn't initialized.
69                 actualFailure = new NotInitializedException(
70                         "Timed out trying to find the primary shard. Most likely cause is the " +
71                         "shard is not initialized yet.");
72             }
73
74             return actualFailure;
75         }
76     };
77
78     private final ActorSystem actorSystem;
79     private final ActorRef shardManager;
80     private final ClusterWrapper clusterWrapper;
81     private final Configuration configuration;
82     private final DatastoreContext datastoreContext;
83     private volatile SchemaContext schemaContext;
84     private final FiniteDuration operationDuration;
85     private final Timeout operationTimeout;
86     private final String selfAddressHostPort;
87
88     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
89             ClusterWrapper clusterWrapper, Configuration configuration) {
90         this(actorSystem, shardManager, clusterWrapper, configuration,
91                 DatastoreContext.newBuilder().build());
92     }
93
94     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
95             ClusterWrapper clusterWrapper, Configuration configuration,
96             DatastoreContext datastoreContext) {
97         this.actorSystem = actorSystem;
98         this.shardManager = shardManager;
99         this.clusterWrapper = clusterWrapper;
100         this.configuration = configuration;
101         this.datastoreContext = datastoreContext;
102
103         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
104                 TimeUnit.SECONDS);
105         operationTimeout = new Timeout(operationDuration);
106
107         Address selfAddress = clusterWrapper.getSelfAddress();
108         if (selfAddress != null && !selfAddress.host().isEmpty()) {
109             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
110         } else {
111             selfAddressHostPort = null;
112         }
113     }
114
115     public DatastoreContext getDatastoreContext() {
116         return datastoreContext;
117     }
118
119     public ActorSystem getActorSystem() {
120         return actorSystem;
121     }
122
123     public ActorRef getShardManager() {
124         return shardManager;
125     }
126
127     public ActorSelection actorSelection(String actorPath) {
128         return actorSystem.actorSelection(actorPath);
129     }
130
131     public ActorSelection actorSelection(ActorPath actorPath) {
132         return actorSystem.actorSelection(actorPath);
133     }
134
135     public void setSchemaContext(SchemaContext schemaContext) {
136         this.schemaContext = schemaContext;
137
138         if(shardManager != null) {
139             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
140         }
141     }
142
143     public SchemaContext getSchemaContext() {
144         return schemaContext;
145     }
146
147     /**
148      * Finds the primary shard for the given shard name
149      *
150      * @param shardName
151      * @return
152      */
153     public Optional<ActorSelection> findPrimaryShard(String shardName) {
154         String path = findPrimaryPathOrNull(shardName);
155         if (path == null){
156             return Optional.absent();
157         }
158         return Optional.of(actorSystem.actorSelection(path));
159     }
160
161     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
162         Future<Object> future = executeOperationAsync(shardManager,
163                 new FindPrimary(shardName, true).toSerializable(),
164                 datastoreContext.getShardInitializationTimeout());
165
166         return future.transform(new Mapper<Object, ActorSelection>() {
167             @Override
168             public ActorSelection checkedApply(Object response) throws Exception {
169                 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
170                     PrimaryFound found = PrimaryFound.fromSerializable(response);
171
172                     LOG.debug("Primary found {}", found.getPrimaryPath());
173                     return actorSystem.actorSelection(found.getPrimaryPath());
174                 } else if(response instanceof ActorNotInitialized) {
175                     throw new NotInitializedException(
176                             String.format("Found primary shard %s but it's not initialized yet. " +
177                                           "Please try again later", shardName));
178                 } else if(response instanceof PrimaryNotFound) {
179                     throw new PrimaryNotFoundException(
180                             String.format("No primary shard found for %S.", shardName));
181                 }
182
183                 throw new UnknownMessageException(String.format(
184                         "FindPrimary returned unkown response: %s", response));
185             }
186         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
187     }
188
189     /**
190      * Finds a local shard given its shard name and return it's ActorRef
191      *
192      * @param shardName the name of the local shard that needs to be found
193      * @return a reference to a local shard actor which represents the shard
194      *         specified by the shardName
195      */
196     public Optional<ActorRef> findLocalShard(String shardName) {
197         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
198
199         if (result instanceof LocalShardFound) {
200             LocalShardFound found = (LocalShardFound) result;
201             LOG.debug("Local shard found {}", found.getPath());
202             return Optional.of(found.getPath());
203         }
204
205         return Optional.absent();
206     }
207
208     /**
209      * Finds a local shard async given its shard name and return a Future from which to obtain the
210      * ActorRef.
211      *
212      * @param shardName the name of the local shard that needs to be found
213      */
214     public Future<ActorRef> findLocalShardAsync( final String shardName) {
215         Future<Object> future = executeOperationAsync(shardManager,
216                 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
217
218         return future.map(new Mapper<Object, ActorRef>() {
219             @Override
220             public ActorRef checkedApply(Object response) throws Throwable {
221                 if(response instanceof LocalShardFound) {
222                     LocalShardFound found = (LocalShardFound)response;
223                     LOG.debug("Local shard found {}", found.getPath());
224                     return found.getPath();
225                 } else if(response instanceof ActorNotInitialized) {
226                     throw new NotInitializedException(
227                             String.format("Found local shard for %s but it's not initialized yet.",
228                                     shardName));
229                 } else if(response instanceof LocalShardNotFound) {
230                     throw new LocalShardNotFoundException(
231                             String.format("Local shard for %s does not exist.", shardName));
232                 }
233
234                 throw new UnknownMessageException(String.format(
235                         "FindLocalShard returned unkown response: %s", response));
236             }
237         }, getActorSystem().dispatcher());
238     }
239
240     private String findPrimaryPathOrNull(String shardName) {
241         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
242
243         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
244             PrimaryFound found = PrimaryFound.fromSerializable(result);
245
246             LOG.debug("Primary found {}", found.getPrimaryPath());
247             return found.getPrimaryPath();
248
249         } else if (result.getClass().equals(ActorNotInitialized.class)){
250             throw new NotInitializedException(
251                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
252             );
253
254         } else {
255             return null;
256         }
257     }
258
259
260     /**
261      * Executes an operation on a local actor and wait for it's response
262      *
263      * @param actor
264      * @param message
265      * @return The response of the operation
266      */
267     public Object executeOperation(ActorRef actor, Object message) {
268         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
269
270         try {
271             return Await.result(future, operationDuration);
272         } catch (Exception e) {
273             throw new TimeoutException("Sending message " + message.getClass().toString() +
274                     " to actor " + actor.toString() + " failed. Try again later.", e);
275         }
276     }
277
278     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
279         Preconditions.checkArgument(actor != null, "actor must not be null");
280         Preconditions.checkArgument(message != null, "message must not be null");
281
282         LOG.debug("Sending message {} to {}", message.getClass(), actor);
283         return ask(actor, message, timeout);
284     }
285
286     /**
287      * Execute an operation on a remote actor and wait for it's response
288      *
289      * @param actor
290      * @param message
291      * @return
292      */
293     public Object executeOperation(ActorSelection actor, Object message) {
294         Future<Object> future = executeOperationAsync(actor, message);
295
296         try {
297             return Await.result(future, operationDuration);
298         } catch (Exception e) {
299             throw new TimeoutException("Sending message " + message.getClass().toString() +
300                     " to actor " + actor.toString() + " failed. Try again later.", e);
301         }
302     }
303
304     /**
305      * Execute an operation on a remote actor asynchronously.
306      *
307      * @param actor the ActorSelection
308      * @param message the message to send
309      * @param timeout the operation timeout
310      * @return a Future containing the eventual result
311      */
312     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
313             Timeout timeout) {
314         Preconditions.checkArgument(actor != null, "actor must not be null");
315         Preconditions.checkArgument(message != null, "message must not be null");
316
317         LOG.debug("Sending message {} to {}", message.getClass(), actor);
318
319         return ask(actor, message, timeout);
320     }
321
322     /**
323      * Execute an operation on a remote actor asynchronously.
324      *
325      * @param actor the ActorSelection
326      * @param message the message to send
327      * @return a Future containing the eventual result
328      */
329     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
330         return executeOperationAsync(actor, message, operationTimeout);
331     }
332
333     /**
334      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
335      * reply (essentially set and forget).
336      *
337      * @param actor the ActorSelection
338      * @param message the message to send
339      */
340     public void sendOperationAsync(ActorSelection actor, Object message) {
341         Preconditions.checkArgument(actor != null, "actor must not be null");
342         Preconditions.checkArgument(message != null, "message must not be null");
343
344         LOG.debug("Sending message {} to {}", message.getClass(), actor);
345
346         actor.tell(message, ActorRef.noSender());
347     }
348
349     public void shutdown() {
350         shardManager.tell(PoisonPill.getInstance(), null);
351         actorSystem.shutdown();
352     }
353
354     public ClusterWrapper getClusterWrapper() {
355         return clusterWrapper;
356     }
357
358     public String getCurrentMemberName(){
359         return clusterWrapper.getCurrentMemberName();
360     }
361
362     /**
363      * Send the message to each and every shard
364      *
365      * @param message
366      */
367     public void broadcast(Object message){
368         for(String shardName : configuration.getAllShardNames()){
369
370             Optional<ActorSelection> primary = findPrimaryShard(shardName);
371             if (primary.isPresent()) {
372                 primary.get().tell(message, ActorRef.noSender());
373             } else {
374                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
375                         message.getClass().getSimpleName(), shardName);
376             }
377         }
378     }
379
380     public FiniteDuration getOperationDuration() {
381         return operationDuration;
382     }
383
384     public boolean isPathLocal(String path) {
385         if (Strings.isNullOrEmpty(path)) {
386             return false;
387         }
388
389         int pathAtIndex = path.indexOf('@');
390         if (pathAtIndex == -1) {
391             //if the path is of local format, then its local and is co-located
392             return true;
393
394         } else if (selfAddressHostPort != null) {
395             // self-address and tx actor path, both are of remote path format
396             int slashIndex = path.indexOf('/', pathAtIndex);
397
398             if (slashIndex == -1) {
399                 return false;
400             }
401
402             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
403             return hostPort.equals(selfAddressHostPort);
404
405         } else {
406             // self address is local format and tx actor path is remote format
407             return false;
408         }
409     }
410
411     /**
412      * @deprecated This method is present only to support backward compatibility with Helium and should not be
413      * used any further
414      *
415      *
416      * @param primaryPath
417      * @param localPathOfRemoteActor
418      * @return
419     */
420     @Deprecated
421     public String resolvePath(final String primaryPath,
422                                             final String localPathOfRemoteActor) {
423         StringBuilder builder = new StringBuilder();
424         String[] primaryPathElements = primaryPath.split("/");
425         builder.append(primaryPathElements[0]).append("//")
426             .append(primaryPathElements[1]).append(primaryPathElements[2]);
427         String[] remotePathElements = localPathOfRemoteActor.split("/");
428         for (int i = 3; i < remotePathElements.length; i++) {
429                 builder.append("/").append(remotePathElements[i]);
430             }
431
432         return builder.toString();
433     }
434 }