2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.AskTimeoutException;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Function;
29 import org.opendaylight.controller.cluster.access.concepts.MemberName;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
34 import org.opendaylight.controller.cluster.datastore.config.Configuration;
35 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
47 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
49 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
50 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
51 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
52 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
53 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56 import scala.concurrent.Await;
57 import scala.concurrent.ExecutionContext;
58 import scala.concurrent.Future;
59 import scala.concurrent.duration.Duration;
60 import scala.concurrent.duration.FiniteDuration;
63 * The ActorContext class contains utility methods which could be used by
64 * non-actors (like DistributedDataStore) to work with actors a little more
65 * easily. An ActorContext can be freely passed around to local object instances
66 * but should not be passed to actors especially remote actors
68 public class ActorContext {
69 private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
70 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
71 private static final String METRIC_RATE = "rate";
72 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
73 new Mapper<Throwable, Throwable>() {
75 public Throwable apply(Throwable failure) {
76 Throwable actualFailure = failure;
77 if(failure instanceof AskTimeoutException) {
78 // A timeout exception most likely means the shard isn't initialized.
79 actualFailure = new NotInitializedException(
80 "Timed out trying to find the primary shard. Most likely cause is the " +
81 "shard is not initialized yet.");
87 public static final String BOUNDED_MAILBOX = "bounded-mailbox";
88 public static final String COMMIT = "commit";
90 private final ActorSystem actorSystem;
91 private final ActorRef shardManager;
92 private final ClusterWrapper clusterWrapper;
93 private final Configuration configuration;
94 private DatastoreContext datastoreContext;
95 private FiniteDuration operationDuration;
96 private Timeout operationTimeout;
97 private final String selfAddressHostPort;
98 private TransactionRateLimiter txRateLimiter;
99 private Timeout transactionCommitOperationTimeout;
100 private Timeout shardInitializationTimeout;
101 private final Dispatchers dispatchers;
103 private volatile SchemaContext schemaContext;
104 private volatile boolean updated;
105 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
107 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
108 private final ShardStrategyFactory shardStrategyFactory;
110 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
111 ClusterWrapper clusterWrapper, Configuration configuration) {
112 this(actorSystem, shardManager, clusterWrapper, configuration,
113 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
116 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
117 ClusterWrapper clusterWrapper, Configuration configuration,
118 DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
119 this.actorSystem = actorSystem;
120 this.shardManager = shardManager;
121 this.clusterWrapper = clusterWrapper;
122 this.configuration = configuration;
123 this.datastoreContext = datastoreContext;
124 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
125 this.primaryShardInfoCache = primaryShardInfoCache;
126 this.shardStrategyFactory = new ShardStrategyFactory(configuration);
128 setCachedProperties();
130 Address selfAddress = clusterWrapper.getSelfAddress();
131 if (selfAddress != null && !selfAddress.host().isEmpty()) {
132 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
134 selfAddressHostPort = null;
139 private void setCachedProperties() {
140 txRateLimiter = new TransactionRateLimiter(this);
142 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
143 operationTimeout = new Timeout(operationDuration);
145 transactionCommitOperationTimeout = new Timeout(Duration.create(
146 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
148 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
151 public DatastoreContext getDatastoreContext() {
152 return datastoreContext;
155 public ActorSystem getActorSystem() {
159 public ActorRef getShardManager() {
163 public ActorSelection actorSelection(String actorPath) {
164 return actorSystem.actorSelection(actorPath);
167 public ActorSelection actorSelection(ActorPath actorPath) {
168 return actorSystem.actorSelection(actorPath);
171 public void setSchemaContext(SchemaContext schemaContext) {
172 this.schemaContext = schemaContext;
174 if(shardManager != null) {
175 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
179 public void setDatastoreContext(DatastoreContextFactory contextFactory) {
180 this.datastoreContext = contextFactory.getBaseDatastoreContext();
181 setCachedProperties();
183 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
184 // will be published immediately even though they may not be immediately visible to other
185 // threads due to unsynchronized reads. That's OK though - we're going for eventual
186 // consistency here as immediately visible updates to these members aren't critical. These
187 // members could've been made volatile but wanted to avoid volatile reads as these are
188 // accessed often and updates will be infrequent.
192 if(shardManager != null) {
193 shardManager.tell(contextFactory, ActorRef.noSender());
197 public SchemaContext getSchemaContext() {
198 return schemaContext;
201 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
202 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
206 Future<Object> future = executeOperationAsync(shardManager,
207 new FindPrimary(shardName, true), shardInitializationTimeout);
209 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
211 public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
212 if(response instanceof RemotePrimaryShardFound) {
213 LOG.debug("findPrimaryShardAsync received: {}", response);
214 RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
215 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
216 } else if(response instanceof LocalPrimaryShardFound) {
217 LOG.debug("findPrimaryShardAsync received: {}", response);
218 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
219 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
220 found.getLocalShardDataTree());
221 } else if(response instanceof NotInitializedException) {
222 throw (NotInitializedException)response;
223 } else if(response instanceof PrimaryNotFoundException) {
224 throw (PrimaryNotFoundException)response;
225 } else if(response instanceof NoShardLeaderException) {
226 throw (NoShardLeaderException)response;
229 throw new UnknownMessageException(String.format(
230 "FindPrimary returned unkown response: %s", response));
232 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
235 private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
236 short primaryVersion, DataTree localShardDataTree) {
237 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
238 PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
239 new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
240 primaryShardInfoCache.putSuccessful(shardName, info);
245 * Finds a local shard given its shard name and return it's ActorRef
247 * @param shardName the name of the local shard that needs to be found
248 * @return a reference to a local shard actor which represents the shard
249 * specified by the shardName
251 public Optional<ActorRef> findLocalShard(String shardName) {
252 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
254 if (result instanceof LocalShardFound) {
255 LocalShardFound found = (LocalShardFound) result;
256 LOG.debug("Local shard found {}", found.getPath());
257 return Optional.of(found.getPath());
260 return Optional.absent();
264 * Finds a local shard async given its shard name and return a Future from which to obtain the
267 * @param shardName the name of the local shard that needs to be found
269 public Future<ActorRef> findLocalShardAsync( final String shardName) {
270 Future<Object> future = executeOperationAsync(shardManager,
271 new FindLocalShard(shardName, true), shardInitializationTimeout);
273 return future.map(new Mapper<Object, ActorRef>() {
275 public ActorRef checkedApply(Object response) throws Throwable {
276 if(response instanceof LocalShardFound) {
277 LocalShardFound found = (LocalShardFound)response;
278 LOG.debug("Local shard found {}", found.getPath());
279 return found.getPath();
280 } else if(response instanceof NotInitializedException) {
281 throw (NotInitializedException)response;
282 } else if(response instanceof LocalShardNotFound) {
283 throw new LocalShardNotFoundException(
284 String.format("Local shard for %s does not exist.", shardName));
287 throw new UnknownMessageException(String.format(
288 "FindLocalShard returned unkown response: %s", response));
290 }, getClientDispatcher());
294 * Executes an operation on a local actor and wait for it's response
298 * @return The response of the operation
300 public Object executeOperation(ActorRef actor, Object message) {
301 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
304 return Await.result(future, operationDuration);
305 } catch (Exception e) {
306 throw new TimeoutException("Sending message " + message.getClass().toString() +
307 " to actor " + actor.toString() + " failed. Try again later.", e);
311 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
312 Preconditions.checkArgument(actor != null, "actor must not be null");
313 Preconditions.checkArgument(message != null, "message must not be null");
315 LOG.debug("Sending message {} to {}", message.getClass(), actor);
316 return doAsk(actor, message, timeout);
320 * Execute an operation on a remote actor and wait for it's response
326 public Object executeOperation(ActorSelection actor, Object message) {
327 Future<Object> future = executeOperationAsync(actor, message);
330 return Await.result(future, operationDuration);
331 } catch (Exception e) {
332 throw new TimeoutException("Sending message " + message.getClass().toString() +
333 " to actor " + actor.toString() + " failed. Try again later.", e);
338 * Execute an operation on a remote actor asynchronously.
340 * @param actor the ActorSelection
341 * @param message the message to send
342 * @param timeout the operation timeout
343 * @return a Future containing the eventual result
345 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
347 Preconditions.checkArgument(actor != null, "actor must not be null");
348 Preconditions.checkArgument(message != null, "message must not be null");
350 LOG.debug("Sending message {} to {}", message.getClass(), actor);
352 return doAsk(actor, message, timeout);
356 * Execute an operation on a remote actor asynchronously.
358 * @param actor the ActorSelection
359 * @param message the message to send
360 * @return a Future containing the eventual result
362 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
363 return executeOperationAsync(actor, message, operationTimeout);
367 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
368 * reply (essentially set and forget).
370 * @param actor the ActorSelection
371 * @param message the message to send
373 public void sendOperationAsync(ActorSelection actor, Object message) {
374 Preconditions.checkArgument(actor != null, "actor must not be null");
375 Preconditions.checkArgument(message != null, "message must not be null");
377 LOG.debug("Sending message {} to {}", message.getClass(), actor);
379 actor.tell(message, ActorRef.noSender());
382 public void shutdown() {
383 FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
385 Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
386 } catch(Exception e) {
387 LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
391 public ClusterWrapper getClusterWrapper() {
392 return clusterWrapper;
395 public MemberName getCurrentMemberName(){
396 return clusterWrapper.getCurrentMemberName();
400 * Send the message to each and every shard
404 public void broadcast(final Function<Short, Object> messageSupplier){
405 for(final String shardName : configuration.getAllShardNames()){
407 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
408 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
410 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
411 Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
412 if(failure != null) {
413 LOG.warn("broadcast failed to send message {} to shard {}: {}",
414 message.getClass().getSimpleName(), shardName, failure);
416 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
419 }, getClientDispatcher());
423 public FiniteDuration getOperationDuration() {
424 return operationDuration;
427 public Timeout getOperationTimeout() {
428 return operationTimeout;
431 public boolean isPathLocal(String path) {
432 if (Strings.isNullOrEmpty(path)) {
436 int pathAtIndex = path.indexOf('@');
437 if (pathAtIndex == -1) {
438 //if the path is of local format, then its local and is co-located
441 } else if (selfAddressHostPort != null) {
442 // self-address and tx actor path, both are of remote path format
443 int slashIndex = path.indexOf('/', pathAtIndex);
445 if (slashIndex == -1) {
449 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
450 return hostPort.equals(selfAddressHostPort);
453 // self address is local format and tx actor path is remote format
459 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
460 * us to create a timer for pretty much anything.
462 * @param operationName
465 public Timer getOperationTimer(String operationName){
466 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
469 public Timer getOperationTimer(String dataStoreType, String operationName){
470 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
471 operationName, METRIC_RATE);
472 return metricRegistry.timer(rate);
476 * Get the name of the data store to which this ActorContext belongs
480 public String getDataStoreName() {
481 return datastoreContext.getDataStoreName();
485 * Get the current transaction creation rate limit
488 public double getTxCreationLimit(){
489 return txRateLimiter.getTxCreationLimit();
493 * Try to acquire a transaction creation permit. Will block if no permits are available.
495 public void acquireTxCreationPermit(){
496 txRateLimiter.acquire();
500 * Return the operation timeout to be used when committing transactions
503 public Timeout getTransactionCommitOperationTimeout(){
504 return transactionCommitOperationTimeout;
508 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
509 * code on the datastore
512 public ExecutionContext getClientDispatcher() {
513 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
516 public String getNotificationDispatcherPath(){
517 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
520 public Configuration getConfiguration() {
521 return configuration;
524 public ShardStrategyFactory getShardStrategyFactory() {
525 return shardStrategyFactory;
528 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
529 return ask(actorRef, message, timeout);
532 protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout){
533 return ask(actorRef, message, timeout);
536 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
537 return primaryShardInfoCache;