a9a646cc9f112e128e302bedc0457bcb814ce8bd
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static akka.pattern.Patterns.ask;
11
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Strings;
26 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27 import java.util.Optional;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.LongAdder;
30 import java.util.function.Function;
31 import org.opendaylight.controller.cluster.access.concepts.MemberName;
32 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
33 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
34 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
37 import org.opendaylight.controller.cluster.datastore.config.Configuration;
38 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
44 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
45 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
49 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
50 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
51 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
52 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
53 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
54 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.Await;
60 import scala.concurrent.ExecutionContext;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.FiniteDuration;
63
64 /**
65  * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
66  * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
67  * not be passed to actors especially remote actors.
68  */
69 public class ActorUtils {
70     private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
71         private LongAdder ateExceptions = new LongAdder();
72
73         @Override
74         public void onComplete(final Throwable failure, final Object success) throws Throwable {
75             if (failure instanceof AskTimeoutException) {
76                 ateExceptions.increment();
77             }
78         }
79
80         void reset() {
81             ateExceptions = new LongAdder();
82         }
83
84         long sum() {
85             return ateExceptions.sum();
86         }
87
88         @Override
89         public void execute(final Runnable runnable) {
90             // Yes, we are this ugly, but then we are just doing a check + an increment
91             runnable.run();
92         }
93
94         @Override
95         public void reportFailure(final Throwable cause) {
96             LOG.warn("Unexpected failure updating counters", cause);
97         }
98     }
99
100     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
101     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
102     private static final String METRIC_RATE = "rate";
103     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
104         @Override
105         public Throwable apply(final Throwable failure) {
106             Throwable actualFailure = failure;
107             if (failure instanceof AskTimeoutException) {
108                 // A timeout exception most likely means the shard isn't initialized.
109                 actualFailure = new NotInitializedException(
110                         "Timed out trying to find the primary shard. Most likely cause is the "
111                         + "shard is not initialized yet.");
112             }
113
114             return actualFailure;
115         }
116     };
117     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
118     public static final String COMMIT = "commit";
119
120     private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
121     private final ActorSystem actorSystem;
122     private final ActorRef shardManager;
123     private final ClusterWrapper clusterWrapper;
124     private final Configuration configuration;
125     private final String selfAddressHostPort;
126     private final Dispatchers dispatchers;
127
128     private DatastoreContext datastoreContext;
129     private FiniteDuration operationDuration;
130     private Timeout operationTimeout;
131     private TransactionRateLimiter txRateLimiter;
132     private Timeout transactionCommitOperationTimeout;
133     private Timeout shardInitializationTimeout;
134
135     private volatile EffectiveModelContext schemaContext;
136
137     // Used as a write memory barrier.
138     @SuppressWarnings("unused")
139     private volatile boolean updated;
140
141     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
142             .getMetricsRegistry();
143
144     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
145     private final ShardStrategyFactory shardStrategyFactory;
146
147     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
148             final ClusterWrapper clusterWrapper, final Configuration configuration) {
149         this(actorSystem, shardManager, clusterWrapper, configuration,
150                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
151     }
152
153     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
154             final ClusterWrapper clusterWrapper, final Configuration configuration,
155             final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
156         this.actorSystem = actorSystem;
157         this.shardManager = shardManager;
158         this.clusterWrapper = clusterWrapper;
159         this.configuration = configuration;
160         this.datastoreContext = datastoreContext;
161         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
162         this.primaryShardInfoCache = primaryShardInfoCache;
163         this.shardStrategyFactory = new ShardStrategyFactory(configuration);
164
165         setCachedProperties();
166
167         Address selfAddress = clusterWrapper.getSelfAddress();
168         if (selfAddress != null && !selfAddress.host().isEmpty()) {
169             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
170         } else {
171             selfAddressHostPort = null;
172         }
173     }
174
175     private void setCachedProperties() {
176         txRateLimiter = new TransactionRateLimiter(this);
177
178         operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
179             TimeUnit.MILLISECONDS);
180         operationTimeout = new Timeout(operationDuration);
181
182         transactionCommitOperationTimeout =  new Timeout(FiniteDuration.create(
183                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
184
185         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
186     }
187
188     public DatastoreContext getDatastoreContext() {
189         return datastoreContext;
190     }
191
192     public ActorSystem getActorSystem() {
193         return actorSystem;
194     }
195
196     public ActorRef getShardManager() {
197         return shardManager;
198     }
199
200     public ActorSelection actorSelection(final String actorPath) {
201         return actorSystem.actorSelection(actorPath);
202     }
203
204     public ActorSelection actorSelection(final ActorPath actorPath) {
205         return actorSystem.actorSelection(actorPath);
206     }
207
208     public void setSchemaContext(final EffectiveModelContext schemaContext) {
209         this.schemaContext = schemaContext;
210
211         if (shardManager != null) {
212             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
213         }
214     }
215
216     public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
217         this.datastoreContext = contextFactory.getBaseDatastoreContext();
218         setCachedProperties();
219
220         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
221         // will be published immediately even though they may not be immediately visible to other
222         // threads due to unsynchronized reads. That's OK though - we're going for eventual
223         // consistency here as immediately visible updates to these members aren't critical. These
224         // members could've been made volatile but wanted to avoid volatile reads as these are
225         // accessed often and updates will be infrequent.
226
227         updated = true;
228
229         if (shardManager != null) {
230             shardManager.tell(contextFactory, ActorRef.noSender());
231         }
232     }
233
234     public EffectiveModelContext getSchemaContext() {
235         return schemaContext;
236     }
237
238     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
239         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
240         if (ret != null) {
241             return ret;
242         }
243         Future<Object> future = executeOperationAsync(shardManager,
244                 new FindPrimary(shardName, true), shardInitializationTimeout);
245
246         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
247             @Override
248             public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
249                 if (response instanceof RemotePrimaryShardFound) {
250                     LOG.debug("findPrimaryShardAsync received: {}", response);
251                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
252                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
253                 } else if (response instanceof LocalPrimaryShardFound) {
254                     LOG.debug("findPrimaryShardAsync received: {}", response);
255                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
256                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
257                             found.getLocalShardDataTree());
258                 } else if (response instanceof NotInitializedException) {
259                     throw (NotInitializedException)response;
260                 } else if (response instanceof PrimaryNotFoundException) {
261                     throw (PrimaryNotFoundException)response;
262                 } else if (response instanceof NoShardLeaderException) {
263                     throw (NoShardLeaderException)response;
264                 }
265
266                 throw new UnknownMessageException(String.format(
267                         "FindPrimary returned unkown response: %s", response));
268             }
269         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
270     }
271
272     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
273             justification = "https://github.com/spotbugs/spotbugs/issues/811")
274     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
275             final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
276         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
277         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
278             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
279         primaryShardInfoCache.putSuccessful(shardName, info);
280         return info;
281     }
282
283     /**
284      * Finds a local shard given its shard name and return it's ActorRef.
285      *
286      * @param shardName the name of the local shard that needs to be found
287      * @return a reference to a local shard actor which represents the shard
288      *         specified by the shardName
289      */
290     public Optional<ActorRef> findLocalShard(final String shardName) {
291         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
292
293         if (result instanceof LocalShardFound) {
294             LocalShardFound found = (LocalShardFound) result;
295             LOG.debug("Local shard found {}", found.getPath());
296             return Optional.of(found.getPath());
297         }
298
299         return Optional.empty();
300     }
301
302     /**
303      * Finds a local shard async given its shard name and return a Future from which to obtain the
304      * ActorRef.
305      *
306      * @param shardName the name of the local shard that needs to be found
307      */
308     public Future<ActorRef> findLocalShardAsync(final String shardName) {
309         Future<Object> future = executeOperationAsync(shardManager,
310                 new FindLocalShard(shardName, true), shardInitializationTimeout);
311
312         return future.map(new Mapper<Object, ActorRef>() {
313             @Override
314             public ActorRef checkedApply(final Object response) throws Throwable {
315                 if (response instanceof LocalShardFound) {
316                     LocalShardFound found = (LocalShardFound)response;
317                     LOG.debug("Local shard found {}", found.getPath());
318                     return found.getPath();
319                 } else if (response instanceof NotInitializedException) {
320                     throw (NotInitializedException)response;
321                 } else if (response instanceof LocalShardNotFound) {
322                     throw new LocalShardNotFoundException(
323                             String.format("Local shard for %s does not exist.", shardName));
324                 }
325
326                 throw new UnknownMessageException(String.format(
327                         "FindLocalShard returned unkown response: %s", response));
328             }
329         }, getClientDispatcher());
330     }
331
332     /**
333      * Executes an operation on a local actor and wait for it's response.
334      *
335      * @param actor the actor
336      * @param message the message to send
337      * @return The response of the operation
338      */
339     @SuppressWarnings("checkstyle:IllegalCatch")
340     public Object executeOperation(final ActorRef actor, final Object message) {
341         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
342
343         try {
344             return Await.result(future, operationDuration);
345         } catch (Exception e) {
346             throw new TimeoutException("Sending message " + message.getClass().toString()
347                     + " to actor " + actor.toString() + " failed. Try again later.", e);
348         }
349     }
350
351     /**
352      * Execute an operation on a remote actor and wait for it's response.
353      *
354      * @param actor the actor
355      * @param message the message
356      * @return the response message
357      */
358     @SuppressWarnings("checkstyle:IllegalCatch")
359     public Object executeOperation(final ActorSelection actor, final Object message) {
360         Future<Object> future = executeOperationAsync(actor, message);
361
362         try {
363             return Await.result(future, operationDuration);
364         } catch (Exception e) {
365             throw new TimeoutException("Sending message " + message.getClass().toString()
366                     + " to actor " + actor.toString() + " failed. Try again later.", e);
367         }
368     }
369
370     public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
371         Preconditions.checkArgument(actor != null, "actor must not be null");
372         Preconditions.checkArgument(message != null, "message must not be null");
373
374         LOG.debug("Sending message {} to {}", message.getClass(), actor);
375         return doAsk(actor, message, timeout);
376     }
377
378     /**
379      * Execute an operation on a remote actor asynchronously.
380      *
381      * @param actor the ActorSelection
382      * @param message the message to send
383      * @param timeout the operation timeout
384      * @return a Future containing the eventual result
385      */
386     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
387             final Timeout timeout) {
388         Preconditions.checkArgument(actor != null, "actor must not be null");
389         Preconditions.checkArgument(message != null, "message must not be null");
390
391         LOG.debug("Sending message {} to {}", message.getClass(), actor);
392
393         return doAsk(actor, message, timeout);
394     }
395
396     /**
397      * Execute an operation on a remote actor asynchronously.
398      *
399      * @param actor the ActorSelection
400      * @param message the message to send
401      * @return a Future containing the eventual result
402      */
403     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
404         return executeOperationAsync(actor, message, operationTimeout);
405     }
406
407     /**
408      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
409      * reply (essentially set and forget).
410      *
411      * @param actor the ActorSelection
412      * @param message the message to send
413      */
414     public void sendOperationAsync(final ActorSelection actor, final Object message) {
415         Preconditions.checkArgument(actor != null, "actor must not be null");
416         Preconditions.checkArgument(message != null, "message must not be null");
417
418         LOG.debug("Sending message {} to {}", message.getClass(), actor);
419
420         actor.tell(message, ActorRef.noSender());
421     }
422
423     @SuppressWarnings("checkstyle:IllegalCatch")
424     public void shutdown() {
425         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
426         try {
427             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
428         } catch (Exception e) {
429             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
430         }
431     }
432
433     public ClusterWrapper getClusterWrapper() {
434         return clusterWrapper;
435     }
436
437     public MemberName getCurrentMemberName() {
438         return clusterWrapper.getCurrentMemberName();
439     }
440
441     /**
442      * Send the message to each and every shard.
443      */
444     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
445         for (final String shardName : configuration.getAllShardNames()) {
446
447             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
448             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
449                 @Override
450                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
451                     if (failure != null) {
452                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
453                             shardName, failure);
454                     } else {
455                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
456                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
457                     }
458                 }
459             }, getClientDispatcher());
460         }
461     }
462
463     public FiniteDuration getOperationDuration() {
464         return operationDuration;
465     }
466
467     public Timeout getOperationTimeout() {
468         return operationTimeout;
469     }
470
471     public boolean isPathLocal(final String path) {
472         if (Strings.isNullOrEmpty(path)) {
473             return false;
474         }
475
476         int pathAtIndex = path.indexOf('@');
477         if (pathAtIndex == -1) {
478             //if the path is of local format, then its local and is co-located
479             return true;
480
481         } else if (selfAddressHostPort != null) {
482             // self-address and tx actor path, both are of remote path format
483             int slashIndex = path.indexOf('/', pathAtIndex);
484
485             if (slashIndex == -1) {
486                 return false;
487             }
488
489             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
490             return hostPort.equals(selfAddressHostPort);
491
492         } else {
493             // self address is local format and tx actor path is remote format
494             return false;
495         }
496     }
497
498     /**
499      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
500      * us to create a timer for pretty much anything.
501      *
502      * @param operationName the name of the operation
503      * @return the Timer instance
504      */
505     public Timer getOperationTimer(final String operationName) {
506         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
507     }
508
509     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
510         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
511                 operationName, METRIC_RATE);
512         return metricRegistry.timer(rate);
513     }
514
515     /**
516      * Get the name of the data store to which this ActorContext belongs.
517      *
518      * @return the data store name
519      */
520     public String getDataStoreName() {
521         return datastoreContext.getDataStoreName();
522     }
523
524     /**
525      * Get the current transaction creation rate limit.
526      *
527      * @return the rate limit
528      */
529     public double getTxCreationLimit() {
530         return txRateLimiter.getTxCreationLimit();
531     }
532
533     public long getAskTimeoutExceptionCount() {
534         return askTimeoutCounter.sum();
535     }
536
537     public void resetAskTimeoutExceptionCount() {
538         askTimeoutCounter.reset();
539     }
540
541     /**
542      * Try to acquire a transaction creation permit. Will block if no permits are available.
543      */
544     public void acquireTxCreationPermit() {
545         txRateLimiter.acquire();
546     }
547
548     /**
549      * Returns the operation timeout to be used when committing transactions.
550      *
551      * @return the operation timeout
552      */
553     public Timeout getTransactionCommitOperationTimeout() {
554         return transactionCommitOperationTimeout;
555     }
556
557     /**
558      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
559      * code on the datastore.
560      *
561      * @return the dispatcher
562      */
563     public ExecutionContext getClientDispatcher() {
564         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
565     }
566
567     public String getNotificationDispatcherPath() {
568         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
569     }
570
571     public Configuration getConfiguration() {
572         return configuration;
573     }
574
575     public ShardStrategyFactory getShardStrategyFactory() {
576         return shardStrategyFactory;
577     }
578
579     protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
580         return ask(actorRef, message, timeout);
581     }
582
583     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
584         final Future<Object> ret = ask(actorRef, message, timeout);
585         ret.onComplete(askTimeoutCounter, askTimeoutCounter);
586         return ret;
587     }
588
589     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
590         return primaryShardInfoCache;
591     }
592 }