bba4d6140e1fb697769a7d32e3a5ff0ea07ba153
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / utils / ActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorPath;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.pattern.Patterns;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Strings;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
32 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
33 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
36 import org.opendaylight.controller.cluster.datastore.config.Configuration;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
49 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
51 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
52 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
53 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
54 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.Await;
60 import scala.concurrent.ExecutionContext;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.Duration;
63 import scala.concurrent.duration.FiniteDuration;
64
65 /**
66  * The ActorContext class contains utility methods which could be used by
67  * non-actors (like DistributedDataStore) to work with actors a little more
68  * easily. An ActorContext can be freely passed around to local object instances
69  * but should not be passed to actors especially remote actors
70  */
71 public class ActorContext {
72     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
73     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
74     private static final String METRIC_RATE = "rate";
75     private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
76                                                               new Mapper<Throwable, Throwable>() {
77         @Override
78         public Throwable apply(final Throwable failure) {
79             Throwable actualFailure = failure;
80             if (failure instanceof AskTimeoutException) {
81                 // A timeout exception most likely means the shard isn't initialized.
82                 actualFailure = new NotInitializedException(
83                         "Timed out trying to find the primary shard. Most likely cause is the "
84                         + "shard is not initialized yet.");
85             }
86
87             return actualFailure;
88         }
89     };
90     public static final String BOUNDED_MAILBOX = "bounded-mailbox";
91     public static final String COMMIT = "commit";
92
93     private final ActorSystem actorSystem;
94     private final ActorRef shardManager;
95     private final ClusterWrapper clusterWrapper;
96     private final Configuration configuration;
97     private DatastoreContext datastoreContext;
98     private FiniteDuration operationDuration;
99     private Timeout operationTimeout;
100     private final String selfAddressHostPort;
101     private TransactionRateLimiter txRateLimiter;
102     private Timeout transactionCommitOperationTimeout;
103     private Timeout shardInitializationTimeout;
104     private final Dispatchers dispatchers;
105
106     private volatile SchemaContext schemaContext;
107
108     // Used as a write memory barrier.
109     @SuppressWarnings("unused")
110     private volatile boolean updated;
111
112     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
113             .getMetricsRegistry();
114
115     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
116     private final ShardStrategyFactory shardStrategyFactory;
117
118     public ActorContext(final ActorSystem actorSystem, final ActorRef shardManager,
119             final ClusterWrapper clusterWrapper, final Configuration configuration) {
120         this(actorSystem, shardManager, clusterWrapper, configuration,
121                 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
122     }
123
124     public ActorContext(final ActorSystem actorSystem, final ActorRef shardManager,
125             final ClusterWrapper clusterWrapper, final Configuration configuration,
126             final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
127         this.actorSystem = actorSystem;
128         this.shardManager = shardManager;
129         this.clusterWrapper = clusterWrapper;
130         this.configuration = configuration;
131         this.datastoreContext = datastoreContext;
132         this.dispatchers = new Dispatchers(actorSystem.dispatchers());
133         this.primaryShardInfoCache = primaryShardInfoCache;
134
135         final LogicalDatastoreType convertedType =
136                 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
137         this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
138
139         setCachedProperties();
140
141         Address selfAddress = clusterWrapper.getSelfAddress();
142         if (selfAddress != null && !selfAddress.host().isEmpty()) {
143             selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
144         } else {
145             selfAddressHostPort = null;
146         }
147
148     }
149
150     private void setCachedProperties() {
151         txRateLimiter = new TransactionRateLimiter(this);
152
153         operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
154         operationTimeout = new Timeout(operationDuration);
155
156         transactionCommitOperationTimeout =  new Timeout(Duration.create(
157                 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
158
159         shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
160     }
161
162     public DatastoreContext getDatastoreContext() {
163         return datastoreContext;
164     }
165
166     public ActorSystem getActorSystem() {
167         return actorSystem;
168     }
169
170     public ActorRef getShardManager() {
171         return shardManager;
172     }
173
174     public ActorSelection actorSelection(final String actorPath) {
175         return actorSystem.actorSelection(actorPath);
176     }
177
178     public ActorSelection actorSelection(final ActorPath actorPath) {
179         return actorSystem.actorSelection(actorPath);
180     }
181
182     public void setSchemaContext(final SchemaContext schemaContext) {
183         this.schemaContext = schemaContext;
184
185         if (shardManager != null) {
186             shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
187         }
188     }
189
190     public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
191         this.datastoreContext = contextFactory.getBaseDatastoreContext();
192         setCachedProperties();
193
194         // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
195         // will be published immediately even though they may not be immediately visible to other
196         // threads due to unsynchronized reads. That's OK though - we're going for eventual
197         // consistency here as immediately visible updates to these members aren't critical. These
198         // members could've been made volatile but wanted to avoid volatile reads as these are
199         // accessed often and updates will be infrequent.
200
201         updated = true;
202
203         if (shardManager != null) {
204             shardManager.tell(contextFactory, ActorRef.noSender());
205         }
206     }
207
208     public SchemaContext getSchemaContext() {
209         return schemaContext;
210     }
211
212     public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
213         Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
214         if (ret != null) {
215             return ret;
216         }
217         Future<Object> future = executeOperationAsync(shardManager,
218                 new FindPrimary(shardName, true), shardInitializationTimeout);
219
220         return future.transform(new Mapper<Object, PrimaryShardInfo>() {
221             @Override
222             public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
223                 if (response instanceof RemotePrimaryShardFound) {
224                     LOG.debug("findPrimaryShardAsync received: {}", response);
225                     RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
226                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
227                 } else if (response instanceof LocalPrimaryShardFound) {
228                     LOG.debug("findPrimaryShardAsync received: {}", response);
229                     LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
230                     return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
231                             found.getLocalShardDataTree());
232                 } else if (response instanceof NotInitializedException) {
233                     throw (NotInitializedException)response;
234                 } else if (response instanceof PrimaryNotFoundException) {
235                     throw (PrimaryNotFoundException)response;
236                 } else if (response instanceof NoShardLeaderException) {
237                     throw (NoShardLeaderException)response;
238                 }
239
240                 throw new UnknownMessageException(String.format(
241                         "FindPrimary returned unkown response: %s", response));
242             }
243         }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
244     }
245
246     private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
247             final short primaryVersion, final DataTree localShardDataTree) {
248         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
249         PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
250             new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
251         primaryShardInfoCache.putSuccessful(shardName, info);
252         return info;
253     }
254
255     /**
256      * Finds a local shard given its shard name and return it's ActorRef.
257      *
258      * @param shardName the name of the local shard that needs to be found
259      * @return a reference to a local shard actor which represents the shard
260      *         specified by the shardName
261      */
262     public Optional<ActorRef> findLocalShard(final String shardName) {
263         Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
264
265         if (result instanceof LocalShardFound) {
266             LocalShardFound found = (LocalShardFound) result;
267             LOG.debug("Local shard found {}", found.getPath());
268             return Optional.of(found.getPath());
269         }
270
271         return Optional.absent();
272     }
273
274     /**
275      * Finds a local shard async given its shard name and return a Future from which to obtain the
276      * ActorRef.
277      *
278      * @param shardName the name of the local shard that needs to be found
279      */
280     public Future<ActorRef> findLocalShardAsync(final String shardName) {
281         Future<Object> future = executeOperationAsync(shardManager,
282                 new FindLocalShard(shardName, true), shardInitializationTimeout);
283
284         return future.map(new Mapper<Object, ActorRef>() {
285             @Override
286             public ActorRef checkedApply(final Object response) throws Throwable {
287                 if (response instanceof LocalShardFound) {
288                     LocalShardFound found = (LocalShardFound)response;
289                     LOG.debug("Local shard found {}", found.getPath());
290                     return found.getPath();
291                 } else if (response instanceof NotInitializedException) {
292                     throw (NotInitializedException)response;
293                 } else if (response instanceof LocalShardNotFound) {
294                     throw new LocalShardNotFoundException(
295                             String.format("Local shard for %s does not exist.", shardName));
296                 }
297
298                 throw new UnknownMessageException(String.format(
299                         "FindLocalShard returned unkown response: %s", response));
300             }
301         }, getClientDispatcher());
302     }
303
304     /**
305      * Executes an operation on a local actor and wait for it's response.
306      *
307      * @param actor the actor
308      * @param message the message to send
309      * @return The response of the operation
310      */
311     @SuppressWarnings("checkstyle:IllegalCatch")
312     public Object executeOperation(final ActorRef actor, final Object message) {
313         Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
314
315         try {
316             return Await.result(future, operationDuration);
317         } catch (Exception e) {
318             throw new TimeoutException("Sending message " + message.getClass().toString()
319                     + " to actor " + actor.toString() + " failed. Try again later.", e);
320         }
321     }
322
323     /**
324      * Execute an operation on a remote actor and wait for it's response.
325      *
326      * @param actor the actor
327      * @param message the message
328      * @return the response message
329      */
330     @SuppressWarnings("checkstyle:IllegalCatch")
331     public Object executeOperation(final ActorSelection actor, final Object message) {
332         Future<Object> future = executeOperationAsync(actor, message);
333
334         try {
335             return Await.result(future, operationDuration);
336         } catch (Exception e) {
337             throw new TimeoutException("Sending message " + message.getClass().toString()
338                     + " to actor " + actor.toString() + " failed. Try again later.", e);
339         }
340     }
341
342     public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
343         Preconditions.checkArgument(actor != null, "actor must not be null");
344         Preconditions.checkArgument(message != null, "message must not be null");
345
346         LOG.debug("Sending message {} to {}", message.getClass(), actor);
347         return doAsk(actor, message, timeout);
348     }
349
350     /**
351      * Execute an operation on a remote actor asynchronously.
352      *
353      * @param actor the ActorSelection
354      * @param message the message to send
355      * @param timeout the operation timeout
356      * @return a Future containing the eventual result
357      */
358     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
359             final Timeout timeout) {
360         Preconditions.checkArgument(actor != null, "actor must not be null");
361         Preconditions.checkArgument(message != null, "message must not be null");
362
363         LOG.debug("Sending message {} to {}", message.getClass(), actor);
364
365         return doAsk(actor, message, timeout);
366     }
367
368     /**
369      * Execute an operation on a remote actor asynchronously.
370      *
371      * @param actor the ActorSelection
372      * @param message the message to send
373      * @return a Future containing the eventual result
374      */
375     public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
376         return executeOperationAsync(actor, message, operationTimeout);
377     }
378
379     /**
380      * Sends an operation to be executed by a remote actor asynchronously without waiting for a
381      * reply (essentially set and forget).
382      *
383      * @param actor the ActorSelection
384      * @param message the message to send
385      */
386     public void sendOperationAsync(final ActorSelection actor, final Object message) {
387         Preconditions.checkArgument(actor != null, "actor must not be null");
388         Preconditions.checkArgument(message != null, "message must not be null");
389
390         LOG.debug("Sending message {} to {}", message.getClass(), actor);
391
392         actor.tell(message, ActorRef.noSender());
393     }
394
395     @SuppressWarnings("checkstyle:IllegalCatch")
396     public void shutdown() {
397         FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
398         try {
399             Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
400         } catch (Exception e) {
401             LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
402         }
403     }
404
405     public ClusterWrapper getClusterWrapper() {
406         return clusterWrapper;
407     }
408
409     public MemberName getCurrentMemberName() {
410         return clusterWrapper.getCurrentMemberName();
411     }
412
413     /**
414      * Send the message to each and every shard.
415      */
416     public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
417         for (final String shardName : configuration.getAllShardNames()) {
418
419             Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
420             primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
421                 @Override
422                 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
423                     if (failure != null) {
424                         LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
425                             shardName, failure);
426                     } else {
427                         Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
428                         primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
429                     }
430                 }
431             }, getClientDispatcher());
432         }
433     }
434
435     public FiniteDuration getOperationDuration() {
436         return operationDuration;
437     }
438
439     public Timeout getOperationTimeout() {
440         return operationTimeout;
441     }
442
443     public boolean isPathLocal(final String path) {
444         if (Strings.isNullOrEmpty(path)) {
445             return false;
446         }
447
448         int pathAtIndex = path.indexOf('@');
449         if (pathAtIndex == -1) {
450             //if the path is of local format, then its local and is co-located
451             return true;
452
453         } else if (selfAddressHostPort != null) {
454             // self-address and tx actor path, both are of remote path format
455             int slashIndex = path.indexOf('/', pathAtIndex);
456
457             if (slashIndex == -1) {
458                 return false;
459             }
460
461             String hostPort = path.substring(pathAtIndex + 1, slashIndex);
462             return hostPort.equals(selfAddressHostPort);
463
464         } else {
465             // self address is local format and tx actor path is remote format
466             return false;
467         }
468     }
469
470     /**
471      * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
472      * us to create a timer for pretty much anything.
473      *
474      * @param operationName the name of the operation
475      * @return the Timer instance
476      */
477     public Timer getOperationTimer(final String operationName) {
478         return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
479     }
480
481     public Timer getOperationTimer(final String dataStoreType, final String operationName) {
482         final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
483                 operationName, METRIC_RATE);
484         return metricRegistry.timer(rate);
485     }
486
487     /**
488      * Get the name of the data store to which this ActorContext belongs.
489      *
490      * @return the data store name
491      */
492     public String getDataStoreName() {
493         return datastoreContext.getDataStoreName();
494     }
495
496     /**
497      * Get the current transaction creation rate limit.
498      *
499      * @return the rate limit
500      */
501     public double getTxCreationLimit() {
502         return txRateLimiter.getTxCreationLimit();
503     }
504
505     /**
506      * Try to acquire a transaction creation permit. Will block if no permits are available.
507      */
508     public void acquireTxCreationPermit() {
509         txRateLimiter.acquire();
510     }
511
512     /**
513      * Returns the operation timeout to be used when committing transactions.
514      *
515      * @return the operation timeout
516      */
517     public Timeout getTransactionCommitOperationTimeout() {
518         return transactionCommitOperationTimeout;
519     }
520
521     /**
522      * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
523      * code on the datastore.
524      *
525      * @return the dispatcher
526      */
527     public ExecutionContext getClientDispatcher() {
528         return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
529     }
530
531     public String getNotificationDispatcherPath() {
532         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
533     }
534
535     public Configuration getConfiguration() {
536         return configuration;
537     }
538
539     public ShardStrategyFactory getShardStrategyFactory() {
540         return shardStrategyFactory;
541     }
542
543     protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
544         return ask(actorRef, message, timeout);
545     }
546
547     protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
548         return ask(actorRef, message, timeout);
549     }
550
551     public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
552         return primaryShardInfoCache;
553     }
554 }