d3eb6f392613beca219280d131f5b12bce68dfc7
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorPath;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.pattern.Patterns;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Strings;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
32 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
35 import org.opendaylight.controller.cluster.datastore.config.Configuration;
36 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
42 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
43 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
48 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
49 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
50 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
51 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
52 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
53 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import scala.concurrent.Await;
58 import scala.concurrent.ExecutionContext;
59 import scala.concurrent.Future;
60 import scala.concurrent.duration.Duration;
61 import scala.concurrent.duration.FiniteDuration;
62
63 /**
64  * The ActorContext class contains utility methods which could be used by
65  * non-actors (like DistributedDataStore) to work with actors a little more
66  * easily. An ActorContext can be freely passed around to local object instances
67  * but should not be passed to actors especially remote actors
68  */
69 public class ActorContext {
70     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
71     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
72     private static final String METRIC_RATE = "rate";
73     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
74                                                               new Mapper<Throwable, Throwable>() {
75         @Override
76         public Throwable apply(Throwable failure) {
77             Throwable actualFailure = failure;
78             if (failure instanceof AskTimeoutException) {
79                 // A timeout exception most likely means the shard isn't initialized.
80                 actualFailure = new NotInitializedException(
81                         "Timed out trying to find the primary shard. Most likely cause is the "
82                         + "shard is not initialized yet.");
83             }
84
85             return actualFailure;
86         }
87     };
88     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
89     public static final String COMMIT = "commit";
90
91     private final ActorSystem actorSystem;
92     private final ActorRef shardManager;
93     private final ClusterWrapper clusterWrapper;
94     private final Configuration configuration;
95     private DatastoreContext datastoreContext;
96     private FiniteDuration operationDuration;
97     private Timeout operationTimeout;
98     private final String selfAddressHostPort;
99     private TransactionRateLimiter txRateLimiter;
100     private Timeout transactionCommitOperationTimeout;
101     private Timeout shardInitializationTimeout;
102     private final Dispatchers dispatchers;
103
104     private volatile SchemaContext schemaContext;
105
106     // Used as a write memory barrier.
107     @SuppressWarnings("unused")
108     private volatile boolean updated;
109
110     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
111             .getMetricsRegistry();
112
113     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
114     private final ShardStrategyFactory shardStrategyFactory;
115
116     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
117             ClusterWrapper clusterWrapper, Configuration configuration) {
118         this(actorSystem, shardManager, clusterWrapper, configuration,
119                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
120     }
121
122     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
123             ClusterWrapper clusterWrapper, Configuration configuration,
124             DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
125         this.actorSystem = actorSystem;
126         this.shardManager = shardManager;
127         this.clusterWrapper = clusterWrapper;
128         this.configuration = configuration;
129         this.datastoreContext = datastoreContext;
130         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
131         this.primaryShardInfoCache = primaryShardInfoCache;
132         this.shardStrategyFactory = new ShardStrategyFactory(configuration);
133
134         setCachedProperties();
135
136         Address selfAddress = clusterWrapper.getSelfAddress();
137         if (selfAddress != null && !selfAddress.host().isEmpty()) {
138             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
139         } else {
140             selfAddressHostPort = null;
141         }
142
143     }
144
145     private void setCachedProperties() {
146         txRateLimiter = new TransactionRateLimiter(this);
147
148         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
149         operationTimeout = new Timeout(operationDuration);
150
151         transactionCommitOperationTimeout =  new Timeout(Duration.create(
152                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
153
154         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
155     }
156
157     public DatastoreContext getDatastoreContext() {
158         return datastoreContext;
159     }
160
161     public ActorSystem getActorSystem() {
162         return actorSystem;
163     }
164
165     public ActorRef getShardManager() {
166         return shardManager;
167     }
168
169     public ActorSelection actorSelection(String actorPath) {
170         return actorSystem.actorSelection(actorPath);
171     }
172
173     public ActorSelection actorSelection(ActorPath actorPath) {
174         return actorSystem.actorSelection(actorPath);
175     }
176
177     public void setSchemaContext(SchemaContext schemaContext) {
178         this.schemaContext = schemaContext;
179
180         if (shardManager != null) {
181             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
182         }
183     }
184
185     public void setDatastoreContext(DatastoreContextFactory contextFactory) {
186         this.datastoreContext = contextFactory.getBaseDatastoreContext();
187         setCachedProperties();
188
189         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
190         // will be published immediately even though they may not be immediately visible to other
191         // threads due to unsynchronized reads. That's OK though - we're going for eventual
192         // consistency here as immediately visible updates to these members aren't critical. These
193         // members could've been made volatile but wanted to avoid volatile reads as these are
194         // accessed often and updates will be infrequent.
195
196         updated = true;
197
198         if (shardManager != null) {
199             shardManager.tell(contextFactory, ActorRef.noSender());
200         }
201     }
202
203     public SchemaContext getSchemaContext() {
204         return schemaContext;
205     }
206
207     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
208         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
209         if (ret != null) {
210             return ret;
211         }
212         Future<Object> future = executeOperationAsync(shardManager,
213                 new FindPrimary(shardName, true), shardInitializationTimeout);
214
215         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
216             @Override
217             public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
218                 if (response instanceof RemotePrimaryShardFound) {
219                     LOG.debug("findPrimaryShardAsync received: {}", response);
220                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
221                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
222                 } else if (response instanceof LocalPrimaryShardFound) {
223                     LOG.debug("findPrimaryShardAsync received: {}", response);
224                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
225                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
226                             found.getLocalShardDataTree());
227                 } else if (response instanceof NotInitializedException) {
228                     throw (NotInitializedException)response;
229                 } else if (response instanceof PrimaryNotFoundException) {
230                     throw (PrimaryNotFoundException)response;
231                 } else if (response instanceof NoShardLeaderException) {
232                     throw (NoShardLeaderException)response;
233                 }
234
235                 throw new UnknownMessageException(String.format(
236                         "FindPrimary returned unkown response: %s", response));
237             }
238         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
239     }
240
241     private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
242             short primaryVersion, DataTree localShardDataTree) {
243         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
244         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
245             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
246         primaryShardInfoCache.putSuccessful(shardName, info);
247         return info;
248     }
249
250     /**
251      * Finds a local shard given its shard name and return it's ActorRef.
252      *
253      * @param shardName the name of the local shard that needs to be found
254      * @return a reference to a local shard actor which represents the shard
255      *         specified by the shardName
256      */
257     public Optional<ActorRef> findLocalShard(String shardName) {
258         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
259
260         if (result instanceof LocalShardFound) {
261             LocalShardFound found = (LocalShardFound) result;
262             LOG.debug("Local shard found {}", found.getPath());
263             return Optional.of(found.getPath());
264         }
265
266         return Optional.absent();
267     }
268
269     /**
270      * Finds a local shard async given its shard name and return a Future from which to obtain the
271      * ActorRef.
272      *
273      * @param shardName the name of the local shard that needs to be found
274      */
275     public Future<ActorRef> findLocalShardAsync(final String shardName) {
276         Future<Object> future = executeOperationAsync(shardManager,
277                 new FindLocalShard(shardName, true), shardInitializationTimeout);
278
279         return future.map(new Mapper<Object, ActorRef>() {
280             @Override
281             public ActorRef checkedApply(Object response) throws Throwable {
282                 if (response instanceof LocalShardFound) {
283                     LocalShardFound found = (LocalShardFound)response;
284                     LOG.debug("Local shard found {}", found.getPath());
285                     return found.getPath();
286                 } else if (response instanceof NotInitializedException) {
287                     throw (NotInitializedException)response;
288                 } else if (response instanceof LocalShardNotFound) {
289                     throw new LocalShardNotFoundException(
290                             String.format("Local shard for %s does not exist.", shardName));
291                 }
292
293                 throw new UnknownMessageException(String.format(
294                         "FindLocalShard returned unkown response: %s", response));
295             }
296         }, getClientDispatcher());
297     }
298
299     /**
300      * Executes an operation on a local actor and wait for it's response.
301      *
302      * @param actor the actor
303      * @param message the message to send
304      * @return The response of the operation
305      */
306     @SuppressWarnings("checkstyle:IllegalCatch")
307     public Object executeOperation(ActorRef actor, Object message) {
308         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
309
310         try {
311             return Await.result(future, operationDuration);
312         } catch (Exception e) {
313             throw new TimeoutException("Sending message " + message.getClass().toString()
314                     + " to actor " + actor.toString() + " failed. Try again later.", e);
315         }
316     }
317
318     /**
319      * Execute an operation on a remote actor and wait for it's response.
320      *
321      * @param actor the actor
322      * @param message the message
323      * @return the response message
324      */
325     @SuppressWarnings("checkstyle:IllegalCatch")
326     public Object executeOperation(ActorSelection actor, Object message) {
327         Future<Object> future = executeOperationAsync(actor, message);
328
329         try {
330             return Await.result(future, operationDuration);
331         } catch (Exception e) {
332             throw new TimeoutException("Sending message " + message.getClass().toString()
333                     + " to actor " + actor.toString() + " failed. Try again later.", e);
334         }
335     }
336
337     public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
338         Preconditions.checkArgument(actor != null, "actor must not be null");
339         Preconditions.checkArgument(message != null, "message must not be null");
340
341         LOG.debug("Sending message {} to {}", message.getClass(), actor);
342         return doAsk(actor, message, timeout);
343     }
344
345     /**
346      * Execute an operation on a remote actor asynchronously.
347      *
348      * @param actor the ActorSelection
349      * @param message the message to send
350      * @param timeout the operation timeout
351      * @return a Future containing the eventual result
352      */
353     public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
354             Timeout timeout) {
355         Preconditions.checkArgument(actor != null, "actor must not be null");
356         Preconditions.checkArgument(message != null, "message must not be null");
357
358         LOG.debug("Sending message {} to {}", message.getClass(), actor);
359
360         return doAsk(actor, message, timeout);
361     }
362
363     /**
364      * Execute an operation on a remote actor asynchronously.
365      *
366      * @param actor the ActorSelection
367      * @param message the message to send
368      * @return a Future containing the eventual result
369      */
370     public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
371         return executeOperationAsync(actor, message, operationTimeout);
372     }
373
374     /**
375      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
376      * reply (essentially set and forget).
377      *
378      * @param actor the ActorSelection
379      * @param message the message to send
380      */
381     public void sendOperationAsync(ActorSelection actor, Object message) {
382         Preconditions.checkArgument(actor != null, "actor must not be null");
383         Preconditions.checkArgument(message != null, "message must not be null");
384
385         LOG.debug("Sending message {} to {}", message.getClass(), actor);
386
387         actor.tell(message, ActorRef.noSender());
388     }
389
390     @SuppressWarnings("checkstyle:IllegalCatch")
391     public void shutdown() {
392         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
393         try {
394             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
395         } catch (Exception e) {
396             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
397         }
398     }
399
400     public ClusterWrapper getClusterWrapper() {
401         return clusterWrapper;
402     }
403
404     public MemberName getCurrentMemberName() {
405         return clusterWrapper.getCurrentMemberName();
406     }
407
408     /**
409      * Send the message to each and every shard.
410      */
411     public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass) {
412         for (final String shardName : configuration.getAllShardNames()) {
413
414             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
415             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
416                 @Override
417                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
418                     if (failure != null) {
419                         LOG.warn("broadcast failed to send message {} to shard {}:  {}",
420                             messageClass.getSimpleName(), shardName, failure);
421                     } else {
422                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
423                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
424                     }
425                 }
426             }, getClientDispatcher());
427         }
428     }
429
430     public FiniteDuration getOperationDuration() {
431         return operationDuration;
432     }
433
434     public Timeout getOperationTimeout() {
435         return operationTimeout;
436     }
437
438     public boolean isPathLocal(String path) {
439         if (Strings.isNullOrEmpty(path)) {
440             return false;
441         }
442
443         int pathAtIndex = path.indexOf('@');
444         if (pathAtIndex == -1) {
445             //if the path is of local format, then its local and is co-located
446             return true;
447
448         } else if (selfAddressHostPort != null) {
449             // self-address and tx actor path, both are of remote path format
450             int slashIndex = path.indexOf('/', pathAtIndex);
451
452             if (slashIndex == -1) {
453                 return false;
454             }
455
456             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
457             return hostPort.equals(selfAddressHostPort);
458
459         } else {
460             // self address is local format and tx actor path is remote format
461             return false;
462         }
463     }
464
465     /**
466      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
467      * us to create a timer for pretty much anything.
468      *
469      * @param operationName the name of the operation
470      * @return the Timer instance
471      */
472     public Timer getOperationTimer(String operationName) {
473         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
474     }
475
476     public Timer getOperationTimer(String dataStoreType, String operationName) {
477         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
478                 operationName, METRIC_RATE);
479         return metricRegistry.timer(rate);
480     }
481
482     /**
483      * Get the name of the data store to which this ActorContext belongs.
484      *
485      * @return the data store name
486      */
487     public String getDataStoreName() {
488         return datastoreContext.getDataStoreName();
489     }
490
491     /**
492      * Get the current transaction creation rate limit.
493      *
494      * @return the rate limit
495      */
496     public double getTxCreationLimit() {
497         return txRateLimiter.getTxCreationLimit();
498     }
499
500     /**
501      * Try to acquire a transaction creation permit. Will block if no permits are available.
502      */
503     public void acquireTxCreationPermit() {
504         txRateLimiter.acquire();
505     }
506
507     /**
508      * Returns the operation timeout to be used when committing transactions.
509      *
510      * @return the operation timeout
511      */
512     public Timeout getTransactionCommitOperationTimeout() {
513         return transactionCommitOperationTimeout;
514     }
515
516     /**
517      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
518      * code on the datastore.
519      *
520      * @return the dispatcher
521      */
522     public ExecutionContext getClientDispatcher() {
523         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
524     }
525
526     public String getNotificationDispatcherPath() {
527         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
528     }
529
530     public Configuration getConfiguration() {
531         return configuration;
532     }
533
534     public ShardStrategyFactory getShardStrategyFactory() {
535         return shardStrategyFactory;
536     }
537
538     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
539         return ask(actorRef, message, timeout);
540     }
541
542     protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout) {
543         return ask(actorRef, message, timeout);
544     }
545
546     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
547         return primaryShardInfoCache;
548     }
549 }