2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Futures;
19 import akka.dispatch.Mapper;
20 import akka.dispatch.OnComplete;
21 import akka.pattern.AskTimeoutException;
22 import akka.util.Timeout;
23 import com.codahale.metrics.JmxReporter;
24 import com.codahale.metrics.MetricRegistry;
25 import com.codahale.metrics.Timer;
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Strings;
30 import com.google.common.cache.Cache;
31 import com.google.common.cache.CacheBuilder;
32 import com.google.common.util.concurrent.RateLimiter;
33 import java.util.concurrent.TimeUnit;
34 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
35 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
36 import org.opendaylight.controller.cluster.datastore.Configuration;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
44 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
45 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
46 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
49 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
50 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
51 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import scala.concurrent.Await;
56 import scala.concurrent.ExecutionContext;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
62 * The ActorContext class contains utility methods which could be used by
63 * non-actors (like DistributedDataStore) to work with actors a little more
64 * easily. An ActorContext can be freely passed around to local object instances
65 * but should not be passed to actors especially remote actors
67 public class ActorContext {
68 private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
69 private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
70 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
71 private static final String METRIC_RATE = "rate";
72 private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
73 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
74 new Mapper<Throwable, Throwable>() {
76 public Throwable apply(Throwable failure) {
77 Throwable actualFailure = failure;
78 if(failure instanceof AskTimeoutException) {
79 // A timeout exception most likely means the shard isn't initialized.
80 actualFailure = new NotInitializedException(
81 "Timed out trying to find the primary shard. Most likely cause is the " +
82 "shard is not initialized yet.");
88 public static final String MAILBOX = "bounded-mailbox";
90 private final ActorSystem actorSystem;
91 private final ActorRef shardManager;
92 private final ClusterWrapper clusterWrapper;
93 private final Configuration configuration;
94 private DatastoreContext datastoreContext;
95 private FiniteDuration operationDuration;
96 private Timeout operationTimeout;
97 private final String selfAddressHostPort;
98 private RateLimiter txRateLimiter;
99 private final MetricRegistry metricRegistry = new MetricRegistry();
100 private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
101 private final int transactionOutstandingOperationLimit;
102 private Timeout transactionCommitOperationTimeout;
103 private Timeout shardInitializationTimeout;
104 private final Dispatchers dispatchers;
105 private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
107 private volatile SchemaContext schemaContext;
108 private volatile boolean updated;
110 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
111 ClusterWrapper clusterWrapper, Configuration configuration) {
112 this(actorSystem, shardManager, clusterWrapper, configuration,
113 DatastoreContext.newBuilder().build());
116 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
117 ClusterWrapper clusterWrapper, Configuration configuration,
118 DatastoreContext datastoreContext) {
119 this.actorSystem = actorSystem;
120 this.shardManager = shardManager;
121 this.clusterWrapper = clusterWrapper;
122 this.configuration = configuration;
123 this.datastoreContext = datastoreContext;
124 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
126 setCachedProperties();
128 Address selfAddress = clusterWrapper.getSelfAddress();
129 if (selfAddress != null && !selfAddress.host().isEmpty()) {
130 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
132 selfAddressHostPort = null;
135 transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
140 private void setCachedProperties() {
141 txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
143 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
144 operationTimeout = new Timeout(operationDuration);
146 transactionCommitOperationTimeout = new Timeout(Duration.create(
147 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
149 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
151 primaryShardActorSelectionCache = CacheBuilder.newBuilder()
152 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
156 public DatastoreContext getDatastoreContext() {
157 return datastoreContext;
160 public ActorSystem getActorSystem() {
164 public ActorRef getShardManager() {
168 public ActorSelection actorSelection(String actorPath) {
169 return actorSystem.actorSelection(actorPath);
172 public ActorSelection actorSelection(ActorPath actorPath) {
173 return actorSystem.actorSelection(actorPath);
176 public void setSchemaContext(SchemaContext schemaContext) {
177 this.schemaContext = schemaContext;
179 if(shardManager != null) {
180 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
184 public void setDatastoreContext(DatastoreContext context) {
185 this.datastoreContext = context;
186 setCachedProperties();
188 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
189 // will be published immediately even though they may not be immediately visible to other
190 // threads due to unsynchronized reads. That's OK though - we're going for eventual
191 // consistency here as immediately visible updates to these members aren't critical. These
192 // members could've been made volatile but wanted to avoid volatile reads as these are
193 // accessed often and updates will be infrequent.
197 if(shardManager != null) {
198 shardManager.tell(context, ActorRef.noSender());
202 public SchemaContext getSchemaContext() {
203 return schemaContext;
206 public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
207 Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
211 Future<Object> future = executeOperationAsync(shardManager,
212 new FindPrimary(shardName, true).toSerializable(), shardInitializationTimeout);
214 return future.transform(new Mapper<Object, ActorSelection>() {
216 public ActorSelection checkedApply(Object response) throws Exception {
217 if(PrimaryFound.SERIALIZABLE_CLASS.isInstance(response)) {
218 PrimaryFound found = PrimaryFound.fromSerializable(response);
220 LOG.debug("Primary found {}", found.getPrimaryPath());
221 ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
222 primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
223 return actorSelection;
224 } else if(response instanceof ActorNotInitialized) {
225 throw new NotInitializedException(
226 String.format("Found primary shard %s but it's not initialized yet. " +
227 "Please try again later", shardName));
228 } else if(response instanceof PrimaryNotFound) {
229 throw new PrimaryNotFoundException(
230 String.format("No primary shard found for %S.", shardName));
231 } else if(response instanceof NoShardLeaderException) {
232 throw (NoShardLeaderException)response;
235 throw new UnknownMessageException(String.format(
236 "FindPrimary returned unkown response: %s", response));
238 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
242 * Finds a local shard given its shard name and return it's ActorRef
244 * @param shardName the name of the local shard that needs to be found
245 * @return a reference to a local shard actor which represents the shard
246 * specified by the shardName
248 public Optional<ActorRef> findLocalShard(String shardName) {
249 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
251 if (result instanceof LocalShardFound) {
252 LocalShardFound found = (LocalShardFound) result;
253 LOG.debug("Local shard found {}", found.getPath());
254 return Optional.of(found.getPath());
257 return Optional.absent();
261 * Finds a local shard async given its shard name and return a Future from which to obtain the
264 * @param shardName the name of the local shard that needs to be found
266 public Future<ActorRef> findLocalShardAsync( final String shardName) {
267 Future<Object> future = executeOperationAsync(shardManager,
268 new FindLocalShard(shardName, true), shardInitializationTimeout);
270 return future.map(new Mapper<Object, ActorRef>() {
272 public ActorRef checkedApply(Object response) throws Throwable {
273 if(response instanceof LocalShardFound) {
274 LocalShardFound found = (LocalShardFound)response;
275 LOG.debug("Local shard found {}", found.getPath());
276 return found.getPath();
277 } else if(response instanceof ActorNotInitialized) {
278 throw new NotInitializedException(
279 String.format("Found local shard for %s but it's not initialized yet.",
281 } else if(response instanceof LocalShardNotFound) {
282 throw new LocalShardNotFoundException(
283 String.format("Local shard for %s does not exist.", shardName));
286 throw new UnknownMessageException(String.format(
287 "FindLocalShard returned unkown response: %s", response));
289 }, getClientDispatcher());
293 * Executes an operation on a local actor and wait for it's response
297 * @return The response of the operation
299 public Object executeOperation(ActorRef actor, Object message) {
300 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
303 return Await.result(future, operationDuration);
304 } catch (Exception e) {
305 throw new TimeoutException("Sending message " + message.getClass().toString() +
306 " to actor " + actor.toString() + " failed. Try again later.", e);
310 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
311 Preconditions.checkArgument(actor != null, "actor must not be null");
312 Preconditions.checkArgument(message != null, "message must not be null");
314 LOG.debug("Sending message {} to {}", message.getClass(), actor);
315 return doAsk(actor, message, timeout);
319 * Execute an operation on a remote actor and wait for it's response
325 public Object executeOperation(ActorSelection actor, Object message) {
326 Future<Object> future = executeOperationAsync(actor, message);
329 return Await.result(future, operationDuration);
330 } catch (Exception e) {
331 throw new TimeoutException("Sending message " + message.getClass().toString() +
332 " to actor " + actor.toString() + " failed. Try again later.", e);
337 * Execute an operation on a remote actor asynchronously.
339 * @param actor the ActorSelection
340 * @param message the message to send
341 * @param timeout the operation timeout
342 * @return a Future containing the eventual result
344 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
346 Preconditions.checkArgument(actor != null, "actor must not be null");
347 Preconditions.checkArgument(message != null, "message must not be null");
349 LOG.debug("Sending message {} to {}", message.getClass(), actor);
351 return doAsk(actor, message, timeout);
355 * Execute an operation on a remote actor asynchronously.
357 * @param actor the ActorSelection
358 * @param message the message to send
359 * @return a Future containing the eventual result
361 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
362 return executeOperationAsync(actor, message, operationTimeout);
366 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
367 * reply (essentially set and forget).
369 * @param actor the ActorSelection
370 * @param message the message to send
372 public void sendOperationAsync(ActorSelection actor, Object message) {
373 Preconditions.checkArgument(actor != null, "actor must not be null");
374 Preconditions.checkArgument(message != null, "message must not be null");
376 LOG.debug("Sending message {} to {}", message.getClass(), actor);
378 actor.tell(message, ActorRef.noSender());
381 public void shutdown() {
382 shardManager.tell(PoisonPill.getInstance(), null);
383 actorSystem.shutdown();
386 public ClusterWrapper getClusterWrapper() {
387 return clusterWrapper;
390 public String getCurrentMemberName(){
391 return clusterWrapper.getCurrentMemberName();
395 * Send the message to each and every shard
399 public void broadcast(final Object message){
400 for(final String shardName : configuration.getAllShardNames()){
402 Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
403 primaryFuture.onComplete(new OnComplete<ActorSelection>() {
405 public void onComplete(Throwable failure, ActorSelection primaryShard) {
406 if(failure != null) {
407 LOG.warn("broadcast failed to send message {} to shard {}: {}",
408 message.getClass().getSimpleName(), shardName, failure);
410 primaryShard.tell(message, ActorRef.noSender());
413 }, getClientDispatcher());
417 public FiniteDuration getOperationDuration() {
418 return operationDuration;
421 public boolean isPathLocal(String path) {
422 if (Strings.isNullOrEmpty(path)) {
426 int pathAtIndex = path.indexOf('@');
427 if (pathAtIndex == -1) {
428 //if the path is of local format, then its local and is co-located
431 } else if (selfAddressHostPort != null) {
432 // self-address and tx actor path, both are of remote path format
433 int slashIndex = path.indexOf('/', pathAtIndex);
435 if (slashIndex == -1) {
439 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
440 return hostPort.equals(selfAddressHostPort);
443 // self address is local format and tx actor path is remote format
449 * @deprecated This method is present only to support backward compatibility with Helium and should not be
454 * @param localPathOfRemoteActor
458 public String resolvePath(final String primaryPath,
459 final String localPathOfRemoteActor) {
460 StringBuilder builder = new StringBuilder();
461 String[] primaryPathElements = primaryPath.split("/");
462 builder.append(primaryPathElements[0]).append("//")
463 .append(primaryPathElements[1]).append(primaryPathElements[2]);
464 String[] remotePathElements = localPathOfRemoteActor.split("/");
465 for (int i = 3; i < remotePathElements.length; i++) {
466 builder.append("/").append(remotePathElements[i]);
469 return builder.toString();
473 * Get the maximum number of operations that are to be permitted within a transaction before the transaction
474 * should begin throttling the operations
476 * Parking reading this configuration here because we need to get to the actor system settings
480 public int getTransactionOutstandingOperationLimit(){
481 return transactionOutstandingOperationLimit;
485 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
486 * us to create a timer for pretty much anything.
488 * @param operationName
491 public Timer getOperationTimer(String operationName){
492 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
493 return metricRegistry.timer(rate);
497 * Get the type of the data store to which this ActorContext belongs
501 public String getDataStoreType() {
502 return datastoreContext.getDataStoreType();
506 * Set the number of transaction creation permits that are to be allowed
508 * @param permitsPerSecond
510 public void setTxCreationLimit(double permitsPerSecond){
511 txRateLimiter.setRate(permitsPerSecond);
515 * Get the current transaction creation rate limit
518 public double getTxCreationLimit(){
519 return txRateLimiter.getRate();
523 * Try to acquire a transaction creation permit. Will block if no permits are available.
525 public void acquireTxCreationPermit(){
526 txRateLimiter.acquire();
530 * Return the operation timeout to be used when committing transactions
533 public Timeout getTransactionCommitOperationTimeout(){
534 return transactionCommitOperationTimeout;
538 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
539 * code on the datastore
542 public ExecutionContext getClientDispatcher() {
543 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
546 public String getNotificationDispatcherPath(){
547 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
550 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
551 return ask(actorRef, message, timeout);
554 protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
555 return ask(actorRef, message, timeout);
559 Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
560 return primaryShardActorSelectionCache;