2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.utils;
10 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Strings;
26 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
27 import java.util.Optional;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
32 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
33 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
36 import org.opendaylight.controller.cluster.datastore.config.Configuration;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
49 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
51 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
52 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
53 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
54 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
56 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.Await;
60 import scala.concurrent.ExecutionContext;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.FiniteDuration;
65 * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
66 * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
67 * not be passed to actors especially remote actors.
69 public class ActorUtils {
70 private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
71 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
72 private static final String METRIC_RATE = "rate";
73 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
76 public Throwable apply(final Throwable failure) {
77 Throwable actualFailure = failure;
78 if (failure instanceof AskTimeoutException) {
79 // A timeout exception most likely means the shard isn't initialized.
80 actualFailure = new NotInitializedException(
81 "Timed out trying to find the primary shard. Most likely cause is the "
82 + "shard is not initialized yet.");
88 public static final String BOUNDED_MAILBOX = "bounded-mailbox";
89 public static final String COMMIT = "commit";
91 private final ActorSystem actorSystem;
92 private final ActorRef shardManager;
93 private final ClusterWrapper clusterWrapper;
94 private final Configuration configuration;
95 private DatastoreContext datastoreContext;
96 private FiniteDuration operationDuration;
97 private Timeout operationTimeout;
98 private final String selfAddressHostPort;
99 private TransactionRateLimiter txRateLimiter;
100 private Timeout transactionCommitOperationTimeout;
101 private Timeout shardInitializationTimeout;
102 private final Dispatchers dispatchers;
104 private volatile EffectiveModelContext schemaContext;
106 // Used as a write memory barrier.
107 @SuppressWarnings("unused")
108 private volatile boolean updated;
110 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
111 .getMetricsRegistry();
113 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
114 private final ShardStrategyFactory shardStrategyFactory;
116 public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
117 final ClusterWrapper clusterWrapper, final Configuration configuration) {
118 this(actorSystem, shardManager, clusterWrapper, configuration,
119 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
122 public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
123 final ClusterWrapper clusterWrapper, final Configuration configuration,
124 final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
125 this.actorSystem = actorSystem;
126 this.shardManager = shardManager;
127 this.clusterWrapper = clusterWrapper;
128 this.configuration = configuration;
129 this.datastoreContext = datastoreContext;
130 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
131 this.primaryShardInfoCache = primaryShardInfoCache;
133 final LogicalDatastoreType convertedType =
134 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
135 this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
137 setCachedProperties();
139 Address selfAddress = clusterWrapper.getSelfAddress();
140 if (selfAddress != null && !selfAddress.host().isEmpty()) {
141 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
143 selfAddressHostPort = null;
148 private void setCachedProperties() {
149 txRateLimiter = new TransactionRateLimiter(this);
151 operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
152 TimeUnit.MILLISECONDS);
153 operationTimeout = new Timeout(operationDuration);
155 transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
156 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
158 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
161 public DatastoreContext getDatastoreContext() {
162 return datastoreContext;
165 public ActorSystem getActorSystem() {
169 public ActorRef getShardManager() {
173 public ActorSelection actorSelection(final String actorPath) {
174 return actorSystem.actorSelection(actorPath);
177 public ActorSelection actorSelection(final ActorPath actorPath) {
178 return actorSystem.actorSelection(actorPath);
181 public void setSchemaContext(final EffectiveModelContext schemaContext) {
182 this.schemaContext = schemaContext;
184 if (shardManager != null) {
185 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
189 public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
190 this.datastoreContext = contextFactory.getBaseDatastoreContext();
191 setCachedProperties();
193 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
194 // will be published immediately even though they may not be immediately visible to other
195 // threads due to unsynchronized reads. That's OK though - we're going for eventual
196 // consistency here as immediately visible updates to these members aren't critical. These
197 // members could've been made volatile but wanted to avoid volatile reads as these are
198 // accessed often and updates will be infrequent.
202 if (shardManager != null) {
203 shardManager.tell(contextFactory, ActorRef.noSender());
207 public EffectiveModelContext getSchemaContext() {
208 return schemaContext;
211 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
212 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
216 Future<Object> future = executeOperationAsync(shardManager,
217 new FindPrimary(shardName, true), shardInitializationTimeout);
219 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
221 public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
222 if (response instanceof RemotePrimaryShardFound) {
223 LOG.debug("findPrimaryShardAsync received: {}", response);
224 RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
225 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
226 } else if (response instanceof LocalPrimaryShardFound) {
227 LOG.debug("findPrimaryShardAsync received: {}", response);
228 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
229 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
230 found.getLocalShardDataTree());
231 } else if (response instanceof NotInitializedException) {
232 throw (NotInitializedException)response;
233 } else if (response instanceof PrimaryNotFoundException) {
234 throw (PrimaryNotFoundException)response;
235 } else if (response instanceof NoShardLeaderException) {
236 throw (NoShardLeaderException)response;
239 throw new UnknownMessageException(String.format(
240 "FindPrimary returned unkown response: %s", response));
242 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
245 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
246 justification = "https://github.com/spotbugs/spotbugs/issues/811")
247 private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
248 final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
249 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
250 PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
251 new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
252 primaryShardInfoCache.putSuccessful(shardName, info);
257 * Finds a local shard given its shard name and return it's ActorRef.
259 * @param shardName the name of the local shard that needs to be found
260 * @return a reference to a local shard actor which represents the shard
261 * specified by the shardName
263 public Optional<ActorRef> findLocalShard(final String shardName) {
264 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
266 if (result instanceof LocalShardFound) {
267 LocalShardFound found = (LocalShardFound) result;
268 LOG.debug("Local shard found {}", found.getPath());
269 return Optional.of(found.getPath());
272 return Optional.empty();
276 * Finds a local shard async given its shard name and return a Future from which to obtain the
279 * @param shardName the name of the local shard that needs to be found
281 public Future<ActorRef> findLocalShardAsync(final String shardName) {
282 Future<Object> future = executeOperationAsync(shardManager,
283 new FindLocalShard(shardName, true), shardInitializationTimeout);
285 return future.map(new Mapper<Object, ActorRef>() {
287 public ActorRef checkedApply(final Object response) throws Throwable {
288 if (response instanceof LocalShardFound) {
289 LocalShardFound found = (LocalShardFound)response;
290 LOG.debug("Local shard found {}", found.getPath());
291 return found.getPath();
292 } else if (response instanceof NotInitializedException) {
293 throw (NotInitializedException)response;
294 } else if (response instanceof LocalShardNotFound) {
295 throw new LocalShardNotFoundException(
296 String.format("Local shard for %s does not exist.", shardName));
299 throw new UnknownMessageException(String.format(
300 "FindLocalShard returned unkown response: %s", response));
302 }, getClientDispatcher());
306 * Executes an operation on a local actor and wait for it's response.
308 * @param actor the actor
309 * @param message the message to send
310 * @return The response of the operation
312 @SuppressWarnings("checkstyle:IllegalCatch")
313 public Object executeOperation(final ActorRef actor, final Object message) {
314 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
317 return Await.result(future, operationDuration);
318 } catch (Exception e) {
319 throw new TimeoutException("Sending message " + message.getClass().toString()
320 + " to actor " + actor.toString() + " failed. Try again later.", e);
325 * Execute an operation on a remote actor and wait for it's response.
327 * @param actor the actor
328 * @param message the message
329 * @return the response message
331 @SuppressWarnings("checkstyle:IllegalCatch")
332 public Object executeOperation(final ActorSelection actor, final Object message) {
333 Future<Object> future = executeOperationAsync(actor, message);
336 return Await.result(future, operationDuration);
337 } catch (Exception e) {
338 throw new TimeoutException("Sending message " + message.getClass().toString()
339 + " to actor " + actor.toString() + " failed. Try again later.", e);
343 public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
344 Preconditions.checkArgument(actor != null, "actor must not be null");
345 Preconditions.checkArgument(message != null, "message must not be null");
347 LOG.debug("Sending message {} to {}", message.getClass(), actor);
348 return doAsk(actor, message, timeout);
352 * Execute an operation on a remote actor asynchronously.
354 * @param actor the ActorSelection
355 * @param message the message to send
356 * @param timeout the operation timeout
357 * @return a Future containing the eventual result
359 public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
360 final Timeout timeout) {
361 Preconditions.checkArgument(actor != null, "actor must not be null");
362 Preconditions.checkArgument(message != null, "message must not be null");
364 LOG.debug("Sending message {} to {}", message.getClass(), actor);
366 return doAsk(actor, message, timeout);
370 * Execute an operation on a remote actor asynchronously.
372 * @param actor the ActorSelection
373 * @param message the message to send
374 * @return a Future containing the eventual result
376 public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
377 return executeOperationAsync(actor, message, operationTimeout);
381 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
382 * reply (essentially set and forget).
384 * @param actor the ActorSelection
385 * @param message the message to send
387 public void sendOperationAsync(final ActorSelection actor, final Object message) {
388 Preconditions.checkArgument(actor != null, "actor must not be null");
389 Preconditions.checkArgument(message != null, "message must not be null");
391 LOG.debug("Sending message {} to {}", message.getClass(), actor);
393 actor.tell(message, ActorRef.noSender());
396 @SuppressWarnings("checkstyle:IllegalCatch")
397 public void shutdown() {
398 FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
400 Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
401 } catch (Exception e) {
402 LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
406 public ClusterWrapper getClusterWrapper() {
407 return clusterWrapper;
410 public MemberName getCurrentMemberName() {
411 return clusterWrapper.getCurrentMemberName();
415 * Send the message to each and every shard.
417 public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
418 for (final String shardName : configuration.getAllShardNames()) {
420 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
421 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
423 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
424 if (failure != null) {
425 LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
428 Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
429 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
432 }, getClientDispatcher());
436 public FiniteDuration getOperationDuration() {
437 return operationDuration;
440 public Timeout getOperationTimeout() {
441 return operationTimeout;
444 public boolean isPathLocal(final String path) {
445 if (Strings.isNullOrEmpty(path)) {
449 int pathAtIndex = path.indexOf('@');
450 if (pathAtIndex == -1) {
451 //if the path is of local format, then its local and is co-located
454 } else if (selfAddressHostPort != null) {
455 // self-address and tx actor path, both are of remote path format
456 int slashIndex = path.indexOf('/', pathAtIndex);
458 if (slashIndex == -1) {
462 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
463 return hostPort.equals(selfAddressHostPort);
466 // self address is local format and tx actor path is remote format
472 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
473 * us to create a timer for pretty much anything.
475 * @param operationName the name of the operation
476 * @return the Timer instance
478 public Timer getOperationTimer(final String operationName) {
479 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
482 public Timer getOperationTimer(final String dataStoreType, final String operationName) {
483 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
484 operationName, METRIC_RATE);
485 return metricRegistry.timer(rate);
489 * Get the name of the data store to which this ActorContext belongs.
491 * @return the data store name
493 public String getDataStoreName() {
494 return datastoreContext.getDataStoreName();
498 * Get the current transaction creation rate limit.
500 * @return the rate limit
502 public double getTxCreationLimit() {
503 return txRateLimiter.getTxCreationLimit();
507 * Try to acquire a transaction creation permit. Will block if no permits are available.
509 public void acquireTxCreationPermit() {
510 txRateLimiter.acquire();
514 * Returns the operation timeout to be used when committing transactions.
516 * @return the operation timeout
518 public Timeout getTransactionCommitOperationTimeout() {
519 return transactionCommitOperationTimeout;
523 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
524 * code on the datastore.
526 * @return the dispatcher
528 public ExecutionContext getClientDispatcher() {
529 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
532 public String getNotificationDispatcherPath() {
533 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
536 public Configuration getConfiguration() {
537 return configuration;
540 public ShardStrategyFactory getShardStrategyFactory() {
541 return shardStrategyFactory;
544 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
545 return ask(actorRef, message, timeout);
548 protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
549 return ask(actorRef, message, timeout);
552 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
553 return primaryShardInfoCache;