2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
13 import akka.actor.ActorPath;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.pattern.Patterns;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Strings;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
32 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
35 import org.opendaylight.controller.cluster.datastore.config.Configuration;
36 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
42 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
43 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
48 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
49 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
50 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
51 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
52 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
53 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
54 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58 import scala.concurrent.Await;
59 import scala.concurrent.ExecutionContext;
60 import scala.concurrent.Future;
61 import scala.concurrent.duration.Duration;
62 import scala.concurrent.duration.FiniteDuration;
65 * The ActorContext class contains utility methods which could be used by
66 * non-actors (like DistributedDataStore) to work with actors a little more
67 * easily. An ActorContext can be freely passed around to local object instances
68 * but should not be passed to actors especially remote actors
70 public class ActorContext {
71 private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
72 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
73 private static final String METRIC_RATE = "rate";
74 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
75 new Mapper<Throwable, Throwable>() {
77 public Throwable apply(Throwable failure) {
78 Throwable actualFailure = failure;
79 if (failure instanceof AskTimeoutException) {
80 // A timeout exception most likely means the shard isn't initialized.
81 actualFailure = new NotInitializedException(
82 "Timed out trying to find the primary shard. Most likely cause is the "
83 + "shard is not initialized yet.");
89 public static final String BOUNDED_MAILBOX = "bounded-mailbox";
90 public static final String COMMIT = "commit";
92 private final ActorSystem actorSystem;
93 private final ActorRef shardManager;
94 private final ClusterWrapper clusterWrapper;
95 private final Configuration configuration;
96 private DatastoreContext datastoreContext;
97 private FiniteDuration operationDuration;
98 private Timeout operationTimeout;
99 private final String selfAddressHostPort;
100 private TransactionRateLimiter txRateLimiter;
101 private Timeout transactionCommitOperationTimeout;
102 private Timeout shardInitializationTimeout;
103 private final Dispatchers dispatchers;
105 private volatile SchemaContext schemaContext;
107 // Used as a write memory barrier.
108 @SuppressWarnings("unused")
109 private volatile boolean updated;
111 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
112 .getMetricsRegistry();
114 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
115 private final ShardStrategyFactory shardStrategyFactory;
117 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
118 ClusterWrapper clusterWrapper, Configuration configuration) {
119 this(actorSystem, shardManager, clusterWrapper, configuration,
120 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
123 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
124 ClusterWrapper clusterWrapper, Configuration configuration,
125 DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
126 this.actorSystem = actorSystem;
127 this.shardManager = shardManager;
128 this.clusterWrapper = clusterWrapper;
129 this.configuration = configuration;
130 this.datastoreContext = datastoreContext;
131 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
132 this.primaryShardInfoCache = primaryShardInfoCache;
134 final LogicalDatastoreType convertedType =
135 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
136 this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
138 setCachedProperties();
140 Address selfAddress = clusterWrapper.getSelfAddress();
141 if (selfAddress != null && !selfAddress.host().isEmpty()) {
142 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
144 selfAddressHostPort = null;
149 private void setCachedProperties() {
150 txRateLimiter = new TransactionRateLimiter(this);
152 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
153 operationTimeout = new Timeout(operationDuration);
155 transactionCommitOperationTimeout = new Timeout(Duration.create(
156 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
158 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
161 public DatastoreContext getDatastoreContext() {
162 return datastoreContext;
165 public ActorSystem getActorSystem() {
169 public ActorRef getShardManager() {
173 public ActorSelection actorSelection(String actorPath) {
174 return actorSystem.actorSelection(actorPath);
177 public ActorSelection actorSelection(ActorPath actorPath) {
178 return actorSystem.actorSelection(actorPath);
181 public void setSchemaContext(SchemaContext schemaContext) {
182 this.schemaContext = schemaContext;
184 if (shardManager != null) {
185 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
189 public void setDatastoreContext(DatastoreContextFactory contextFactory) {
190 this.datastoreContext = contextFactory.getBaseDatastoreContext();
191 setCachedProperties();
193 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
194 // will be published immediately even though they may not be immediately visible to other
195 // threads due to unsynchronized reads. That's OK though - we're going for eventual
196 // consistency here as immediately visible updates to these members aren't critical. These
197 // members could've been made volatile but wanted to avoid volatile reads as these are
198 // accessed often and updates will be infrequent.
202 if (shardManager != null) {
203 shardManager.tell(contextFactory, ActorRef.noSender());
207 public SchemaContext getSchemaContext() {
208 return schemaContext;
211 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
212 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
216 Future<Object> future = executeOperationAsync(shardManager,
217 new FindPrimary(shardName, true), shardInitializationTimeout);
219 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
221 public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
222 if (response instanceof RemotePrimaryShardFound) {
223 LOG.debug("findPrimaryShardAsync received: {}", response);
224 RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
225 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
226 } else if (response instanceof LocalPrimaryShardFound) {
227 LOG.debug("findPrimaryShardAsync received: {}", response);
228 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
229 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
230 found.getLocalShardDataTree());
231 } else if (response instanceof NotInitializedException) {
232 throw (NotInitializedException)response;
233 } else if (response instanceof PrimaryNotFoundException) {
234 throw (PrimaryNotFoundException)response;
235 } else if (response instanceof NoShardLeaderException) {
236 throw (NoShardLeaderException)response;
239 throw new UnknownMessageException(String.format(
240 "FindPrimary returned unkown response: %s", response));
242 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
245 private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
246 short primaryVersion, DataTree localShardDataTree) {
247 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
248 PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
249 new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
250 primaryShardInfoCache.putSuccessful(shardName, info);
255 * Finds a local shard given its shard name and return it's ActorRef.
257 * @param shardName the name of the local shard that needs to be found
258 * @return a reference to a local shard actor which represents the shard
259 * specified by the shardName
261 public Optional<ActorRef> findLocalShard(String shardName) {
262 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
264 if (result instanceof LocalShardFound) {
265 LocalShardFound found = (LocalShardFound) result;
266 LOG.debug("Local shard found {}", found.getPath());
267 return Optional.of(found.getPath());
270 return Optional.absent();
274 * Finds a local shard async given its shard name and return a Future from which to obtain the
277 * @param shardName the name of the local shard that needs to be found
279 public Future<ActorRef> findLocalShardAsync(final String shardName) {
280 Future<Object> future = executeOperationAsync(shardManager,
281 new FindLocalShard(shardName, true), shardInitializationTimeout);
283 return future.map(new Mapper<Object, ActorRef>() {
285 public ActorRef checkedApply(Object response) throws Throwable {
286 if (response instanceof LocalShardFound) {
287 LocalShardFound found = (LocalShardFound)response;
288 LOG.debug("Local shard found {}", found.getPath());
289 return found.getPath();
290 } else if (response instanceof NotInitializedException) {
291 throw (NotInitializedException)response;
292 } else if (response instanceof LocalShardNotFound) {
293 throw new LocalShardNotFoundException(
294 String.format("Local shard for %s does not exist.", shardName));
297 throw new UnknownMessageException(String.format(
298 "FindLocalShard returned unkown response: %s", response));
300 }, getClientDispatcher());
304 * Executes an operation on a local actor and wait for it's response.
306 * @param actor the actor
307 * @param message the message to send
308 * @return The response of the operation
310 @SuppressWarnings("checkstyle:IllegalCatch")
311 public Object executeOperation(ActorRef actor, Object message) {
312 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
315 return Await.result(future, operationDuration);
316 } catch (Exception e) {
317 throw new TimeoutException("Sending message " + message.getClass().toString()
318 + " to actor " + actor.toString() + " failed. Try again later.", e);
323 * Execute an operation on a remote actor and wait for it's response.
325 * @param actor the actor
326 * @param message the message
327 * @return the response message
329 @SuppressWarnings("checkstyle:IllegalCatch")
330 public Object executeOperation(ActorSelection actor, Object message) {
331 Future<Object> future = executeOperationAsync(actor, message);
334 return Await.result(future, operationDuration);
335 } catch (Exception e) {
336 throw new TimeoutException("Sending message " + message.getClass().toString()
337 + " to actor " + actor.toString() + " failed. Try again later.", e);
341 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
342 Preconditions.checkArgument(actor != null, "actor must not be null");
343 Preconditions.checkArgument(message != null, "message must not be null");
345 LOG.debug("Sending message {} to {}", message.getClass(), actor);
346 return doAsk(actor, message, timeout);
350 * Execute an operation on a remote actor asynchronously.
352 * @param actor the ActorSelection
353 * @param message the message to send
354 * @param timeout the operation timeout
355 * @return a Future containing the eventual result
357 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
359 Preconditions.checkArgument(actor != null, "actor must not be null");
360 Preconditions.checkArgument(message != null, "message must not be null");
362 LOG.debug("Sending message {} to {}", message.getClass(), actor);
364 return doAsk(actor, message, timeout);
368 * Execute an operation on a remote actor asynchronously.
370 * @param actor the ActorSelection
371 * @param message the message to send
372 * @return a Future containing the eventual result
374 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
375 return executeOperationAsync(actor, message, operationTimeout);
379 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
380 * reply (essentially set and forget).
382 * @param actor the ActorSelection
383 * @param message the message to send
385 public void sendOperationAsync(ActorSelection actor, Object message) {
386 Preconditions.checkArgument(actor != null, "actor must not be null");
387 Preconditions.checkArgument(message != null, "message must not be null");
389 LOG.debug("Sending message {} to {}", message.getClass(), actor);
391 actor.tell(message, ActorRef.noSender());
394 @SuppressWarnings("checkstyle:IllegalCatch")
395 public void shutdown() {
396 FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
398 Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
399 } catch (Exception e) {
400 LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
404 public ClusterWrapper getClusterWrapper() {
405 return clusterWrapper;
408 public MemberName getCurrentMemberName() {
409 return clusterWrapper.getCurrentMemberName();
413 * Send the message to each and every shard.
415 public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass) {
416 for (final String shardName : configuration.getAllShardNames()) {
418 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
419 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
421 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
422 if (failure != null) {
423 LOG.warn("broadcast failed to send message {} to shard {}: {}",
424 messageClass.getSimpleName(), shardName, failure);
426 Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
427 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
430 }, getClientDispatcher());
434 public FiniteDuration getOperationDuration() {
435 return operationDuration;
438 public Timeout getOperationTimeout() {
439 return operationTimeout;
442 public boolean isPathLocal(String path) {
443 if (Strings.isNullOrEmpty(path)) {
447 int pathAtIndex = path.indexOf('@');
448 if (pathAtIndex == -1) {
449 //if the path is of local format, then its local and is co-located
452 } else if (selfAddressHostPort != null) {
453 // self-address and tx actor path, both are of remote path format
454 int slashIndex = path.indexOf('/', pathAtIndex);
456 if (slashIndex == -1) {
460 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
461 return hostPort.equals(selfAddressHostPort);
464 // self address is local format and tx actor path is remote format
470 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
471 * us to create a timer for pretty much anything.
473 * @param operationName the name of the operation
474 * @return the Timer instance
476 public Timer getOperationTimer(String operationName) {
477 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
480 public Timer getOperationTimer(String dataStoreType, String operationName) {
481 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
482 operationName, METRIC_RATE);
483 return metricRegistry.timer(rate);
487 * Get the name of the data store to which this ActorContext belongs.
489 * @return the data store name
491 public String getDataStoreName() {
492 return datastoreContext.getDataStoreName();
496 * Get the current transaction creation rate limit.
498 * @return the rate limit
500 public double getTxCreationLimit() {
501 return txRateLimiter.getTxCreationLimit();
505 * Try to acquire a transaction creation permit. Will block if no permits are available.
507 public void acquireTxCreationPermit() {
508 txRateLimiter.acquire();
512 * Returns the operation timeout to be used when committing transactions.
514 * @return the operation timeout
516 public Timeout getTransactionCommitOperationTimeout() {
517 return transactionCommitOperationTimeout;
521 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
522 * code on the datastore.
524 * @return the dispatcher
526 public ExecutionContext getClientDispatcher() {
527 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
530 public String getNotificationDispatcherPath() {
531 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
534 public Configuration getConfiguration() {
535 return configuration;
538 public ShardStrategyFactory getShardStrategyFactory() {
539 return shardStrategyFactory;
542 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
543 return ask(actorRef, message, timeout);
546 protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout) {
547 return ask(actorRef, message, timeout);
550 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
551 return primaryShardInfoCache;