2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Function;
29 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
30 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
33 import org.opendaylight.controller.cluster.datastore.config.Configuration;
34 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
40 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
41 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
46 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
48 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
49 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
50 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
51 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import scala.concurrent.Await;
56 import scala.concurrent.ExecutionContext;
57 import scala.concurrent.Future;
58 import scala.concurrent.duration.Duration;
59 import scala.concurrent.duration.FiniteDuration;
62 * The ActorContext class contains utility methods which could be used by
63 * non-actors (like DistributedDataStore) to work with actors a little more
64 * easily. An ActorContext can be freely passed around to local object instances
65 * but should not be passed to actors especially remote actors
67 public class ActorContext {
68 private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
69 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
70 private static final String METRIC_RATE = "rate";
71 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
72 new Mapper<Throwable, Throwable>() {
74 public Throwable apply(Throwable failure) {
75 Throwable actualFailure = failure;
76 if(failure instanceof AskTimeoutException) {
77 // A timeout exception most likely means the shard isn't initialized.
78 actualFailure = new NotInitializedException(
79 "Timed out trying to find the primary shard. Most likely cause is the " +
80 "shard is not initialized yet.");
86 public static final String BOUNDED_MAILBOX = "bounded-mailbox";
87 public static final String COMMIT = "commit";
89 private final ActorSystem actorSystem;
90 private final ActorRef shardManager;
91 private final ClusterWrapper clusterWrapper;
92 private final Configuration configuration;
93 private DatastoreContext datastoreContext;
94 private FiniteDuration operationDuration;
95 private Timeout operationTimeout;
96 private final String selfAddressHostPort;
97 private TransactionRateLimiter txRateLimiter;
98 private Timeout transactionCommitOperationTimeout;
99 private Timeout shardInitializationTimeout;
100 private final Dispatchers dispatchers;
102 private volatile SchemaContext schemaContext;
103 private volatile boolean updated;
104 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
106 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
107 private final ShardStrategyFactory shardStrategyFactory;
109 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
110 ClusterWrapper clusterWrapper, Configuration configuration) {
111 this(actorSystem, shardManager, clusterWrapper, configuration,
112 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
115 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
116 ClusterWrapper clusterWrapper, Configuration configuration,
117 DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
118 this.actorSystem = actorSystem;
119 this.shardManager = shardManager;
120 this.clusterWrapper = clusterWrapper;
121 this.configuration = configuration;
122 this.datastoreContext = datastoreContext;
123 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
124 this.primaryShardInfoCache = primaryShardInfoCache;
125 this.shardStrategyFactory = new ShardStrategyFactory(configuration);
127 setCachedProperties();
129 Address selfAddress = clusterWrapper.getSelfAddress();
130 if (selfAddress != null && !selfAddress.host().isEmpty()) {
131 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
133 selfAddressHostPort = null;
138 private void setCachedProperties() {
139 txRateLimiter = new TransactionRateLimiter(this);
141 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
142 operationTimeout = new Timeout(operationDuration);
144 transactionCommitOperationTimeout = new Timeout(Duration.create(
145 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
147 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
150 public DatastoreContext getDatastoreContext() {
151 return datastoreContext;
154 public ActorSystem getActorSystem() {
158 public ActorRef getShardManager() {
162 public ActorSelection actorSelection(String actorPath) {
163 return actorSystem.actorSelection(actorPath);
166 public ActorSelection actorSelection(ActorPath actorPath) {
167 return actorSystem.actorSelection(actorPath);
170 public void setSchemaContext(SchemaContext schemaContext) {
171 this.schemaContext = schemaContext;
173 if(shardManager != null) {
174 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
178 public void setDatastoreContext(DatastoreContextFactory contextFactory) {
179 this.datastoreContext = contextFactory.getBaseDatastoreContext();
180 setCachedProperties();
182 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
183 // will be published immediately even though they may not be immediately visible to other
184 // threads due to unsynchronized reads. That's OK though - we're going for eventual
185 // consistency here as immediately visible updates to these members aren't critical. These
186 // members could've been made volatile but wanted to avoid volatile reads as these are
187 // accessed often and updates will be infrequent.
191 if(shardManager != null) {
192 shardManager.tell(contextFactory, ActorRef.noSender());
196 public SchemaContext getSchemaContext() {
197 return schemaContext;
200 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
201 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
205 Future<Object> future = executeOperationAsync(shardManager,
206 new FindPrimary(shardName, true), shardInitializationTimeout);
208 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
210 public PrimaryShardInfo checkedApply(Object response) throws Exception {
211 if(response instanceof RemotePrimaryShardFound) {
212 LOG.debug("findPrimaryShardAsync received: {}", response);
213 RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
214 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
215 } else if(response instanceof LocalPrimaryShardFound) {
216 LOG.debug("findPrimaryShardAsync received: {}", response);
217 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
218 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
219 found.getLocalShardDataTree());
220 } else if(response instanceof NotInitializedException) {
221 throw (NotInitializedException)response;
222 } else if(response instanceof PrimaryNotFoundException) {
223 throw (PrimaryNotFoundException)response;
224 } else if(response instanceof NoShardLeaderException) {
225 throw (NoShardLeaderException)response;
228 throw new UnknownMessageException(String.format(
229 "FindPrimary returned unkown response: %s", response));
231 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
234 private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
235 short primaryVersion, DataTree localShardDataTree) {
236 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
237 PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
238 new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
239 primaryShardInfoCache.putSuccessful(shardName, info);
244 * Finds a local shard given its shard name and return it's ActorRef
246 * @param shardName the name of the local shard that needs to be found
247 * @return a reference to a local shard actor which represents the shard
248 * specified by the shardName
250 public Optional<ActorRef> findLocalShard(String shardName) {
251 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
253 if (result instanceof LocalShardFound) {
254 LocalShardFound found = (LocalShardFound) result;
255 LOG.debug("Local shard found {}", found.getPath());
256 return Optional.of(found.getPath());
259 return Optional.absent();
263 * Finds a local shard async given its shard name and return a Future from which to obtain the
266 * @param shardName the name of the local shard that needs to be found
268 public Future<ActorRef> findLocalShardAsync( final String shardName) {
269 Future<Object> future = executeOperationAsync(shardManager,
270 new FindLocalShard(shardName, true), shardInitializationTimeout);
272 return future.map(new Mapper<Object, ActorRef>() {
274 public ActorRef checkedApply(Object response) throws Throwable {
275 if(response instanceof LocalShardFound) {
276 LocalShardFound found = (LocalShardFound)response;
277 LOG.debug("Local shard found {}", found.getPath());
278 return found.getPath();
279 } else if(response instanceof NotInitializedException) {
280 throw (NotInitializedException)response;
281 } else if(response instanceof LocalShardNotFound) {
282 throw new LocalShardNotFoundException(
283 String.format("Local shard for %s does not exist.", shardName));
286 throw new UnknownMessageException(String.format(
287 "FindLocalShard returned unkown response: %s", response));
289 }, getClientDispatcher());
293 * Executes an operation on a local actor and wait for it's response
297 * @return The response of the operation
299 public Object executeOperation(ActorRef actor, Object message) {
300 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
303 return Await.result(future, operationDuration);
304 } catch (Exception e) {
305 throw new TimeoutException("Sending message " + message.getClass().toString() +
306 " to actor " + actor.toString() + " failed. Try again later.", e);
310 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
311 Preconditions.checkArgument(actor != null, "actor must not be null");
312 Preconditions.checkArgument(message != null, "message must not be null");
314 LOG.debug("Sending message {} to {}", message.getClass(), actor);
315 return doAsk(actor, message, timeout);
319 * Execute an operation on a remote actor and wait for it's response
325 public Object executeOperation(ActorSelection actor, Object message) {
326 Future<Object> future = executeOperationAsync(actor, message);
329 return Await.result(future, operationDuration);
330 } catch (Exception e) {
331 throw new TimeoutException("Sending message " + message.getClass().toString() +
332 " to actor " + actor.toString() + " failed. Try again later.", e);
337 * Execute an operation on a remote actor asynchronously.
339 * @param actor the ActorSelection
340 * @param message the message to send
341 * @param timeout the operation timeout
342 * @return a Future containing the eventual result
344 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
346 Preconditions.checkArgument(actor != null, "actor must not be null");
347 Preconditions.checkArgument(message != null, "message must not be null");
349 LOG.debug("Sending message {} to {}", message.getClass(), actor);
351 return doAsk(actor, message, timeout);
355 * Execute an operation on a remote actor asynchronously.
357 * @param actor the ActorSelection
358 * @param message the message to send
359 * @return a Future containing the eventual result
361 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
362 return executeOperationAsync(actor, message, operationTimeout);
366 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
367 * reply (essentially set and forget).
369 * @param actor the ActorSelection
370 * @param message the message to send
372 public void sendOperationAsync(ActorSelection actor, Object message) {
373 Preconditions.checkArgument(actor != null, "actor must not be null");
374 Preconditions.checkArgument(message != null, "message must not be null");
376 LOG.debug("Sending message {} to {}", message.getClass(), actor);
378 actor.tell(message, ActorRef.noSender());
381 public void shutdown() {
382 FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
384 Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
385 } catch(Exception e) {
386 LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
390 public ClusterWrapper getClusterWrapper() {
391 return clusterWrapper;
394 public String getCurrentMemberName(){
395 return clusterWrapper.getCurrentMemberName();
399 * Send the message to each and every shard
403 public void broadcast(final Function<Short, Object> messageSupplier){
404 for(final String shardName : configuration.getAllShardNames()){
406 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
407 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
409 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
410 Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
411 if(failure != null) {
412 LOG.warn("broadcast failed to send message {} to shard {}: {}",
413 message.getClass().getSimpleName(), shardName, failure);
415 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
418 }, getClientDispatcher());
422 public FiniteDuration getOperationDuration() {
423 return operationDuration;
426 public Timeout getOperationTimeout() {
427 return operationTimeout;
430 public boolean isPathLocal(String path) {
431 if (Strings.isNullOrEmpty(path)) {
435 int pathAtIndex = path.indexOf('@');
436 if (pathAtIndex == -1) {
437 //if the path is of local format, then its local and is co-located
440 } else if (selfAddressHostPort != null) {
441 // self-address and tx actor path, both are of remote path format
442 int slashIndex = path.indexOf('/', pathAtIndex);
444 if (slashIndex == -1) {
448 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
449 return hostPort.equals(selfAddressHostPort);
452 // self address is local format and tx actor path is remote format
458 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
459 * us to create a timer for pretty much anything.
461 * @param operationName
464 public Timer getOperationTimer(String operationName){
465 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
468 public Timer getOperationTimer(String dataStoreType, String operationName){
469 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
470 operationName, METRIC_RATE);
471 return metricRegistry.timer(rate);
475 * Get the name of the data store to which this ActorContext belongs
479 public String getDataStoreName() {
480 return datastoreContext.getDataStoreName();
484 * Get the current transaction creation rate limit
487 public double getTxCreationLimit(){
488 return txRateLimiter.getTxCreationLimit();
492 * Try to acquire a transaction creation permit. Will block if no permits are available.
494 public void acquireTxCreationPermit(){
495 txRateLimiter.acquire();
499 * Return the operation timeout to be used when committing transactions
502 public Timeout getTransactionCommitOperationTimeout(){
503 return transactionCommitOperationTimeout;
507 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
508 * code on the datastore
511 public ExecutionContext getClientDispatcher() {
512 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
515 public String getNotificationDispatcherPath(){
516 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
519 public Configuration getConfiguration() {
520 return configuration;
523 public ShardStrategyFactory getShardStrategyFactory() {
524 return shardStrategyFactory;
527 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
528 return ask(actorRef, message, timeout);
531 protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
532 return ask(actorRef, message, timeout);
535 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
536 return primaryShardInfoCache;