1dc49dc88aca5468d075907033110af89368aed9
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
29 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
32 import org.opendaylight.controller.cluster.datastore.config.Configuration;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
48 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
49 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
50 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.concurrent.Await;
55 import scala.concurrent.ExecutionContext;
56 import scala.concurrent.Future;
57 import scala.concurrent.duration.Duration;
58 import scala.concurrent.duration.FiniteDuration;
59
60 /**
61  * The ActorContext class contains utility methods which could be used by
62  * non-actors (like DistributedDataStore) to work with actors a little more
63  * easily. An ActorContext can be freely passed around to local object instances
64  * but should not be passed to actors especially remote actors
65  */
66 public class ActorContext {
67     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
68     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
69     private static final String METRIC_RATE = "rate";
70     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
71                                                               new Mapper<Throwable, Throwable>() {
72         @Override
73         public Throwable apply(Throwable failure) {
74             Throwable actualFailure = failure;
75             if(failure instanceof AskTimeoutException) {
76                 // A timeout exception most likely means the shard isn't initialized.
77                 actualFailure = new NotInitializedException(
78                         "Timed out trying to find the primary shard. Most likely cause is the " +
79                         "shard is not initialized yet.");
80             }
81
82             return actualFailure;
83         }
84     };
85     public static final String MAILBOX = "bounded-mailbox";
86     public static final String COMMIT = "commit";
87
88     private final ActorSystem actorSystem;
89     private final ActorRef shardManager;
90     private final ClusterWrapper clusterWrapper;
91     private final Configuration configuration;
92     private DatastoreContext datastoreContext;
93     private FiniteDuration operationDuration;
94     private Timeout operationTimeout;
95     private final String selfAddressHostPort;
96     private TransactionRateLimiter txRateLimiter;
97     private Timeout transactionCommitOperationTimeout;
98     private Timeout shardInitializationTimeout;
99     private final Dispatchers dispatchers;
100
101     private volatile SchemaContext schemaContext;
102     private volatile boolean updated;
103     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
104
105     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
106     private final ShardStrategyFactory shardStrategyFactory;
107
108     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
109             ClusterWrapper clusterWrapper, Configuration configuration) {
110         this(actorSystem, shardManager, clusterWrapper, configuration,
111                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
112     }
113
114     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
115             ClusterWrapper clusterWrapper, Configuration configuration,
116             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
117         this.actorSystem = actorSystem;
118         this.shardManager = shardManager;
119         this.clusterWrapper = clusterWrapper;
120         this.configuration = configuration;
121         this.datastoreContext = datastoreContext;
122         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
123         this.primaryShardInfoCache = primaryShardInfoCache;
124         this.shardStrategyFactory = new ShardStrategyFactory(configuration);
125
126         setCachedProperties();
127
128         Address selfAddress = clusterWrapper.getSelfAddress();
129         if (selfAddress != null && !selfAddress.host().isEmpty()) {
130             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
131         } else {
132             selfAddressHostPort = null;
133         }
134
135     }
136
137     private void setCachedProperties() {
138         txRateLimiter = new TransactionRateLimiter(this);
139
140         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
141         operationTimeout = new Timeout(operationDuration);
142
143         transactionCommitOperationTimeout =  new Timeout(Duration.create(
144                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
145
146         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
147     }
148
149     public DatastoreContext getDatastoreContext() {
150         return datastoreContext;
151     }
152
153     public ActorSystem getActorSystem() {
154         return actorSystem;
155     }
156
157     public ActorRef getShardManager() {
158         return shardManager;
159     }
160
161     public ActorSelection actorSelection(String actorPath) {
162         return actorSystem.actorSelection(actorPath);
163     }
164
165     public ActorSelection actorSelection(ActorPath actorPath) {
166         return actorSystem.actorSelection(actorPath);
167     }
168
169     public void setSchemaContext(SchemaContext schemaContext) {
170         this.schemaContext = schemaContext;
171
172         if(shardManager != null) {
173             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
174         }
175     }
176
177     public void setDatastoreContext(DatastoreContextFactory contextFactory) {
178         this.datastoreContext = contextFactory.getBaseDatastoreContext();
179         setCachedProperties();
180
181         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
182         // will be published immediately even though they may not be immediately visible to other
183         // threads due to unsynchronized reads. That's OK though - we're going for eventual
184         // consistency here as immediately visible updates to these members aren't critical. These
185         // members could've been made volatile but wanted to avoid volatile reads as these are
186         // accessed often and updates will be infrequent.
187
188         updated = true;
189
190         if(shardManager != null) {
191             shardManager.tell(contextFactory, ActorRef.noSender());
192         }
193     }
194
195     public SchemaContext getSchemaContext() {
196         return schemaContext;
197     }
198
199     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
200         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
201         if(ret != null){
202             return ret;
203         }
204         Future<Object> future = executeOperationAsync(shardManager,
205                 new FindPrimary(shardName, true), shardInitializationTimeout);
206
207         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
208             @Override
209             public PrimaryShardInfo checkedApply(Object response) throws Exception {
210                 if(response instanceof RemotePrimaryShardFound) {
211                     LOG.debug("findPrimaryShardAsync received: {}", response);
212                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
213                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
214                 } else if(response instanceof LocalPrimaryShardFound) {
215                     LOG.debug("findPrimaryShardAsync received: {}", response);
216                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
217                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
218                             found.getLocalShardDataTree());
219                 } else if(response instanceof NotInitializedException) {
220                     throw (NotInitializedException)response;
221                 } else if(response instanceof PrimaryNotFoundException) {
222                     throw (PrimaryNotFoundException)response;
223                 } else if(response instanceof NoShardLeaderException) {
224                     throw (NoShardLeaderException)response;
225                 }
226
227                 throw new UnknownMessageException(String.format(
228                         "FindPrimary returned unkown response: %s", response));
229             }
230         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
231     }
232
233     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
234             short primaryVersion, DataTree localShardDataTree) {
235         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
236         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
237                 Optional.fromNullable(localShardDataTree));
238         primaryShardInfoCache.putSuccessful(shardName, info);
239         return info;
240     }
241
242     /**
243      * Finds a local shard given its shard name and return it's ActorRef
244      *
245      * @param shardName the name of the local shard that needs to be found
246      * @return a reference to a local shard actor which represents the shard
247      *         specified by the shardName
248      */
249     public Optional<ActorRef> findLocalShard(String shardName) {
250         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
251
252         if (result instanceof LocalShardFound) {
253             LocalShardFound found = (LocalShardFound) result;
254             LOG.debug("Local shard found {}", found.getPath());
255             return Optional.of(found.getPath());
256         }
257
258         return Optional.absent();
259     }
260
261     /**
262      * Finds a local shard async given its shard name and return a Future from which to obtain the
263      * ActorRef.
264      *
265      * @param shardName the name of the local shard that needs to be found
266      */
267     public Future<ActorRef> findLocalShardAsync( final String shardName) {
268         Future<Object> future = executeOperationAsync(shardManager,
269                 new FindLocalShard(shardName, true), shardInitializationTimeout);
270
271         return future.map(new Mapper<Object, ActorRef>() {
272             @Override
273             public ActorRef checkedApply(Object response) throws Throwable {
274                 if(response instanceof LocalShardFound) {
275                     LocalShardFound found = (LocalShardFound)response;
276                     LOG.debug("Local shard found {}", found.getPath());
277                     return found.getPath();
278                 } else if(response instanceof NotInitializedException) {
279                     throw (NotInitializedException)response;
280                 } else if(response instanceof LocalShardNotFound) {
281                     throw new LocalShardNotFoundException(
282                             String.format("Local shard for %s does not exist.", shardName));
283                 }
284
285                 throw new UnknownMessageException(String.format(
286                         "FindLocalShard returned unkown response: %s", response));
287             }
288         }, getClientDispatcher());
289     }
290
291     /**
292      * Executes an operation on a local actor and wait for it's response
293      *
294      * @param actor
295      * @param message
296      * @return The response of the operation
297      */
298     public Object executeOperation(ActorRef actor, Object message) {
299         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
300
301         try {
302             return Await.result(future, operationDuration);
303         } catch (Exception e) {
304             throw new TimeoutException("Sending message " + message.getClass().toString() +
305                     " to actor " + actor.toString() + " failed. Try again later.", e);
306         }
307     }
308
309     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
310         Preconditions.checkArgument(actor != null, "actor must not be null");
311         Preconditions.checkArgument(message != null, "message must not be null");
312
313         LOG.debug("Sending message {} to {}", message.getClass(), actor);
314         return doAsk(actor, message, timeout);
315     }
316
317     /**
318      * Execute an operation on a remote actor and wait for it's response
319      *
320      * @param actor
321      * @param message
322      * @return
323      */
324     public Object executeOperation(ActorSelection actor, Object message) {
325         Future<Object> future = executeOperationAsync(actor, message);
326
327         try {
328             return Await.result(future, operationDuration);
329         } catch (Exception e) {
330             throw new TimeoutException("Sending message " + message.getClass().toString() +
331                     " to actor " + actor.toString() + " failed. Try again later.", e);
332         }
333     }
334
335     /**
336      * Execute an operation on a remote actor asynchronously.
337      *
338      * @param actor the ActorSelection
339      * @param message the message to send
340      * @param timeout the operation timeout
341      * @return a Future containing the eventual result
342      */
343     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
344             Timeout timeout) {
345         Preconditions.checkArgument(actor != null, "actor must not be null");
346         Preconditions.checkArgument(message != null, "message must not be null");
347
348         LOG.debug("Sending message {} to {}", message.getClass(), actor);
349
350         return doAsk(actor, message, timeout);
351     }
352
353     /**
354      * Execute an operation on a remote actor asynchronously.
355      *
356      * @param actor the ActorSelection
357      * @param message the message to send
358      * @return a Future containing the eventual result
359      */
360     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
361         return executeOperationAsync(actor, message, operationTimeout);
362     }
363
364     /**
365      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
366      * reply (essentially set and forget).
367      *
368      * @param actor the ActorSelection
369      * @param message the message to send
370      */
371     public void sendOperationAsync(ActorSelection actor, Object message) {
372         Preconditions.checkArgument(actor != null, "actor must not be null");
373         Preconditions.checkArgument(message != null, "message must not be null");
374
375         LOG.debug("Sending message {} to {}", message.getClass(), actor);
376
377         actor.tell(message, ActorRef.noSender());
378     }
379
380     public void shutdown() {
381         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
382         try {
383             Await.ready(Patterns.gracefulStop(shardManager, duration, new Shutdown()), duration);
384         } catch(Exception e) {
385             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
386         }
387     }
388
389     public ClusterWrapper getClusterWrapper() {
390         return clusterWrapper;
391     }
392
393     public String getCurrentMemberName(){
394         return clusterWrapper.getCurrentMemberName();
395     }
396
397     /**
398      * Send the message to each and every shard
399      *
400      * @param message
401      */
402     public void broadcast(final Object message){
403         for(final String shardName : configuration.getAllShardNames()){
404
405             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
406             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
407                 @Override
408                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
409                     if(failure != null) {
410                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
411                                 message.getClass().getSimpleName(), shardName, failure);
412                     } else {
413                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
414                     }
415                 }
416             }, getClientDispatcher());
417         }
418     }
419
420     public FiniteDuration getOperationDuration() {
421         return operationDuration;
422     }
423
424     public Timeout getOperationTimeout() {
425         return operationTimeout;
426     }
427
428     public boolean isPathLocal(String path) {
429         if (Strings.isNullOrEmpty(path)) {
430             return false;
431         }
432
433         int pathAtIndex = path.indexOf('@');
434         if (pathAtIndex == -1) {
435             //if the path is of local format, then its local and is co-located
436             return true;
437
438         } else if (selfAddressHostPort != null) {
439             // self-address and tx actor path, both are of remote path format
440             int slashIndex = path.indexOf('/', pathAtIndex);
441
442             if (slashIndex == -1) {
443                 return false;
444             }
445
446             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
447             return hostPort.equals(selfAddressHostPort);
448
449         } else {
450             // self address is local format and tx actor path is remote format
451             return false;
452         }
453     }
454
455     /**
456      * @deprecated This method is present only to support backward compatibility with Helium and should not be
457      * used any further
458      *
459      *
460      * @param primaryPath
461      * @param localPathOfRemoteActor
462      * @return
463     */
464     @Deprecated
465     public String resolvePath(final String primaryPath,
466                                             final String localPathOfRemoteActor) {
467         StringBuilder builder = new StringBuilder();
468         String[] primaryPathElements = primaryPath.split("/");
469         builder.append(primaryPathElements[0]).append("//")
470             .append(primaryPathElements[1]).append(primaryPathElements[2]);
471         String[] remotePathElements = localPathOfRemoteActor.split("/");
472         for (int i = 3; i < remotePathElements.length; i++) {
473                 builder.append("/").append(remotePathElements[i]);
474             }
475
476         return builder.toString();
477     }
478
479     /**
480      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
481      * us to create a timer for pretty much anything.
482      *
483      * @param operationName
484      * @return
485      */
486     public Timer getOperationTimer(String operationName){
487         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
488     }
489
490     public Timer getOperationTimer(String dataStoreType, String operationName){
491         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
492                 operationName, METRIC_RATE);
493         return metricRegistry.timer(rate);
494     }
495
496     /**
497      * Get the name of the data store to which this ActorContext belongs
498      *
499      * @return
500      */
501     public String getDataStoreName() {
502         return datastoreContext.getDataStoreName();
503     }
504
505     /**
506      * Get the type of the data store to which this ActorContext belongs
507      *
508      * @return
509      * @deprecated Use {@link #getDataStoreName()} instead.
510      */
511     @Deprecated
512     public String getDataStoreType() {
513         return datastoreContext.getDataStoreName();
514     }
515
516     /**
517      * Get the current transaction creation rate limit
518      * @return
519      */
520     public double getTxCreationLimit(){
521         return txRateLimiter.getTxCreationLimit();
522     }
523
524     /**
525      * Try to acquire a transaction creation permit. Will block if no permits are available.
526      */
527     public void acquireTxCreationPermit(){
528         txRateLimiter.acquire();
529     }
530
531     /**
532      * Return the operation timeout to be used when committing transactions
533      * @return
534      */
535     public Timeout getTransactionCommitOperationTimeout(){
536         return transactionCommitOperationTimeout;
537     }
538
539     /**
540      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
541      * code on the datastore
542      * @return
543      */
544     public ExecutionContext getClientDispatcher() {
545         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
546     }
547
548     public String getNotificationDispatcherPath(){
549         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
550     }
551
552     public Configuration getConfiguration() {
553         return configuration;
554     }
555
556     public ShardStrategyFactory getShardStrategyFactory() {
557         return shardStrategyFactory;
558     }
559
560     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
561         return ask(actorRef, message, timeout);
562     }
563
564     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
565         return ask(actorRef, message, timeout);
566     }
567
568     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
569         return primaryShardInfoCache;
570     }
571 }