Add UnsignedLongRangeSet.toString()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static akka.pattern.Patterns.ask;
11
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Strings;
26 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27 import java.util.Optional;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
32 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
33 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
36 import org.opendaylight.controller.cluster.datastore.config.Configuration;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
49 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
51 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
52 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
53 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
54 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.Await;
60 import scala.concurrent.ExecutionContext;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.FiniteDuration;
63
64 /**
65  * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
66  * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
67  * not be passed to actors especially remote actors.
68  */
69 public class ActorUtils {
70     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
71     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
72     private static final String METRIC_RATE = "rate";
73     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
74                                                               new Mapper<>() {
75         @Override
76         public Throwable apply(final Throwable failure) {
77             Throwable actualFailure = failure;
78             if (failure instanceof AskTimeoutException) {
79                 // A timeout exception most likely means the shard isn't initialized.
80                 actualFailure = new NotInitializedException(
81                         "Timed out trying to find the primary shard. Most likely cause is the "
82                         + "shard is not initialized yet.");
83             }
84
85             return actualFailure;
86         }
87     };
88     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
89     public static final String COMMIT = "commit";
90
91     private final ActorSystem actorSystem;
92     private final ActorRef shardManager;
93     private final ClusterWrapper clusterWrapper;
94     private final Configuration configuration;
95     private DatastoreContext datastoreContext;
96     private FiniteDuration operationDuration;
97     private Timeout operationTimeout;
98     private final String selfAddressHostPort;
99     private TransactionRateLimiter txRateLimiter;
100     private Timeout transactionCommitOperationTimeout;
101     private Timeout shardInitializationTimeout;
102     private final Dispatchers dispatchers;
103
104     private volatile EffectiveModelContext schemaContext;
105
106     // Used as a write memory barrier.
107     @SuppressWarnings("unused")
108     private volatile boolean updated;
109
110     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
111             .getMetricsRegistry();
112
113     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
114     private final ShardStrategyFactory shardStrategyFactory;
115
116     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
117             final ClusterWrapper clusterWrapper, final Configuration configuration) {
118         this(actorSystem, shardManager, clusterWrapper, configuration,
119                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
120     }
121
122     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
123             final ClusterWrapper clusterWrapper, final Configuration configuration,
124             final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
125         this.actorSystem = actorSystem;
126         this.shardManager = shardManager;
127         this.clusterWrapper = clusterWrapper;
128         this.configuration = configuration;
129         this.datastoreContext = datastoreContext;
130         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
131         this.primaryShardInfoCache = primaryShardInfoCache;
132
133         final LogicalDatastoreType convertedType =
134                 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
135         this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
136
137         setCachedProperties();
138
139         Address selfAddress = clusterWrapper.getSelfAddress();
140         if (selfAddress != null && !selfAddress.host().isEmpty()) {
141             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
142         } else {
143             selfAddressHostPort = null;
144         }
145
146     }
147
148     private void setCachedProperties() {
149         txRateLimiter = new TransactionRateLimiter(this);
150
151         operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
152             TimeUnit.MILLISECONDS);
153         operationTimeout = new Timeout(operationDuration);
154
155         transactionCommitOperationTimeout =  new Timeout(FiniteDuration.create(
156                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
157
158         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
159     }
160
161     public DatastoreContext getDatastoreContext() {
162         return datastoreContext;
163     }
164
165     public ActorSystem getActorSystem() {
166         return actorSystem;
167     }
168
169     public ActorRef getShardManager() {
170         return shardManager;
171     }
172
173     public ActorSelection actorSelection(final String actorPath) {
174         return actorSystem.actorSelection(actorPath);
175     }
176
177     public ActorSelection actorSelection(final ActorPath actorPath) {
178         return actorSystem.actorSelection(actorPath);
179     }
180
181     public void setSchemaContext(final EffectiveModelContext schemaContext) {
182         this.schemaContext = schemaContext;
183
184         if (shardManager != null) {
185             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
186         }
187     }
188
189     public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
190         this.datastoreContext = contextFactory.getBaseDatastoreContext();
191         setCachedProperties();
192
193         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
194         // will be published immediately even though they may not be immediately visible to other
195         // threads due to unsynchronized reads. That's OK though - we're going for eventual
196         // consistency here as immediately visible updates to these members aren't critical. These
197         // members could've been made volatile but wanted to avoid volatile reads as these are
198         // accessed often and updates will be infrequent.
199
200         updated = true;
201
202         if (shardManager != null) {
203             shardManager.tell(contextFactory, ActorRef.noSender());
204         }
205     }
206
207     public EffectiveModelContext getSchemaContext() {
208         return schemaContext;
209     }
210
211     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
212         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
213         if (ret != null) {
214             return ret;
215         }
216         Future<Object> future = executeOperationAsync(shardManager,
217                 new FindPrimary(shardName, true), shardInitializationTimeout);
218
219         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
220             @Override
221             public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
222                 if (response instanceof RemotePrimaryShardFound) {
223                     LOG.debug("findPrimaryShardAsync received: {}", response);
224                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
225                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
226                 } else if (response instanceof LocalPrimaryShardFound) {
227                     LOG.debug("findPrimaryShardAsync received: {}", response);
228                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
229                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
230                             found.getLocalShardDataTree());
231                 } else if (response instanceof NotInitializedException) {
232                     throw (NotInitializedException)response;
233                 } else if (response instanceof PrimaryNotFoundException) {
234                     throw (PrimaryNotFoundException)response;
235                 } else if (response instanceof NoShardLeaderException) {
236                     throw (NoShardLeaderException)response;
237                 }
238
239                 throw new UnknownMessageException(String.format(
240                         "FindPrimary returned unkown response: %s", response));
241             }
242         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
243     }
244
245     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
246             justification = "https://github.com/spotbugs/spotbugs/issues/811")
247     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
248             final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
249         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
250         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
251             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
252         primaryShardInfoCache.putSuccessful(shardName, info);
253         return info;
254     }
255
256     /**
257      * Finds a local shard given its shard name and return it's ActorRef.
258      *
259      * @param shardName the name of the local shard that needs to be found
260      * @return a reference to a local shard actor which represents the shard
261      *         specified by the shardName
262      */
263     public Optional<ActorRef> findLocalShard(final String shardName) {
264         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
265
266         if (result instanceof LocalShardFound) {
267             LocalShardFound found = (LocalShardFound) result;
268             LOG.debug("Local shard found {}", found.getPath());
269             return Optional.of(found.getPath());
270         }
271
272         return Optional.empty();
273     }
274
275     /**
276      * Finds a local shard async given its shard name and return a Future from which to obtain the
277      * ActorRef.
278      *
279      * @param shardName the name of the local shard that needs to be found
280      */
281     public Future<ActorRef> findLocalShardAsync(final String shardName) {
282         Future<Object> future = executeOperationAsync(shardManager,
283                 new FindLocalShard(shardName, true), shardInitializationTimeout);
284
285         return future.map(new Mapper<Object, ActorRef>() {
286             @Override
287             public ActorRef checkedApply(final Object response) throws Throwable {
288                 if (response instanceof LocalShardFound) {
289                     LocalShardFound found = (LocalShardFound)response;
290                     LOG.debug("Local shard found {}", found.getPath());
291                     return found.getPath();
292                 } else if (response instanceof NotInitializedException) {
293                     throw (NotInitializedException)response;
294                 } else if (response instanceof LocalShardNotFound) {
295                     throw new LocalShardNotFoundException(
296                             String.format("Local shard for %s does not exist.", shardName));
297                 }
298
299                 throw new UnknownMessageException(String.format(
300                         "FindLocalShard returned unkown response: %s", response));
301             }
302         }, getClientDispatcher());
303     }
304
305     /**
306      * Executes an operation on a local actor and wait for it's response.
307      *
308      * @param actor the actor
309      * @param message the message to send
310      * @return The response of the operation
311      */
312     @SuppressWarnings("checkstyle:IllegalCatch")
313     public Object executeOperation(final ActorRef actor, final Object message) {
314         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
315
316         try {
317             return Await.result(future, operationDuration);
318         } catch (Exception e) {
319             throw new TimeoutException("Sending message " + message.getClass().toString()
320                     + " to actor " + actor.toString() + " failed. Try again later.", e);
321         }
322     }
323
324     /**
325      * Execute an operation on a remote actor and wait for it's response.
326      *
327      * @param actor the actor
328      * @param message the message
329      * @return the response message
330      */
331     @SuppressWarnings("checkstyle:IllegalCatch")
332     public Object executeOperation(final ActorSelection actor, final Object message) {
333         Future<Object> future = executeOperationAsync(actor, message);
334
335         try {
336             return Await.result(future, operationDuration);
337         } catch (Exception e) {
338             throw new TimeoutException("Sending message " + message.getClass().toString()
339                     + " to actor " + actor.toString() + " failed. Try again later.", e);
340         }
341     }
342
343     public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
344         Preconditions.checkArgument(actor != null, "actor must not be null");
345         Preconditions.checkArgument(message != null, "message must not be null");
346
347         LOG.debug("Sending message {} to {}", message.getClass(), actor);
348         return doAsk(actor, message, timeout);
349     }
350
351     /**
352      * Execute an operation on a remote actor asynchronously.
353      *
354      * @param actor the ActorSelection
355      * @param message the message to send
356      * @param timeout the operation timeout
357      * @return a Future containing the eventual result
358      */
359     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
360             final Timeout timeout) {
361         Preconditions.checkArgument(actor != null, "actor must not be null");
362         Preconditions.checkArgument(message != null, "message must not be null");
363
364         LOG.debug("Sending message {} to {}", message.getClass(), actor);
365
366         return doAsk(actor, message, timeout);
367     }
368
369     /**
370      * Execute an operation on a remote actor asynchronously.
371      *
372      * @param actor the ActorSelection
373      * @param message the message to send
374      * @return a Future containing the eventual result
375      */
376     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
377         return executeOperationAsync(actor, message, operationTimeout);
378     }
379
380     /**
381      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
382      * reply (essentially set and forget).
383      *
384      * @param actor the ActorSelection
385      * @param message the message to send
386      */
387     public void sendOperationAsync(final ActorSelection actor, final Object message) {
388         Preconditions.checkArgument(actor != null, "actor must not be null");
389         Preconditions.checkArgument(message != null, "message must not be null");
390
391         LOG.debug("Sending message {} to {}", message.getClass(), actor);
392
393         actor.tell(message, ActorRef.noSender());
394     }
395
396     @SuppressWarnings("checkstyle:IllegalCatch")
397     public void shutdown() {
398         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
399         try {
400             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
401         } catch (Exception e) {
402             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
403         }
404     }
405
406     public ClusterWrapper getClusterWrapper() {
407         return clusterWrapper;
408     }
409
410     public MemberName getCurrentMemberName() {
411         return clusterWrapper.getCurrentMemberName();
412     }
413
414     /**
415      * Send the message to each and every shard.
416      */
417     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
418         for (final String shardName : configuration.getAllShardNames()) {
419
420             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
421             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
422                 @Override
423                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
424                     if (failure != null) {
425                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
426                             shardName, failure);
427                     } else {
428                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
429                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
430                     }
431                 }
432             }, getClientDispatcher());
433         }
434     }
435
436     public FiniteDuration getOperationDuration() {
437         return operationDuration;
438     }
439
440     public Timeout getOperationTimeout() {
441         return operationTimeout;
442     }
443
444     public boolean isPathLocal(final String path) {
445         if (Strings.isNullOrEmpty(path)) {
446             return false;
447         }
448
449         int pathAtIndex = path.indexOf('@');
450         if (pathAtIndex == -1) {
451             //if the path is of local format, then its local and is co-located
452             return true;
453
454         } else if (selfAddressHostPort != null) {
455             // self-address and tx actor path, both are of remote path format
456             int slashIndex = path.indexOf('/', pathAtIndex);
457
458             if (slashIndex == -1) {
459                 return false;
460             }
461
462             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
463             return hostPort.equals(selfAddressHostPort);
464
465         } else {
466             // self address is local format and tx actor path is remote format
467             return false;
468         }
469     }
470
471     /**
472      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
473      * us to create a timer for pretty much anything.
474      *
475      * @param operationName the name of the operation
476      * @return the Timer instance
477      */
478     public Timer getOperationTimer(final String operationName) {
479         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
480     }
481
482     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
483         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
484                 operationName, METRIC_RATE);
485         return metricRegistry.timer(rate);
486     }
487
488     /**
489      * Get the name of the data store to which this ActorContext belongs.
490      *
491      * @return the data store name
492      */
493     public String getDataStoreName() {
494         return datastoreContext.getDataStoreName();
495     }
496
497     /**
498      * Get the current transaction creation rate limit.
499      *
500      * @return the rate limit
501      */
502     public double getTxCreationLimit() {
503         return txRateLimiter.getTxCreationLimit();
504     }
505
506     /**
507      * Try to acquire a transaction creation permit. Will block if no permits are available.
508      */
509     public void acquireTxCreationPermit() {
510         txRateLimiter.acquire();
511     }
512
513     /**
514      * Returns the operation timeout to be used when committing transactions.
515      *
516      * @return the operation timeout
517      */
518     public Timeout getTransactionCommitOperationTimeout() {
519         return transactionCommitOperationTimeout;
520     }
521
522     /**
523      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
524      * code on the datastore.
525      *
526      * @return the dispatcher
527      */
528     public ExecutionContext getClientDispatcher() {
529         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
530     }
531
532     public String getNotificationDispatcherPath() {
533         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
534     }
535
536     public Configuration getConfiguration() {
537         return configuration;
538     }
539
540     public ShardStrategyFactory getShardStrategyFactory() {
541         return shardStrategyFactory;
542     }
543
544     protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
545         return ask(actorRef, message, timeout);
546     }
547
548     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
549         return ask(actorRef, message, timeout);
550     }
551
552     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
553         return primaryShardInfoCache;
554     }
555 }