Merge "Improve ConfigurationImpl performance"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.pattern.AskTimeoutException;
20 import akka.util.Timeout;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
26 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
27 import org.opendaylight.controller.cluster.datastore.Configuration;
28 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
29 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
30 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
31 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
32 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
33 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
34 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
35 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
36 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
37 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
38 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
39 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
40 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
41 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import scala.concurrent.Await;
46 import scala.concurrent.Future;
47 import scala.concurrent.duration.Duration;
48 import scala.concurrent.duration.FiniteDuration;
49
50 /**
51  * The ActorContext class contains utility methods which could be used by
52  * non-actors (like DistributedDataStore) to work with actors a little more
53  * easily. An ActorContext can be freely passed around to local object instances
54  * but should not be passed to actors especially remote actors
55  */
56 public class ActorContext {
57     private static final Logger
58         LOG = LoggerFactory.getLogger(ActorContext.class);
59
60     public static final String MAILBOX = "bounded-mailbox";
61
62     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
63                                                               new Mapper<Throwable, Throwable>() {
64         @Override
65         public Throwable apply(Throwable failure) {
66             Throwable actualFailure = failure;
67             if(failure instanceof AskTimeoutException) {
68                 // A timeout exception most likely means the shard isn't initialized.
69                 actualFailure = new NotInitializedException(
70                         "Timed out trying to find the primary shard. Most likely cause is the " +
71                         "shard is not initialized yet.");
72             }
73
74             return actualFailure;
75         }
76     };
77
78     private final ActorSystem actorSystem;
79     private final ActorRef shardManager;
80     private final ClusterWrapper clusterWrapper;
81     private final Configuration configuration;
82     private final DatastoreContext datastoreContext;
83     private volatile SchemaContext schemaContext;
84     private final FiniteDuration operationDuration;
85     private final Timeout operationTimeout;
86     private final String selfAddressHostPort;
87     private final int transactionOutstandingOperationLimit;
88
89     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
90             ClusterWrapper clusterWrapper, Configuration configuration) {
91         this(actorSystem, shardManager, clusterWrapper, configuration,
92                 DatastoreContext.newBuilder().build());
93     }
94
95     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
96             ClusterWrapper clusterWrapper, Configuration configuration,
97             DatastoreContext datastoreContext) {
98         this.actorSystem = actorSystem;
99         this.shardManager = shardManager;
100         this.clusterWrapper = clusterWrapper;
101         this.configuration = configuration;
102         this.datastoreContext = datastoreContext;
103
104         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
105                 TimeUnit.SECONDS);
106         operationTimeout = new Timeout(operationDuration);
107
108         Address selfAddress = clusterWrapper.getSelfAddress();
109         if (selfAddress != null && !selfAddress.host().isEmpty()) {
110             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
111         } else {
112             selfAddressHostPort = null;
113         }
114
115         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
116     }
117
118     public DatastoreContext getDatastoreContext() {
119         return datastoreContext;
120     }
121
122     public ActorSystem getActorSystem() {
123         return actorSystem;
124     }
125
126     public ActorRef getShardManager() {
127         return shardManager;
128     }
129
130     public ActorSelection actorSelection(String actorPath) {
131         return actorSystem.actorSelection(actorPath);
132     }
133
134     public ActorSelection actorSelection(ActorPath actorPath) {
135         return actorSystem.actorSelection(actorPath);
136     }
137
138     public void setSchemaContext(SchemaContext schemaContext) {
139         this.schemaContext = schemaContext;
140
141         if(shardManager != null) {
142             shardManager.tell(new UpdateSchemaContext(schemaContext), null);
143         }
144     }
145
146     public SchemaContext getSchemaContext() {
147         return schemaContext;
148     }
149
150     /**
151      * Finds the primary shard for the given shard name
152      *
153      * @param shardName
154      * @return
155      */
156     public Optional<ActorSelection> findPrimaryShard(String shardName) {
157         String path = findPrimaryPathOrNull(shardName);
158         if (path == null){
159             return Optional.absent();
160         }
161         return Optional.of(actorSystem.actorSelection(path));
162     }
163
164     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
165         Future<Object> future = executeOperationAsync(shardManager,
166                 new FindPrimary(shardName, true).toSerializable(),
167                 datastoreContext.getShardInitializationTimeout());
168
169         return future.transform(new Mapper<Object, ActorSelection>() {
170             @Override
171             public ActorSelection checkedApply(Object response) throws Exception {
172                 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
173                     PrimaryFound found = PrimaryFound.fromSerializable(response);
174
175                     LOG.debug("Primary found {}", found.getPrimaryPath());
176                     return actorSystem.actorSelection(found.getPrimaryPath());
177                 } else if(response instanceof ActorNotInitialized) {
178                     throw new NotInitializedException(
179                             String.format("Found primary shard %s but it's not initialized yet. " +
180                                           "Please try again later", shardName));
181                 } else if(response instanceof PrimaryNotFound) {
182                     throw new PrimaryNotFoundException(
183                             String.format("No primary shard found for %S.", shardName));
184                 }
185
186                 throw new UnknownMessageException(String.format(
187                         "FindPrimary returned unkown response: %s", response));
188             }
189         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
190     }
191
192     /**
193      * Finds a local shard given its shard name and return it's ActorRef
194      *
195      * @param shardName the name of the local shard that needs to be found
196      * @return a reference to a local shard actor which represents the shard
197      *         specified by the shardName
198      */
199     public Optional<ActorRef> findLocalShard(String shardName) {
200         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
201
202         if (result instanceof LocalShardFound) {
203             LocalShardFound found = (LocalShardFound) result;
204             LOG.debug("Local shard found {}", found.getPath());
205             return Optional.of(found.getPath());
206         }
207
208         return Optional.absent();
209     }
210
211     /**
212      * Finds a local shard async given its shard name and return a Future from which to obtain the
213      * ActorRef.
214      *
215      * @param shardName the name of the local shard that needs to be found
216      */
217     public Future<ActorRef> findLocalShardAsync( final String shardName) {
218         Future<Object> future = executeOperationAsync(shardManager,
219                 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
220
221         return future.map(new Mapper<Object, ActorRef>() {
222             @Override
223             public ActorRef checkedApply(Object response) throws Throwable {
224                 if(response instanceof LocalShardFound) {
225                     LocalShardFound found = (LocalShardFound)response;
226                     LOG.debug("Local shard found {}", found.getPath());
227                     return found.getPath();
228                 } else if(response instanceof ActorNotInitialized) {
229                     throw new NotInitializedException(
230                             String.format("Found local shard for %s but it's not initialized yet.",
231                                     shardName));
232                 } else if(response instanceof LocalShardNotFound) {
233                     throw new LocalShardNotFoundException(
234                             String.format("Local shard for %s does not exist.", shardName));
235                 }
236
237                 throw new UnknownMessageException(String.format(
238                         "FindLocalShard returned unkown response: %s", response));
239             }
240         }, getActorSystem().dispatcher());
241     }
242
243     private String findPrimaryPathOrNull(String shardName) {
244         Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
245
246         if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
247             PrimaryFound found = PrimaryFound.fromSerializable(result);
248
249             LOG.debug("Primary found {}", found.getPrimaryPath());
250             return found.getPrimaryPath();
251
252         } else if (result.getClass().equals(ActorNotInitialized.class)){
253             throw new NotInitializedException(
254                 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
255             );
256
257         } else {
258             return null;
259         }
260     }
261
262
263     /**
264      * Executes an operation on a local actor and wait for it's response
265      *
266      * @param actor
267      * @param message
268      * @return The response of the operation
269      */
270     public Object executeOperation(ActorRef actor, Object message) {
271         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
272
273         try {
274             return Await.result(future, operationDuration);
275         } catch (Exception e) {
276             throw new TimeoutException("Sending message " + message.getClass().toString() +
277                     " to actor " + actor.toString() + " failed. Try again later.", e);
278         }
279     }
280
281     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
282         Preconditions.checkArgument(actor != null, "actor must not be null");
283         Preconditions.checkArgument(message != null, "message must not be null");
284
285         LOG.debug("Sending message {} to {}", message.getClass(), actor);
286         return ask(actor, message, timeout);
287     }
288
289     /**
290      * Execute an operation on a remote actor and wait for it's response
291      *
292      * @param actor
293      * @param message
294      * @return
295      */
296     public Object executeOperation(ActorSelection actor, Object message) {
297         Future<Object> future = executeOperationAsync(actor, message);
298
299         try {
300             return Await.result(future, operationDuration);
301         } catch (Exception e) {
302             throw new TimeoutException("Sending message " + message.getClass().toString() +
303                     " to actor " + actor.toString() + " failed. Try again later.", e);
304         }
305     }
306
307     /**
308      * Execute an operation on a remote actor asynchronously.
309      *
310      * @param actor the ActorSelection
311      * @param message the message to send
312      * @param timeout the operation timeout
313      * @return a Future containing the eventual result
314      */
315     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
316             Timeout timeout) {
317         Preconditions.checkArgument(actor != null, "actor must not be null");
318         Preconditions.checkArgument(message != null, "message must not be null");
319
320         LOG.debug("Sending message {} to {}", message.getClass(), actor);
321
322         return ask(actor, message, timeout);
323     }
324
325     /**
326      * Execute an operation on a remote actor asynchronously.
327      *
328      * @param actor the ActorSelection
329      * @param message the message to send
330      * @return a Future containing the eventual result
331      */
332     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
333         return executeOperationAsync(actor, message, operationTimeout);
334     }
335
336     /**
337      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
338      * reply (essentially set and forget).
339      *
340      * @param actor the ActorSelection
341      * @param message the message to send
342      */
343     public void sendOperationAsync(ActorSelection actor, Object message) {
344         Preconditions.checkArgument(actor != null, "actor must not be null");
345         Preconditions.checkArgument(message != null, "message must not be null");
346
347         LOG.debug("Sending message {} to {}", message.getClass(), actor);
348
349         actor.tell(message, ActorRef.noSender());
350     }
351
352     public void shutdown() {
353         shardManager.tell(PoisonPill.getInstance(), null);
354         actorSystem.shutdown();
355     }
356
357     public ClusterWrapper getClusterWrapper() {
358         return clusterWrapper;
359     }
360
361     public String getCurrentMemberName(){
362         return clusterWrapper.getCurrentMemberName();
363     }
364
365     /**
366      * Send the message to each and every shard
367      *
368      * @param message
369      */
370     public void broadcast(Object message){
371         for(String shardName : configuration.getAllShardNames()){
372
373             Optional<ActorSelection> primary = findPrimaryShard(shardName);
374             if (primary.isPresent()) {
375                 primary.get().tell(message, ActorRef.noSender());
376             } else {
377                 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
378                         message.getClass().getSimpleName(), shardName);
379             }
380         }
381     }
382
383     public FiniteDuration getOperationDuration() {
384         return operationDuration;
385     }
386
387     public boolean isPathLocal(String path) {
388         if (Strings.isNullOrEmpty(path)) {
389             return false;
390         }
391
392         int pathAtIndex = path.indexOf('@');
393         if (pathAtIndex == -1) {
394             //if the path is of local format, then its local and is co-located
395             return true;
396
397         } else if (selfAddressHostPort != null) {
398             // self-address and tx actor path, both are of remote path format
399             int slashIndex = path.indexOf('/', pathAtIndex);
400
401             if (slashIndex == -1) {
402                 return false;
403             }
404
405             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
406             return hostPort.equals(selfAddressHostPort);
407
408         } else {
409             // self address is local format and tx actor path is remote format
410             return false;
411         }
412     }
413
414     /**
415      * @deprecated This method is present only to support backward compatibility with Helium and should not be
416      * used any further
417      *
418      *
419      * @param primaryPath
420      * @param localPathOfRemoteActor
421      * @return
422     */
423     @Deprecated
424     public String resolvePath(final String primaryPath,
425                                             final String localPathOfRemoteActor) {
426         StringBuilder builder = new StringBuilder();
427         String[] primaryPathElements = primaryPath.split("/");
428         builder.append(primaryPathElements[0]).append("//")
429             .append(primaryPathElements[1]).append(primaryPathElements[2]);
430         String[] remotePathElements = localPathOfRemoteActor.split("/");
431         for (int i = 3; i < remotePathElements.length; i++) {
432                 builder.append("/").append(remotePathElements[i]);
433             }
434
435         return builder.toString();
436     }
437
438     /**
439      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
440      * should begin throttling the operations
441      *
442      * Parking reading this configuration here because we need to get to the actor system settings
443      *
444      * @return
445      */
446     public int getTransactionOutstandingOperationLimit(){
447         return transactionOutstandingOperationLimit;
448     }
449 }