BUG 3125 : Set Rate Limit just before acquiring a permit to avoid contention
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Futures;
19 import akka.dispatch.Mapper;
20 import akka.dispatch.OnComplete;
21 import akka.pattern.AskTimeoutException;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Strings;
29 import com.google.common.cache.Cache;
30 import com.google.common.cache.CacheBuilder;
31 import com.google.common.cache.RemovalListener;
32 import com.google.common.cache.RemovalNotification;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.concurrent.TimeUnit;
36 import javax.annotation.concurrent.GuardedBy;
37 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
38 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
39 import org.opendaylight.controller.cluster.datastore.Configuration;
40 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
41 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
44 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
46 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
47 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
48 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
49 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
51 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
52 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
53 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
55 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
56 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Await;
61 import scala.concurrent.ExecutionContext;
62 import scala.concurrent.Future;
63 import scala.concurrent.duration.Duration;
64 import scala.concurrent.duration.FiniteDuration;
65
66 /**
67  * The ActorContext class contains utility methods which could be used by
68  * non-actors (like DistributedDataStore) to work with actors a little more
69  * easily. An ActorContext can be freely passed around to local object instances
70  * but should not be passed to actors especially remote actors
71  */
72 public class ActorContext implements RemovalListener<String, Future<PrimaryShardInfo>> {
73     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
74     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
75     private static final String METRIC_RATE = "rate";
76     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
77                                                               new Mapper<Throwable, Throwable>() {
78         @Override
79         public Throwable apply(Throwable failure) {
80             Throwable actualFailure = failure;
81             if(failure instanceof AskTimeoutException) {
82                 // A timeout exception most likely means the shard isn't initialized.
83                 actualFailure = new NotInitializedException(
84                         "Timed out trying to find the primary shard. Most likely cause is the " +
85                         "shard is not initialized yet.");
86             }
87
88             return actualFailure;
89         }
90     };
91     public static final String MAILBOX = "bounded-mailbox";
92     public static final String COMMIT = "commit";
93
94     private final ActorSystem actorSystem;
95     private final ActorRef shardManager;
96     private final ClusterWrapper clusterWrapper;
97     private final Configuration configuration;
98     private DatastoreContext datastoreContext;
99     private FiniteDuration operationDuration;
100     private Timeout operationTimeout;
101     private final String selfAddressHostPort;
102     private TransactionRateLimiter txRateLimiter;
103     private final int transactionOutstandingOperationLimit;
104     private Timeout transactionCommitOperationTimeout;
105     private Timeout shardInitializationTimeout;
106     private final Dispatchers dispatchers;
107     private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
108
109     private volatile SchemaContext schemaContext;
110     private volatile boolean updated;
111     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
112     @GuardedBy("shardInfoListeners")
113     private final Collection<ShardInfoListenerRegistration<?>> shardInfoListeners = new ArrayList<>();
114
115     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
116             ClusterWrapper clusterWrapper, Configuration configuration) {
117         this(actorSystem, shardManager, clusterWrapper, configuration,
118                 DatastoreContext.newBuilder().build());
119     }
120
121     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
122             ClusterWrapper clusterWrapper, Configuration configuration,
123             DatastoreContext datastoreContext) {
124         this.actorSystem = actorSystem;
125         this.shardManager = shardManager;
126         this.clusterWrapper = clusterWrapper;
127         this.configuration = configuration;
128         this.datastoreContext = datastoreContext;
129         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
130
131         setCachedProperties();
132
133         Address selfAddress = clusterWrapper.getSelfAddress();
134         if (selfAddress != null && !selfAddress.host().isEmpty()) {
135             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
136         } else {
137             selfAddressHostPort = null;
138         }
139
140         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
141     }
142
143     private void setCachedProperties() {
144         txRateLimiter = new TransactionRateLimiter(this);
145
146         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
147         operationTimeout = new Timeout(operationDuration);
148
149         transactionCommitOperationTimeout =  new Timeout(Duration.create(
150                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
151
152         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
153
154         primaryShardInfoCache = CacheBuilder.newBuilder()
155                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
156                 .removalListener(this)
157                 .build();
158     }
159
160     public DatastoreContext getDatastoreContext() {
161         return datastoreContext;
162     }
163
164     public ActorSystem getActorSystem() {
165         return actorSystem;
166     }
167
168     public ActorRef getShardManager() {
169         return shardManager;
170     }
171
172     public ActorSelection actorSelection(String actorPath) {
173         return actorSystem.actorSelection(actorPath);
174     }
175
176     public ActorSelection actorSelection(ActorPath actorPath) {
177         return actorSystem.actorSelection(actorPath);
178     }
179
180     public void setSchemaContext(SchemaContext schemaContext) {
181         this.schemaContext = schemaContext;
182
183         if(shardManager != null) {
184             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
185         }
186     }
187
188     public void setDatastoreContext(DatastoreContext context) {
189         this.datastoreContext = context;
190         setCachedProperties();
191
192         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
193         // will be published immediately even though they may not be immediately visible to other
194         // threads due to unsynchronized reads. That's OK though - we're going for eventual
195         // consistency here as immediately visible updates to these members aren't critical. These
196         // members could've been made volatile but wanted to avoid volatile reads as these are
197         // accessed often and updates will be infrequent.
198
199         updated = true;
200
201         if(shardManager != null) {
202             shardManager.tell(context, ActorRef.noSender());
203         }
204     }
205
206     public SchemaContext getSchemaContext() {
207         return schemaContext;
208     }
209
210     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
211         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
212         if(ret != null){
213             return ret;
214         }
215         Future<Object> future = executeOperationAsync(shardManager,
216                 new FindPrimary(shardName, true), shardInitializationTimeout);
217
218         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
219             @Override
220             public PrimaryShardInfo checkedApply(Object response) throws Exception {
221                 if(response instanceof RemotePrimaryShardFound) {
222                     LOG.debug("findPrimaryShardAsync received: {}", response);
223                     return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
224                 } else if(response instanceof LocalPrimaryShardFound) {
225                     LOG.debug("findPrimaryShardAsync received: {}", response);
226                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
227                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
228                 } else if(response instanceof NotInitializedException) {
229                     throw (NotInitializedException)response;
230                 } else if(response instanceof PrimaryNotFoundException) {
231                     throw (PrimaryNotFoundException)response;
232                 } else if(response instanceof NoShardLeaderException) {
233                     throw (NoShardLeaderException)response;
234                 }
235
236                 throw new UnknownMessageException(String.format(
237                         "FindPrimary returned unkown response: %s", response));
238             }
239         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
240     }
241
242     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
243             DataTree localShardDataTree) {
244         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
245         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
246         primaryShardInfoCache.put(shardName, Futures.successful(info));
247
248         synchronized (shardInfoListeners) {
249             for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
250                 reg.getInstance().onShardInfoUpdated(shardName, info);
251             }
252         }
253         return info;
254     }
255
256     /**
257      * Finds a local shard given its shard name and return it's ActorRef
258      *
259      * @param shardName the name of the local shard that needs to be found
260      * @return a reference to a local shard actor which represents the shard
261      *         specified by the shardName
262      */
263     public Optional<ActorRef> findLocalShard(String shardName) {
264         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
265
266         if (result instanceof LocalShardFound) {
267             LocalShardFound found = (LocalShardFound) result;
268             LOG.debug("Local shard found {}", found.getPath());
269             return Optional.of(found.getPath());
270         }
271
272         return Optional.absent();
273     }
274
275     /**
276      * Finds a local shard async given its shard name and return a Future from which to obtain the
277      * ActorRef.
278      *
279      * @param shardName the name of the local shard that needs to be found
280      */
281     public Future<ActorRef> findLocalShardAsync( final String shardName) {
282         Future<Object> future = executeOperationAsync(shardManager,
283                 new FindLocalShard(shardName, true), shardInitializationTimeout);
284
285         return future.map(new Mapper<Object, ActorRef>() {
286             @Override
287             public ActorRef checkedApply(Object response) throws Throwable {
288                 if(response instanceof LocalShardFound) {
289                     LocalShardFound found = (LocalShardFound)response;
290                     LOG.debug("Local shard found {}", found.getPath());
291                     return found.getPath();
292                 } else if(response instanceof NotInitializedException) {
293                     throw (NotInitializedException)response;
294                 } else if(response instanceof LocalShardNotFound) {
295                     throw new LocalShardNotFoundException(
296                             String.format("Local shard for %s does not exist.", shardName));
297                 }
298
299                 throw new UnknownMessageException(String.format(
300                         "FindLocalShard returned unkown response: %s", response));
301             }
302         }, getClientDispatcher());
303     }
304
305     /**
306      * Executes an operation on a local actor and wait for it's response
307      *
308      * @param actor
309      * @param message
310      * @return The response of the operation
311      */
312     public Object executeOperation(ActorRef actor, Object message) {
313         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
314
315         try {
316             return Await.result(future, operationDuration);
317         } catch (Exception e) {
318             throw new TimeoutException("Sending message " + message.getClass().toString() +
319                     " to actor " + actor.toString() + " failed. Try again later.", e);
320         }
321     }
322
323     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
324         Preconditions.checkArgument(actor != null, "actor must not be null");
325         Preconditions.checkArgument(message != null, "message must not be null");
326
327         LOG.debug("Sending message {} to {}", message.getClass(), actor);
328         return doAsk(actor, message, timeout);
329     }
330
331     /**
332      * Execute an operation on a remote actor and wait for it's response
333      *
334      * @param actor
335      * @param message
336      * @return
337      */
338     public Object executeOperation(ActorSelection actor, Object message) {
339         Future<Object> future = executeOperationAsync(actor, message);
340
341         try {
342             return Await.result(future, operationDuration);
343         } catch (Exception e) {
344             throw new TimeoutException("Sending message " + message.getClass().toString() +
345                     " to actor " + actor.toString() + " failed. Try again later.", e);
346         }
347     }
348
349     /**
350      * Execute an operation on a remote actor asynchronously.
351      *
352      * @param actor the ActorSelection
353      * @param message the message to send
354      * @param timeout the operation timeout
355      * @return a Future containing the eventual result
356      */
357     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
358             Timeout timeout) {
359         Preconditions.checkArgument(actor != null, "actor must not be null");
360         Preconditions.checkArgument(message != null, "message must not be null");
361
362         LOG.debug("Sending message {} to {}", message.getClass(), actor);
363
364         return doAsk(actor, message, timeout);
365     }
366
367     /**
368      * Execute an operation on a remote actor asynchronously.
369      *
370      * @param actor the ActorSelection
371      * @param message the message to send
372      * @return a Future containing the eventual result
373      */
374     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
375         return executeOperationAsync(actor, message, operationTimeout);
376     }
377
378     /**
379      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
380      * reply (essentially set and forget).
381      *
382      * @param actor the ActorSelection
383      * @param message the message to send
384      */
385     public void sendOperationAsync(ActorSelection actor, Object message) {
386         Preconditions.checkArgument(actor != null, "actor must not be null");
387         Preconditions.checkArgument(message != null, "message must not be null");
388
389         LOG.debug("Sending message {} to {}", message.getClass(), actor);
390
391         actor.tell(message, ActorRef.noSender());
392     }
393
394     public void shutdown() {
395         shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
396     }
397
398     public ClusterWrapper getClusterWrapper() {
399         return clusterWrapper;
400     }
401
402     public String getCurrentMemberName(){
403         return clusterWrapper.getCurrentMemberName();
404     }
405
406     /**
407      * Send the message to each and every shard
408      *
409      * @param message
410      */
411     public void broadcast(final Object message){
412         for(final String shardName : configuration.getAllShardNames()){
413
414             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
415             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
416                 @Override
417                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
418                     if(failure != null) {
419                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
420                                 message.getClass().getSimpleName(), shardName, failure);
421                     } else {
422                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
423                     }
424                 }
425             }, getClientDispatcher());
426         }
427     }
428
429     public FiniteDuration getOperationDuration() {
430         return operationDuration;
431     }
432
433     public boolean isPathLocal(String path) {
434         if (Strings.isNullOrEmpty(path)) {
435             return false;
436         }
437
438         int pathAtIndex = path.indexOf('@');
439         if (pathAtIndex == -1) {
440             //if the path is of local format, then its local and is co-located
441             return true;
442
443         } else if (selfAddressHostPort != null) {
444             // self-address and tx actor path, both are of remote path format
445             int slashIndex = path.indexOf('/', pathAtIndex);
446
447             if (slashIndex == -1) {
448                 return false;
449             }
450
451             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
452             return hostPort.equals(selfAddressHostPort);
453
454         } else {
455             // self address is local format and tx actor path is remote format
456             return false;
457         }
458     }
459
460     /**
461      * @deprecated This method is present only to support backward compatibility with Helium and should not be
462      * used any further
463      *
464      *
465      * @param primaryPath
466      * @param localPathOfRemoteActor
467      * @return
468     */
469     @Deprecated
470     public String resolvePath(final String primaryPath,
471                                             final String localPathOfRemoteActor) {
472         StringBuilder builder = new StringBuilder();
473         String[] primaryPathElements = primaryPath.split("/");
474         builder.append(primaryPathElements[0]).append("//")
475             .append(primaryPathElements[1]).append(primaryPathElements[2]);
476         String[] remotePathElements = localPathOfRemoteActor.split("/");
477         for (int i = 3; i < remotePathElements.length; i++) {
478                 builder.append("/").append(remotePathElements[i]);
479             }
480
481         return builder.toString();
482     }
483
484     /**
485      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
486      * should begin throttling the operations
487      *
488      * Parking reading this configuration here because we need to get to the actor system settings
489      *
490      * @return
491      */
492     public int getTransactionOutstandingOperationLimit(){
493         return transactionOutstandingOperationLimit;
494     }
495
496     /**
497      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
498      * us to create a timer for pretty much anything.
499      *
500      * @param operationName
501      * @return
502      */
503     public Timer getOperationTimer(String operationName){
504         return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
505     }
506
507     public Timer getOperationTimer(String dataStoreType, String operationName){
508         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
509                 operationName, METRIC_RATE);
510         return metricRegistry.timer(rate);
511     }
512
513     /**
514      * Get the type of the data store to which this ActorContext belongs
515      *
516      * @return
517      */
518     public String getDataStoreType() {
519         return datastoreContext.getDataStoreType();
520     }
521
522     /**
523      * Get the current transaction creation rate limit
524      * @return
525      */
526     public double getTxCreationLimit(){
527         return txRateLimiter.getTxCreationLimit();
528     }
529
530     /**
531      * Try to acquire a transaction creation permit. Will block if no permits are available.
532      */
533     public void acquireTxCreationPermit(){
534         txRateLimiter.acquire();
535     }
536
537     /**
538      * Return the operation timeout to be used when committing transactions
539      * @return
540      */
541     public Timeout getTransactionCommitOperationTimeout(){
542         return transactionCommitOperationTimeout;
543     }
544
545     /**
546      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
547      * code on the datastore
548      * @return
549      */
550     public ExecutionContext getClientDispatcher() {
551         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
552     }
553
554     public String getNotificationDispatcherPath(){
555         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
556     }
557
558     public Configuration getConfiguration() {
559         return configuration;
560     }
561
562     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
563         return ask(actorRef, message, timeout);
564     }
565
566     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
567         return ask(actorRef, message, timeout);
568     }
569
570     @VisibleForTesting
571     Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
572         return primaryShardInfoCache;
573     }
574
575     public <T extends ShardInfoListener> ShardInfoListenerRegistration<T> registerShardInfoListener(final T listener) {
576         final ShardInfoListenerRegistration<T> reg = new ShardInfoListenerRegistration<T>(listener, this);
577
578         synchronized (shardInfoListeners) {
579             shardInfoListeners.add(reg);
580         }
581         return reg;
582     }
583
584     protected void removeShardInfoListener(final ShardInfoListenerRegistration<?> registration) {
585         synchronized (shardInfoListeners) {
586             shardInfoListeners.remove(registration);
587         }
588     }
589
590     @Override
591     public void onRemoval(final RemovalNotification<String, Future<PrimaryShardInfo>> notification) {
592         synchronized (shardInfoListeners) {
593             for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
594                 reg.getInstance().onShardInfoUpdated(notification.getKey(), null);
595             }
596         }
597     }
598 }