2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.utils;
10 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Strings;
26 import java.util.Optional;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Function;
29 import org.opendaylight.controller.cluster.access.concepts.MemberName;
30 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
31 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
32 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
35 import org.opendaylight.controller.cluster.datastore.config.Configuration;
36 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
42 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
43 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
48 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
49 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
50 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
51 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
52 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
53 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
54 import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.Await;
59 import scala.concurrent.ExecutionContext;
60 import scala.concurrent.Future;
61 import scala.concurrent.duration.FiniteDuration;
64 * The ActorUtils class contains utility methods which could be used by non-actors (like DistributedDataStore) to work
65 * with actors a little more easily. An ActorContext can be freely passed around to local object instances but should
66 * not be passed to actors especially remote actors.
68 public class ActorUtils {
69 private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
70 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
71 private static final String METRIC_RATE = "rate";
72 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
73 new Mapper<Throwable, Throwable>() {
75 public Throwable apply(final Throwable failure) {
76 Throwable actualFailure = failure;
77 if (failure instanceof AskTimeoutException) {
78 // A timeout exception most likely means the shard isn't initialized.
79 actualFailure = new NotInitializedException(
80 "Timed out trying to find the primary shard. Most likely cause is the "
81 + "shard is not initialized yet.");
87 public static final String BOUNDED_MAILBOX = "bounded-mailbox";
88 public static final String COMMIT = "commit";
90 private final ActorSystem actorSystem;
91 private final ActorRef shardManager;
92 private final ClusterWrapper clusterWrapper;
93 private final Configuration configuration;
94 private DatastoreContext datastoreContext;
95 private FiniteDuration operationDuration;
96 private Timeout operationTimeout;
97 private final String selfAddressHostPort;
98 private TransactionRateLimiter txRateLimiter;
99 private Timeout transactionCommitOperationTimeout;
100 private Timeout shardInitializationTimeout;
101 private final Dispatchers dispatchers;
103 private volatile SchemaContext schemaContext;
105 // Used as a write memory barrier.
106 @SuppressWarnings("unused")
107 private volatile boolean updated;
109 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
110 .getMetricsRegistry();
112 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
113 private final ShardStrategyFactory shardStrategyFactory;
115 public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
116 final ClusterWrapper clusterWrapper, final Configuration configuration) {
117 this(actorSystem, shardManager, clusterWrapper, configuration,
118 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
121 public ActorUtils(final ActorSystem actorSystem, final ActorRef shardManager,
122 final ClusterWrapper clusterWrapper, final Configuration configuration,
123 final DatastoreContext datastoreContext, final PrimaryShardInfoFutureCache primaryShardInfoCache) {
124 this.actorSystem = actorSystem;
125 this.shardManager = shardManager;
126 this.clusterWrapper = clusterWrapper;
127 this.configuration = configuration;
128 this.datastoreContext = datastoreContext;
129 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
130 this.primaryShardInfoCache = primaryShardInfoCache;
132 final LogicalDatastoreType convertedType =
133 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
134 this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
136 setCachedProperties();
138 Address selfAddress = clusterWrapper.getSelfAddress();
139 if (selfAddress != null && !selfAddress.host().isEmpty()) {
140 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
142 selfAddressHostPort = null;
147 private void setCachedProperties() {
148 txRateLimiter = new TransactionRateLimiter(this);
150 operationDuration = FiniteDuration.create(datastoreContext.getOperationTimeoutInMillis(),
151 TimeUnit.MILLISECONDS);
152 operationTimeout = new Timeout(operationDuration);
154 transactionCommitOperationTimeout = new Timeout(FiniteDuration.create(
155 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
157 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
160 public DatastoreContext getDatastoreContext() {
161 return datastoreContext;
164 public ActorSystem getActorSystem() {
168 public ActorRef getShardManager() {
172 public ActorSelection actorSelection(final String actorPath) {
173 return actorSystem.actorSelection(actorPath);
176 public ActorSelection actorSelection(final ActorPath actorPath) {
177 return actorSystem.actorSelection(actorPath);
180 public void setSchemaContext(final SchemaContext schemaContext) {
181 this.schemaContext = schemaContext;
183 if (shardManager != null) {
184 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
188 public void setDatastoreContext(final DatastoreContextFactory contextFactory) {
189 this.datastoreContext = contextFactory.getBaseDatastoreContext();
190 setCachedProperties();
192 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
193 // will be published immediately even though they may not be immediately visible to other
194 // threads due to unsynchronized reads. That's OK though - we're going for eventual
195 // consistency here as immediately visible updates to these members aren't critical. These
196 // members could've been made volatile but wanted to avoid volatile reads as these are
197 // accessed often and updates will be infrequent.
201 if (shardManager != null) {
202 shardManager.tell(contextFactory, ActorRef.noSender());
206 public SchemaContext getSchemaContext() {
207 return schemaContext;
210 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
211 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
215 Future<Object> future = executeOperationAsync(shardManager,
216 new FindPrimary(shardName, true), shardInitializationTimeout);
218 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
220 public PrimaryShardInfo checkedApply(final Object response) throws UnknownMessageException {
221 if (response instanceof RemotePrimaryShardFound) {
222 LOG.debug("findPrimaryShardAsync received: {}", response);
223 RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
224 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
225 } else if (response instanceof LocalPrimaryShardFound) {
226 LOG.debug("findPrimaryShardAsync received: {}", response);
227 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
228 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
229 found.getLocalShardDataTree());
230 } else if (response instanceof NotInitializedException) {
231 throw (NotInitializedException)response;
232 } else if (response instanceof PrimaryNotFoundException) {
233 throw (PrimaryNotFoundException)response;
234 } else if (response instanceof NoShardLeaderException) {
235 throw (NoShardLeaderException)response;
238 throw new UnknownMessageException(String.format(
239 "FindPrimary returned unkown response: %s", response));
241 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
244 private PrimaryShardInfo onPrimaryShardFound(final String shardName, final String primaryActorPath,
245 final short primaryVersion, final ReadOnlyDataTree localShardDataTree) {
246 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
247 PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
248 new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
249 primaryShardInfoCache.putSuccessful(shardName, info);
254 * Finds a local shard given its shard name and return it's ActorRef.
256 * @param shardName the name of the local shard that needs to be found
257 * @return a reference to a local shard actor which represents the shard
258 * specified by the shardName
260 public Optional<ActorRef> findLocalShard(final String shardName) {
261 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
263 if (result instanceof LocalShardFound) {
264 LocalShardFound found = (LocalShardFound) result;
265 LOG.debug("Local shard found {}", found.getPath());
266 return Optional.of(found.getPath());
269 return Optional.empty();
273 * Finds a local shard async given its shard name and return a Future from which to obtain the
276 * @param shardName the name of the local shard that needs to be found
278 public Future<ActorRef> findLocalShardAsync(final String shardName) {
279 Future<Object> future = executeOperationAsync(shardManager,
280 new FindLocalShard(shardName, true), shardInitializationTimeout);
282 return future.map(new Mapper<Object, ActorRef>() {
284 public ActorRef checkedApply(final Object response) throws Throwable {
285 if (response instanceof LocalShardFound) {
286 LocalShardFound found = (LocalShardFound)response;
287 LOG.debug("Local shard found {}", found.getPath());
288 return found.getPath();
289 } else if (response instanceof NotInitializedException) {
290 throw (NotInitializedException)response;
291 } else if (response instanceof LocalShardNotFound) {
292 throw new LocalShardNotFoundException(
293 String.format("Local shard for %s does not exist.", shardName));
296 throw new UnknownMessageException(String.format(
297 "FindLocalShard returned unkown response: %s", response));
299 }, getClientDispatcher());
303 * Executes an operation on a local actor and wait for it's response.
305 * @param actor the actor
306 * @param message the message to send
307 * @return The response of the operation
309 @SuppressWarnings("checkstyle:IllegalCatch")
310 public Object executeOperation(final ActorRef actor, final Object message) {
311 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
314 return Await.result(future, operationDuration);
315 } catch (Exception e) {
316 throw new TimeoutException("Sending message " + message.getClass().toString()
317 + " to actor " + actor.toString() + " failed. Try again later.", e);
322 * Execute an operation on a remote actor and wait for it's response.
324 * @param actor the actor
325 * @param message the message
326 * @return the response message
328 @SuppressWarnings("checkstyle:IllegalCatch")
329 public Object executeOperation(final ActorSelection actor, final Object message) {
330 Future<Object> future = executeOperationAsync(actor, message);
333 return Await.result(future, operationDuration);
334 } catch (Exception e) {
335 throw new TimeoutException("Sending message " + message.getClass().toString()
336 + " to actor " + actor.toString() + " failed. Try again later.", e);
340 public Future<Object> executeOperationAsync(final ActorRef actor, final Object message, final Timeout timeout) {
341 Preconditions.checkArgument(actor != null, "actor must not be null");
342 Preconditions.checkArgument(message != null, "message must not be null");
344 LOG.debug("Sending message {} to {}", message.getClass(), actor);
345 return doAsk(actor, message, timeout);
349 * Execute an operation on a remote actor asynchronously.
351 * @param actor the ActorSelection
352 * @param message the message to send
353 * @param timeout the operation timeout
354 * @return a Future containing the eventual result
356 public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message,
357 final Timeout timeout) {
358 Preconditions.checkArgument(actor != null, "actor must not be null");
359 Preconditions.checkArgument(message != null, "message must not be null");
361 LOG.debug("Sending message {} to {}", message.getClass(), actor);
363 return doAsk(actor, message, timeout);
367 * Execute an operation on a remote actor asynchronously.
369 * @param actor the ActorSelection
370 * @param message the message to send
371 * @return a Future containing the eventual result
373 public Future<Object> executeOperationAsync(final ActorSelection actor, final Object message) {
374 return executeOperationAsync(actor, message, operationTimeout);
378 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
379 * reply (essentially set and forget).
381 * @param actor the ActorSelection
382 * @param message the message to send
384 public void sendOperationAsync(final ActorSelection actor, final Object message) {
385 Preconditions.checkArgument(actor != null, "actor must not be null");
386 Preconditions.checkArgument(message != null, "message must not be null");
388 LOG.debug("Sending message {} to {}", message.getClass(), actor);
390 actor.tell(message, ActorRef.noSender());
393 @SuppressWarnings("checkstyle:IllegalCatch")
394 public void shutdown() {
395 FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
397 Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
398 } catch (Exception e) {
399 LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
403 public ClusterWrapper getClusterWrapper() {
404 return clusterWrapper;
407 public MemberName getCurrentMemberName() {
408 return clusterWrapper.getCurrentMemberName();
412 * Send the message to each and every shard.
414 public void broadcast(final Function<Short, Object> messageSupplier, final Class<?> messageClass) {
415 for (final String shardName : configuration.getAllShardNames()) {
417 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
418 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
420 public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
421 if (failure != null) {
422 LOG.warn("broadcast failed to send message {} to shard {}", messageClass.getSimpleName(),
425 Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
426 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
429 }, getClientDispatcher());
433 public FiniteDuration getOperationDuration() {
434 return operationDuration;
437 public Timeout getOperationTimeout() {
438 return operationTimeout;
441 public boolean isPathLocal(final String path) {
442 if (Strings.isNullOrEmpty(path)) {
446 int pathAtIndex = path.indexOf('@');
447 if (pathAtIndex == -1) {
448 //if the path is of local format, then its local and is co-located
451 } else if (selfAddressHostPort != null) {
452 // self-address and tx actor path, both are of remote path format
453 int slashIndex = path.indexOf('/', pathAtIndex);
455 if (slashIndex == -1) {
459 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
460 return hostPort.equals(selfAddressHostPort);
463 // self address is local format and tx actor path is remote format
469 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
470 * us to create a timer for pretty much anything.
472 * @param operationName the name of the operation
473 * @return the Timer instance
475 public Timer getOperationTimer(final String operationName) {
476 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
479 public Timer getOperationTimer(final String dataStoreType, final String operationName) {
480 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
481 operationName, METRIC_RATE);
482 return metricRegistry.timer(rate);
486 * Get the name of the data store to which this ActorContext belongs.
488 * @return the data store name
490 public String getDataStoreName() {
491 return datastoreContext.getDataStoreName();
495 * Get the current transaction creation rate limit.
497 * @return the rate limit
499 public double getTxCreationLimit() {
500 return txRateLimiter.getTxCreationLimit();
504 * Try to acquire a transaction creation permit. Will block if no permits are available.
506 public void acquireTxCreationPermit() {
507 txRateLimiter.acquire();
511 * Returns the operation timeout to be used when committing transactions.
513 * @return the operation timeout
515 public Timeout getTransactionCommitOperationTimeout() {
516 return transactionCommitOperationTimeout;
520 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
521 * code on the datastore.
523 * @return the dispatcher
525 public ExecutionContext getClientDispatcher() {
526 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
529 public String getNotificationDispatcherPath() {
530 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
533 public Configuration getConfiguration() {
534 return configuration;
537 public ShardStrategyFactory getShardStrategyFactory() {
538 return shardStrategyFactory;
541 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
542 return ask(actorRef, message, timeout);
545 protected Future<Object> doAsk(final ActorSelection actorRef, final Object message, final Timeout timeout) {
546 return ask(actorRef, message, timeout);
549 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
550 return primaryShardInfoCache;