2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Futures;
19 import akka.dispatch.Mapper;
20 import akka.dispatch.OnComplete;
21 import akka.pattern.AskTimeoutException;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Optional;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Strings;
29 import com.google.common.cache.Cache;
30 import com.google.common.cache.CacheBuilder;
31 import com.google.common.cache.RemovalListener;
32 import com.google.common.cache.RemovalNotification;
33 import java.util.ArrayList;
34 import java.util.Collection;
35 import java.util.concurrent.TimeUnit;
36 import javax.annotation.concurrent.GuardedBy;
37 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
38 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
39 import org.opendaylight.controller.cluster.datastore.Configuration;
40 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
41 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
44 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
46 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
47 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
48 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
49 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
51 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
52 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
53 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
55 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
56 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Await;
61 import scala.concurrent.ExecutionContext;
62 import scala.concurrent.Future;
63 import scala.concurrent.duration.Duration;
64 import scala.concurrent.duration.FiniteDuration;
67 * The ActorContext class contains utility methods which could be used by
68 * non-actors (like DistributedDataStore) to work with actors a little more
69 * easily. An ActorContext can be freely passed around to local object instances
70 * but should not be passed to actors especially remote actors
72 public class ActorContext implements RemovalListener<String, Future<PrimaryShardInfo>> {
73 private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
74 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
75 private static final String METRIC_RATE = "rate";
76 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
77 new Mapper<Throwable, Throwable>() {
79 public Throwable apply(Throwable failure) {
80 Throwable actualFailure = failure;
81 if(failure instanceof AskTimeoutException) {
82 // A timeout exception most likely means the shard isn't initialized.
83 actualFailure = new NotInitializedException(
84 "Timed out trying to find the primary shard. Most likely cause is the " +
85 "shard is not initialized yet.");
91 public static final String MAILBOX = "bounded-mailbox";
92 public static final String COMMIT = "commit";
94 private final ActorSystem actorSystem;
95 private final ActorRef shardManager;
96 private final ClusterWrapper clusterWrapper;
97 private final Configuration configuration;
98 private DatastoreContext datastoreContext;
99 private FiniteDuration operationDuration;
100 private Timeout operationTimeout;
101 private final String selfAddressHostPort;
102 private TransactionRateLimiter txRateLimiter;
103 private final int transactionOutstandingOperationLimit;
104 private Timeout transactionCommitOperationTimeout;
105 private Timeout shardInitializationTimeout;
106 private final Dispatchers dispatchers;
107 private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
109 private volatile SchemaContext schemaContext;
110 private volatile boolean updated;
111 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
112 @GuardedBy("shardInfoListeners")
113 private final Collection<ShardInfoListenerRegistration<?>> shardInfoListeners = new ArrayList<>();
115 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
116 ClusterWrapper clusterWrapper, Configuration configuration) {
117 this(actorSystem, shardManager, clusterWrapper, configuration,
118 DatastoreContext.newBuilder().build());
121 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
122 ClusterWrapper clusterWrapper, Configuration configuration,
123 DatastoreContext datastoreContext) {
124 this.actorSystem = actorSystem;
125 this.shardManager = shardManager;
126 this.clusterWrapper = clusterWrapper;
127 this.configuration = configuration;
128 this.datastoreContext = datastoreContext;
129 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
131 setCachedProperties();
133 Address selfAddress = clusterWrapper.getSelfAddress();
134 if (selfAddress != null && !selfAddress.host().isEmpty()) {
135 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
137 selfAddressHostPort = null;
140 transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
143 private void setCachedProperties() {
144 txRateLimiter = new TransactionRateLimiter(this);
146 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
147 operationTimeout = new Timeout(operationDuration);
149 transactionCommitOperationTimeout = new Timeout(Duration.create(
150 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
152 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
154 primaryShardInfoCache = CacheBuilder.newBuilder()
155 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
156 .removalListener(this)
160 public DatastoreContext getDatastoreContext() {
161 return datastoreContext;
164 public ActorSystem getActorSystem() {
168 public ActorRef getShardManager() {
172 public ActorSelection actorSelection(String actorPath) {
173 return actorSystem.actorSelection(actorPath);
176 public ActorSelection actorSelection(ActorPath actorPath) {
177 return actorSystem.actorSelection(actorPath);
180 public void setSchemaContext(SchemaContext schemaContext) {
181 this.schemaContext = schemaContext;
183 if(shardManager != null) {
184 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
188 public void setDatastoreContext(DatastoreContext context) {
189 this.datastoreContext = context;
190 setCachedProperties();
192 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
193 // will be published immediately even though they may not be immediately visible to other
194 // threads due to unsynchronized reads. That's OK though - we're going for eventual
195 // consistency here as immediately visible updates to these members aren't critical. These
196 // members could've been made volatile but wanted to avoid volatile reads as these are
197 // accessed often and updates will be infrequent.
201 if(shardManager != null) {
202 shardManager.tell(context, ActorRef.noSender());
206 public SchemaContext getSchemaContext() {
207 return schemaContext;
210 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
211 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
215 Future<Object> future = executeOperationAsync(shardManager,
216 new FindPrimary(shardName, true), shardInitializationTimeout);
218 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
220 public PrimaryShardInfo checkedApply(Object response) throws Exception {
221 if(response instanceof RemotePrimaryShardFound) {
222 LOG.debug("findPrimaryShardAsync received: {}", response);
223 return onPrimaryShardFound(shardName, ((RemotePrimaryShardFound)response).getPrimaryPath(), null);
224 } else if(response instanceof LocalPrimaryShardFound) {
225 LOG.debug("findPrimaryShardAsync received: {}", response);
226 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
227 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getLocalShardDataTree());
228 } else if(response instanceof NotInitializedException) {
229 throw (NotInitializedException)response;
230 } else if(response instanceof PrimaryNotFoundException) {
231 throw (PrimaryNotFoundException)response;
232 } else if(response instanceof NoShardLeaderException) {
233 throw (NoShardLeaderException)response;
236 throw new UnknownMessageException(String.format(
237 "FindPrimary returned unkown response: %s", response));
239 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
242 private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
243 DataTree localShardDataTree) {
244 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
245 PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
246 primaryShardInfoCache.put(shardName, Futures.successful(info));
248 synchronized (shardInfoListeners) {
249 for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
250 reg.getInstance().onShardInfoUpdated(shardName, info);
257 * Finds a local shard given its shard name and return it's ActorRef
259 * @param shardName the name of the local shard that needs to be found
260 * @return a reference to a local shard actor which represents the shard
261 * specified by the shardName
263 public Optional<ActorRef> findLocalShard(String shardName) {
264 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
266 if (result instanceof LocalShardFound) {
267 LocalShardFound found = (LocalShardFound) result;
268 LOG.debug("Local shard found {}", found.getPath());
269 return Optional.of(found.getPath());
272 return Optional.absent();
276 * Finds a local shard async given its shard name and return a Future from which to obtain the
279 * @param shardName the name of the local shard that needs to be found
281 public Future<ActorRef> findLocalShardAsync( final String shardName) {
282 Future<Object> future = executeOperationAsync(shardManager,
283 new FindLocalShard(shardName, true), shardInitializationTimeout);
285 return future.map(new Mapper<Object, ActorRef>() {
287 public ActorRef checkedApply(Object response) throws Throwable {
288 if(response instanceof LocalShardFound) {
289 LocalShardFound found = (LocalShardFound)response;
290 LOG.debug("Local shard found {}", found.getPath());
291 return found.getPath();
292 } else if(response instanceof NotInitializedException) {
293 throw (NotInitializedException)response;
294 } else if(response instanceof LocalShardNotFound) {
295 throw new LocalShardNotFoundException(
296 String.format("Local shard for %s does not exist.", shardName));
299 throw new UnknownMessageException(String.format(
300 "FindLocalShard returned unkown response: %s", response));
302 }, getClientDispatcher());
306 * Executes an operation on a local actor and wait for it's response
310 * @return The response of the operation
312 public Object executeOperation(ActorRef actor, Object message) {
313 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
316 return Await.result(future, operationDuration);
317 } catch (Exception e) {
318 throw new TimeoutException("Sending message " + message.getClass().toString() +
319 " to actor " + actor.toString() + " failed. Try again later.", e);
323 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
324 Preconditions.checkArgument(actor != null, "actor must not be null");
325 Preconditions.checkArgument(message != null, "message must not be null");
327 LOG.debug("Sending message {} to {}", message.getClass(), actor);
328 return doAsk(actor, message, timeout);
332 * Execute an operation on a remote actor and wait for it's response
338 public Object executeOperation(ActorSelection actor, Object message) {
339 Future<Object> future = executeOperationAsync(actor, message);
342 return Await.result(future, operationDuration);
343 } catch (Exception e) {
344 throw new TimeoutException("Sending message " + message.getClass().toString() +
345 " to actor " + actor.toString() + " failed. Try again later.", e);
350 * Execute an operation on a remote actor asynchronously.
352 * @param actor the ActorSelection
353 * @param message the message to send
354 * @param timeout the operation timeout
355 * @return a Future containing the eventual result
357 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
359 Preconditions.checkArgument(actor != null, "actor must not be null");
360 Preconditions.checkArgument(message != null, "message must not be null");
362 LOG.debug("Sending message {} to {}", message.getClass(), actor);
364 return doAsk(actor, message, timeout);
368 * Execute an operation on a remote actor asynchronously.
370 * @param actor the ActorSelection
371 * @param message the message to send
372 * @return a Future containing the eventual result
374 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
375 return executeOperationAsync(actor, message, operationTimeout);
379 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
380 * reply (essentially set and forget).
382 * @param actor the ActorSelection
383 * @param message the message to send
385 public void sendOperationAsync(ActorSelection actor, Object message) {
386 Preconditions.checkArgument(actor != null, "actor must not be null");
387 Preconditions.checkArgument(message != null, "message must not be null");
389 LOG.debug("Sending message {} to {}", message.getClass(), actor);
391 actor.tell(message, ActorRef.noSender());
394 public void shutdown() {
395 shardManager.tell(PoisonPill.getInstance(), ActorRef.noSender());
398 public ClusterWrapper getClusterWrapper() {
399 return clusterWrapper;
402 public String getCurrentMemberName(){
403 return clusterWrapper.getCurrentMemberName();
407 * Send the message to each and every shard
411 public void broadcast(final Object message){
412 for(final String shardName : configuration.getAllShardNames()){
414 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
415 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
417 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
418 if(failure != null) {
419 LOG.warn("broadcast failed to send message {} to shard {}: {}",
420 message.getClass().getSimpleName(), shardName, failure);
422 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
425 }, getClientDispatcher());
429 public FiniteDuration getOperationDuration() {
430 return operationDuration;
433 public boolean isPathLocal(String path) {
434 if (Strings.isNullOrEmpty(path)) {
438 int pathAtIndex = path.indexOf('@');
439 if (pathAtIndex == -1) {
440 //if the path is of local format, then its local and is co-located
443 } else if (selfAddressHostPort != null) {
444 // self-address and tx actor path, both are of remote path format
445 int slashIndex = path.indexOf('/', pathAtIndex);
447 if (slashIndex == -1) {
451 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
452 return hostPort.equals(selfAddressHostPort);
455 // self address is local format and tx actor path is remote format
461 * @deprecated This method is present only to support backward compatibility with Helium and should not be
466 * @param localPathOfRemoteActor
470 public String resolvePath(final String primaryPath,
471 final String localPathOfRemoteActor) {
472 StringBuilder builder = new StringBuilder();
473 String[] primaryPathElements = primaryPath.split("/");
474 builder.append(primaryPathElements[0]).append("//")
475 .append(primaryPathElements[1]).append(primaryPathElements[2]);
476 String[] remotePathElements = localPathOfRemoteActor.split("/");
477 for (int i = 3; i < remotePathElements.length; i++) {
478 builder.append("/").append(remotePathElements[i]);
481 return builder.toString();
485 * Get the maximum number of operations that are to be permitted within a transaction before the transaction
486 * should begin throttling the operations
488 * Parking reading this configuration here because we need to get to the actor system settings
492 public int getTransactionOutstandingOperationLimit(){
493 return transactionOutstandingOperationLimit;
497 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
498 * us to create a timer for pretty much anything.
500 * @param operationName
503 public Timer getOperationTimer(String operationName){
504 return getOperationTimer(datastoreContext.getDataStoreType(), operationName);
507 public Timer getOperationTimer(String dataStoreType, String operationName){
508 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
509 operationName, METRIC_RATE);
510 return metricRegistry.timer(rate);
514 * Get the type of the data store to which this ActorContext belongs
518 public String getDataStoreType() {
519 return datastoreContext.getDataStoreType();
523 * Get the current transaction creation rate limit
526 public double getTxCreationLimit(){
527 return txRateLimiter.getTxCreationLimit();
531 * Try to acquire a transaction creation permit. Will block if no permits are available.
533 public void acquireTxCreationPermit(){
534 txRateLimiter.acquire();
538 * Return the operation timeout to be used when committing transactions
541 public Timeout getTransactionCommitOperationTimeout(){
542 return transactionCommitOperationTimeout;
546 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
547 * code on the datastore
550 public ExecutionContext getClientDispatcher() {
551 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
554 public String getNotificationDispatcherPath(){
555 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
558 public Configuration getConfiguration() {
559 return configuration;
562 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
563 return ask(actorRef, message, timeout);
566 protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
567 return ask(actorRef, message, timeout);
571 Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
572 return primaryShardInfoCache;
575 public <T extends ShardInfoListener> ShardInfoListenerRegistration<T> registerShardInfoListener(final T listener) {
576 final ShardInfoListenerRegistration<T> reg = new ShardInfoListenerRegistration<T>(listener, this);
578 synchronized (shardInfoListeners) {
579 shardInfoListeners.add(reg);
584 protected void removeShardInfoListener(final ShardInfoListenerRegistration<?> registration) {
585 synchronized (shardInfoListeners) {
586 shardInfoListeners.remove(registration);
591 public void onRemoval(final RemovalNotification<String, Future<PrimaryShardInfo>> notification) {
592 synchronized (shardInfoListeners) {
593 for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
594 reg.getInstance().onShardInfoUpdated(notification.getKey(), null);