Merge "Fixes Bug 2935"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Futures;
19 import akka.dispatch.Mapper;
20 import akka.dispatch.OnComplete;
21 import akka.pattern.AskTimeoutException;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Strings;
29 import com.google.common.cache.Cache;
30 import com.google.common.cache.CacheBuilder;
31 import com.google.common.util.concurrent.RateLimiter;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
48 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
49 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
50 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.Await;
54 import scala.concurrent.ExecutionContext;
55 import scala.concurrent.Future;
56 import scala.concurrent.duration.Duration;
57 import scala.concurrent.duration.FiniteDuration;
58
59 /**
60  * The ActorContext class contains utility methods which could be used by
61  * non-actors (like DistributedDataStore) to work with actors a little more
62  * easily. An ActorContext can be freely passed around to local object instances
63  * but should not be passed to actors especially remote actors
64  */
65 public class ActorContext {
66     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
67     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
68     private static final String METRIC_RATE = "rate";
69     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
70                                                               new Mapper<Throwable, Throwable>() {
71         @Override
72         public Throwable apply(Throwable failure) {
73             Throwable actualFailure = failure;
74             if(failure instanceof AskTimeoutException) {
75                 // A timeout exception most likely means the shard isn't initialized.
76                 actualFailure = new NotInitializedException(
77                         "Timed out trying to find the primary shard. Most likely cause is the " +
78                         "shard is not initialized yet.");
79             }
80
81             return actualFailure;
82         }
83     };
84     public static final String MAILBOX = "bounded-mailbox";
85
86     private final ActorSystem actorSystem;
87     private final ActorRef shardManager;
88     private final ClusterWrapper clusterWrapper;
89     private final Configuration configuration;
90     private DatastoreContext datastoreContext;
91     private FiniteDuration operationDuration;
92     private Timeout operationTimeout;
93     private final String selfAddressHostPort;
94     private RateLimiter txRateLimiter;
95     private final int transactionOutstandingOperationLimit;
96     private Timeout transactionCommitOperationTimeout;
97     private Timeout shardInitializationTimeout;
98     private final Dispatchers dispatchers;
99     private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
100
101     private volatile SchemaContext schemaContext;
102     private volatile boolean updated;
103     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
104
105     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
106             ClusterWrapper clusterWrapper, Configuration configuration) {
107         this(actorSystem, shardManager, clusterWrapper, configuration,
108                 DatastoreContext.newBuilder().build());
109     }
110
111     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
112             ClusterWrapper clusterWrapper, Configuration configuration,
113             DatastoreContext datastoreContext) {
114         this.actorSystem = actorSystem;
115         this.shardManager = shardManager;
116         this.clusterWrapper = clusterWrapper;
117         this.configuration = configuration;
118         this.datastoreContext = datastoreContext;
119         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
120
121         setCachedProperties();
122
123         Address selfAddress = clusterWrapper.getSelfAddress();
124         if (selfAddress != null && !selfAddress.host().isEmpty()) {
125             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
126         } else {
127             selfAddressHostPort = null;
128         }
129
130         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
131     }
132
133     private void setCachedProperties() {
134         txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
135
136         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
137         operationTimeout = new Timeout(operationDuration);
138
139         transactionCommitOperationTimeout =  new Timeout(Duration.create(
140                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
141
142         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
143
144         primaryShardActorSelectionCache = CacheBuilder.newBuilder()
145                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
146                 .build();
147     }
148
149     public DatastoreContext getDatastoreContext() {
150         return datastoreContext;
151     }
152
153     public ActorSystem getActorSystem() {
154         return actorSystem;
155     }
156
157     public ActorRef getShardManager() {
158         return shardManager;
159     }
160
161     public ActorSelection actorSelection(String actorPath) {
162         return actorSystem.actorSelection(actorPath);
163     }
164
165     public ActorSelection actorSelection(ActorPath actorPath) {
166         return actorSystem.actorSelection(actorPath);
167     }
168
169     public void setSchemaContext(SchemaContext schemaContext) {
170         this.schemaContext = schemaContext;
171
172         if(shardManager != null) {
173             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
174         }
175     }
176
177     public void setDatastoreContext(DatastoreContext context) {
178         this.datastoreContext = context;
179         setCachedProperties();
180
181         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
182         // will be published immediately even though they may not be immediately visible to other
183         // threads due to unsynchronized reads. That's OK though - we're going for eventual
184         // consistency here as immediately visible updates to these members aren't critical. These
185         // members could've been made volatile but wanted to avoid volatile reads as these are
186         // accessed often and updates will be infrequent.
187
188         updated = true;
189
190         if(shardManager != null) {
191             shardManager.tell(context, ActorRef.noSender());
192         }
193     }
194
195     public SchemaContext getSchemaContext() {
196         return schemaContext;
197     }
198
199     public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
200         Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
201         if(ret != null){
202             return ret;
203         }
204         Future<Object> future = executeOperationAsync(shardManager,
205                 new FindPrimary(shardName, true), shardInitializationTimeout);
206
207         return future.transform(new Mapper<Object, ActorSelection>() {
208             @Override
209             public ActorSelection checkedApply(Object response) throws Exception {
210                 if(response instanceof PrimaryFound) {
211                     PrimaryFound found = (PrimaryFound)response;
212
213                     LOG.debug("Primary found {}", found.getPrimaryPath());
214                     ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
215                     primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
216                     return actorSelection;
217                 } else if(response instanceof NotInitializedException) {
218                     throw (NotInitializedException)response;
219                 } else if(response instanceof PrimaryNotFoundException) {
220                     throw (PrimaryNotFoundException)response;
221                 } else if(response instanceof NoShardLeaderException) {
222                     throw (NoShardLeaderException)response;
223                 }
224
225                 throw new UnknownMessageException(String.format(
226                         "FindPrimary returned unkown response: %s", response));
227             }
228         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
229     }
230
231     /**
232      * Finds a local shard given its shard name and return it's ActorRef
233      *
234      * @param shardName the name of the local shard that needs to be found
235      * @return a reference to a local shard actor which represents the shard
236      *         specified by the shardName
237      */
238     public Optional<ActorRef> findLocalShard(String shardName) {
239         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
240
241         if (result instanceof LocalShardFound) {
242             LocalShardFound found = (LocalShardFound) result;
243             LOG.debug("Local shard found {}", found.getPath());
244             return Optional.of(found.getPath());
245         }
246
247         return Optional.absent();
248     }
249
250     /**
251      * Finds a local shard async given its shard name and return a Future from which to obtain the
252      * ActorRef.
253      *
254      * @param shardName the name of the local shard that needs to be found
255      */
256     public Future<ActorRef> findLocalShardAsync( final String shardName) {
257         Future<Object> future = executeOperationAsync(shardManager,
258                 new FindLocalShard(shardName, true), shardInitializationTimeout);
259
260         return future.map(new Mapper<Object, ActorRef>() {
261             @Override
262             public ActorRef checkedApply(Object response) throws Throwable {
263                 if(response instanceof LocalShardFound) {
264                     LocalShardFound found = (LocalShardFound)response;
265                     LOG.debug("Local shard found {}", found.getPath());
266                     return found.getPath();
267                 } else if(response instanceof NotInitializedException) {
268                     throw (NotInitializedException)response;
269                 } else if(response instanceof LocalShardNotFound) {
270                     throw new LocalShardNotFoundException(
271                             String.format("Local shard for %s does not exist.", shardName));
272                 }
273
274                 throw new UnknownMessageException(String.format(
275                         "FindLocalShard returned unkown response: %s", response));
276             }
277         }, getClientDispatcher());
278     }
279
280     /**
281      * Executes an operation on a local actor and wait for it's response
282      *
283      * @param actor
284      * @param message
285      * @return The response of the operation
286      */
287     public Object executeOperation(ActorRef actor, Object message) {
288         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
289
290         try {
291             return Await.result(future, operationDuration);
292         } catch (Exception e) {
293             throw new TimeoutException("Sending message " + message.getClass().toString() +
294                     " to actor " + actor.toString() + " failed. Try again later.", e);
295         }
296     }
297
298     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
299         Preconditions.checkArgument(actor != null, "actor must not be null");
300         Preconditions.checkArgument(message != null, "message must not be null");
301
302         LOG.debug("Sending message {} to {}", message.getClass(), actor);
303         return doAsk(actor, message, timeout);
304     }
305
306     /**
307      * Execute an operation on a remote actor and wait for it's response
308      *
309      * @param actor
310      * @param message
311      * @return
312      */
313     public Object executeOperation(ActorSelection actor, Object message) {
314         Future<Object> future = executeOperationAsync(actor, message);
315
316         try {
317             return Await.result(future, operationDuration);
318         } catch (Exception e) {
319             throw new TimeoutException("Sending message " + message.getClass().toString() +
320                     " to actor " + actor.toString() + " failed. Try again later.", e);
321         }
322     }
323
324     /**
325      * Execute an operation on a remote actor asynchronously.
326      *
327      * @param actor the ActorSelection
328      * @param message the message to send
329      * @param timeout the operation timeout
330      * @return a Future containing the eventual result
331      */
332     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
333             Timeout timeout) {
334         Preconditions.checkArgument(actor != null, "actor must not be null");
335         Preconditions.checkArgument(message != null, "message must not be null");
336
337         LOG.debug("Sending message {} to {}", message.getClass(), actor);
338
339         return doAsk(actor, message, timeout);
340     }
341
342     /**
343      * Execute an operation on a remote actor asynchronously.
344      *
345      * @param actor the ActorSelection
346      * @param message the message to send
347      * @return a Future containing the eventual result
348      */
349     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
350         return executeOperationAsync(actor, message, operationTimeout);
351     }
352
353     /**
354      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
355      * reply (essentially set and forget).
356      *
357      * @param actor the ActorSelection
358      * @param message the message to send
359      */
360     public void sendOperationAsync(ActorSelection actor, Object message) {
361         Preconditions.checkArgument(actor != null, "actor must not be null");
362         Preconditions.checkArgument(message != null, "message must not be null");
363
364         LOG.debug("Sending message {} to {}", message.getClass(), actor);
365
366         actor.tell(message, ActorRef.noSender());
367     }
368
369     public void shutdown() {
370         shardManager.tell(PoisonPill.getInstance(), null);
371         actorSystem.shutdown();
372     }
373
374     public ClusterWrapper getClusterWrapper() {
375         return clusterWrapper;
376     }
377
378     public String getCurrentMemberName(){
379         return clusterWrapper.getCurrentMemberName();
380     }
381
382     /**
383      * Send the message to each and every shard
384      *
385      * @param message
386      */
387     public void broadcast(final Object message){
388         for(final String shardName : configuration.getAllShardNames()){
389
390             Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
391             primaryFuture.onComplete(new OnComplete<ActorSelection>() {
392                 @Override
393                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
394                     if(failure != null) {
395                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
396                                 message.getClass().getSimpleName(), shardName, failure);
397                     } else {
398                         primaryShard.tell(message, ActorRef.noSender());
399                     }
400                 }
401             }, getClientDispatcher());
402         }
403     }
404
405     public FiniteDuration getOperationDuration() {
406         return operationDuration;
407     }
408
409     public boolean isPathLocal(String path) {
410         if (Strings.isNullOrEmpty(path)) {
411             return false;
412         }
413
414         int pathAtIndex = path.indexOf('@');
415         if (pathAtIndex == -1) {
416             //if the path is of local format, then its local and is co-located
417             return true;
418
419         } else if (selfAddressHostPort != null) {
420             // self-address and tx actor path, both are of remote path format
421             int slashIndex = path.indexOf('/', pathAtIndex);
422
423             if (slashIndex == -1) {
424                 return false;
425             }
426
427             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
428             return hostPort.equals(selfAddressHostPort);
429
430         } else {
431             // self address is local format and tx actor path is remote format
432             return false;
433         }
434     }
435
436     /**
437      * @deprecated This method is present only to support backward compatibility with Helium and should not be
438      * used any further
439      *
440      *
441      * @param primaryPath
442      * @param localPathOfRemoteActor
443      * @return
444     */
445     @Deprecated
446     public String resolvePath(final String primaryPath,
447                                             final String localPathOfRemoteActor) {
448         StringBuilder builder = new StringBuilder();
449         String[] primaryPathElements = primaryPath.split("/");
450         builder.append(primaryPathElements[0]).append("//")
451             .append(primaryPathElements[1]).append(primaryPathElements[2]);
452         String[] remotePathElements = localPathOfRemoteActor.split("/");
453         for (int i = 3; i < remotePathElements.length; i++) {
454                 builder.append("/").append(remotePathElements[i]);
455             }
456
457         return builder.toString();
458     }
459
460     /**
461      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
462      * should begin throttling the operations
463      *
464      * Parking reading this configuration here because we need to get to the actor system settings
465      *
466      * @return
467      */
468     public int getTransactionOutstandingOperationLimit(){
469         return transactionOutstandingOperationLimit;
470     }
471
472     /**
473      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
474      * us to create a timer for pretty much anything.
475      *
476      * @param operationName
477      * @return
478      */
479     public Timer getOperationTimer(String operationName){
480         return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
481     }
482
483     public Timer getOperationTimer(String dataStoreType, String operationName){
484         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
485                 operationName, METRIC_RATE);
486         return metricRegistry.timer(rate);
487     }
488
489     /**
490      * Get the type of the data store to which this ActorContext belongs
491      *
492      * @return
493      */
494     public String getDataStoreType() {
495         return datastoreContext.getDataStoreType();
496     }
497
498     /**
499      * Set the number of transaction creation permits that are to be allowed
500      *
501      * @param permitsPerSecond
502      */
503     public void setTxCreationLimit(double permitsPerSecond){
504         txRateLimiter.setRate(permitsPerSecond);
505     }
506
507     /**
508      * Get the current transaction creation rate limit
509      * @return
510      */
511     public double getTxCreationLimit(){
512         return txRateLimiter.getRate();
513     }
514
515     /**
516      * Try to acquire a transaction creation permit. Will block if no permits are available.
517      */
518     public void acquireTxCreationPermit(){
519         txRateLimiter.acquire();
520     }
521
522     /**
523      * Return the operation timeout to be used when committing transactions
524      * @return
525      */
526     public Timeout getTransactionCommitOperationTimeout(){
527         return transactionCommitOperationTimeout;
528     }
529
530     /**
531      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
532      * code on the datastore
533      * @return
534      */
535     public ExecutionContext getClientDispatcher() {
536         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
537     }
538
539     public String getNotificationDispatcherPath(){
540         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
541     }
542
543     public Configuration getConfiguration() {
544         return configuration;
545     }
546
547     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
548         return ask(actorRef, message, timeout);
549     }
550
551     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
552         return ask(actorRef, message, timeout);
553     }
554
555     @VisibleForTesting
556     Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
557         return primaryShardActorSelectionCache;
558     }
559 }