2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.utils;
10 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Strings;
26 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27 import java.util.Optional;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.LongAdder;
30 import java.util.function.Function;
31 import org.opendaylight.controller.cluster.access.concepts.MemberName;
32 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
33 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
34 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
37 import org.opendaylight.controller.cluster.datastore.config.Configuration;
38 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
44 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
45 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
49 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
50 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
51 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
52 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
53 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
54 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
55 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
56 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
57 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Await;
61 import scala.concurrent.ExecutionContext;
62 import scala.concurrent.Future;
63 import scala.concurrent.duration.FiniteDuration;
66 * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
67 * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
68 * not be passed to actors especially remote actors.
70 public class ActorUtils {
71 private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
72 private LongAdder ateExceptions = new LongAdder();
75 public void onComplete(final Throwable failure, final Object success) throws Throwable {
76 if (failure instanceof AskTimeoutException) {
77 ateExceptions.increment();
82 ateExceptions = new LongAdder();
86 return ateExceptions.sum();
90 public void execute(final Runnable runnable) {
91 // Yes, we are this ugly, but then we are just doing a check + an increment
96 public void reportFailure(final Throwable cause) {
97 LOG.warn("Unexpected failure updating counters", cause);
101 private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
102 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
103 private static final String METRIC_RATE = "rate";
104 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
106 public Throwable apply(final Throwable failure) {
107 Throwable actualFailure = failure;
108 if (failure instanceof AskTimeoutException) {
109 // A timeout exception most likely means the shard isn't initialized.
110 actualFailure = new NotInitializedException(
111 "Timed out trying to find the primary shard. Most likely cause is the "
112 + "shard is not initialized yet.");
115 return actualFailure;
118 public static final String BOUNDED_MAILBOX = "bounded-mailbox";
119 public static final String COMMIT = "commit";
121 private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
122 private final ActorSystem actorSystem;
123 private final ActorRef shardManager;
124 private final ClusterWrapper clusterWrapper;
125 private final Configuration configuration;
126 private final String selfAddressHostPort;
127 private final Dispatchers dispatchers;
129 private DatastoreContext datastoreContext;
130 private FiniteDuration operationDuration;
131 private Timeout operationTimeout;
132 private TransactionRateLimiter txRateLimiter;
133 private Timeout transactionCommitOperationTimeout;
134 private Timeout shardInitializationTimeout;
136 private volatile EffectiveModelContext schemaContext;
138 // Used as a write memory barrier.
139 @SuppressWarnings("unused")
140 private volatile boolean updated;
142 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
143 .getMetricsRegistry();
145 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
146 private final ShardStrategyFactory shardStrategyFactory;
148 public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
149 final ClusterWrapper clusterWrapper, final Configuration configuration) {
150 this(actorSystem, shardManager, clusterWrapper, configuration,
151 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
154 public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
155 final ClusterWrapper clusterWrapper, final Configuration configuration,
156 final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
157 this.actorSystem = actorSystem;
158 this.shardManager = shardManager;
159 this.clusterWrapper = clusterWrapper;
160 this.configuration = configuration;
161 this.datastoreContext = datastoreContext;
162 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
163 this.primaryShardInfoCache = primaryShardInfoCache;
165 final LogicalDatastoreType convertedType =
166 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
167 this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
169 setCachedProperties();
171 Address selfAddress = clusterWrapper.getSelfAddress();
172 if (selfAddress != null && !selfAddress.host().isEmpty()) {
173 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
175 selfAddressHostPort = null;
179 private void setCachedProperties() {
180 txRateLimiter = new TransactionRateLimiter(this);
182 operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
183 TimeUnit.MILLISECONDS);
184 operationTimeout = new Timeout(operationDuration);
186 transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
187 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
189 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
192 public DatastoreContext getDatastoreContext() {
193 return datastoreContext;
196 public ActorSystem getActorSystem() {
200 public ActorRef getShardManager() {
204 public ActorSelection actorSelection(final String actorPath) {
205 return actorSystem.actorSelection(actorPath);
208 public ActorSelection actorSelection(final ActorPath actorPath) {
209 return actorSystem.actorSelection(actorPath);
212 public void setSchemaContext(final EffectiveModelContext schemaContext) {
213 this.schemaContext = schemaContext;
215 if (shardManager != null) {
216 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
220 public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
221 this.datastoreContext = contextFactory.getBaseDatastoreContext();
222 setCachedProperties();
224 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
225 // will be published immediately even though they may not be immediately visible to other
226 // threads due to unsynchronized reads. That's OK though - we're going for eventual
227 // consistency here as immediately visible updates to these members aren't critical. These
228 // members could've been made volatile but wanted to avoid volatile reads as these are
229 // accessed often and updates will be infrequent.
233 if (shardManager != null) {
234 shardManager.tell(contextFactory, ActorRef.noSender());
238 public EffectiveModelContext getSchemaContext() {
239 return schemaContext;
242 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
243 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
247 Future<Object> future = executeOperationAsync(shardManager,
248 new FindPrimary(shardName, true), shardInitializationTimeout);
250 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
252 public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
253 if (response instanceof RemotePrimaryShardFound) {
254 LOG.debug("findPrimaryShardAsync received: {}", response);
255 RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
256 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
257 } else if (response instanceof LocalPrimaryShardFound) {
258 LOG.debug("findPrimaryShardAsync received: {}", response);
259 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
260 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
261 found.getLocalShardDataTree());
262 } else if (response instanceof NotInitializedException) {
263 throw (NotInitializedException)response;
264 } else if (response instanceof PrimaryNotFoundException) {
265 throw (PrimaryNotFoundException)response;
266 } else if (response instanceof NoShardLeaderException) {
267 throw (NoShardLeaderException)response;
270 throw new UnknownMessageException(String.format(
271 "FindPrimary returned unkown response: %s", response));
273 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
276 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
277 justification = "https://github.com/spotbugs/spotbugs/issues/811")
278 private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
279 final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
280 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
281 PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
282 new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
283 primaryShardInfoCache.putSuccessful(shardName, info);
288 * Finds a local shard given its shard name and return it's ActorRef.
290 * @param shardName the name of the local shard that needs to be found
291 * @return a reference to a local shard actor which represents the shard
292 * specified by the shardName
294 public Optional<ActorRef> findLocalShard(final String shardName) {
295 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
297 if (result instanceof LocalShardFound) {
298 LocalShardFound found = (LocalShardFound) result;
299 LOG.debug("Local shard found {}", found.getPath());
300 return Optional.of(found.getPath());
303 return Optional.empty();
307 * Finds a local shard async given its shard name and return a Future from which to obtain the
310 * @param shardName the name of the local shard that needs to be found
312 public Future<ActorRef> findLocalShardAsync(final String shardName) {
313 Future<Object> future = executeOperationAsync(shardManager,
314 new FindLocalShard(shardName, true), shardInitializationTimeout);
316 return future.map(new Mapper<Object, ActorRef>() {
318 public ActorRef checkedApply(final Object response) throws Throwable {
319 if (response instanceof LocalShardFound) {
320 LocalShardFound found = (LocalShardFound)response;
321 LOG.debug("Local shard found {}", found.getPath());
322 return found.getPath();
323 } else if (response instanceof NotInitializedException) {
324 throw (NotInitializedException)response;
325 } else if (response instanceof LocalShardNotFound) {
326 throw new LocalShardNotFoundException(
327 String.format("Local shard for %s does not exist.", shardName));
330 throw new UnknownMessageException(String.format(
331 "FindLocalShard returned unkown response: %s", response));
333 }, getClientDispatcher());
337 * Executes an operation on a local actor and wait for it's response.
339 * @param actor the actor
340 * @param message the message to send
341 * @return The response of the operation
343 @SuppressWarnings("checkstyle:IllegalCatch")
344 public Object executeOperation(final ActorRef actor, final Object message) {
345 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
348 return Await.result(future, operationDuration);
349 } catch (Exception e) {
350 throw new TimeoutException("Sending message " + message.getClass().toString()
351 + " to actor " + actor.toString() + " failed. Try again later.", e);
356 * Execute an operation on a remote actor and wait for it's response.
358 * @param actor the actor
359 * @param message the message
360 * @return the response message
362 @SuppressWarnings("checkstyle:IllegalCatch")
363 public Object executeOperation(final ActorSelection actor, final Object message) {
364 Future<Object> future = executeOperationAsync(actor, message);
367 return Await.result(future, operationDuration);
368 } catch (Exception e) {
369 throw new TimeoutException("Sending message " + message.getClass().toString()
370 + " to actor " + actor.toString() + " failed. Try again later.", e);
374 public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
375 Preconditions.checkArgument(actor != null, "actor must not be null");
376 Preconditions.checkArgument(message != null, "message must not be null");
378 LOG.debug("Sending message {} to {}", message.getClass(), actor);
379 return doAsk(actor, message, timeout);
383 * Execute an operation on a remote actor asynchronously.
385 * @param actor the ActorSelection
386 * @param message the message to send
387 * @param timeout the operation timeout
388 * @return a Future containing the eventual result
390 public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
391 final Timeout timeout) {
392 Preconditions.checkArgument(actor != null, "actor must not be null");
393 Preconditions.checkArgument(message != null, "message must not be null");
395 LOG.debug("Sending message {} to {}", message.getClass(), actor);
397 return doAsk(actor, message, timeout);
401 * Execute an operation on a remote actor asynchronously.
403 * @param actor the ActorSelection
404 * @param message the message to send
405 * @return a Future containing the eventual result
407 public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
408 return executeOperationAsync(actor, message, operationTimeout);
412 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
413 * reply (essentially set and forget).
415 * @param actor the ActorSelection
416 * @param message the message to send
418 public void sendOperationAsync(final ActorSelection actor, final Object message) {
419 Preconditions.checkArgument(actor != null, "actor must not be null");
420 Preconditions.checkArgument(message != null, "message must not be null");
422 LOG.debug("Sending message {} to {}", message.getClass(), actor);
424 actor.tell(message, ActorRef.noSender());
427 @SuppressWarnings("checkstyle:IllegalCatch")
428 public void shutdown() {
429 FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
431 Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
432 } catch (Exception e) {
433 LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
437 public ClusterWrapper getClusterWrapper() {
438 return clusterWrapper;
441 public MemberName getCurrentMemberName() {
442 return clusterWrapper.getCurrentMemberName();
446 * Send the message to each and every shard.
448 public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
449 for (final String shardName : configuration.getAllShardNames()) {
451 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
452 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
454 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
455 if (failure != null) {
456 LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
459 Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
460 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
463 }, getClientDispatcher());
467 public FiniteDuration getOperationDuration() {
468 return operationDuration;
471 public Timeout getOperationTimeout() {
472 return operationTimeout;
475 public boolean isPathLocal(final String path) {
476 if (Strings.isNullOrEmpty(path)) {
480 int pathAtIndex = path.indexOf('@');
481 if (pathAtIndex == -1) {
482 //if the path is of local format, then its local and is co-located
485 } else if (selfAddressHostPort != null) {
486 // self-address and tx actor path, both are of remote path format
487 int slashIndex = path.indexOf('/', pathAtIndex);
489 if (slashIndex == -1) {
493 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
494 return hostPort.equals(selfAddressHostPort);
497 // self address is local format and tx actor path is remote format
503 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
504 * us to create a timer for pretty much anything.
506 * @param operationName the name of the operation
507 * @return the Timer instance
509 public Timer getOperationTimer(final String operationName) {
510 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
513 public Timer getOperationTimer(final String dataStoreType, final String operationName) {
514 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
515 operationName, METRIC_RATE);
516 return metricRegistry.timer(rate);
520 * Get the name of the data store to which this ActorContext belongs.
522 * @return the data store name
524 public String getDataStoreName() {
525 return datastoreContext.getDataStoreName();
529 * Get the current transaction creation rate limit.
531 * @return the rate limit
533 public double getTxCreationLimit() {
534 return txRateLimiter.getTxCreationLimit();
537 public long getAskTimeoutExceptionCount() {
538 return askTimeoutCounter.sum();
541 public void resetAskTimeoutExceptionCount() {
542 askTimeoutCounter.reset();
546 * Try to acquire a transaction creation permit. Will block if no permits are available.
548 public void acquireTxCreationPermit() {
549 txRateLimiter.acquire();
553 * Returns the operation timeout to be used when committing transactions.
555 * @return the operation timeout
557 public Timeout getTransactionCommitOperationTimeout() {
558 return transactionCommitOperationTimeout;
562 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
563 * code on the datastore.
565 * @return the dispatcher
567 public ExecutionContext getClientDispatcher() {
568 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
571 public String getNotificationDispatcherPath() {
572 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
575 public Configuration getConfiguration() {
576 return configuration;
579 public ShardStrategyFactory getShardStrategyFactory() {
580 return shardStrategyFactory;
583 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
584 return ask(actorRef, message, timeout);
587 protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
588 final Future<Object> ret = ask(actorRef, message, timeout);
589 ret.onComplete(askTimeoutCounter, askTimeoutCounter);
593 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
594 return primaryShardInfoCache;