Make Shutdown message a proper singleton
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Function;
29 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
30 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
33 import org.opendaylight.controller.cluster.datastore.config.Configuration;
34 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
40 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
41 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
46 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
48 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
49 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
50 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import scala.concurrent.Await;
56 import scala.concurrent.ExecutionContext;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
60
61 /**
62  * The ActorContext class contains utility methods which could be used by
63  * non-actors (like DistributedDataStore) to work with actors a little more
64  * easily. An ActorContext can be freely passed around to local object instances
65  * but should not be passed to actors especially remote actors
66  */
67 public class ActorContext {
68     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
69     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
70     private static final String METRIC_RATE = "rate";
71     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
72                                                               new Mapper<Throwable, Throwable>() {
73         @Override
74         public Throwable apply(Throwable failure) {
75             Throwable actualFailure = failure;
76             if(failure instanceof AskTimeoutException) {
77                 // A timeout exception most likely means the shard isn't initialized.
78                 actualFailure = new NotInitializedException(
79                         "Timed out trying to find the primary shard. Most likely cause is the " +
80                         "shard is not initialized yet.");
81             }
82
83             return actualFailure;
84         }
85     };
86     public static final String MAILBOX = "bounded-mailbox";
87     public static final String COMMIT = "commit";
88
89     private final ActorSystem actorSystem;
90     private final ActorRef shardManager;
91     private final ClusterWrapper clusterWrapper;
92     private final Configuration configuration;
93     private DatastoreContext datastoreContext;
94     private FiniteDuration operationDuration;
95     private Timeout operationTimeout;
96     private final String selfAddressHostPort;
97     private TransactionRateLimiter txRateLimiter;
98     private Timeout transactionCommitOperationTimeout;
99     private Timeout shardInitializationTimeout;
100     private final Dispatchers dispatchers;
101
102     private volatile SchemaContext schemaContext;
103     private volatile boolean updated;
104     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
105
106     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
107     private final ShardStrategyFactory shardStrategyFactory;
108
109     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
110             ClusterWrapper clusterWrapper, Configuration configuration) {
111         this(actorSystem, shardManager, clusterWrapper, configuration,
112                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
113     }
114
115     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
116             ClusterWrapper clusterWrapper, Configuration configuration,
117             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
118         this.actorSystem = actorSystem;
119         this.shardManager = shardManager;
120         this.clusterWrapper = clusterWrapper;
121         this.configuration = configuration;
122         this.datastoreContext = datastoreContext;
123         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
124         this.primaryShardInfoCache = primaryShardInfoCache;
125         this.shardStrategyFactory = new ShardStrategyFactory(configuration);
126
127         setCachedProperties();
128
129         Address selfAddress = clusterWrapper.getSelfAddress();
130         if (selfAddress != null && !selfAddress.host().isEmpty()) {
131             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
132         } else {
133             selfAddressHostPort = null;
134         }
135
136     }
137
138     private void setCachedProperties() {
139         txRateLimiter = new TransactionRateLimiter(this);
140
141         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
142         operationTimeout = new Timeout(operationDuration);
143
144         transactionCommitOperationTimeout =  new Timeout(Duration.create(
145                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
146
147         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
148     }
149
150     public DatastoreContext getDatastoreContext() {
151         return datastoreContext;
152     }
153
154     public ActorSystem getActorSystem() {
155         return actorSystem;
156     }
157
158     public ActorRef getShardManager() {
159         return shardManager;
160     }
161
162     public ActorSelection actorSelection(String actorPath) {
163         return actorSystem.actorSelection(actorPath);
164     }
165
166     public ActorSelection actorSelection(ActorPath actorPath) {
167         return actorSystem.actorSelection(actorPath);
168     }
169
170     public void setSchemaContext(SchemaContext schemaContext) {
171         this.schemaContext = schemaContext;
172
173         if(shardManager != null) {
174             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
175         }
176     }
177
178     public void setDatastoreContext(DatastoreContextFactory contextFactory) {
179         this.datastoreContext = contextFactory.getBaseDatastoreContext();
180         setCachedProperties();
181
182         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
183         // will be published immediately even though they may not be immediately visible to other
184         // threads due to unsynchronized reads. That's OK though - we're going for eventual
185         // consistency here as immediately visible updates to these members aren't critical. These
186         // members could've been made volatile but wanted to avoid volatile reads as these are
187         // accessed often and updates will be infrequent.
188
189         updated = true;
190
191         if(shardManager != null) {
192             shardManager.tell(contextFactory, ActorRef.noSender());
193         }
194     }
195
196     public SchemaContext getSchemaContext() {
197         return schemaContext;
198     }
199
200     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
201         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
202         if(ret != null){
203             return ret;
204         }
205         Future<Object> future = executeOperationAsync(shardManager,
206                 new FindPrimary(shardName, true), shardInitializationTimeout);
207
208         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
209             @Override
210             public PrimaryShardInfo checkedApply(Object response) throws Exception {
211                 if(response instanceof RemotePrimaryShardFound) {
212                     LOG.debug("findPrimaryShardAsync received: {}", response);
213                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
214                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
215                 } else if(response instanceof LocalPrimaryShardFound) {
216                     LOG.debug("findPrimaryShardAsync received: {}", response);
217                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
218                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
219                             found.getLocalShardDataTree());
220                 } else if(response instanceof NotInitializedException) {
221                     throw (NotInitializedException)response;
222                 } else if(response instanceof PrimaryNotFoundException) {
223                     throw (PrimaryNotFoundException)response;
224                 } else if(response instanceof NoShardLeaderException) {
225                     throw (NoShardLeaderException)response;
226                 }
227
228                 throw new UnknownMessageException(String.format(
229                         "FindPrimary returned unkown response: %s", response));
230             }
231         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
232     }
233
234     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
235             short primaryVersion, DataTree localShardDataTree) {
236         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
237         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
238                 Optional.fromNullable(localShardDataTree));
239         primaryShardInfoCache.putSuccessful(shardName, info);
240         return info;
241     }
242
243     /**
244      * Finds a local shard given its shard name and return it's ActorRef
245      *
246      * @param shardName the name of the local shard that needs to be found
247      * @return a reference to a local shard actor which represents the shard
248      *         specified by the shardName
249      */
250     public Optional<ActorRef> findLocalShard(String shardName) {
251         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
252
253         if (result instanceof LocalShardFound) {
254             LocalShardFound found = (LocalShardFound) result;
255             LOG.debug("Local shard found {}", found.getPath());
256             return Optional.of(found.getPath());
257         }
258
259         return Optional.absent();
260     }
261
262     /**
263      * Finds a local shard async given its shard name and return a Future from which to obtain the
264      * ActorRef.
265      *
266      * @param shardName the name of the local shard that needs to be found
267      */
268     public Future<ActorRef> findLocalShardAsync( final String shardName) {
269         Future<Object> future = executeOperationAsync(shardManager,
270                 new FindLocalShard(shardName, true), shardInitializationTimeout);
271
272         return future.map(new Mapper<Object, ActorRef>() {
273             @Override
274             public ActorRef checkedApply(Object response) throws Throwable {
275                 if(response instanceof LocalShardFound) {
276                     LocalShardFound found = (LocalShardFound)response;
277                     LOG.debug("Local shard found {}", found.getPath());
278                     return found.getPath();
279                 } else if(response instanceof NotInitializedException) {
280                     throw (NotInitializedException)response;
281                 } else if(response instanceof LocalShardNotFound) {
282                     throw new LocalShardNotFoundException(
283                             String.format("Local shard for %s does not exist.", shardName));
284                 }
285
286                 throw new UnknownMessageException(String.format(
287                         "FindLocalShard returned unkown response: %s", response));
288             }
289         }, getClientDispatcher());
290     }
291
292     /**
293      * Executes an operation on a local actor and wait for it's response
294      *
295      * @param actor
296      * @param message
297      * @return The response of the operation
298      */
299     public Object executeOperation(ActorRef actor, Object message) {
300         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
301
302         try {
303             return Await.result(future, operationDuration);
304         } catch (Exception e) {
305             throw new TimeoutException("Sending message " + message.getClass().toString() +
306                     " to actor " + actor.toString() + " failed. Try again later.", e);
307         }
308     }
309
310     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
311         Preconditions.checkArgument(actor != null, "actor must not be null");
312         Preconditions.checkArgument(message != null, "message must not be null");
313
314         LOG.debug("Sending message {} to {}", message.getClass(), actor);
315         return doAsk(actor, message, timeout);
316     }
317
318     /**
319      * Execute an operation on a remote actor and wait for it's response
320      *
321      * @param actor
322      * @param message
323      * @return
324      */
325     public Object executeOperation(ActorSelection actor, Object message) {
326         Future<Object> future = executeOperationAsync(actor, message);
327
328         try {
329             return Await.result(future, operationDuration);
330         } catch (Exception e) {
331             throw new TimeoutException("Sending message " + message.getClass().toString() +
332                     " to actor " + actor.toString() + " failed. Try again later.", e);
333         }
334     }
335
336     /**
337      * Execute an operation on a remote actor asynchronously.
338      *
339      * @param actor the ActorSelection
340      * @param message the message to send
341      * @param timeout the operation timeout
342      * @return a Future containing the eventual result
343      */
344     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
345             Timeout timeout) {
346         Preconditions.checkArgument(actor != null, "actor must not be null");
347         Preconditions.checkArgument(message != null, "message must not be null");
348
349         LOG.debug("Sending message {} to {}", message.getClass(), actor);
350
351         return doAsk(actor, message, timeout);
352     }
353
354     /**
355      * Execute an operation on a remote actor asynchronously.
356      *
357      * @param actor the ActorSelection
358      * @param message the message to send
359      * @return a Future containing the eventual result
360      */
361     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
362         return executeOperationAsync(actor, message, operationTimeout);
363     }
364
365     /**
366      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
367      * reply (essentially set and forget).
368      *
369      * @param actor the ActorSelection
370      * @param message the message to send
371      */
372     public void sendOperationAsync(ActorSelection actor, Object message) {
373         Preconditions.checkArgument(actor != null, "actor must not be null");
374         Preconditions.checkArgument(message != null, "message must not be null");
375
376         LOG.debug("Sending message {} to {}", message.getClass(), actor);
377
378         actor.tell(message, ActorRef.noSender());
379     }
380
381     public void shutdown() {
382         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
383         try {
384             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
385         } catch(Exception e) {
386             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
387         }
388     }
389
390     public ClusterWrapper getClusterWrapper() {
391         return clusterWrapper;
392     }
393
394     public String getCurrentMemberName(){
395         return clusterWrapper.getCurrentMemberName();
396     }
397
398     /**
399      * Send the message to each and every shard
400      *
401      * @param message
402      */
403     public void broadcast(final Function<Short, Object> messageSupplier){
404         for(final String shardName : configuration.getAllShardNames()){
405
406             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
407             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
408                 @Override
409                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
410                     Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
411                     if(failure != null) {
412                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
413                                 message.getClass().getSimpleName(), shardName, failure);
414                     } else {
415                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
416                     }
417                 }
418             }, getClientDispatcher());
419         }
420     }
421
422     public FiniteDuration getOperationDuration() {
423         return operationDuration;
424     }
425
426     public Timeout getOperationTimeout() {
427         return operationTimeout;
428     }
429
430     public boolean isPathLocal(String path) {
431         if (Strings.isNullOrEmpty(path)) {
432             return false;
433         }
434
435         int pathAtIndex = path.indexOf('@');
436         if (pathAtIndex == -1) {
437             //if the path is of local format, then its local and is co-located
438             return true;
439
440         } else if (selfAddressHostPort != null) {
441             // self-address and tx actor path, both are of remote path format
442             int slashIndex = path.indexOf('/', pathAtIndex);
443
444             if (slashIndex == -1) {
445                 return false;
446             }
447
448             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
449             return hostPort.equals(selfAddressHostPort);
450
451         } else {
452             // self address is local format and tx actor path is remote format
453             return false;
454         }
455     }
456
457     /**
458      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
459      * us to create a timer for pretty much anything.
460      *
461      * @param operationName
462      * @return
463      */
464     public Timer getOperationTimer(String operationName){
465         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
466     }
467
468     public Timer getOperationTimer(String dataStoreType, String operationName){
469         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
470                 operationName, METRIC_RATE);
471         return metricRegistry.timer(rate);
472     }
473
474     /**
475      * Get the name of the data store to which this ActorContext belongs
476      *
477      * @return
478      */
479     public String getDataStoreName() {
480         return datastoreContext.getDataStoreName();
481     }
482
483     /**
484      * Get the current transaction creation rate limit
485      * @return
486      */
487     public double getTxCreationLimit(){
488         return txRateLimiter.getTxCreationLimit();
489     }
490
491     /**
492      * Try to acquire a transaction creation permit. Will block if no permits are available.
493      */
494     public void acquireTxCreationPermit(){
495         txRateLimiter.acquire();
496     }
497
498     /**
499      * Return the operation timeout to be used when committing transactions
500      * @return
501      */
502     public Timeout getTransactionCommitOperationTimeout(){
503         return transactionCommitOperationTimeout;
504     }
505
506     /**
507      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
508      * code on the datastore
509      * @return
510      */
511     public ExecutionContext getClientDispatcher() {
512         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
513     }
514
515     public String getNotificationDispatcherPath(){
516         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
517     }
518
519     public Configuration getConfiguration() {
520         return configuration;
521     }
522
523     public ShardStrategyFactory getShardStrategyFactory() {
524         return shardStrategyFactory;
525     }
526
527     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
528         return ask(actorRef, message, timeout);
529     }
530
531     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
532         return ask(actorRef, message, timeout);
533     }
534
535     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
536         return primaryShardInfoCache;
537     }
538 }