NormalizedNodeAggregator should also report empty
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static akka.pattern.Patterns.ask;
11
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Strings;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.LongAdder;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
32 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
33 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
36 import org.opendaylight.controller.cluster.datastore.config.Configuration;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
49 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
51 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
52 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
53 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
54 import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
55 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.Await;
59 import scala.concurrent.ExecutionContext;
60 import scala.concurrent.Future;
61 import scala.concurrent.duration.FiniteDuration;
62
63 /**
64  * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
65  * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
66  * not be passed to actors especially remote actors.
67  */
68 public class ActorUtils {
69     private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
70         private LongAdder ateExceptions = new LongAdder();
71
72         @Override
73         public void onComplete(final Throwable failure, final Object success) throws Throwable {
74             if (failure instanceof AskTimeoutException) {
75                 ateExceptions.increment();
76             }
77         }
78
79         void reset() {
80             ateExceptions = new LongAdder();
81         }
82
83         long sum() {
84             return ateExceptions.sum();
85         }
86
87         @Override
88         public void execute(final Runnable runnable) {
89             // Yes, we are this ugly, but then we are just doing a check + an increment
90             runnable.run();
91         }
92
93         @Override
94         public void reportFailure(final Throwable cause) {
95             LOG.warn("Unexpected failure updating counters", cause);
96         }
97     }
98
99     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
100     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
101     private static final String METRIC_RATE = "rate";
102     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
103         @Override
104         public Throwable apply(final Throwable failure) {
105             Throwable actualFailure = failure;
106             if (failure instanceof AskTimeoutException) {
107                 // A timeout exception most likely means the shard isn't initialized.
108                 actualFailure = new NotInitializedException(
109                         "Timed out trying to find the primary shard. Most likely cause is the "
110                         + "shard is not initialized yet.");
111             }
112
113             return actualFailure;
114         }
115     };
116     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
117     public static final String COMMIT = "commit";
118
119     private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
120     private final ActorSystem actorSystem;
121     private final ActorRef shardManager;
122     private final ClusterWrapper clusterWrapper;
123     private final Configuration configuration;
124     private final String selfAddressHostPort;
125     private final Dispatchers dispatchers;
126
127     private DatastoreContext datastoreContext;
128     private FiniteDuration operationDuration;
129     private Timeout operationTimeout;
130     private TransactionRateLimiter txRateLimiter;
131     private Timeout transactionCommitOperationTimeout;
132     private Timeout shardInitializationTimeout;
133
134     private volatile EffectiveModelContext schemaContext;
135
136     // Used as a write memory barrier.
137     @SuppressWarnings("unused")
138     private volatile boolean updated;
139
140     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
141             .getMetricsRegistry();
142
143     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
144     private final ShardStrategyFactory shardStrategyFactory;
145
146     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
147             final ClusterWrapper clusterWrapper, final Configuration configuration) {
148         this(actorSystem, shardManager, clusterWrapper, configuration,
149                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
150     }
151
152     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
153             final ClusterWrapper clusterWrapper, final Configuration configuration,
154             final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
155         this.actorSystem = actorSystem;
156         this.shardManager = shardManager;
157         this.clusterWrapper = clusterWrapper;
158         this.configuration = configuration;
159         this.datastoreContext = datastoreContext;
160         dispatchers = new Dispatchers(actorSystem.dispatchers());
161         this.primaryShardInfoCache = primaryShardInfoCache;
162         shardStrategyFactory = new ShardStrategyFactory(configuration);
163
164         setCachedProperties();
165
166         Address selfAddress = clusterWrapper.getSelfAddress();
167         if (selfAddress != null && !selfAddress.host().isEmpty()) {
168             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
169         } else {
170             selfAddressHostPort = null;
171         }
172     }
173
174     private void setCachedProperties() {
175         txRateLimiter = new TransactionRateLimiter(this);
176
177         operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
178             TimeUnit.MILLISECONDS);
179         operationTimeout = new Timeout(operationDuration);
180
181         transactionCommitOperationTimeout =  new Timeout(FiniteDuration.create(
182                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
183
184         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
185     }
186
187     public DatastoreContext getDatastoreContext() {
188         return datastoreContext;
189     }
190
191     public ActorSystem getActorSystem() {
192         return actorSystem;
193     }
194
195     public ActorRef getShardManager() {
196         return shardManager;
197     }
198
199     public ActorSelection actorSelection(final String actorPath) {
200         return actorSystem.actorSelection(actorPath);
201     }
202
203     public ActorSelection actorSelection(final ActorPath actorPath) {
204         return actorSystem.actorSelection(actorPath);
205     }
206
207     public void setSchemaContext(final EffectiveModelContext schemaContext) {
208         this.schemaContext = schemaContext;
209
210         if (shardManager != null) {
211             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
212         }
213     }
214
215     public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
216         datastoreContext = contextFactory.getBaseDatastoreContext();
217         setCachedProperties();
218
219         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
220         // will be published immediately even though they may not be immediately visible to other
221         // threads due to unsynchronized reads. That's OK though - we're going for eventual
222         // consistency here as immediately visible updates to these members aren't critical. These
223         // members could've been made volatile but wanted to avoid volatile reads as these are
224         // accessed often and updates will be infrequent.
225
226         updated = true;
227
228         if (shardManager != null) {
229             shardManager.tell(contextFactory, ActorRef.noSender());
230         }
231     }
232
233     public EffectiveModelContext getSchemaContext() {
234         return schemaContext;
235     }
236
237     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
238         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
239         if (ret != null) {
240             return ret;
241         }
242         Future<Object> future = executeOperationAsync(shardManager,
243                 new FindPrimary(shardName, true), shardInitializationTimeout);
244
245         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
246             @Override
247             public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
248                 if (response instanceof RemotePrimaryShardFound) {
249                     LOG.debug("findPrimaryShardAsync received: {}", response);
250                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
251                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
252                 } else if (response instanceof LocalPrimaryShardFound) {
253                     LOG.debug("findPrimaryShardAsync received: {}", response);
254                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
255                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
256                             found.getLocalShardDataTree());
257                 } else if (response instanceof NotInitializedException) {
258                     throw (NotInitializedException)response;
259                 } else if (response instanceof PrimaryNotFoundException) {
260                     throw (PrimaryNotFoundException)response;
261                 } else if (response instanceof NoShardLeaderException) {
262                     throw (NoShardLeaderException)response;
263                 }
264
265                 throw new UnknownMessageException(String.format(
266                         "FindPrimary returned unkown response: %s", response));
267             }
268         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
269     }
270
271     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
272             final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
273         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
274         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
275             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
276         primaryShardInfoCache.putSuccessful(shardName, info);
277         return info;
278     }
279
280     /**
281      * Finds a local shard given its shard name and return it's ActorRef.
282      *
283      * @param shardName the name of the local shard that needs to be found
284      * @return a reference to a local shard actor which represents the shard
285      *         specified by the shardName
286      */
287     public Optional<ActorRef> findLocalShard(final String shardName) {
288         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
289
290         if (result instanceof LocalShardFound) {
291             LocalShardFound found = (LocalShardFound) result;
292             LOG.debug("Local shard found {}", found.getPath());
293             return Optional.of(found.getPath());
294         }
295
296         return Optional.empty();
297     }
298
299     /**
300      * Finds a local shard async given its shard name and return a Future from which to obtain the
301      * ActorRef.
302      *
303      * @param shardName the name of the local shard that needs to be found
304      */
305     public Future<ActorRef> findLocalShardAsync(final String shardName) {
306         Future<Object> future = executeOperationAsync(shardManager,
307                 new FindLocalShard(shardName, true), shardInitializationTimeout);
308
309         return future.map(new Mapper<Object, ActorRef>() {
310             @Override
311             public ActorRef checkedApply(final Object response) throws Throwable {
312                 if (response instanceof LocalShardFound) {
313                     LocalShardFound found = (LocalShardFound)response;
314                     LOG.debug("Local shard found {}", found.getPath());
315                     return found.getPath();
316                 } else if (response instanceof NotInitializedException) {
317                     throw (NotInitializedException)response;
318                 } else if (response instanceof LocalShardNotFound) {
319                     throw new LocalShardNotFoundException(
320                             String.format("Local shard for %s does not exist.", shardName));
321                 }
322
323                 throw new UnknownMessageException(String.format(
324                         "FindLocalShard returned unkown response: %s", response));
325             }
326         }, getClientDispatcher());
327     }
328
329     /**
330      * Executes an operation on a local actor and wait for it's response.
331      *
332      * @param actor the actor
333      * @param message the message to send
334      * @return The response of the operation
335      */
336     @SuppressWarnings("checkstyle:IllegalCatch")
337     public Object executeOperation(final ActorRef actor, final Object message) {
338         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
339
340         try {
341             return Await.result(future, operationDuration);
342         } catch (Exception e) {
343             throw new TimeoutException("Sending message " + message.getClass().toString()
344                     + " to actor " + actor.toString() + " failed. Try again later.", e);
345         }
346     }
347
348     /**
349      * Execute an operation on a remote actor and wait for it's response.
350      *
351      * @param actor the actor
352      * @param message the message
353      * @return the response message
354      */
355     @SuppressWarnings("checkstyle:IllegalCatch")
356     public Object executeOperation(final ActorSelection actor, final Object message) {
357         Future<Object> future = executeOperationAsync(actor, message);
358
359         try {
360             return Await.result(future, operationDuration);
361         } catch (Exception e) {
362             throw new TimeoutException("Sending message " + message.getClass().toString()
363                     + " to actor " + actor.toString() + " failed. Try again later.", e);
364         }
365     }
366
367     public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
368         Preconditions.checkArgument(actor != null, "actor must not be null");
369         Preconditions.checkArgument(message != null, "message must not be null");
370
371         LOG.debug("Sending message {} to {}", message.getClass(), actor);
372         return doAsk(actor, message, timeout);
373     }
374
375     /**
376      * Execute an operation on a remote actor asynchronously.
377      *
378      * @param actor the ActorSelection
379      * @param message the message to send
380      * @param timeout the operation timeout
381      * @return a Future containing the eventual result
382      */
383     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
384             final Timeout timeout) {
385         Preconditions.checkArgument(actor != null, "actor must not be null");
386         Preconditions.checkArgument(message != null, "message must not be null");
387
388         LOG.debug("Sending message {} to {}", message.getClass(), actor);
389
390         return doAsk(actor, message, timeout);
391     }
392
393     /**
394      * Execute an operation on a remote actor asynchronously.
395      *
396      * @param actor the ActorSelection
397      * @param message the message to send
398      * @return a Future containing the eventual result
399      */
400     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
401         return executeOperationAsync(actor, message, operationTimeout);
402     }
403
404     /**
405      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
406      * reply (essentially set and forget).
407      *
408      * @param actor the ActorSelection
409      * @param message the message to send
410      */
411     public void sendOperationAsync(final ActorSelection actor, final Object message) {
412         Preconditions.checkArgument(actor != null, "actor must not be null");
413         Preconditions.checkArgument(message != null, "message must not be null");
414
415         LOG.debug("Sending message {} to {}", message.getClass(), actor);
416
417         actor.tell(message, ActorRef.noSender());
418     }
419
420     @SuppressWarnings("checkstyle:IllegalCatch")
421     public void shutdown() {
422         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
423         try {
424             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
425         } catch (Exception e) {
426             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
427         }
428     }
429
430     public ClusterWrapper getClusterWrapper() {
431         return clusterWrapper;
432     }
433
434     public MemberName getCurrentMemberName() {
435         return clusterWrapper.getCurrentMemberName();
436     }
437
438     /**
439      * Send the message to each and every shard.
440      */
441     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
442         for (final String shardName : configuration.getAllShardNames()) {
443
444             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
445             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
446                 @Override
447                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
448                     if (failure != null) {
449                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
450                             shardName, failure);
451                     } else {
452                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
453                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
454                     }
455                 }
456             }, getClientDispatcher());
457         }
458     }
459
460     public FiniteDuration getOperationDuration() {
461         return operationDuration;
462     }
463
464     public Timeout getOperationTimeout() {
465         return operationTimeout;
466     }
467
468     public boolean isPathLocal(final String path) {
469         if (Strings.isNullOrEmpty(path)) {
470             return false;
471         }
472
473         int pathAtIndex = path.indexOf('@');
474         if (pathAtIndex == -1) {
475             //if the path is of local format, then its local and is co-located
476             return true;
477
478         } else if (selfAddressHostPort != null) {
479             // self-address and tx actor path, both are of remote path format
480             int slashIndex = path.indexOf('/', pathAtIndex);
481
482             if (slashIndex == -1) {
483                 return false;
484             }
485
486             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
487             return hostPort.equals(selfAddressHostPort);
488
489         } else {
490             // self address is local format and tx actor path is remote format
491             return false;
492         }
493     }
494
495     /**
496      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
497      * us to create a timer for pretty much anything.
498      *
499      * @param operationName the name of the operation
500      * @return the Timer instance
501      */
502     public Timer getOperationTimer(final String operationName) {
503         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
504     }
505
506     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
507         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
508                 operationName, METRIC_RATE);
509         return metricRegistry.timer(rate);
510     }
511
512     /**
513      * Get the name of the data store to which this ActorContext belongs.
514      *
515      * @return the data store name
516      */
517     public String getDataStoreName() {
518         return datastoreContext.getDataStoreName();
519     }
520
521     /**
522      * Get the current transaction creation rate limit.
523      *
524      * @return the rate limit
525      */
526     public double getTxCreationLimit() {
527         return txRateLimiter.getTxCreationLimit();
528     }
529
530     public long getAskTimeoutExceptionCount() {
531         return askTimeoutCounter.sum();
532     }
533
534     public void resetAskTimeoutExceptionCount() {
535         askTimeoutCounter.reset();
536     }
537
538     /**
539      * Try to acquire a transaction creation permit. Will block if no permits are available.
540      */
541     public void acquireTxCreationPermit() {
542         txRateLimiter.acquire();
543     }
544
545     /**
546      * Returns the operation timeout to be used when committing transactions.
547      *
548      * @return the operation timeout
549      */
550     public Timeout getTransactionCommitOperationTimeout() {
551         return transactionCommitOperationTimeout;
552     }
553
554     /**
555      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
556      * code on the datastore.
557      *
558      * @return the dispatcher
559      */
560     public ExecutionContext getClientDispatcher() {
561         return dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
562     }
563
564     public String getNotificationDispatcherPath() {
565         return dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
566     }
567
568     public Configuration getConfiguration() {
569         return configuration;
570     }
571
572     public ShardStrategyFactory getShardStrategyFactory() {
573         return shardStrategyFactory;
574     }
575
576     protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
577         return ask(actorRef, message, timeout);
578     }
579
580     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
581         final Future<Object> ret = ask(actorRef, message, timeout);
582         ret.onComplete(askTimeoutCounter, askTimeoutCounter);
583         return ret;
584     }
585
586     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
587         return primaryShardInfoCache;
588     }
589 }