Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import akka.actor.ActorPath;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.dispatch.Mapper;
15 import akka.dispatch.OnComplete;
16 import akka.pattern.AskTimeoutException;
17 import akka.pattern.Patterns;
18 import akka.util.Timeout;
19 import com.codahale.metrics.MetricRegistry;
20 import com.codahale.metrics.Timer;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import java.lang.invoke.VarHandle;
24 import java.util.Optional;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.LongAdder;
27 import java.util.function.Function;
28 import org.opendaylight.controller.cluster.access.concepts.MemberName;
29 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
47 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
49 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
50 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
51 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
52 import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
53 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.Await;
57 import scala.concurrent.ExecutionContext;
58 import scala.concurrent.Future;
59 import scala.concurrent.duration.FiniteDuration;
60
61 /**
62  * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
63  * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
64  * not be passed to actors especially remote actors.
65  */
66 public class ActorUtils {
67     private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
68         private LongAdder ateExceptions = new LongAdder();
69
70         @Override
71         public void onComplete(final Throwable failure, final Object success) throws Throwable {
72             if (failure instanceof AskTimeoutException) {
73                 ateExceptions.increment();
74             }
75         }
76
77         void reset() {
78             ateExceptions = new LongAdder();
79         }
80
81         long sum() {
82             return ateExceptions.sum();
83         }
84
85         @Override
86         public void execute(final Runnable runnable) {
87             // Yes, we are this ugly, but then we are just doing a check + an increment
88             runnable.run();
89         }
90
91         @Override
92         public void reportFailure(final Throwable cause) {
93             LOG.warn("Unexpected failure updating counters", cause);
94         }
95     }
96
97     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
98     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
99     private static final String METRIC_RATE = "rate";
100     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
101         @Override
102         public Throwable apply(final Throwable failure) {
103             if (failure instanceof AskTimeoutException) {
104                 // A timeout exception most likely means the shard isn't initialized.
105                 return new NotInitializedException(
106                         "Timed out trying to find the primary shard. Most likely cause is the "
107                         + "shard is not initialized yet.");
108             }
109             return failure;
110         }
111     };
112     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
113     public static final String COMMIT = "commit";
114
115     private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
116     private final ActorSystem actorSystem;
117     private final ActorRef shardManager;
118     private final ClusterWrapper clusterWrapper;
119     private final Configuration configuration;
120     private final String selfAddressHostPort;
121     private final Dispatchers dispatchers;
122
123     private DatastoreContext datastoreContext;
124     private FiniteDuration operationDuration;
125     private Timeout operationTimeout;
126     private TransactionRateLimiter txRateLimiter;
127     private Timeout transactionCommitOperationTimeout;
128     private Timeout shardInitializationTimeout;
129
130     private volatile EffectiveModelContext schemaContext;
131
132     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
133             .getMetricsRegistry();
134
135     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
136     private final ShardStrategyFactory shardStrategyFactory;
137
138     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
139             final ClusterWrapper clusterWrapper, final Configuration configuration) {
140         this(actorSystem, shardManager, clusterWrapper, configuration,
141                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
142     }
143
144     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
145             final ClusterWrapper clusterWrapper, final Configuration configuration,
146             final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
147         this.actorSystem = actorSystem;
148         this.shardManager = shardManager;
149         this.clusterWrapper = clusterWrapper;
150         this.configuration = configuration;
151         this.datastoreContext = datastoreContext;
152         dispatchers = new Dispatchers(actorSystem.dispatchers());
153         this.primaryShardInfoCache = primaryShardInfoCache;
154         shardStrategyFactory = new ShardStrategyFactory(configuration);
155
156         setCachedProperties();
157
158         final var selfAddress = clusterWrapper.getSelfAddress();
159         if (selfAddress != null && !selfAddress.host().isEmpty()) {
160             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
161         } else {
162             selfAddressHostPort = null;
163         }
164     }
165
166     private void setCachedProperties() {
167         txRateLimiter = new TransactionRateLimiter(this);
168
169         operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
170             TimeUnit.MILLISECONDS);
171         operationTimeout = new Timeout(operationDuration);
172
173         transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
174                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
175
176         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
177     }
178
179     public DatastoreContext getDatastoreContext() {
180         return datastoreContext;
181     }
182
183     public ActorSystem getActorSystem() {
184         return actorSystem;
185     }
186
187     public ActorRef getShardManager() {
188         return shardManager;
189     }
190
191     public ActorSelection actorSelection(final String actorPath) {
192         return actorSystem.actorSelection(actorPath);
193     }
194
195     public ActorSelection actorSelection(final ActorPath actorPath) {
196         return actorSystem.actorSelection(actorPath);
197     }
198
199     public void setSchemaContext(final EffectiveModelContext schemaContext) {
200         this.schemaContext = schemaContext;
201
202         if (shardManager != null) {
203             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
204         }
205     }
206
207     public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
208         datastoreContext = contextFactory.getBaseDatastoreContext();
209         setCachedProperties();
210
211         // Trigger a write memory barrier so that the writes above will be published immediately even though they may
212         // not be immediately visible to other threads due to unsynchronized reads. That is OK though - we are going for
213         // eventual consistency here as immediately visible updates to these members are not critical. These members
214         // could have been made volatile but wanted to avoid volatile reads as these are accessed often and updates will
215         // be infrequent.
216         VarHandle.fullFence();
217
218         if (shardManager != null) {
219             shardManager.tell(contextFactory, ActorRef.noSender());
220         }
221     }
222
223     public EffectiveModelContext getSchemaContext() {
224         return schemaContext;
225     }
226
227     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
228         final var ret = primaryShardInfoCache.getIfPresent(shardName);
229         if (ret != null) {
230             return ret;
231         }
232
233         return executeOperationAsync(shardManager, new FindPrimary(shardName, true), shardInitializationTimeout)
234             .transform(new Mapper<>() {
235                 @Override
236                 public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
237                     if (response instanceof RemotePrimaryShardFound found) {
238                         LOG.debug("findPrimaryShardAsync received: {}", found);
239                         return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
240                     } else if (response instanceof LocalPrimaryShardFound found) {
241                         LOG.debug("findPrimaryShardAsync received: {}", found);
242                         return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
243                             found.getLocalShardDataTree());
244                     } else if (response instanceof NotInitializedException notInitialized) {
245                         throw notInitialized;
246                     } else if (response instanceof PrimaryNotFoundException primaryNotFound) {
247                         throw primaryNotFound;
248                     } else if (response instanceof NoShardLeaderException noShardLeader) {
249                         throw noShardLeader;
250                     }
251
252                     throw new UnknownMessageException(String.format(
253                         "FindPrimary returned unkown response: %s", response));
254                 }
255             }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
256     }
257
258     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
259             final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
260         final var actorSelection = actorSystem.actorSelection(primaryActorPath);
261         final var info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
262             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
263         primaryShardInfoCache.putSuccessful(shardName, info);
264         return info;
265     }
266
267     /**
268      * Finds a local shard given its shard name and return it's ActorRef.
269      *
270      * @param shardName the name of the local shard that needs to be found
271      * @return a reference to a local shard actor which represents the shard
272      *         specified by the shardName
273      */
274     public Optional<ActorRef> findLocalShard(final String shardName) {
275         final var result = executeOperation(shardManager, new FindLocalShard(shardName, false));
276         if (result instanceof LocalShardFound found) {
277             LOG.debug("Local shard found {}", found.getPath());
278             return Optional.of(found.getPath());
279         }
280
281         return Optional.empty();
282     }
283
284     /**
285      * Finds a local shard async given its shard name and return a Future from which to obtain the
286      * ActorRef.
287      *
288      * @param shardName the name of the local shard that needs to be found
289      */
290     public Future<ActorRef> findLocalShardAsync(final String shardName) {
291         return executeOperationAsync(shardManager, new FindLocalShard(shardName, true), shardInitializationTimeout)
292             .map(new Mapper<>() {
293                 @Override
294                 public ActorRef checkedApply(final Object response) throws Throwable {
295                     if (response instanceof LocalShardFound found) {
296                         LOG.debug("Local shard found {}", found.getPath());
297                         return found.getPath();
298                     } else if (response instanceof NotInitializedException) {
299                         throw (NotInitializedException)response;
300                     } else if (response instanceof LocalShardNotFound) {
301                         throw new LocalShardNotFoundException(
302                             String.format("Local shard for %s does not exist.", shardName));
303                     }
304
305                     throw new UnknownMessageException("FindLocalShard returned unkown response: " + response);
306                 }
307             }, getClientDispatcher());
308     }
309
310     /**
311      * Executes an operation on a local actor and wait for it's response.
312      *
313      * @param actor the actor
314      * @param message the message to send
315      * @return The response of the operation
316      */
317     @SuppressWarnings("checkstyle:IllegalCatch")
318     public Object executeOperation(final ActorRef actor, final Object message) {
319         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
320
321         try {
322             return Await.result(future, operationDuration);
323         } catch (Exception e) {
324             throw new TimeoutException("Sending message " + message.getClass().toString()
325                     + " to actor " + actor.toString() + " failed. Try again later.", e);
326         }
327     }
328
329     /**
330      * Execute an operation on a remote actor and wait for it's response.
331      *
332      * @param actor the actor
333      * @param message the message
334      * @return the response message
335      */
336     @SuppressWarnings("checkstyle:IllegalCatch")
337     public Object executeOperation(final ActorSelection actor, final Object message) {
338         Future<Object> future = executeOperationAsync(actor, message);
339
340         try {
341             return Await.result(future, operationDuration);
342         } catch (Exception e) {
343             throw new TimeoutException("Sending message " + message.getClass().toString()
344                     + " to actor " + actor.toString() + " failed. Try again later.", e);
345         }
346     }
347
348     public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
349         Preconditions.checkArgument(actor != null, "actor must not be null");
350         Preconditions.checkArgument(message != null, "message must not be null");
351
352         LOG.debug("Sending message {} to {}", message.getClass(), actor);
353         return doAsk(actor, message, timeout);
354     }
355
356     /**
357      * Execute an operation on a remote actor asynchronously.
358      *
359      * @param actor the ActorSelection
360      * @param message the message to send
361      * @param timeout the operation timeout
362      * @return a Future containing the eventual result
363      */
364     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
365             final Timeout timeout) {
366         Preconditions.checkArgument(actor != null, "actor must not be null");
367         Preconditions.checkArgument(message != null, "message must not be null");
368
369         LOG.debug("Sending message {} to {}", message.getClass(), actor);
370
371         return doAsk(actor, message, timeout);
372     }
373
374     /**
375      * Execute an operation on a remote actor asynchronously.
376      *
377      * @param actor the ActorSelection
378      * @param message the message to send
379      * @return a Future containing the eventual result
380      */
381     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
382         return executeOperationAsync(actor, message, operationTimeout);
383     }
384
385     /**
386      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
387      * reply (essentially set and forget).
388      *
389      * @param actor the ActorSelection
390      * @param message the message to send
391      */
392     public void sendOperationAsync(final ActorSelection actor, final Object message) {
393         Preconditions.checkArgument(actor != null, "actor must not be null");
394         Preconditions.checkArgument(message != null, "message must not be null");
395
396         LOG.debug("Sending message {} to {}", message.getClass(), actor);
397
398         actor.tell(message, ActorRef.noSender());
399     }
400
401     @SuppressWarnings("checkstyle:IllegalCatch")
402     public void shutdown() {
403         final var duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
404         try {
405             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
406         } catch (Exception e) {
407             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
408         }
409     }
410
411     public ClusterWrapper getClusterWrapper() {
412         return clusterWrapper;
413     }
414
415     public MemberName getCurrentMemberName() {
416         return clusterWrapper.getCurrentMemberName();
417     }
418
419     /**
420      * Send the message to each and every shard.
421      */
422     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
423         for (final String shardName : configuration.getAllShardNames()) {
424
425             final var primaryFuture = findPrimaryShardAsync(shardName);
426             primaryFuture.onComplete(new OnComplete<>() {
427                 @Override
428                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
429                     if (failure != null) {
430                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
431                             shardName, failure);
432                     } else {
433                         final var message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
434                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
435                     }
436                 }
437             }, getClientDispatcher());
438         }
439     }
440
441     public FiniteDuration getOperationDuration() {
442         return operationDuration;
443     }
444
445     public Timeout getOperationTimeout() {
446         return operationTimeout;
447     }
448
449     public boolean isPathLocal(final String path) {
450         if (Strings.isNullOrEmpty(path)) {
451             return false;
452         }
453
454         int pathAtIndex = path.indexOf('@');
455         if (pathAtIndex == -1) {
456             //if the path is of local format, then its local and is co-located
457             return true;
458
459         } else if (selfAddressHostPort != null) {
460             // self-address and tx actor path, both are of remote path format
461             int slashIndex = path.indexOf('/', pathAtIndex);
462
463             if (slashIndex == -1) {
464                 return false;
465             }
466
467             final var hostPort = path.substring(pathAtIndex + 1, slashIndex);
468             return hostPort.equals(selfAddressHostPort);
469
470         } else {
471             // self address is local format and tx actor path is remote format
472             return false;
473         }
474     }
475
476     /**
477      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
478      * us to create a timer for pretty much anything.
479      *
480      * @param operationName the name of the operation
481      * @return the Timer instance
482      */
483     public Timer getOperationTimer(final String operationName) {
484         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
485     }
486
487     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
488         final var rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName,
489             METRIC_RATE);
490         return metricRegistry.timer(rate);
491     }
492
493     /**
494      * Get the name of the data store to which this ActorContext belongs.
495      *
496      * @return the data store name
497      */
498     public String getDataStoreName() {
499         return datastoreContext.getDataStoreName();
500     }
501
502     /**
503      * Get the current transaction creation rate limit.
504      *
505      * @return the rate limit
506      */
507     public double getTxCreationLimit() {
508         return txRateLimiter.getTxCreationLimit();
509     }
510
511     public long getAskTimeoutExceptionCount() {
512         return askTimeoutCounter.sum();
513     }
514
515     public void resetAskTimeoutExceptionCount() {
516         askTimeoutCounter.reset();
517     }
518
519     /**
520      * Try to acquire a transaction creation permit. Will block if no permits are available.
521      */
522     public void acquireTxCreationPermit() {
523         txRateLimiter.acquire();
524     }
525
526     /**
527      * Returns the operation timeout to be used when committing transactions.
528      *
529      * @return the operation timeout
530      */
531     public Timeout getTransactionCommitOperationTimeout() {
532         return transactionCommitOperationTimeout;
533     }
534
535     /**
536      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
537      * code on the datastore.
538      *
539      * @return the dispatcher
540      */
541     public ExecutionContext getClientDispatcher() {
542         return dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
543     }
544
545     public String getNotificationDispatcherPath() {
546         return dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
547     }
548
549     public Configuration getConfiguration() {
550         return configuration;
551     }
552
553     public ShardStrategyFactory getShardStrategyFactory() {
554         return shardStrategyFactory;
555     }
556
557     protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
558         return Patterns.ask(actorRef, message, timeout);
559     }
560
561     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
562         final var ret = Patterns.ask(actorRef, message, timeout);
563         ret.onComplete(askTimeoutCounter, askTimeoutCounter);
564         return ret;
565     }
566
567     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
568         return primaryShardInfoCache;
569     }
570 }