3d45f647a759ef19839f7d398d5bf2e2d635db9e
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtils.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static akka.pattern.Patterns.ask;
11
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Function;
29 import org.opendaylight.controller.cluster.access.concepts.MemberName;
30 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
31 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
32 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
35 import org.opendaylight.controller.cluster.datastore.config.Configuration;
36 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
42 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
43 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
48 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
49 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
50 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
51 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
52 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
53 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
54 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.Await;
59 import scala.concurrent.ExecutionContext;
60 import scala.concurrent.Future;
61 import scala.concurrent.duration.FiniteDuration;
62
63 /**
64  * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
65  * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
66  * not be passed to actors especially remote actors.
67  */
68 public class ActorUtils {
69     private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
70     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
71     private static final String METRIC_RATE = "rate";
72     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
73                                                               new Mapper<Throwable, Throwable>() {
74         @Override
75         public Throwable apply(final Throwable failure) {
76             Throwable actualFailure = failure;
77             if (failure instanceof AskTimeoutException) {
78                 // A timeout exception most likely means the shard isn't initialized.
79                 actualFailure = new NotInitializedException(
80                         "Timed out trying to find the primary shard. Most likely cause is the "
81                         + "shard is not initialized yet.");
82             }
83
84             return actualFailure;
85         }
86     };
87     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
88     public static final String COMMIT = "commit";
89
90     private final ActorSystem actorSystem;
91     private final ActorRef shardManager;
92     private final ClusterWrapper clusterWrapper;
93     private final Configuration configuration;
94     private DatastoreContext datastoreContext;
95     private FiniteDuration operationDuration;
96     private Timeout operationTimeout;
97     private final String selfAddressHostPort;
98     private TransactionRateLimiter txRateLimiter;
99     private Timeout transactionCommitOperationTimeout;
100     private Timeout shardInitializationTimeout;
101     private final Dispatchers dispatchers;
102
103     private volatile SchemaContext schemaContext;
104
105     // Used as a write memory barrier.
106     @SuppressWarnings("unused")
107     private volatile boolean updated;
108
109     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
110             .getMetricsRegistry();
111
112     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
113     private final ShardStrategyFactory shardStrategyFactory;
114
115     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
116             final ClusterWrapper clusterWrapper, final Configuration configuration) {
117         this(actorSystem, shardManager, clusterWrapper, configuration,
118                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
119     }
120
121     public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
122             final ClusterWrapper clusterWrapper, final Configuration configuration,
123             final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
124         this.actorSystem = actorSystem;
125         this.shardManager = shardManager;
126         this.clusterWrapper = clusterWrapper;
127         this.configuration = configuration;
128         this.datastoreContext = datastoreContext;
129         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
130         this.primaryShardInfoCache = primaryShardInfoCache;
131
132         final LogicalDatastoreType convertedType =
133                 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
134         this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
135
136         setCachedProperties();
137
138         Address selfAddress = clusterWrapper.getSelfAddress();
139         if (selfAddress != null && !selfAddress.host().isEmpty()) {
140             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
141         } else {
142             selfAddressHostPort = null;
143         }
144
145     }
146
147     private void setCachedProperties() {
148         txRateLimiter = new TransactionRateLimiter(this);
149
150         operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
151             TimeUnit.MILLISECONDS);
152         operationTimeout = new Timeout(operationDuration);
153
154         transactionCommitOperationTimeout =  new Timeout(FiniteDuration.create(
155                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
156
157         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
158     }
159
160     public DatastoreContext getDatastoreContext() {
161         return datastoreContext;
162     }
163
164     public ActorSystem getActorSystem() {
165         return actorSystem;
166     }
167
168     public ActorRef getShardManager() {
169         return shardManager;
170     }
171
172     public ActorSelection actorSelection(final String actorPath) {
173         return actorSystem.actorSelection(actorPath);
174     }
175
176     public ActorSelection actorSelection(final ActorPath actorPath) {
177         return actorSystem.actorSelection(actorPath);
178     }
179
180     public void setSchemaContext(final SchemaContext schemaContext) {
181         this.schemaContext = schemaContext;
182
183         if (shardManager != null) {
184             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
185         }
186     }
187
188     public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
189         this.datastoreContext = contextFactory.getBaseDatastoreContext();
190         setCachedProperties();
191
192         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
193         // will be published immediately even though they may not be immediately visible to other
194         // threads due to unsynchronized reads. That's OK though - we're going for eventual
195         // consistency here as immediately visible updates to these members aren't critical. These
196         // members could've been made volatile but wanted to avoid volatile reads as these are
197         // accessed often and updates will be infrequent.
198
199         updated = true;
200
201         if (shardManager != null) {
202             shardManager.tell(contextFactory, ActorRef.noSender());
203         }
204     }
205
206     public SchemaContext getSchemaContext() {
207         return schemaContext;
208     }
209
210     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
211         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
212         if (ret != null) {
213             return ret;
214         }
215         Future<Object> future = executeOperationAsync(shardManager,
216                 new FindPrimary(shardName, true), shardInitializationTimeout);
217
218         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
219             @Override
220             public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
221                 if (response instanceof RemotePrimaryShardFound) {
222                     LOG.debug("findPrimaryShardAsync received: {}", response);
223                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
224                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
225                 } else if (response instanceof LocalPrimaryShardFound) {
226                     LOG.debug("findPrimaryShardAsync received: {}", response);
227                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
228                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
229                             found.getLocalShardDataTree());
230                 } else if (response instanceof NotInitializedException) {
231                     throw (NotInitializedException)response;
232                 } else if (response instanceof PrimaryNotFoundException) {
233                     throw (PrimaryNotFoundException)response;
234                 } else if (response instanceof NoShardLeaderException) {
235                     throw (NoShardLeaderException)response;
236                 }
237
238                 throw new UnknownMessageException(String.format(
239                         "FindPrimary returned unkown response: %s", response));
240             }
241         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
242     }
243
244     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
245             final short primaryVersion, final DataTree localShardDataTree) {
246         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
247         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
248             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
249         primaryShardInfoCache.putSuccessful(shardName, info);
250         return info;
251     }
252
253     /**
254      * Finds a local shard given its shard name and return it's ActorRef.
255      *
256      * @param shardName the name of the local shard that needs to be found
257      * @return a reference to a local shard actor which represents the shard
258      *         specified by the shardName
259      */
260     public Optional<ActorRef> findLocalShard(final String shardName) {
261         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
262
263         if (result instanceof LocalShardFound) {
264             LocalShardFound found = (LocalShardFound) result;
265             LOG.debug("Local shard found {}", found.getPath());
266             return Optional.of(found.getPath());
267         }
268
269         return Optional.absent();
270     }
271
272     /**
273      * Finds a local shard async given its shard name and return a Future from which to obtain the
274      * ActorRef.
275      *
276      * @param shardName the name of the local shard that needs to be found
277      */
278     public Future<ActorRef> findLocalShardAsync(final String shardName) {
279         Future<Object> future = executeOperationAsync(shardManager,
280                 new FindLocalShard(shardName, true), shardInitializationTimeout);
281
282         return future.map(new Mapper<Object, ActorRef>() {
283             @Override
284             public ActorRef checkedApply(final Object response) throws Throwable {
285                 if (response instanceof LocalShardFound) {
286                     LocalShardFound found = (LocalShardFound)response;
287                     LOG.debug("Local shard found {}", found.getPath());
288                     return found.getPath();
289                 } else if (response instanceof NotInitializedException) {
290                     throw (NotInitializedException)response;
291                 } else if (response instanceof LocalShardNotFound) {
292                     throw new LocalShardNotFoundException(
293                             String.format("Local shard for %s does not exist.", shardName));
294                 }
295
296                 throw new UnknownMessageException(String.format(
297                         "FindLocalShard returned unkown response: %s", response));
298             }
299         }, getClientDispatcher());
300     }
301
302     /**
303      * Executes an operation on a local actor and wait for it's response.
304      *
305      * @param actor the actor
306      * @param message the message to send
307      * @return The response of the operation
308      */
309     @SuppressWarnings("checkstyle:IllegalCatch")
310     public Object executeOperation(final ActorRef actor, final Object message) {
311         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
312
313         try {
314             return Await.result(future, operationDuration);
315         } catch (Exception e) {
316             throw new TimeoutException("Sending message " + message.getClass().toString()
317                     + " to actor " + actor.toString() + " failed. Try again later.", e);
318         }
319     }
320
321     /**
322      * Execute an operation on a remote actor and wait for it's response.
323      *
324      * @param actor the actor
325      * @param message the message
326      * @return the response message
327      */
328     @SuppressWarnings("checkstyle:IllegalCatch")
329     public Object executeOperation(final ActorSelection actor, final Object message) {
330         Future<Object> future = executeOperationAsync(actor, message);
331
332         try {
333             return Await.result(future, operationDuration);
334         } catch (Exception e) {
335             throw new TimeoutException("Sending message " + message.getClass().toString()
336                     + " to actor " + actor.toString() + " failed. Try again later.", e);
337         }
338     }
339
340     public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
341         Preconditions.checkArgument(actor != null, "actor must not be null");
342         Preconditions.checkArgument(message != null, "message must not be null");
343
344         LOG.debug("Sending message {} to {}", message.getClass(), actor);
345         return doAsk(actor, message, timeout);
346     }
347
348     /**
349      * Execute an operation on a remote actor asynchronously.
350      *
351      * @param actor the ActorSelection
352      * @param message the message to send
353      * @param timeout the operation timeout
354      * @return a Future containing the eventual result
355      */
356     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
357             final Timeout timeout) {
358         Preconditions.checkArgument(actor != null, "actor must not be null");
359         Preconditions.checkArgument(message != null, "message must not be null");
360
361         LOG.debug("Sending message {} to {}", message.getClass(), actor);
362
363         return doAsk(actor, message, timeout);
364     }
365
366     /**
367      * Execute an operation on a remote actor asynchronously.
368      *
369      * @param actor the ActorSelection
370      * @param message the message to send
371      * @return a Future containing the eventual result
372      */
373     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
374         return executeOperationAsync(actor, message, operationTimeout);
375     }
376
377     /**
378      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
379      * reply (essentially set and forget).
380      *
381      * @param actor the ActorSelection
382      * @param message the message to send
383      */
384     public void sendOperationAsync(final ActorSelection actor, final Object message) {
385         Preconditions.checkArgument(actor != null, "actor must not be null");
386         Preconditions.checkArgument(message != null, "message must not be null");
387
388         LOG.debug("Sending message {} to {}", message.getClass(), actor);
389
390         actor.tell(message, ActorRef.noSender());
391     }
392
393     @SuppressWarnings("checkstyle:IllegalCatch")
394     public void shutdown() {
395         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
396         try {
397             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
398         } catch (Exception e) {
399             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
400         }
401     }
402
403     public ClusterWrapper getClusterWrapper() {
404         return clusterWrapper;
405     }
406
407     public MemberName getCurrentMemberName() {
408         return clusterWrapper.getCurrentMemberName();
409     }
410
411     /**
412      * Send the message to each and every shard.
413      */
414     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
415         for (final String shardName : configuration.getAllShardNames()) {
416
417             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
418             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
419                 @Override
420                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
421                     if (failure != null) {
422                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
423                             shardName, failure);
424                     } else {
425                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
426                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
427                     }
428                 }
429             }, getClientDispatcher());
430         }
431     }
432
433     public FiniteDuration getOperationDuration() {
434         return operationDuration;
435     }
436
437     public Timeout getOperationTimeout() {
438         return operationTimeout;
439     }
440
441     public boolean isPathLocal(final String path) {
442         if (Strings.isNullOrEmpty(path)) {
443             return false;
444         }
445
446         int pathAtIndex = path.indexOf('@');
447         if (pathAtIndex == -1) {
448             //if the path is of local format, then its local and is co-located
449             return true;
450
451         } else if (selfAddressHostPort != null) {
452             // self-address and tx actor path, both are of remote path format
453             int slashIndex = path.indexOf('/', pathAtIndex);
454
455             if (slashIndex == -1) {
456                 return false;
457             }
458
459             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
460             return hostPort.equals(selfAddressHostPort);
461
462         } else {
463             // self address is local format and tx actor path is remote format
464             return false;
465         }
466     }
467
468     /**
469      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
470      * us to create a timer for pretty much anything.
471      *
472      * @param operationName the name of the operation
473      * @return the Timer instance
474      */
475     public Timer getOperationTimer(final String operationName) {
476         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
477     }
478
479     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
480         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
481                 operationName, METRIC_RATE);
482         return metricRegistry.timer(rate);
483     }
484
485     /**
486      * Get the name of the data store to which this ActorContext belongs.
487      *
488      * @return the data store name
489      */
490     public String getDataStoreName() {
491         return datastoreContext.getDataStoreName();
492     }
493
494     /**
495      * Get the current transaction creation rate limit.
496      *
497      * @return the rate limit
498      */
499     public double getTxCreationLimit() {
500         return txRateLimiter.getTxCreationLimit();
501     }
502
503     /**
504      * Try to acquire a transaction creation permit. Will block if no permits are available.
505      */
506     public void acquireTxCreationPermit() {
507         txRateLimiter.acquire();
508     }
509
510     /**
511      * Returns the operation timeout to be used when committing transactions.
512      *
513      * @return the operation timeout
514      */
515     public Timeout getTransactionCommitOperationTimeout() {
516         return transactionCommitOperationTimeout;
517     }
518
519     /**
520      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
521      * code on the datastore.
522      *
523      * @return the dispatcher
524      */
525     public ExecutionContext getClientDispatcher() {
526         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
527     }
528
529     public String getNotificationDispatcherPath() {
530         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
531     }
532
533     public Configuration getConfiguration() {
534         return configuration;
535     }
536
537     public ShardStrategyFactory getShardStrategyFactory() {
538         return shardStrategyFactory;
539     }
540
541     protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
542         return ask(actorRef, message, timeout);
543     }
544
545     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
546         return ask(actorRef, message, timeout);
547     }
548
549     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
550         return primaryShardInfoCache;
551     }
552 }