fcfa9296aeb554e0da8057c8168c8d98ac34c06b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static akka.pattern.Patterns.ask;
11
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Strings;
26 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27 import java.util.Optional;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.LongAdder;
30 import java.util.function.Function;
31 import org.opendaylight.controller.cluster.access.concepts.MemberName;
32 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
33 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
34 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
37 import org.opendaylight.controller.cluster.datastore.config.Configuration;
38 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
44 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
45 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
49 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
50 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
51 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
52 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
53 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
54 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
55 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
56 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
57 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Await;
61 import scala.concurrent.ExecutionContext;
62 import scala.concurrent.Future;
63 import scala.concurrent.duration.FiniteDuration;
64
65 /**
66  * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
67  * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
68  * not be passed to actors especially remote actors.
69  */
70 public class ActorUtils {
71     private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
72         private LongAdder ateExceptions = new LongAdder();
73
74         @Override
75         public void onComplete(final Throwable failure, final Object success) throws Throwable {
76             if (failure instanceof AskTimeoutException) {
77                 ateExceptions.increment();
78             }
79         }
80
81         void reset() {
82             ateExceptions = new LongAdder();
83         }
84
85         long sum() {
86             return ateExceptions.sum();
87         }
88
89         @Override
90         public void execute(final Runnable runnable) {
91             // Yes, we are this ugly, but then we are just doing a check + an increment
92             runnable.run();
93         }
94
95         @Override
96         public void reportFailure(final Throwable cause) {
97             LOG.warn("Unexpected failure updating counters", cause);
98         }
99     }
100
101     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
102     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
103     private static final String METRIC_RATE = "rate";
104     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
105         @Override
106         public Throwable apply(final Throwable failure) {
107             Throwable actualFailure = failure;
108             if (failure instanceof AskTimeoutException) {
109                 // A timeout exception most likely means the shard isn't initialized.
110                 actualFailure = new NotInitializedException(
111                         "Timed out trying to find the primary shard. Most likely cause is the "
112                         + "shard is not initialized yet.");
113             }
114
115             return actualFailure;
116         }
117     };
118     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
119     public static final String COMMIT = "commit";
120
121     private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
122     private final ActorSystem actorSystem;
123     private final ActorRef shardManager;
124     private final ClusterWrapper clusterWrapper;
125     private final Configuration configuration;
126     private final String selfAddressHostPort;
127     private final Dispatchers dispatchers;
128
129     private DatastoreContext datastoreContext;
130     private FiniteDuration operationDuration;
131     private Timeout operationTimeout;
132     private TransactionRateLimiter txRateLimiter;
133     private Timeout transactionCommitOperationTimeout;
134     private Timeout shardInitializationTimeout;
135
136     private volatile EffectiveModelContext schemaContext;
137
138     // Used as a write memory barrier.
139     @SuppressWarnings("unused")
140     private volatile boolean updated;
141
142     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
143             .getMetricsRegistry();
144
145     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
146     private final ShardStrategyFactory shardStrategyFactory;
147
148     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
149             final ClusterWrapper clusterWrapper, final Configuration configuration) {
150         this(actorSystem, shardManager, clusterWrapper, configuration,
151                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
152     }
153
154     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
155             final ClusterWrapper clusterWrapper, final Configuration configuration,
156             final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
157         this.actorSystem = actorSystem;
158         this.shardManager = shardManager;
159         this.clusterWrapper = clusterWrapper;
160         this.configuration = configuration;
161         this.datastoreContext = datastoreContext;
162         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
163         this.primaryShardInfoCache = primaryShardInfoCache;
164
165         final LogicalDatastoreType convertedType =
166                 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
167         this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
168
169         setCachedProperties();
170
171         Address selfAddress = clusterWrapper.getSelfAddress();
172         if (selfAddress != null && !selfAddress.host().isEmpty()) {
173             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
174         } else {
175             selfAddressHostPort = null;
176         }
177     }
178
179     private void setCachedProperties() {
180         txRateLimiter = new TransactionRateLimiter(this);
181
182         operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
183             TimeUnit.MILLISECONDS);
184         operationTimeout = new Timeout(operationDuration);
185
186         transactionCommitOperationTimeout =  new Timeout(FiniteDuration.create(
187                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
188
189         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
190     }
191
192     public DatastoreContext getDatastoreContext() {
193         return datastoreContext;
194     }
195
196     public ActorSystem getActorSystem() {
197         return actorSystem;
198     }
199
200     public ActorRef getShardManager() {
201         return shardManager;
202     }
203
204     public ActorSelection actorSelection(final String actorPath) {
205         return actorSystem.actorSelection(actorPath);
206     }
207
208     public ActorSelection actorSelection(final ActorPath actorPath) {
209         return actorSystem.actorSelection(actorPath);
210     }
211
212     public void setSchemaContext(final EffectiveModelContext schemaContext) {
213         this.schemaContext = schemaContext;
214
215         if (shardManager != null) {
216             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
217         }
218     }
219
220     public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
221         this.datastoreContext = contextFactory.getBaseDatastoreContext();
222         setCachedProperties();
223
224         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
225         // will be published immediately even though they may not be immediately visible to other
226         // threads due to unsynchronized reads. That's OK though - we're going for eventual
227         // consistency here as immediately visible updates to these members aren't critical. These
228         // members could've been made volatile but wanted to avoid volatile reads as these are
229         // accessed often and updates will be infrequent.
230
231         updated = true;
232
233         if (shardManager != null) {
234             shardManager.tell(contextFactory, ActorRef.noSender());
235         }
236     }
237
238     public EffectiveModelContext getSchemaContext() {
239         return schemaContext;
240     }
241
242     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
243         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
244         if (ret != null) {
245             return ret;
246         }
247         Future<Object> future = executeOperationAsync(shardManager,
248                 new FindPrimary(shardName, true), shardInitializationTimeout);
249
250         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
251             @Override
252             public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
253                 if (response instanceof RemotePrimaryShardFound) {
254                     LOG.debug("findPrimaryShardAsync received: {}", response);
255                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
256                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
257                 } else if (response instanceof LocalPrimaryShardFound) {
258                     LOG.debug("findPrimaryShardAsync received: {}", response);
259                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
260                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
261                             found.getLocalShardDataTree());
262                 } else if (response instanceof NotInitializedException) {
263                     throw (NotInitializedException)response;
264                 } else if (response instanceof PrimaryNotFoundException) {
265                     throw (PrimaryNotFoundException)response;
266                 } else if (response instanceof NoShardLeaderException) {
267                     throw (NoShardLeaderException)response;
268                 }
269
270                 throw new UnknownMessageException(String.format(
271                         "FindPrimary returned unkown response: %s", response));
272             }
273         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
274     }
275
276     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
277             justification = "https://github.com/spotbugs/spotbugs/issues/811")
278     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
279             final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
280         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
281         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
282             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
283         primaryShardInfoCache.putSuccessful(shardName, info);
284         return info;
285     }
286
287     /**
288      * Finds a local shard given its shard name and return it's ActorRef.
289      *
290      * @param shardName the name of the local shard that needs to be found
291      * @return a reference to a local shard actor which represents the shard
292      *         specified by the shardName
293      */
294     public Optional<ActorRef> findLocalShard(final String shardName) {
295         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
296
297         if (result instanceof LocalShardFound) {
298             LocalShardFound found = (LocalShardFound) result;
299             LOG.debug("Local shard found {}", found.getPath());
300             return Optional.of(found.getPath());
301         }
302
303         return Optional.empty();
304     }
305
306     /**
307      * Finds a local shard async given its shard name and return a Future from which to obtain the
308      * ActorRef.
309      *
310      * @param shardName the name of the local shard that needs to be found
311      */
312     public Future<ActorRef> findLocalShardAsync(final String shardName) {
313         Future<Object> future = executeOperationAsync(shardManager,
314                 new FindLocalShard(shardName, true), shardInitializationTimeout);
315
316         return future.map(new Mapper<Object, ActorRef>() {
317             @Override
318             public ActorRef checkedApply(final Object response) throws Throwable {
319                 if (response instanceof LocalShardFound) {
320                     LocalShardFound found = (LocalShardFound)response;
321                     LOG.debug("Local shard found {}", found.getPath());
322                     return found.getPath();
323                 } else if (response instanceof NotInitializedException) {
324                     throw (NotInitializedException)response;
325                 } else if (response instanceof LocalShardNotFound) {
326                     throw new LocalShardNotFoundException(
327                             String.format("Local shard for %s does not exist.", shardName));
328                 }
329
330                 throw new UnknownMessageException(String.format(
331                         "FindLocalShard returned unkown response: %s", response));
332             }
333         }, getClientDispatcher());
334     }
335
336     /**
337      * Executes an operation on a local actor and wait for it's response.
338      *
339      * @param actor the actor
340      * @param message the message to send
341      * @return The response of the operation
342      */
343     @SuppressWarnings("checkstyle:IllegalCatch")
344     public Object executeOperation(final ActorRef actor, final Object message) {
345         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
346
347         try {
348             return Await.result(future, operationDuration);
349         } catch (Exception e) {
350             throw new TimeoutException("Sending message " + message.getClass().toString()
351                     + " to actor " + actor.toString() + " failed. Try again later.", e);
352         }
353     }
354
355     /**
356      * Execute an operation on a remote actor and wait for it's response.
357      *
358      * @param actor the actor
359      * @param message the message
360      * @return the response message
361      */
362     @SuppressWarnings("checkstyle:IllegalCatch")
363     public Object executeOperation(final ActorSelection actor, final Object message) {
364         Future<Object> future = executeOperationAsync(actor, message);
365
366         try {
367             return Await.result(future, operationDuration);
368         } catch (Exception e) {
369             throw new TimeoutException("Sending message " + message.getClass().toString()
370                     + " to actor " + actor.toString() + " failed. Try again later.", e);
371         }
372     }
373
374     public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
375         Preconditions.checkArgument(actor != null, "actor must not be null");
376         Preconditions.checkArgument(message != null, "message must not be null");
377
378         LOG.debug("Sending message {} to {}", message.getClass(), actor);
379         return doAsk(actor, message, timeout);
380     }
381
382     /**
383      * Execute an operation on a remote actor asynchronously.
384      *
385      * @param actor the ActorSelection
386      * @param message the message to send
387      * @param timeout the operation timeout
388      * @return a Future containing the eventual result
389      */
390     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
391             final Timeout timeout) {
392         Preconditions.checkArgument(actor != null, "actor must not be null");
393         Preconditions.checkArgument(message != null, "message must not be null");
394
395         LOG.debug("Sending message {} to {}", message.getClass(), actor);
396
397         return doAsk(actor, message, timeout);
398     }
399
400     /**
401      * Execute an operation on a remote actor asynchronously.
402      *
403      * @param actor the ActorSelection
404      * @param message the message to send
405      * @return a Future containing the eventual result
406      */
407     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
408         return executeOperationAsync(actor, message, operationTimeout);
409     }
410
411     /**
412      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
413      * reply (essentially set and forget).
414      *
415      * @param actor the ActorSelection
416      * @param message the message to send
417      */
418     public void sendOperationAsync(final ActorSelection actor, final Object message) {
419         Preconditions.checkArgument(actor != null, "actor must not be null");
420         Preconditions.checkArgument(message != null, "message must not be null");
421
422         LOG.debug("Sending message {} to {}", message.getClass(), actor);
423
424         actor.tell(message, ActorRef.noSender());
425     }
426
427     @SuppressWarnings("checkstyle:IllegalCatch")
428     public void shutdown() {
429         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
430         try {
431             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
432         } catch (Exception e) {
433             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
434         }
435     }
436
437     public ClusterWrapper getClusterWrapper() {
438         return clusterWrapper;
439     }
440
441     public MemberName getCurrentMemberName() {
442         return clusterWrapper.getCurrentMemberName();
443     }
444
445     /**
446      * Send the message to each and every shard.
447      */
448     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
449         for (final String shardName : configuration.getAllShardNames()) {
450
451             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
452             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
453                 @Override
454                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
455                     if (failure != null) {
456                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
457                             shardName, failure);
458                     } else {
459                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
460                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
461                     }
462                 }
463             }, getClientDispatcher());
464         }
465     }
466
467     public FiniteDuration getOperationDuration() {
468         return operationDuration;
469     }
470
471     public Timeout getOperationTimeout() {
472         return operationTimeout;
473     }
474
475     public boolean isPathLocal(final String path) {
476         if (Strings.isNullOrEmpty(path)) {
477             return false;
478         }
479
480         int pathAtIndex = path.indexOf('@');
481         if (pathAtIndex == -1) {
482             //if the path is of local format, then its local and is co-located
483             return true;
484
485         } else if (selfAddressHostPort != null) {
486             // self-address and tx actor path, both are of remote path format
487             int slashIndex = path.indexOf('/', pathAtIndex);
488
489             if (slashIndex == -1) {
490                 return false;
491             }
492
493             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
494             return hostPort.equals(selfAddressHostPort);
495
496         } else {
497             // self address is local format and tx actor path is remote format
498             return false;
499         }
500     }
501
502     /**
503      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
504      * us to create a timer for pretty much anything.
505      *
506      * @param operationName the name of the operation
507      * @return the Timer instance
508      */
509     public Timer getOperationTimer(final String operationName) {
510         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
511     }
512
513     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
514         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
515                 operationName, METRIC_RATE);
516         return metricRegistry.timer(rate);
517     }
518
519     /**
520      * Get the name of the data store to which this ActorContext belongs.
521      *
522      * @return the data store name
523      */
524     public String getDataStoreName() {
525         return datastoreContext.getDataStoreName();
526     }
527
528     /**
529      * Get the current transaction creation rate limit.
530      *
531      * @return the rate limit
532      */
533     public double getTxCreationLimit() {
534         return txRateLimiter.getTxCreationLimit();
535     }
536
537     public long getAskTimeoutExceptionCount() {
538         return askTimeoutCounter.sum();
539     }
540
541     public void resetAskTimeoutExceptionCount() {
542         askTimeoutCounter.reset();
543     }
544
545     /**
546      * Try to acquire a transaction creation permit. Will block if no permits are available.
547      */
548     public void acquireTxCreationPermit() {
549         txRateLimiter.acquire();
550     }
551
552     /**
553      * Returns the operation timeout to be used when committing transactions.
554      *
555      * @return the operation timeout
556      */
557     public Timeout getTransactionCommitOperationTimeout() {
558         return transactionCommitOperationTimeout;
559     }
560
561     /**
562      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
563      * code on the datastore.
564      *
565      * @return the dispatcher
566      */
567     public ExecutionContext getClientDispatcher() {
568         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
569     }
570
571     public String getNotificationDispatcherPath() {
572         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
573     }
574
575     public Configuration getConfiguration() {
576         return configuration;
577     }
578
579     public ShardStrategyFactory getShardStrategyFactory() {
580         return shardStrategyFactory;
581     }
582
583     protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
584         return ask(actorRef, message, timeout);
585     }
586
587     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
588         final Future<Object> ret = ask(actorRef, message, timeout);
589         ret.onComplete(askTimeoutCounter, askTimeoutCounter);
590         return ret;
591     }
592
593     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
594         return primaryShardInfoCache;
595     }
596 }