ba3f9c5c7e074d9f3897808cc83abe22fc6ad86a
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static akka.pattern.Patterns.ask;
11
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.dispatch.Mapper;
17 import akka.dispatch.OnComplete;
18 import akka.pattern.AskTimeoutException;
19 import akka.pattern.Patterns;
20 import akka.util.Timeout;
21 import com.codahale.metrics.MetricRegistry;
22 import com.codahale.metrics.Timer;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Strings;
25 import java.lang.invoke.VarHandle;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.LongAdder;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
32 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
33 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
36 import org.opendaylight.controller.cluster.datastore.config.Configuration;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
49 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
51 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
52 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
53 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
54 import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
55 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.Await;
59 import scala.concurrent.ExecutionContext;
60 import scala.concurrent.Future;
61 import scala.concurrent.duration.FiniteDuration;
62
63 /**
64  * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
65  * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
66  * not be passed to actors especially remote actors.
67  */
68 public class ActorUtils {
69     private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
70         private LongAdder ateExceptions = new LongAdder();
71
72         @Override
73         public void onComplete(final Throwable failure, final Object success) throws Throwable {
74             if (failure instanceof AskTimeoutException) {
75                 ateExceptions.increment();
76             }
77         }
78
79         void reset() {
80             ateExceptions = new LongAdder();
81         }
82
83         long sum() {
84             return ateExceptions.sum();
85         }
86
87         @Override
88         public void execute(final Runnable runnable) {
89             // Yes, we are this ugly, but then we are just doing a check + an increment
90             runnable.run();
91         }
92
93         @Override
94         public void reportFailure(final Throwable cause) {
95             LOG.warn("Unexpected failure updating counters", cause);
96         }
97     }
98
99     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
100     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
101     private static final String METRIC_RATE = "rate";
102     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
103         @Override
104         public Throwable apply(final Throwable failure) {
105             if (failure instanceof AskTimeoutException) {
106                 // A timeout exception most likely means the shard isn't initialized.
107                 return new NotInitializedException(
108                         "Timed out trying to find the primary shard. Most likely cause is the "
109                         + "shard is not initialized yet.");
110             }
111             return failure;
112         }
113     };
114     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
115     public static final String COMMIT = "commit";
116
117     private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
118     private final ActorSystem actorSystem;
119     private final ActorRef shardManager;
120     private final ClusterWrapper clusterWrapper;
121     private final Configuration configuration;
122     private final String selfAddressHostPort;
123     private final Dispatchers dispatchers;
124
125     private DatastoreContext datastoreContext;
126     private FiniteDuration operationDuration;
127     private Timeout operationTimeout;
128     private TransactionRateLimiter txRateLimiter;
129     private Timeout transactionCommitOperationTimeout;
130     private Timeout shardInitializationTimeout;
131
132     private volatile EffectiveModelContext schemaContext;
133
134     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
135             .getMetricsRegistry();
136
137     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
138     private final ShardStrategyFactory shardStrategyFactory;
139
140     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
141             final ClusterWrapper clusterWrapper, final Configuration configuration) {
142         this(actorSystem, shardManager, clusterWrapper, configuration,
143                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
144     }
145
146     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
147             final ClusterWrapper clusterWrapper, final Configuration configuration,
148             final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
149         this.actorSystem = actorSystem;
150         this.shardManager = shardManager;
151         this.clusterWrapper = clusterWrapper;
152         this.configuration = configuration;
153         this.datastoreContext = datastoreContext;
154         dispatchers = new Dispatchers(actorSystem.dispatchers());
155         this.primaryShardInfoCache = primaryShardInfoCache;
156         shardStrategyFactory = new ShardStrategyFactory(configuration);
157
158         setCachedProperties();
159
160         final var selfAddress = clusterWrapper.getSelfAddress();
161         if (selfAddress != null && !selfAddress.host().isEmpty()) {
162             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
163         } else {
164             selfAddressHostPort = null;
165         }
166     }
167
168     private void setCachedProperties() {
169         txRateLimiter = new TransactionRateLimiter(this);
170
171         operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
172             TimeUnit.MILLISECONDS);
173         operationTimeout = new Timeout(operationDuration);
174
175         transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
176                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
177
178         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
179     }
180
181     public DatastoreContext getDatastoreContext() {
182         return datastoreContext;
183     }
184
185     public ActorSystem getActorSystem() {
186         return actorSystem;
187     }
188
189     public ActorRef getShardManager() {
190         return shardManager;
191     }
192
193     public ActorSelection actorSelection(final String actorPath) {
194         return actorSystem.actorSelection(actorPath);
195     }
196
197     public ActorSelection actorSelection(final ActorPath actorPath) {
198         return actorSystem.actorSelection(actorPath);
199     }
200
201     public void setSchemaContext(final EffectiveModelContext schemaContext) {
202         this.schemaContext = schemaContext;
203
204         if (shardManager != null) {
205             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
206         }
207     }
208
209     public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
210         datastoreContext = contextFactory.getBaseDatastoreContext();
211         setCachedProperties();
212
213         // Trigger a write memory barrier so that the writes above will be published immediately even though they may
214         // not be immediately visible to other threads due to unsynchronized reads. That is OK though - we are going for
215         // eventual consistency here as immediately visible updates to these members are not critical. These members
216         // could have been made volatile but wanted to avoid volatile reads as these are accessed often and updates will
217         // be infrequent.
218         VarHandle.fullFence();
219
220         if (shardManager != null) {
221             shardManager.tell(contextFactory, ActorRef.noSender());
222         }
223     }
224
225     public EffectiveModelContext getSchemaContext() {
226         return schemaContext;
227     }
228
229     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
230         final var ret = primaryShardInfoCache.getIfPresent(shardName);
231         if (ret != null) {
232             return ret;
233         }
234
235         return executeOperationAsync(shardManager, new FindPrimary(shardName, true), shardInitializationTimeout)
236             .transform(new Mapper<>() {
237                 @Override
238                 public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
239                     if (response instanceof RemotePrimaryShardFound found) {
240                         LOG.debug("findPrimaryShardAsync received: {}", found);
241                         return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
242                     } else if (response instanceof LocalPrimaryShardFound found) {
243                         LOG.debug("findPrimaryShardAsync received: {}", found);
244                         return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
245                             found.getLocalShardDataTree());
246                     } else if (response instanceof NotInitializedException notInitialized) {
247                         throw notInitialized;
248                     } else if (response instanceof PrimaryNotFoundException primaryNotFound) {
249                         throw primaryNotFound;
250                     } else if (response instanceof NoShardLeaderException noShardLeader) {
251                         throw noShardLeader;
252                     }
253
254                     throw new UnknownMessageException(String.format(
255                         "FindPrimary returned unkown response: %s", response));
256                 }
257             }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
258     }
259
260     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
261             final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
262         final var actorSelection = actorSystem.actorSelection(primaryActorPath);
263         final var info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
264             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
265         primaryShardInfoCache.putSuccessful(shardName, info);
266         return info;
267     }
268
269     /**
270      * Finds a local shard given its shard name and return it's ActorRef.
271      *
272      * @param shardName the name of the local shard that needs to be found
273      * @return a reference to a local shard actor which represents the shard
274      *         specified by the shardName
275      */
276     public Optional<ActorRef> findLocalShard(final String shardName) {
277         final var result = executeOperation(shardManager, new FindLocalShard(shardName, false));
278         if (result instanceof LocalShardFound found) {
279             LOG.debug("Local shard found {}", found.getPath());
280             return Optional.of(found.getPath());
281         }
282
283         return Optional.empty();
284     }
285
286     /**
287      * Finds a local shard async given its shard name and return a Future from which to obtain the
288      * ActorRef.
289      *
290      * @param shardName the name of the local shard that needs to be found
291      */
292     public Future<ActorRef> findLocalShardAsync(final String shardName) {
293         return executeOperationAsync(shardManager, new FindLocalShard(shardName, true), shardInitializationTimeout)
294             .map(new Mapper<>() {
295                 @Override
296                 public ActorRef checkedApply(final Object response) throws Throwable {
297                     if (response instanceof LocalShardFound found) {
298                         LOG.debug("Local shard found {}", found.getPath());
299                         return found.getPath();
300                     } else if (response instanceof NotInitializedException) {
301                         throw (NotInitializedException)response;
302                     } else if (response instanceof LocalShardNotFound) {
303                         throw new LocalShardNotFoundException(
304                             String.format("Local shard for %s does not exist.", shardName));
305                     }
306
307                     throw new UnknownMessageException("FindLocalShard returned unkown response: " + response);
308                 }
309             }, getClientDispatcher());
310     }
311
312     /**
313      * Executes an operation on a local actor and wait for it's response.
314      *
315      * @param actor the actor
316      * @param message the message to send
317      * @return The response of the operation
318      */
319     @SuppressWarnings("checkstyle:IllegalCatch")
320     public Object executeOperation(final ActorRef actor, final Object message) {
321         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
322
323         try {
324             return Await.result(future, operationDuration);
325         } catch (Exception e) {
326             throw new TimeoutException("Sending message " + message.getClass().toString()
327                     + " to actor " + actor.toString() + " failed. Try again later.", e);
328         }
329     }
330
331     /**
332      * Execute an operation on a remote actor and wait for it's response.
333      *
334      * @param actor the actor
335      * @param message the message
336      * @return the response message
337      */
338     @SuppressWarnings("checkstyle:IllegalCatch")
339     public Object executeOperation(final ActorSelection actor, final Object message) {
340         Future<Object> future = executeOperationAsync(actor, message);
341
342         try {
343             return Await.result(future, operationDuration);
344         } catch (Exception e) {
345             throw new TimeoutException("Sending message " + message.getClass().toString()
346                     + " to actor " + actor.toString() + " failed. Try again later.", e);
347         }
348     }
349
350     public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
351         Preconditions.checkArgument(actor != null, "actor must not be null");
352         Preconditions.checkArgument(message != null, "message must not be null");
353
354         LOG.debug("Sending message {} to {}", message.getClass(), actor);
355         return doAsk(actor, message, timeout);
356     }
357
358     /**
359      * Execute an operation on a remote actor asynchronously.
360      *
361      * @param actor the ActorSelection
362      * @param message the message to send
363      * @param timeout the operation timeout
364      * @return a Future containing the eventual result
365      */
366     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
367             final Timeout timeout) {
368         Preconditions.checkArgument(actor != null, "actor must not be null");
369         Preconditions.checkArgument(message != null, "message must not be null");
370
371         LOG.debug("Sending message {} to {}", message.getClass(), actor);
372
373         return doAsk(actor, message, timeout);
374     }
375
376     /**
377      * Execute an operation on a remote actor asynchronously.
378      *
379      * @param actor the ActorSelection
380      * @param message the message to send
381      * @return a Future containing the eventual result
382      */
383     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
384         return executeOperationAsync(actor, message, operationTimeout);
385     }
386
387     /**
388      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
389      * reply (essentially set and forget).
390      *
391      * @param actor the ActorSelection
392      * @param message the message to send
393      */
394     public void sendOperationAsync(final ActorSelection actor, final Object message) {
395         Preconditions.checkArgument(actor != null, "actor must not be null");
396         Preconditions.checkArgument(message != null, "message must not be null");
397
398         LOG.debug("Sending message {} to {}", message.getClass(), actor);
399
400         actor.tell(message, ActorRef.noSender());
401     }
402
403     @SuppressWarnings("checkstyle:IllegalCatch")
404     public void shutdown() {
405         final var duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
406         try {
407             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
408         } catch (Exception e) {
409             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
410         }
411     }
412
413     public ClusterWrapper getClusterWrapper() {
414         return clusterWrapper;
415     }
416
417     public MemberName getCurrentMemberName() {
418         return clusterWrapper.getCurrentMemberName();
419     }
420
421     /**
422      * Send the message to each and every shard.
423      */
424     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
425         for (final String shardName : configuration.getAllShardNames()) {
426
427             final var primaryFuture = findPrimaryShardAsync(shardName);
428             primaryFuture.onComplete(new OnComplete<>() {
429                 @Override
430                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
431                     if (failure != null) {
432                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
433                             shardName, failure);
434                     } else {
435                         final var message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
436                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
437                     }
438                 }
439             }, getClientDispatcher());
440         }
441     }
442
443     public FiniteDuration getOperationDuration() {
444         return operationDuration;
445     }
446
447     public Timeout getOperationTimeout() {
448         return operationTimeout;
449     }
450
451     public boolean isPathLocal(final String path) {
452         if (Strings.isNullOrEmpty(path)) {
453             return false;
454         }
455
456         int pathAtIndex = path.indexOf('@');
457         if (pathAtIndex == -1) {
458             //if the path is of local format, then its local and is co-located
459             return true;
460
461         } else if (selfAddressHostPort != null) {
462             // self-address and tx actor path, both are of remote path format
463             int slashIndex = path.indexOf('/', pathAtIndex);
464
465             if (slashIndex == -1) {
466                 return false;
467             }
468
469             final var hostPort = path.substring(pathAtIndex + 1, slashIndex);
470             return hostPort.equals(selfAddressHostPort);
471
472         } else {
473             // self address is local format and tx actor path is remote format
474             return false;
475         }
476     }
477
478     /**
479      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
480      * us to create a timer for pretty much anything.
481      *
482      * @param operationName the name of the operation
483      * @return the Timer instance
484      */
485     public Timer getOperationTimer(final String operationName) {
486         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
487     }
488
489     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
490         final var rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName,
491             METRIC_RATE);
492         return metricRegistry.timer(rate);
493     }
494
495     /**
496      * Get the name of the data store to which this ActorContext belongs.
497      *
498      * @return the data store name
499      */
500     public String getDataStoreName() {
501         return datastoreContext.getDataStoreName();
502     }
503
504     /**
505      * Get the current transaction creation rate limit.
506      *
507      * @return the rate limit
508      */
509     public double getTxCreationLimit() {
510         return txRateLimiter.getTxCreationLimit();
511     }
512
513     public long getAskTimeoutExceptionCount() {
514         return askTimeoutCounter.sum();
515     }
516
517     public void resetAskTimeoutExceptionCount() {
518         askTimeoutCounter.reset();
519     }
520
521     /**
522      * Try to acquire a transaction creation permit. Will block if no permits are available.
523      */
524     public void acquireTxCreationPermit() {
525         txRateLimiter.acquire();
526     }
527
528     /**
529      * Returns the operation timeout to be used when committing transactions.
530      *
531      * @return the operation timeout
532      */
533     public Timeout getTransactionCommitOperationTimeout() {
534         return transactionCommitOperationTimeout;
535     }
536
537     /**
538      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
539      * code on the datastore.
540      *
541      * @return the dispatcher
542      */
543     public ExecutionContext getClientDispatcher() {
544         return dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
545     }
546
547     public String getNotificationDispatcherPath() {
548         return dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
549     }
550
551     public Configuration getConfiguration() {
552         return configuration;
553     }
554
555     public ShardStrategyFactory getShardStrategyFactory() {
556         return shardStrategyFactory;
557     }
558
559     protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
560         return ask(actorRef, message, timeout);
561     }
562
563     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
564         final var ret = ask(actorRef, message, timeout);
565         ret.onComplete(askTimeoutCounter, askTimeoutCounter);
566         return ret;
567     }
568
569     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
570         return primaryShardInfoCache;
571     }
572 }