Bug 3020: Use leader version in LeaderStateChanged
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
29 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
30 import org.opendaylight.controller.cluster.datastore.Configuration;
31 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
47 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
49 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import scala.concurrent.Await;
53 import scala.concurrent.ExecutionContext;
54 import scala.concurrent.Future;
55 import scala.concurrent.duration.Duration;
56 import scala.concurrent.duration.FiniteDuration;
57
58 /**
59  * The ActorContext class contains utility methods which could be used by
60  * non-actors (like DistributedDataStore) to work with actors a little more
61  * easily. An ActorContext can be freely passed around to local object instances
62  * but should not be passed to actors especially remote actors
63  */
64 public class ActorContext {
65     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
66     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
67     private static final String METRIC_RATE = "rate";
68     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
69                                                               new Mapper<Throwable, Throwable>() {
70         @Override
71         public Throwable apply(Throwable failure) {
72             Throwable actualFailure = failure;
73             if(failure instanceof AskTimeoutException) {
74                 // A timeout exception most likely means the shard isn't initialized.
75                 actualFailure = new NotInitializedException(
76                         "Timed out trying to find the primary shard. Most likely cause is the " +
77                         "shard is not initialized yet.");
78             }
79
80             return actualFailure;
81         }
82     };
83     public static final String MAILBOX = "bounded-mailbox";
84     public static final String COMMIT = "commit";
85
86     private final ActorSystem actorSystem;
87     private final ActorRef shardManager;
88     private final ClusterWrapper clusterWrapper;
89     private final Configuration configuration;
90     private DatastoreContext datastoreContext;
91     private FiniteDuration operationDuration;
92     private Timeout operationTimeout;
93     private final String selfAddressHostPort;
94     private TransactionRateLimiter txRateLimiter;
95     private final int transactionOutstandingOperationLimit;
96     private Timeout transactionCommitOperationTimeout;
97     private Timeout shardInitializationTimeout;
98     private final Dispatchers dispatchers;
99
100     private volatile SchemaContext schemaContext;
101     private volatile boolean updated;
102     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
103
104     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
105
106     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
107             ClusterWrapper clusterWrapper, Configuration configuration) {
108         this(actorSystem, shardManager, clusterWrapper, configuration,
109                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
110     }
111
112     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
113             ClusterWrapper clusterWrapper, Configuration configuration,
114             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
115         this.actorSystem = actorSystem;
116         this.shardManager = shardManager;
117         this.clusterWrapper = clusterWrapper;
118         this.configuration = configuration;
119         this.datastoreContext = datastoreContext;
120         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
121         this.primaryShardInfoCache = primaryShardInfoCache;
122
123         setCachedProperties();
124
125         Address selfAddress = clusterWrapper.getSelfAddress();
126         if (selfAddress != null && !selfAddress.host().isEmpty()) {
127             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
128         } else {
129             selfAddressHostPort = null;
130         }
131
132         transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
133     }
134
135     private void setCachedProperties() {
136         txRateLimiter = new TransactionRateLimiter(this);
137
138         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
139         operationTimeout = new Timeout(operationDuration);
140
141         transactionCommitOperationTimeout =  new Timeout(Duration.create(
142                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
143
144         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
145     }
146
147     public DatastoreContext getDatastoreContext() {
148         return datastoreContext;
149     }
150
151     public ActorSystem getActorSystem() {
152         return actorSystem;
153     }
154
155     public ActorRef getShardManager() {
156         return shardManager;
157     }
158
159     public ActorSelection actorSelection(String actorPath) {
160         return actorSystem.actorSelection(actorPath);
161     }
162
163     public ActorSelection actorSelection(ActorPath actorPath) {
164         return actorSystem.actorSelection(actorPath);
165     }
166
167     public void setSchemaContext(SchemaContext schemaContext) {
168         this.schemaContext = schemaContext;
169
170         if(shardManager != null) {
171             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
172         }
173     }
174
175     public void setDatastoreContext(DatastoreContext context) {
176         this.datastoreContext = context;
177         setCachedProperties();
178
179         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
180         // will be published immediately even though they may not be immediately visible to other
181         // threads due to unsynchronized reads. That's OK though - we're going for eventual
182         // consistency here as immediately visible updates to these members aren't critical. These
183         // members could've been made volatile but wanted to avoid volatile reads as these are
184         // accessed often and updates will be infrequent.
185
186         updated = true;
187
188         if(shardManager != null) {
189             shardManager.tell(context, ActorRef.noSender());
190         }
191     }
192
193     public SchemaContext getSchemaContext() {
194         return schemaContext;
195     }
196
197     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
198         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
199         if(ret != null){
200             return ret;
201         }
202         Future<Object> future = executeOperationAsync(shardManager,
203                 new FindPrimary(shardName, true), shardInitializationTimeout);
204
205         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
206             @Override
207             public PrimaryShardInfo checkedApply(Object response) throws Exception {
208                 if(response instanceof RemotePrimaryShardFound) {
209                     LOG.debug("findPrimaryShardAsync received: {}", response);
210                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
211                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
212                 } else if(response instanceof LocalPrimaryShardFound) {
213                     LOG.debug("findPrimaryShardAsync received: {}", response);
214                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
215                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
216                             found.getLocalShardDataTree());
217                 } else if(response instanceof NotInitializedException) {
218                     throw (NotInitializedException)response;
219                 } else if(response instanceof PrimaryNotFoundException) {
220                     throw (PrimaryNotFoundException)response;
221                 } else if(response instanceof NoShardLeaderException) {
222                     throw (NoShardLeaderException)response;
223                 }
224
225                 throw new UnknownMessageException(String.format(
226                         "FindPrimary returned unkown response: %s", response));
227             }
228         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
229     }
230
231     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
232             short primaryVersion, DataTree localShardDataTree) {
233         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
234         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
235                 Optional.fromNullable(localShardDataTree));
236         primaryShardInfoCache.putSuccessful(shardName, info);
237         return info;
238     }
239
240     /**
241      * Finds a local shard given its shard name and return it's ActorRef
242      *
243      * @param shardName the name of the local shard that needs to be found
244      * @return a reference to a local shard actor which represents the shard
245      *         specified by the shardName
246      */
247     public Optional<ActorRef> findLocalShard(String shardName) {
248         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
249
250         if (result instanceof LocalShardFound) {
251             LocalShardFound found = (LocalShardFound) result;
252             LOG.debug("Local shard found {}", found.getPath());
253             return Optional.of(found.getPath());
254         }
255
256         return Optional.absent();
257     }
258
259     /**
260      * Finds a local shard async given its shard name and return a Future from which to obtain the
261      * ActorRef.
262      *
263      * @param shardName the name of the local shard that needs to be found
264      */
265     public Future<ActorRef> findLocalShardAsync( final String shardName) {
266         Future<Object> future = executeOperationAsync(shardManager,
267                 new FindLocalShard(shardName, true), shardInitializationTimeout);
268
269         return future.map(new Mapper<Object, ActorRef>() {
270             @Override
271             public ActorRef checkedApply(Object response) throws Throwable {
272                 if(response instanceof LocalShardFound) {
273                     LocalShardFound found = (LocalShardFound)response;
274                     LOG.debug("Local shard found {}", found.getPath());
275                     return found.getPath();
276                 } else if(response instanceof NotInitializedException) {
277                     throw (NotInitializedException)response;
278                 } else if(response instanceof LocalShardNotFound) {
279                     throw new LocalShardNotFoundException(
280                             String.format("Local shard for %s does not exist.", shardName));
281                 }
282
283                 throw new UnknownMessageException(String.format(
284                         "FindLocalShard returned unkown response: %s", response));
285             }
286         }, getClientDispatcher());
287     }
288
289     /**
290      * Executes an operation on a local actor and wait for it's response
291      *
292      * @param actor
293      * @param message
294      * @return The response of the operation
295      */
296     public Object executeOperation(ActorRef actor, Object message) {
297         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
298
299         try {
300             return Await.result(future, operationDuration);
301         } catch (Exception e) {
302             throw new TimeoutException("Sending message " + message.getClass().toString() +
303                     " to actor " + actor.toString() + " failed. Try again later.", e);
304         }
305     }
306
307     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
308         Preconditions.checkArgument(actor != null, "actor must not be null");
309         Preconditions.checkArgument(message != null, "message must not be null");
310
311         LOG.debug("Sending message {} to {}", message.getClass(), actor);
312         return doAsk(actor, message, timeout);
313     }
314
315     /**
316      * Execute an operation on a remote actor and wait for it's response
317      *
318      * @param actor
319      * @param message
320      * @return
321      */
322     public Object executeOperation(ActorSelection actor, Object message) {
323         Future<Object> future = executeOperationAsync(actor, message);
324
325         try {
326             return Await.result(future, operationDuration);
327         } catch (Exception e) {
328             throw new TimeoutException("Sending message " + message.getClass().toString() +
329                     " to actor " + actor.toString() + " failed. Try again later.", e);
330         }
331     }
332
333     /**
334      * Execute an operation on a remote actor asynchronously.
335      *
336      * @param actor the ActorSelection
337      * @param message the message to send
338      * @param timeout the operation timeout
339      * @return a Future containing the eventual result
340      */
341     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
342             Timeout timeout) {
343         Preconditions.checkArgument(actor != null, "actor must not be null");
344         Preconditions.checkArgument(message != null, "message must not be null");
345
346         LOG.debug("Sending message {} to {}", message.getClass(), actor);
347
348         return doAsk(actor, message, timeout);
349     }
350
351     /**
352      * Execute an operation on a remote actor asynchronously.
353      *
354      * @param actor the ActorSelection
355      * @param message the message to send
356      * @return a Future containing the eventual result
357      */
358     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
359         return executeOperationAsync(actor, message, operationTimeout);
360     }
361
362     /**
363      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
364      * reply (essentially set and forget).
365      *
366      * @param actor the ActorSelection
367      * @param message the message to send
368      */
369     public void sendOperationAsync(ActorSelection actor, Object message) {
370         Preconditions.checkArgument(actor != null, "actor must not be null");
371         Preconditions.checkArgument(message != null, "message must not be null");
372
373         LOG.debug("Sending message {} to {}", message.getClass(), actor);
374
375         actor.tell(message, ActorRef.noSender());
376     }
377
378     public void shutdown() {
379         shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
380     }
381
382     public ClusterWrapper getClusterWrapper() {
383         return clusterWrapper;
384     }
385
386     public String getCurrentMemberName(){
387         return clusterWrapper.getCurrentMemberName();
388     }
389
390     /**
391      * Send the message to each and every shard
392      *
393      * @param message
394      */
395     public void broadcast(final Object message){
396         for(final String shardName : configuration.getAllShardNames()){
397
398             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
399             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
400                 @Override
401                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
402                     if(failure != null) {
403                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
404                                 message.getClass().getSimpleName(), shardName, failure);
405                     } else {
406                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
407                     }
408                 }
409             }, getClientDispatcher());
410         }
411     }
412
413     public FiniteDuration getOperationDuration() {
414         return operationDuration;
415     }
416
417     public boolean isPathLocal(String path) {
418         if (Strings.isNullOrEmpty(path)) {
419             return false;
420         }
421
422         int pathAtIndex = path.indexOf('@');
423         if (pathAtIndex == -1) {
424             //if the path is of local format, then its local and is co-located
425             return true;
426
427         } else if (selfAddressHostPort != null) {
428             // self-address and tx actor path, both are of remote path format
429             int slashIndex = path.indexOf('/', pathAtIndex);
430
431             if (slashIndex == -1) {
432                 return false;
433             }
434
435             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
436             return hostPort.equals(selfAddressHostPort);
437
438         } else {
439             // self address is local format and tx actor path is remote format
440             return false;
441         }
442     }
443
444     /**
445      * @deprecated This method is present only to support backward compatibility with Helium and should not be
446      * used any further
447      *
448      *
449      * @param primaryPath
450      * @param localPathOfRemoteActor
451      * @return
452     */
453     @Deprecated
454     public String resolvePath(final String primaryPath,
455                                             final String localPathOfRemoteActor) {
456         StringBuilder builder = new StringBuilder();
457         String[] primaryPathElements = primaryPath.split("/");
458         builder.append(primaryPathElements[0]).append("//")
459             .append(primaryPathElements[1]).append(primaryPathElements[2]);
460         String[] remotePathElements = localPathOfRemoteActor.split("/");
461         for (int i = 3; i < remotePathElements.length; i++) {
462                 builder.append("/").append(remotePathElements[i]);
463             }
464
465         return builder.toString();
466     }
467
468     /**
469      * Get the maximum number of operations that are to be permitted within a transaction before the transaction
470      * should begin throttling the operations
471      *
472      * Parking reading this configuration here because we need to get to the actor system settings
473      *
474      * @return
475      */
476     public int getTransactionOutstandingOperationLimit(){
477         return transactionOutstandingOperationLimit;
478     }
479
480     /**
481      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
482      * us to create a timer for pretty much anything.
483      *
484      * @param operationName
485      * @return
486      */
487     public Timer getOperationTimer(String operationName){
488         return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
489     }
490
491     public Timer getOperationTimer(String dataStoreType, String operationName){
492         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
493                 operationName, METRIC_RATE);
494         return metricRegistry.timer(rate);
495     }
496
497     /**
498      * Get the type of the data store to which this ActorContext belongs
499      *
500      * @return
501      */
502     public String getDataStoreType() {
503         return datastoreContext.getDataStoreType();
504     }
505
506     /**
507      * Get the current transaction creation rate limit
508      * @return
509      */
510     public double getTxCreationLimit(){
511         return txRateLimiter.getTxCreationLimit();
512     }
513
514     /**
515      * Try to acquire a transaction creation permit. Will block if no permits are available.
516      */
517     public void acquireTxCreationPermit(){
518         txRateLimiter.acquire();
519     }
520
521     /**
522      * Return the operation timeout to be used when committing transactions
523      * @return
524      */
525     public Timeout getTransactionCommitOperationTimeout(){
526         return transactionCommitOperationTimeout;
527     }
528
529     /**
530      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
531      * code on the datastore
532      * @return
533      */
534     public ExecutionContext getClientDispatcher() {
535         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
536     }
537
538     public String getNotificationDispatcherPath(){
539         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
540     }
541
542     public Configuration getConfiguration() {
543         return configuration;
544     }
545
546     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
547         return ask(actorRef, message, timeout);
548     }
549
550     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
551         return ask(actorRef, message, timeout);
552     }
553
554     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
555         return primaryShardInfoCache;
556     }
557 }