2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.utils;
10 import akka.actor.ActorPath;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.dispatch.Mapper;
15 import akka.dispatch.OnComplete;
16 import akka.pattern.AskTimeoutException;
17 import akka.pattern.Patterns;
18 import akka.util.Timeout;
19 import com.codahale.metrics.MetricRegistry;
20 import com.codahale.metrics.Timer;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Strings;
23 import java.lang.invoke.VarHandle;
24 import java.util.Optional;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.LongAdder;
27 import java.util.function.Function;
28 import org.opendaylight.controller.cluster.access.concepts.MemberName;
29 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
47 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
49 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
50 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
51 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
52 import org.opendaylight.yangtools.yang.data.tree.api.ReadOnlyDataTree;
53 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.Await;
57 import scala.concurrent.ExecutionContext;
58 import scala.concurrent.Future;
59 import scala.concurrent.duration.FiniteDuration;
62 * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
63 * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
64 * not be passed to actors especially remote actors.
66 public class ActorUtils {
67 private static final class AskTimeoutCounter extends OnComplete<Object> implements ExecutionContext {
68 private LongAdder ateExceptions = new LongAdder();
71 public void onComplete(final Throwable failure, final Object success) throws Throwable {
72 if (failure instanceof AskTimeoutException) {
73 ateExceptions.increment();
78 ateExceptions = new LongAdder();
82 return ateExceptions.sum();
86 public void execute(final Runnable runnable) {
87 // Yes, we are this ugly, but then we are just doing a check + an increment
92 public void reportFailure(final Throwable cause) {
93 LOG.warn("Unexpected failure updating counters", cause);
97 private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
98 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
99 private static final String METRIC_RATE = "rate";
100 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER = new Mapper<>() {
102 public Throwable apply(final Throwable failure) {
103 if (failure instanceof AskTimeoutException) {
104 // A timeout exception most likely means the shard isn't initialized.
105 return new NotInitializedException(
106 "Timed out trying to find the primary shard. Most likely cause is the "
107 + "shard is not initialized yet.");
112 public static final String BOUNDED_MAILBOX = "bounded-mailbox";
113 public static final String COMMIT = "commit";
115 private final AskTimeoutCounter askTimeoutCounter = new AskTimeoutCounter();
116 private final ActorSystem actorSystem;
117 private final ActorRef shardManager;
118 private final ClusterWrapper clusterWrapper;
119 private final Configuration configuration;
120 private final String selfAddressHostPort;
121 private final Dispatchers dispatchers;
123 private DatastoreContext datastoreContext;
124 private FiniteDuration operationDuration;
125 private Timeout operationTimeout;
126 private TransactionRateLimiter txRateLimiter;
127 private Timeout transactionCommitOperationTimeout;
128 private Timeout shardInitializationTimeout;
130 private volatile EffectiveModelContext schemaContext;
132 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
133 .getMetricsRegistry();
135 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
136 private final ShardStrategyFactory shardStrategyFactory;
138 public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
139 final ClusterWrapper clusterWrapper, final Configuration configuration) {
140 this(actorSystem, shardManager, clusterWrapper, configuration,
141 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
144 public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
145 final ClusterWrapper clusterWrapper, final Configuration configuration,
146 final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
147 this.actorSystem = actorSystem;
148 this.shardManager = shardManager;
149 this.clusterWrapper = clusterWrapper;
150 this.configuration = configuration;
151 this.datastoreContext = datastoreContext;
152 dispatchers = new Dispatchers(actorSystem.dispatchers());
153 this.primaryShardInfoCache = primaryShardInfoCache;
154 shardStrategyFactory = new ShardStrategyFactory(configuration);
156 setCachedProperties();
158 final var selfAddress = clusterWrapper.getSelfAddress();
159 if (selfAddress != null && !selfAddress.host().isEmpty()) {
160 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
162 selfAddressHostPort = null;
166 private void setCachedProperties() {
167 txRateLimiter = new TransactionRateLimiter(this);
169 operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
170 TimeUnit.MILLISECONDS);
171 operationTimeout = new Timeout(operationDuration);
173 transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
174 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
176 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
179 public DatastoreContext getDatastoreContext() {
180 return datastoreContext;
183 public ActorSystem getActorSystem() {
187 public ActorRef getShardManager() {
191 public ActorSelection actorSelection(final String actorPath) {
192 return actorSystem.actorSelection(actorPath);
195 public ActorSelection actorSelection(final ActorPath actorPath) {
196 return actorSystem.actorSelection(actorPath);
199 public void setSchemaContext(final EffectiveModelContext schemaContext) {
200 this.schemaContext = schemaContext;
202 if (shardManager != null) {
203 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
207 public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
208 datastoreContext = contextFactory.getBaseDatastoreContext();
209 setCachedProperties();
211 // Trigger a write memory barrier so that the writes above will be published immediately even though they may
212 // not be immediately visible to other threads due to unsynchronized reads. That is OK though - we are going for
213 // eventual consistency here as immediately visible updates to these members are not critical. These members
214 // could have been made volatile but wanted to avoid volatile reads as these are accessed often and updates will
216 VarHandle.fullFence();
218 if (shardManager != null) {
219 shardManager.tell(contextFactory, ActorRef.noSender());
223 public EffectiveModelContext getSchemaContext() {
224 return schemaContext;
227 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
228 final var ret = primaryShardInfoCache.getIfPresent(shardName);
233 return executeOperationAsync(shardManager, new FindPrimary(shardName, true), shardInitializationTimeout)
234 .transform(new Mapper<>() {
236 public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
237 if (response instanceof RemotePrimaryShardFound found) {
238 LOG.debug("findPrimaryShardAsync received: {}", found);
239 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
240 } else if (response instanceof LocalPrimaryShardFound found) {
241 LOG.debug("findPrimaryShardAsync received: {}", found);
242 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
243 found.getLocalShardDataTree());
244 } else if (response instanceof NotInitializedException notInitialized) {
245 throw notInitialized;
246 } else if (response instanceof PrimaryNotFoundException primaryNotFound) {
247 throw primaryNotFound;
248 } else if (response instanceof NoShardLeaderException noShardLeader) {
252 throw new UnknownMessageException(String.format(
253 "FindPrimary returned unkown response: %s", response));
255 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
258 private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
259 final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
260 final var actorSelection = actorSystem.actorSelection(primaryActorPath);
261 final var info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
262 new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
263 primaryShardInfoCache.putSuccessful(shardName, info);
268 * Finds a local shard given its shard name and return it's ActorRef.
270 * @param shardName the name of the local shard that needs to be found
271 * @return a reference to a local shard actor which represents the shard
272 * specified by the shardName
274 public Optional<ActorRef> findLocalShard(final String shardName) {
275 final var result = executeOperation(shardManager, new FindLocalShard(shardName, false));
276 if (result instanceof LocalShardFound found) {
277 LOG.debug("Local shard found {}", found.getPath());
278 return Optional.of(found.getPath());
281 return Optional.empty();
285 * Finds a local shard async given its shard name and return a Future from which to obtain the
288 * @param shardName the name of the local shard that needs to be found
290 public Future<ActorRef> findLocalShardAsync(final String shardName) {
291 return executeOperationAsync(shardManager, new FindLocalShard(shardName, true), shardInitializationTimeout)
292 .map(new Mapper<>() {
294 public ActorRef checkedApply(final Object response) throws Throwable {
295 if (response instanceof LocalShardFound found) {
296 LOG.debug("Local shard found {}", found.getPath());
297 return found.getPath();
298 } else if (response instanceof NotInitializedException) {
299 throw (NotInitializedException)response;
300 } else if (response instanceof LocalShardNotFound) {
301 throw new LocalShardNotFoundException(
302 String.format("Local shard for %s does not exist.", shardName));
305 throw new UnknownMessageException("FindLocalShard returned unkown response: " + response);
307 }, getClientDispatcher());
311 * Executes an operation on a local actor and wait for it's response.
313 * @param actor the actor
314 * @param message the message to send
315 * @return The response of the operation
317 @SuppressWarnings("checkstyle:IllegalCatch")
318 public Object executeOperation(final ActorRef actor, final Object message) {
319 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
322 return Await.result(future, operationDuration);
323 } catch (Exception e) {
324 throw new TimeoutException("Sending message " + message.getClass().toString()
325 + " to actor " + actor.toString() + " failed. Try again later.", e);
330 * Execute an operation on a remote actor and wait for it's response.
332 * @param actor the actor
333 * @param message the message
334 * @return the response message
336 @SuppressWarnings("checkstyle:IllegalCatch")
337 public Object executeOperation(final ActorSelection actor, final Object message) {
338 Future<Object> future = executeOperationAsync(actor, message);
341 return Await.result(future, operationDuration);
342 } catch (Exception e) {
343 throw new TimeoutException("Sending message " + message.getClass().toString()
344 + " to actor " + actor.toString() + " failed. Try again later.", e);
348 public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
349 Preconditions.checkArgument(actor != null, "actor must not be null");
350 Preconditions.checkArgument(message != null, "message must not be null");
352 LOG.debug("Sending message {} to {}", message.getClass(), actor);
353 return doAsk(actor, message, timeout);
357 * Execute an operation on a remote actor asynchronously.
359 * @param actor the ActorSelection
360 * @param message the message to send
361 * @param timeout the operation timeout
362 * @return a Future containing the eventual result
364 public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
365 final Timeout timeout) {
366 Preconditions.checkArgument(actor != null, "actor must not be null");
367 Preconditions.checkArgument(message != null, "message must not be null");
369 LOG.debug("Sending message {} to {}", message.getClass(), actor);
371 return doAsk(actor, message, timeout);
375 * Execute an operation on a remote actor asynchronously.
377 * @param actor the ActorSelection
378 * @param message the message to send
379 * @return a Future containing the eventual result
381 public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
382 return executeOperationAsync(actor, message, operationTimeout);
386 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
387 * reply (essentially set and forget).
389 * @param actor the ActorSelection
390 * @param message the message to send
392 public void sendOperationAsync(final ActorSelection actor, final Object message) {
393 Preconditions.checkArgument(actor != null, "actor must not be null");
394 Preconditions.checkArgument(message != null, "message must not be null");
396 LOG.debug("Sending message {} to {}", message.getClass(), actor);
398 actor.tell(message, ActorRef.noSender());
401 @SuppressWarnings("checkstyle:IllegalCatch")
402 public void shutdown() {
403 final var duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
405 Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
406 } catch (Exception e) {
407 LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
411 public ClusterWrapper getClusterWrapper() {
412 return clusterWrapper;
415 public MemberName getCurrentMemberName() {
416 return clusterWrapper.getCurrentMemberName();
420 * Send the message to each and every shard.
422 public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
423 for (final String shardName : configuration.getAllShardNames()) {
425 final var primaryFuture = findPrimaryShardAsync(shardName);
426 primaryFuture.onComplete(new OnComplete<>() {
428 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
429 if (failure != null) {
430 LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
433 final var message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
434 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
437 }, getClientDispatcher());
441 public FiniteDuration getOperationDuration() {
442 return operationDuration;
445 public Timeout getOperationTimeout() {
446 return operationTimeout;
449 public boolean isPathLocal(final String path) {
450 if (Strings.isNullOrEmpty(path)) {
454 int pathAtIndex = path.indexOf('@');
455 if (pathAtIndex == -1) {
456 //if the path is of local format, then its local and is co-located
459 } else if (selfAddressHostPort != null) {
460 // self-address and tx actor path, both are of remote path format
461 int slashIndex = path.indexOf('/', pathAtIndex);
463 if (slashIndex == -1) {
467 final var hostPort = path.substring(pathAtIndex + 1, slashIndex);
468 return hostPort.equals(selfAddressHostPort);
471 // self address is local format and tx actor path is remote format
477 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
478 * us to create a timer for pretty much anything.
480 * @param operationName the name of the operation
481 * @return the Timer instance
483 public Timer getOperationTimer(final String operationName) {
484 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
487 public Timer getOperationTimer(final String dataStoreType, final String operationName) {
488 final var rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType, operationName,
490 return metricRegistry.timer(rate);
494 * Get the name of the data store to which this ActorContext belongs.
496 * @return the data store name
498 public String getDataStoreName() {
499 return datastoreContext.getDataStoreName();
503 * Get the current transaction creation rate limit.
505 * @return the rate limit
507 public double getTxCreationLimit() {
508 return txRateLimiter.getTxCreationLimit();
511 public long getAskTimeoutExceptionCount() {
512 return askTimeoutCounter.sum();
515 public void resetAskTimeoutExceptionCount() {
516 askTimeoutCounter.reset();
520 * Try to acquire a transaction creation permit. Will block if no permits are available.
522 public void acquireTxCreationPermit() {
523 txRateLimiter.acquire();
527 * Returns the operation timeout to be used when committing transactions.
529 * @return the operation timeout
531 public Timeout getTransactionCommitOperationTimeout() {
532 return transactionCommitOperationTimeout;
536 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
537 * code on the datastore.
539 * @return the dispatcher
541 public ExecutionContext getClientDispatcher() {
542 return dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
545 public String getNotificationDispatcherPath() {
546 return dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
549 public Configuration getConfiguration() {
550 return configuration;
553 public ShardStrategyFactory getShardStrategyFactory() {
554 return shardStrategyFactory;
557 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
558 return Patterns.ask(actorRef, message, timeout);
561 protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
562 final var ret = Patterns.ask(actorRef, message, timeout);
563 ret.onComplete(askTimeoutCounter, askTimeoutCounter);
567 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
568 return primaryShardInfoCache;