Merge "Do not use ActorSystem.actorFor as it is deprecated"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Futures;
19 import akka.dispatch.Mapper;
20 import akka.dispatch.OnComplete;
21 import akka.pattern.AskTimeoutException;
22 import akka.util.Timeout;
23 import com.codahale.metrics.JmxReporter;
24 import com.codahale.metrics.MetricRegistry;
25 import com.codahale.metrics.Timer;
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Strings;
30 import com.google.common.cache.Cache;
31 import com.google.common.cache.CacheBuilder;
32 import com.google.common.util.concurrent.RateLimiter;
33 import java.util.concurrent.TimeUnit;
34 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
35 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
36 import org.opendaylight.controller.cluster.datastore.Configuration;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
44 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
45 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
46 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
49 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
50 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
51 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import scala.concurrent.Await;
56 import scala.concurrent.ExecutionContext;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
60
61 /**
62  * The ActorContext class contains utility methods which could be used by
63  * non-actors (like DistributedDataStore) to work with actors a little more
64  * easily. An ActorContext can be freely passed around to local object instances
65  * but should not be passed to actors especially remote actors
66  */
67 public class ActorContext {
68     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
69     private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
70     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
71     private static final String METRIC_RATE = "rate";
72     private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
73     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
74                                                               new Mapper<Throwable, Throwable>() {
75         @Override
76         public Throwable apply(Throwable failure) {
77             Throwable actualFailure = failure;
78             if(failure instanceof AskTimeoutException) {
79                 // A timeout exception most likely means the shard isn't initialized.
80                 actualFailure = new NotInitializedException(
81                         "Timed out trying to find the primary shard. Most likely cause is the " +
82                         "shard is not initialized yet.");
83             }
84
85             return actualFailure;
86         }
87     };
88     public static final String MAILBOX = "bounded-mailbox";
89
90     private final ActorSystem actorSystem;
91     private final ActorRef shardManager;
92     private final ClusterWrapper clusterWrapper;
93     private final Configuration configuration;
94     private DatastoreContext datastoreContext;
95     private FiniteDuration operationDuration;
96     private Timeout operationTimeout;
97     private final String selfAddressHostPort;
98     private RateLimiter txRateLimiter;
99     private final MetricRegistry metricRegistry = new MetricRegistry();
100     private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
101     private final int transactionOutstandingOperationLimit;
102     private Timeout transactionCommitOperationTimeout;
103     private Timeout shardInitializationTimeout;
104     private final Dispatchers dispatchers;
105     private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
106
107     private volatile SchemaContext schemaContext;
108     private volatile boolean updated;
109
110     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
111             ClusterWrapper clusterWrapper, Configuration configuration) {
112         this(actorSystem, shardManager, clusterWrapper, configuration,
113                 DatastoreContext.newBuilder().build());
114     }
115
116     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
117             ClusterWrapper clusterWrapper, Configuration configuration,
118             DatastoreContext datastoreContext) {
119         this.actorSystem = actorSystem;
120         this.shardManager = shardManager;
121         this.clusterWrapper = clusterWrapper;
122         this.configuration = configuration;
123         this.datastoreContext = datastoreContext;
124         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
125
126         setCachedProperties();
127
128         Address selfAddress = clusterWrapper.getSelfAddress();
129         if (selfAddress != null && !selfAddress.host().isEmpty()) {
130             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
131         } else {
132             selfAddressHostPort = null;
133         }
134
135         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
136         jmxReporter.start();
137
138     }
139
140     private void setCachedProperties() {
141         txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
142
143         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
144         operationTimeout = new Timeout(operationDuration);
145
146         transactionCommitOperationTimeout =  new Timeout(Duration.create(
147                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
148
149         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
150
151         primaryShardActorSelectionCache = CacheBuilder.newBuilder()
152                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
153                 .build();
154     }
155
156     public DatastoreContext getDatastoreContext() {
157         return datastoreContext;
158     }
159
160     public ActorSystem getActorSystem() {
161         return actorSystem;
162     }
163
164     public ActorRef getShardManager() {
165         return shardManager;
166     }
167
168     public ActorSelection actorSelection(String actorPath) {
169         return actorSystem.actorSelection(actorPath);
170     }
171
172     public ActorSelection actorSelection(ActorPath actorPath) {
173         return actorSystem.actorSelection(actorPath);
174     }
175
176     public void setSchemaContext(SchemaContext schemaContext) {
177         this.schemaContext = schemaContext;
178
179         if(shardManager != null) {
180             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
181         }
182     }
183
184     public void setDatastoreContext(DatastoreContext context) {
185         this.datastoreContext = context;
186         setCachedProperties();
187
188         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
189         // will be published immediately even though they may not be immediately visible to other
190         // threads due to unsynchronized reads. That's OK though - we're going for eventual
191         // consistency here as immediately visible updates to these members aren't critical. These
192         // members could've been made volatile but wanted to avoid volatile reads as these are
193         // accessed often and updates will be infrequent.
194
195         updated = true;
196
197         if(shardManager != null) {
198             shardManager.tell(context, ActorRef.noSender());
199         }
200     }
201
202     public SchemaContext getSchemaContext() {
203         return schemaContext;
204     }
205
206     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
207         Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
208         if(ret != null){
209             return ret;
210         }
211         Future<Object> future = executeOperationAsync(shardManager,
212                 new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
213
214         return future.transform(new Mapper<Object, ActorSelection>() {
215             @Override
216             public ActorSelection checkedApply(Object response) throws Exception {
217                 if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
218                     PrimaryFound found = PrimaryFound.fromSerializable(response);
219
220                     LOG.debug("Primary found {}", found.getPrimaryPath());
221                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
222                     primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
223                     return actorSelection;
224                 } else if(response instanceof ActorNotInitialized) {
225                     throw new NotInitializedException(
226                             String.format("Found primary shard %s but it's not initialized yet. " +
227                                           "Please try again later", shardName));
228                 } else if(response instanceof PrimaryNotFound) {
229                     throw new PrimaryNotFoundException(
230                             String.format("No primary shard found for %S.", shardName));
231                 } else if(response instanceof NoShardLeaderException) {
232                     throw (NoShardLeaderException)response;
233                 }
234
235                 throw new UnknownMessageException(String.format(
236                         "FindPrimary returned unkown response: %s", response));
237             }
238         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
239     }
240
241     /**
242      * Finds a local shard given its shard name and return it's ActorRef
243      *
244      * @param shardName the name of the local shard that needs to be found
245      * @return a reference to a local shard actor which represents the shard
246      *         specified by the shardName
247      */
248     public Optional<ActorRef> findLocalShard(String shardName) {
249         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
250
251         if (result instanceof LocalShardFound) {
252             LocalShardFound found = (LocalShardFound) result;
253             LOG.debug("Local shard found {}", found.getPath());
254             return Optional.of(found.getPath());
255         }
256
257         return Optional.absent();
258     }
259
260     /**
261      * Finds a local shard async given its shard name and return a Future from which to obtain the
262      * ActorRef.
263      *
264      * @param shardName the name of the local shard that needs to be found
265      */
266     public Future<ActorRef> findLocalShardAsync( final String shardName) {
267         Future<Object> future = executeOperationAsync(shardManager,
268                 new FindLocalShard(shardName, true), shardInitializationTimeout);
269
270         return future.map(new Mapper<Object, ActorRef>() {
271             @Override
272             public ActorRef checkedApply(Object response) throws Throwable {
273                 if(response instanceof LocalShardFound) {
274                     LocalShardFound found = (LocalShardFound)response;
275                     LOG.debug("Local shard found {}", found.getPath());
276                     return found.getPath();
277                 } else if(response instanceof ActorNotInitialized) {
278                     throw new NotInitializedException(
279                             String.format("Found local shard for %s but it's not initialized yet.",
280                                     shardName));
281                 } else if(response instanceof LocalShardNotFound) {
282                     throw new LocalShardNotFoundException(
283                             String.format("Local shard for %s does not exist.", shardName));
284                 }
285
286                 throw new UnknownMessageException(String.format(
287                         "FindLocalShard returned unkown response: %s", response));
288             }
289         }, getClientDispatcher());
290     }
291
292     /**
293      * Executes an operation on a local actor and wait for it's response
294      *
295      * @param actor
296      * @param message
297      * @return The response of the operation
298      */
299     public Object executeOperation(ActorRef actor, Object message) {
300         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
301
302         try {
303             return Await.result(future, operationDuration);
304         } catch (Exception e) {
305             throw new TimeoutException("Sending message " + message.getClass().toString() +
306                     " to actor " + actor.toString() + " failed. Try again later.", e);
307         }
308     }
309
310     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
311         Preconditions.checkArgument(actor != null, "actor must not be null");
312         Preconditions.checkArgument(message != null, "message must not be null");
313
314         LOG.debug("Sending message {} to {}", message.getClass(), actor);
315         return doAsk(actor, message, timeout);
316     }
317
318     /**
319      * Execute an operation on a remote actor and wait for it's response
320      *
321      * @param actor
322      * @param message
323      * @return
324      */
325     public Object executeOperation(ActorSelection actor, Object message) {
326         Future<Object> future = executeOperationAsync(actor, message);
327
328         try {
329             return Await.result(future, operationDuration);
330         } catch (Exception e) {
331             throw new TimeoutException("Sending message " + message.getClass().toString() +
332                     " to actor " + actor.toString() + " failed. Try again later.", e);
333         }
334     }
335
336     /**
337      * Execute an operation on a remote actor asynchronously.
338      *
339      * @param actor the ActorSelection
340      * @param message the message to send
341      * @param timeout the operation timeout
342      * @return a Future containing the eventual result
343      */
344     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
345             Timeout timeout) {
346         Preconditions.checkArgument(actor != null, "actor must not be null");
347         Preconditions.checkArgument(message != null, "message must not be null");
348
349         LOG.debug("Sending message {} to {}", message.getClass(), actor);
350
351         return doAsk(actor, message, timeout);
352     }
353
354     /**
355      * Execute an operation on a remote actor asynchronously.
356      *
357      * @param actor the ActorSelection
358      * @param message the message to send
359      * @return a Future containing the eventual result
360      */
361     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
362         return executeOperationAsync(actor, message, operationTimeout);
363     }
364
365     /**
366      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
367      * reply (essentially set and forget).
368      *
369      * @param actor the ActorSelection
370      * @param message the message to send
371      */
372     public void sendOperationAsync(ActorSelection actor, Object message) {
373         Preconditions.checkArgument(actor != null, "actor must not be null");
374         Preconditions.checkArgument(message != null, "message must not be null");
375
376         LOG.debug("Sending message {} to {}", message.getClass(), actor);
377
378         actor.tell(message, ActorRef.noSender());
379     }
380
381     public void shutdown() {
382         shardManager.tell(PoisonPill.getInstance(), null);
383         actorSystem.shutdown();
384     }
385
386     public ClusterWrapper getClusterWrapper() {
387         return clusterWrapper;
388     }
389
390     public String getCurrentMemberName(){
391         return clusterWrapper.getCurrentMemberName();
392     }
393
394     /**
395      * Send the message to each and every shard
396      *
397      * @param message
398      */
399     public void broadcast(final Object message){
400         for(final String shardName : configuration.getAllShardNames()){
401
402             Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
403             primaryFuture.onComplete(new OnComplete<ActorSelection>() {
404                 @Override
405                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
406                     if(failure != null) {
407                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
408                                 message.getClass().getSimpleName(), shardName, failure);
409                     } else {
410                         primaryShard.tell(message, ActorRef.noSender());
411                     }
412                 }
413             }, getClientDispatcher());
414         }
415     }
416
417     public FiniteDuration getOperationDuration() {
418         return operationDuration;
419     }
420
421     public boolean isPathLocal(String path) {
422         if (Strings.isNullOrEmpty(path)) {
423             return false;
424         }
425
426         int pathAtIndex = path.indexOf('@');
427         if (pathAtIndex == -1) {
428             //if the path is of local format, then its local and is co-located
429             return true;
430
431         } else if (selfAddressHostPort != null) {
432             // self-address and tx actor path, both are of remote path format
433             int slashIndex = path.indexOf('/', pathAtIndex);
434
435             if (slashIndex == -1) {
436                 return false;
437             }
438
439             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
440             return hostPort.equals(selfAddressHostPort);
441
442         } else {
443             // self address is local format and tx actor path is remote format
444             return false;
445         }
446     }
447
448     /**
449      * @deprecated This method is present only to support backward compatibility with Helium and should not be
450      * used any further
451      *
452      *
453      * @param primaryPath
454      * @param localPathOfRemoteActor
455      * @return
456     */
457     @Deprecated
458     public String resolvePath(final String primaryPath,
459                                             final String localPathOfRemoteActor) {
460         StringBuilder builder = new StringBuilder();
461         String[] primaryPathElements = primaryPath.split("/");
462         builder.append(primaryPathElements[0]).append("//")
463             .append(primaryPathElements[1]).append(primaryPathElements[2]);
464         String[] remotePathElements = localPathOfRemoteActor.split("/");
465         for (int i = 3; i < remotePathElements.length; i++) {
466                 builder.append("/").append(remotePathElements[i]);
467             }
468
469         return builder.toString();
470     }
471
472     /**
473      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
474      * should begin throttling the operations
475      *
476      * Parking reading this configuration here because we need to get to the actor system settings
477      *
478      * @return
479      */
480     public int getTransactionOutstandingOperationLimit(){
481         return transactionOutstandingOperationLimit;
482     }
483
484     /**
485      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
486      * us to create a timer for pretty much anything.
487      *
488      * @param operationName
489      * @return
490      */
491     public Timer getOperationTimer(String operationName){
492         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
493         return metricRegistry.timer(rate);
494     }
495
496     /**
497      * Get the type of the data store to which this ActorContext belongs
498      *
499      * @return
500      */
501     public String getDataStoreType() {
502         return datastoreContext.getDataStoreType();
503     }
504
505     /**
506      * Set the number of transaction creation permits that are to be allowed
507      *
508      * @param permitsPerSecond
509      */
510     public void setTxCreationLimit(double permitsPerSecond){
511         txRateLimiter.setRate(permitsPerSecond);
512     }
513
514     /**
515      * Get the current transaction creation rate limit
516      * @return
517      */
518     public double getTxCreationLimit(){
519         return txRateLimiter.getRate();
520     }
521
522     /**
523      * Try to acquire a transaction creation permit. Will block if no permits are available.
524      */
525     public void acquireTxCreationPermit(){
526         txRateLimiter.acquire();
527     }
528
529     /**
530      * Return the operation timeout to be used when committing transactions
531      * @return
532      */
533     public Timeout getTransactionCommitOperationTimeout(){
534         return transactionCommitOperationTimeout;
535     }
536
537     /**
538      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
539      * code on the datastore
540      * @return
541      */
542     public ExecutionContext getClientDispatcher() {
543         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
544     }
545
546     public String getNotificationDispatcherPath(){
547         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
548     }
549
550     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
551         return ask(actorRef, message, timeout);
552     }
553
554     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
555         return ask(actorRef, message, timeout);
556     }
557
558     @VisibleForTesting
559     Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
560         return primaryShardActorSelectionCache;
561     }
562 }