2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
29 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
32 import org.opendaylight.controller.cluster.datastore.config.Configuration;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
45 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
47 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
48 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
49 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
50 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.Await;
54 import scala.concurrent.ExecutionContext;
55 import scala.concurrent.Future;
56 import scala.concurrent.duration.Duration;
57 import scala.concurrent.duration.FiniteDuration;
60 * The ActorContext class contains utility methods which could be used by
61 * non-actors (like DistributedDataStore) to work with actors a little more
62 * easily. An ActorContext can be freely passed around to local object instances
63 * but should not be passed to actors especially remote actors
65 public class ActorContext {
66 private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
67 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
68 private static final String METRIC_RATE = "rate";
69 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
70 new Mapper<Throwable, Throwable>() {
72 public Throwable apply(Throwable failure) {
73 Throwable actualFailure = failure;
74 if(failure instanceof AskTimeoutException) {
75 // A timeout exception most likely means the shard isn't initialized.
76 actualFailure = new NotInitializedException(
77 "Timed out trying to find the primary shard. Most likely cause is the " +
78 "shard is not initialized yet.");
84 public static final String MAILBOX = "bounded-mailbox";
85 public static final String COMMIT = "commit";
87 private final ActorSystem actorSystem;
88 private final ActorRef shardManager;
89 private final ClusterWrapper clusterWrapper;
90 private final Configuration configuration;
91 private DatastoreContext datastoreContext;
92 private FiniteDuration operationDuration;
93 private Timeout operationTimeout;
94 private final String selfAddressHostPort;
95 private TransactionRateLimiter txRateLimiter;
96 private Timeout transactionCommitOperationTimeout;
97 private Timeout shardInitializationTimeout;
98 private final Dispatchers dispatchers;
100 private volatile SchemaContext schemaContext;
101 private volatile boolean updated;
102 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
104 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
105 private final ShardStrategyFactory shardStrategyFactory;
107 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
108 ClusterWrapper clusterWrapper, Configuration configuration) {
109 this(actorSystem, shardManager, clusterWrapper, configuration,
110 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
113 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
114 ClusterWrapper clusterWrapper, Configuration configuration,
115 DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
116 this.actorSystem = actorSystem;
117 this.shardManager = shardManager;
118 this.clusterWrapper = clusterWrapper;
119 this.configuration = configuration;
120 this.datastoreContext = datastoreContext;
121 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
122 this.primaryShardInfoCache = primaryShardInfoCache;
123 this.shardStrategyFactory = new ShardStrategyFactory(configuration);
125 setCachedProperties();
127 Address selfAddress = clusterWrapper.getSelfAddress();
128 if (selfAddress != null && !selfAddress.host().isEmpty()) {
129 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
131 selfAddressHostPort = null;
136 private void setCachedProperties() {
137 txRateLimiter = new TransactionRateLimiter(this);
139 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
140 operationTimeout = new Timeout(operationDuration);
142 transactionCommitOperationTimeout = new Timeout(Duration.create(
143 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
145 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
148 public DatastoreContext getDatastoreContext() {
149 return datastoreContext;
152 public ActorSystem getActorSystem() {
156 public ActorRef getShardManager() {
160 public ActorSelection actorSelection(String actorPath) {
161 return actorSystem.actorSelection(actorPath);
164 public ActorSelection actorSelection(ActorPath actorPath) {
165 return actorSystem.actorSelection(actorPath);
168 public void setSchemaContext(SchemaContext schemaContext) {
169 this.schemaContext = schemaContext;
171 if(shardManager != null) {
172 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
176 public void setDatastoreContext(DatastoreContextFactory contextFactory) {
177 this.datastoreContext = contextFactory.getBaseDatastoreContext();
178 setCachedProperties();
180 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
181 // will be published immediately even though they may not be immediately visible to other
182 // threads due to unsynchronized reads. That's OK though - we're going for eventual
183 // consistency here as immediately visible updates to these members aren't critical. These
184 // members could've been made volatile but wanted to avoid volatile reads as these are
185 // accessed often and updates will be infrequent.
189 if(shardManager != null) {
190 shardManager.tell(contextFactory, ActorRef.noSender());
194 public SchemaContext getSchemaContext() {
195 return schemaContext;
198 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
199 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
203 Future<Object> future = executeOperationAsync(shardManager,
204 new FindPrimary(shardName, true), shardInitializationTimeout);
206 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
208 public PrimaryShardInfo checkedApply(Object response) throws Exception {
209 if(response instanceof RemotePrimaryShardFound) {
210 LOG.debug("findPrimaryShardAsync received: {}", response);
211 RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
212 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
213 } else if(response instanceof LocalPrimaryShardFound) {
214 LOG.debug("findPrimaryShardAsync received: {}", response);
215 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
216 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
217 found.getLocalShardDataTree());
218 } else if(response instanceof NotInitializedException) {
219 throw (NotInitializedException)response;
220 } else if(response instanceof PrimaryNotFoundException) {
221 throw (PrimaryNotFoundException)response;
222 } else if(response instanceof NoShardLeaderException) {
223 throw (NoShardLeaderException)response;
226 throw new UnknownMessageException(String.format(
227 "FindPrimary returned unkown response: %s", response));
229 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
232 private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
233 short primaryVersion, DataTree localShardDataTree) {
234 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
235 PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, primaryVersion,
236 Optional.fromNullable(localShardDataTree));
237 primaryShardInfoCache.putSuccessful(shardName, info);
242 * Finds a local shard given its shard name and return it's ActorRef
244 * @param shardName the name of the local shard that needs to be found
245 * @return a reference to a local shard actor which represents the shard
246 * specified by the shardName
248 public Optional<ActorRef> findLocalShard(String shardName) {
249 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
251 if (result instanceof LocalShardFound) {
252 LocalShardFound found = (LocalShardFound) result;
253 LOG.debug("Local shard found {}", found.getPath());
254 return Optional.of(found.getPath());
257 return Optional.absent();
261 * Finds a local shard async given its shard name and return a Future from which to obtain the
264 * @param shardName the name of the local shard that needs to be found
266 public Future<ActorRef> findLocalShardAsync( final String shardName) {
267 Future<Object> future = executeOperationAsync(shardManager,
268 new FindLocalShard(shardName, true), shardInitializationTimeout);
270 return future.map(new Mapper<Object, ActorRef>() {
272 public ActorRef checkedApply(Object response) throws Throwable {
273 if(response instanceof LocalShardFound) {
274 LocalShardFound found = (LocalShardFound)response;
275 LOG.debug("Local shard found {}", found.getPath());
276 return found.getPath();
277 } else if(response instanceof NotInitializedException) {
278 throw (NotInitializedException)response;
279 } else if(response instanceof LocalShardNotFound) {
280 throw new LocalShardNotFoundException(
281 String.format("Local shard for %s does not exist.", shardName));
284 throw new UnknownMessageException(String.format(
285 "FindLocalShard returned unkown response: %s", response));
287 }, getClientDispatcher());
291 * Executes an operation on a local actor and wait for it's response
295 * @return The response of the operation
297 public Object executeOperation(ActorRef actor, Object message) {
298 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
301 return Await.result(future, operationDuration);
302 } catch (Exception e) {
303 throw new TimeoutException("Sending message " + message.getClass().toString() +
304 " to actor " + actor.toString() + " failed. Try again later.", e);
308 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
309 Preconditions.checkArgument(actor != null, "actor must not be null");
310 Preconditions.checkArgument(message != null, "message must not be null");
312 LOG.debug("Sending message {} to {}", message.getClass(), actor);
313 return doAsk(actor, message, timeout);
317 * Execute an operation on a remote actor and wait for it's response
323 public Object executeOperation(ActorSelection actor, Object message) {
324 Future<Object> future = executeOperationAsync(actor, message);
327 return Await.result(future, operationDuration);
328 } catch (Exception e) {
329 throw new TimeoutException("Sending message " + message.getClass().toString() +
330 " to actor " + actor.toString() + " failed. Try again later.", e);
335 * Execute an operation on a remote actor asynchronously.
337 * @param actor the ActorSelection
338 * @param message the message to send
339 * @param timeout the operation timeout
340 * @return a Future containing the eventual result
342 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
344 Preconditions.checkArgument(actor != null, "actor must not be null");
345 Preconditions.checkArgument(message != null, "message must not be null");
347 LOG.debug("Sending message {} to {}", message.getClass(), actor);
349 return doAsk(actor, message, timeout);
353 * Execute an operation on a remote actor asynchronously.
355 * @param actor the ActorSelection
356 * @param message the message to send
357 * @return a Future containing the eventual result
359 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
360 return executeOperationAsync(actor, message, operationTimeout);
364 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
365 * reply (essentially set and forget).
367 * @param actor the ActorSelection
368 * @param message the message to send
370 public void sendOperationAsync(ActorSelection actor, Object message) {
371 Preconditions.checkArgument(actor != null, "actor must not be null");
372 Preconditions.checkArgument(message != null, "message must not be null");
374 LOG.debug("Sending message {} to {}", message.getClass(), actor);
376 actor.tell(message, ActorRef.noSender());
379 public void shutdown() {
380 shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
383 public ClusterWrapper getClusterWrapper() {
384 return clusterWrapper;
387 public String getCurrentMemberName(){
388 return clusterWrapper.getCurrentMemberName();
392 * Send the message to each and every shard
396 public void broadcast(final Object message){
397 for(final String shardName : configuration.getAllShardNames()){
399 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
400 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
402 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
403 if(failure != null) {
404 LOG.warn("broadcast failed to send message {} to shard {}: {}",
405 message.getClass().getSimpleName(), shardName, failure);
407 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
410 }, getClientDispatcher());
414 public FiniteDuration getOperationDuration() {
415 return operationDuration;
418 public Timeout getOperationTimeout() {
419 return operationTimeout;
422 public boolean isPathLocal(String path) {
423 if (Strings.isNullOrEmpty(path)) {
427 int pathAtIndex = path.indexOf('@');
428 if (pathAtIndex == -1) {
429 //if the path is of local format, then its local and is co-located
432 } else if (selfAddressHostPort != null) {
433 // self-address and tx actor path, both are of remote path format
434 int slashIndex = path.indexOf('/', pathAtIndex);
436 if (slashIndex == -1) {
440 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
441 return hostPort.equals(selfAddressHostPort);
444 // self address is local format and tx actor path is remote format
450 * @deprecated This method is present only to support backward compatibility with Helium and should not be
455 * @param localPathOfRemoteActor
459 public String resolvePath(final String primaryPath,
460 final String localPathOfRemoteActor) {
461 StringBuilder builder = new StringBuilder();
462 String[] primaryPathElements = primaryPath.split("/");
463 builder.append(primaryPathElements[0]).append("//")
464 .append(primaryPathElements[1]).append(primaryPathElements[2]);
465 String[] remotePathElements = localPathOfRemoteActor.split("/");
466 for (int i = 3; i < remotePathElements.length; i++) {
467 builder.append("/").append(remotePathElements[i]);
470 return builder.toString();
474 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
475 * us to create a timer for pretty much anything.
477 * @param operationName
480 public Timer getOperationTimer(String operationName){
481 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
484 public Timer getOperationTimer(String dataStoreType, String operationName){
485 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
486 operationName, METRIC_RATE);
487 return metricRegistry.timer(rate);
491 * Get the name of the data store to which this ActorContext belongs
495 public String getDataStoreName() {
496 return datastoreContext.getDataStoreName();
500 * Get the type of the data store to which this ActorContext belongs
503 * @deprecated Use {@link #getDataStoreName()} instead.
505 public String getDataStoreType() {
506 return datastoreContext.getDataStoreName();
510 * Get the current transaction creation rate limit
513 public double getTxCreationLimit(){
514 return txRateLimiter.getTxCreationLimit();
518 * Try to acquire a transaction creation permit. Will block if no permits are available.
520 public void acquireTxCreationPermit(){
521 txRateLimiter.acquire();
525 * Return the operation timeout to be used when committing transactions
528 public Timeout getTransactionCommitOperationTimeout(){
529 return transactionCommitOperationTimeout;
533 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
534 * code on the datastore
537 public ExecutionContext getClientDispatcher() {
538 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
541 public String getNotificationDispatcherPath(){
542 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
545 public Configuration getConfiguration() {
546 return configuration;
549 public ShardStrategyFactory getShardStrategyFactory() {
550 return shardStrategyFactory;
553 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
554 return ask(actorRef, message, timeout);
557 protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
558 return ask(actorRef, message, timeout);
561 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
562 return primaryShardInfoCache;