2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
13 import akka.actor.ActorPath;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSelection;
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.dispatch.Mapper;
19 import akka.dispatch.OnComplete;
20 import akka.pattern.AskTimeoutException;
21 import akka.pattern.Patterns;
22 import akka.util.Timeout;
23 import com.codahale.metrics.MetricRegistry;
24 import com.codahale.metrics.Timer;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Strings;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Function;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
32 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
33 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
36 import org.opendaylight.controller.cluster.datastore.config.Configuration;
37 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
42 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
43 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
44 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
49 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
51 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
52 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
53 import org.opendaylight.controller.cluster.reporting.MetricsReporter;
54 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
56 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59 import scala.concurrent.Await;
60 import scala.concurrent.ExecutionContext;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.Duration;
63 import scala.concurrent.duration.FiniteDuration;
66 * The ActorContext class contains utility methods which could be used by
67 * non-actors (like DistributedDataStore) to work with actors a little more
68 * easily. An ActorContext can be freely passed around to local object instances
69 * but should not be passed to actors especially remote actors
71 public class ActorContext {
72 private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
73 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
74 private static final String METRIC_RATE = "rate";
75 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
76 new Mapper<Throwable, Throwable>() {
78 public Throwable apply(Throwable failure) {
79 Throwable actualFailure = failure;
80 if (failure instanceof AskTimeoutException) {
81 // A timeout exception most likely means the shard isn't initialized.
82 actualFailure = new NotInitializedException(
83 "Timed out trying to find the primary shard. Most likely cause is the "
84 + "shard is not initialized yet.");
90 public static final String BOUNDED_MAILBOX = "bounded-mailbox";
91 public static final String COMMIT = "commit";
93 private final ActorSystem actorSystem;
94 private final ActorRef shardManager;
95 private final ClusterWrapper clusterWrapper;
96 private final Configuration configuration;
97 private DatastoreContext datastoreContext;
98 private FiniteDuration operationDuration;
99 private Timeout operationTimeout;
100 private final String selfAddressHostPort;
101 private TransactionRateLimiter txRateLimiter;
102 private Timeout transactionCommitOperationTimeout;
103 private Timeout shardInitializationTimeout;
104 private final Dispatchers dispatchers;
106 private volatile SchemaContext schemaContext;
108 // Used as a write memory barrier.
109 @SuppressWarnings("unused")
110 private volatile boolean updated;
112 private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN)
113 .getMetricsRegistry();
115 private final PrimaryShardInfoFutureCache primaryShardInfoCache;
116 private final ShardStrategyFactory shardStrategyFactory;
118 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
119 ClusterWrapper clusterWrapper, Configuration configuration) {
120 this(actorSystem, shardManager, clusterWrapper, configuration,
121 DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
124 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
125 ClusterWrapper clusterWrapper, Configuration configuration,
126 DatastoreContext datastoreContext, PrimaryShardInfoFutureCache primaryShardInfoCache) {
127 this.actorSystem = actorSystem;
128 this.shardManager = shardManager;
129 this.clusterWrapper = clusterWrapper;
130 this.configuration = configuration;
131 this.datastoreContext = datastoreContext;
132 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
133 this.primaryShardInfoCache = primaryShardInfoCache;
135 final LogicalDatastoreType convertedType =
136 LogicalDatastoreType.valueOf(datastoreContext.getLogicalStoreType().name());
137 this.shardStrategyFactory = new ShardStrategyFactory(configuration, convertedType);
139 setCachedProperties();
141 Address selfAddress = clusterWrapper.getSelfAddress();
142 if (selfAddress != null && !selfAddress.host().isEmpty()) {
143 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
145 selfAddressHostPort = null;
150 private void setCachedProperties() {
151 txRateLimiter = new TransactionRateLimiter(this);
153 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInMillis(), TimeUnit.MILLISECONDS);
154 operationTimeout = new Timeout(operationDuration);
156 transactionCommitOperationTimeout = new Timeout(Duration.create(
157 datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS));
159 shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
162 public DatastoreContext getDatastoreContext() {
163 return datastoreContext;
166 public ActorSystem getActorSystem() {
170 public ActorRef getShardManager() {
174 public ActorSelection actorSelection(String actorPath) {
175 return actorSystem.actorSelection(actorPath);
178 public ActorSelection actorSelection(ActorPath actorPath) {
179 return actorSystem.actorSelection(actorPath);
182 public void setSchemaContext(SchemaContext schemaContext) {
183 this.schemaContext = schemaContext;
185 if (shardManager != null) {
186 shardManager.tell(new UpdateSchemaContext(schemaContext), ActorRef.noSender());
190 public void setDatastoreContext(DatastoreContextFactory contextFactory) {
191 this.datastoreContext = contextFactory.getBaseDatastoreContext();
192 setCachedProperties();
194 // We write the 'updated' volatile to trigger a write memory barrier so that the writes above
195 // will be published immediately even though they may not be immediately visible to other
196 // threads due to unsynchronized reads. That's OK though - we're going for eventual
197 // consistency here as immediately visible updates to these members aren't critical. These
198 // members could've been made volatile but wanted to avoid volatile reads as these are
199 // accessed often and updates will be infrequent.
203 if (shardManager != null) {
204 shardManager.tell(contextFactory, ActorRef.noSender());
208 public SchemaContext getSchemaContext() {
209 return schemaContext;
212 public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
213 Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
217 Future<Object> future = executeOperationAsync(shardManager,
218 new FindPrimary(shardName, true), shardInitializationTimeout);
220 return future.transform(new Mapper<Object, PrimaryShardInfo>() {
222 public PrimaryShardInfo checkedApply(Object response) throws UnknownMessageException {
223 if (response instanceof RemotePrimaryShardFound) {
224 LOG.debug("findPrimaryShardAsync received: {}", response);
225 RemotePrimaryShardFound found = (RemotePrimaryShardFound)response;
226 return onPrimaryShardFound(shardName, found.getPrimaryPath(), found.getPrimaryVersion(), null);
227 } else if (response instanceof LocalPrimaryShardFound) {
228 LOG.debug("findPrimaryShardAsync received: {}", response);
229 LocalPrimaryShardFound found = (LocalPrimaryShardFound)response;
230 return onPrimaryShardFound(shardName, found.getPrimaryPath(), DataStoreVersions.CURRENT_VERSION,
231 found.getLocalShardDataTree());
232 } else if (response instanceof NotInitializedException) {
233 throw (NotInitializedException)response;
234 } else if (response instanceof PrimaryNotFoundException) {
235 throw (PrimaryNotFoundException)response;
236 } else if (response instanceof NoShardLeaderException) {
237 throw (NoShardLeaderException)response;
240 throw new UnknownMessageException(String.format(
241 "FindPrimary returned unkown response: %s", response));
243 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
246 private PrimaryShardInfo onPrimaryShardFound(String shardName, String primaryActorPath,
247 short primaryVersion, DataTree localShardDataTree) {
248 ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
249 PrimaryShardInfo info = localShardDataTree == null ? new PrimaryShardInfo(actorSelection, primaryVersion) :
250 new PrimaryShardInfo(actorSelection, primaryVersion, localShardDataTree);
251 primaryShardInfoCache.putSuccessful(shardName, info);
256 * Finds a local shard given its shard name and return it's ActorRef.
258 * @param shardName the name of the local shard that needs to be found
259 * @return a reference to a local shard actor which represents the shard
260 * specified by the shardName
262 public Optional<ActorRef> findLocalShard(String shardName) {
263 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
265 if (result instanceof LocalShardFound) {
266 LocalShardFound found = (LocalShardFound) result;
267 LOG.debug("Local shard found {}", found.getPath());
268 return Optional.of(found.getPath());
271 return Optional.absent();
275 * Finds a local shard async given its shard name and return a Future from which to obtain the
278 * @param shardName the name of the local shard that needs to be found
280 public Future<ActorRef> findLocalShardAsync(final String shardName) {
281 Future<Object> future = executeOperationAsync(shardManager,
282 new FindLocalShard(shardName, true), shardInitializationTimeout);
284 return future.map(new Mapper<Object, ActorRef>() {
286 public ActorRef checkedApply(Object response) throws Throwable {
287 if (response instanceof LocalShardFound) {
288 LocalShardFound found = (LocalShardFound)response;
289 LOG.debug("Local shard found {}", found.getPath());
290 return found.getPath();
291 } else if (response instanceof NotInitializedException) {
292 throw (NotInitializedException)response;
293 } else if (response instanceof LocalShardNotFound) {
294 throw new LocalShardNotFoundException(
295 String.format("Local shard for %s does not exist.", shardName));
298 throw new UnknownMessageException(String.format(
299 "FindLocalShard returned unkown response: %s", response));
301 }, getClientDispatcher());
305 * Executes an operation on a local actor and wait for it's response.
307 * @param actor the actor
308 * @param message the message to send
309 * @return The response of the operation
311 @SuppressWarnings("checkstyle:IllegalCatch")
312 public Object executeOperation(ActorRef actor, Object message) {
313 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
316 return Await.result(future, operationDuration);
317 } catch (Exception e) {
318 throw new TimeoutException("Sending message " + message.getClass().toString()
319 + " to actor " + actor.toString() + " failed. Try again later.", e);
324 * Execute an operation on a remote actor and wait for it's response.
326 * @param actor the actor
327 * @param message the message
328 * @return the response message
330 @SuppressWarnings("checkstyle:IllegalCatch")
331 public Object executeOperation(ActorSelection actor, Object message) {
332 Future<Object> future = executeOperationAsync(actor, message);
335 return Await.result(future, operationDuration);
336 } catch (Exception e) {
337 throw new TimeoutException("Sending message " + message.getClass().toString()
338 + " to actor " + actor.toString() + " failed. Try again later.", e);
342 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
343 Preconditions.checkArgument(actor != null, "actor must not be null");
344 Preconditions.checkArgument(message != null, "message must not be null");
346 LOG.debug("Sending message {} to {}", message.getClass(), actor);
347 return doAsk(actor, message, timeout);
351 * Execute an operation on a remote actor asynchronously.
353 * @param actor the ActorSelection
354 * @param message the message to send
355 * @param timeout the operation timeout
356 * @return a Future containing the eventual result
358 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
360 Preconditions.checkArgument(actor != null, "actor must not be null");
361 Preconditions.checkArgument(message != null, "message must not be null");
363 LOG.debug("Sending message {} to {}", message.getClass(), actor);
365 return doAsk(actor, message, timeout);
369 * Execute an operation on a remote actor asynchronously.
371 * @param actor the ActorSelection
372 * @param message the message to send
373 * @return a Future containing the eventual result
375 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
376 return executeOperationAsync(actor, message, operationTimeout);
380 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
381 * reply (essentially set and forget).
383 * @param actor the ActorSelection
384 * @param message the message to send
386 public void sendOperationAsync(ActorSelection actor, Object message) {
387 Preconditions.checkArgument(actor != null, "actor must not be null");
388 Preconditions.checkArgument(message != null, "message must not be null");
390 LOG.debug("Sending message {} to {}", message.getClass(), actor);
392 actor.tell(message, ActorRef.noSender());
395 @SuppressWarnings("checkstyle:IllegalCatch")
396 public void shutdown() {
397 FiniteDuration duration = datastoreContext.getShardRaftConfig().getElectionTimeOutInterval().$times(3);
399 Await.ready(Patterns.gracefulStop(shardManager, duration, Shutdown.INSTANCE), duration);
400 } catch (Exception e) {
401 LOG.warn("ShardManager for {} data store did not shutdown gracefully", getDataStoreName(), e);
405 public ClusterWrapper getClusterWrapper() {
406 return clusterWrapper;
409 public MemberName getCurrentMemberName() {
410 return clusterWrapper.getCurrentMemberName();
414 * Send the message to each and every shard.
416 public void broadcast(final Function<Short, Object> messageSupplier, Class<?> messageClass) {
417 for (final String shardName : configuration.getAllShardNames()) {
419 Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
420 primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
422 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
423 if (failure != null) {
424 LOG.warn("broadcast failed to send message {} to shard {}: {}",
425 messageClass.getSimpleName(), shardName, failure);
427 Object message = messageSupplier.apply(primaryShardInfo.getPrimaryShardVersion());
428 primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
431 }, getClientDispatcher());
435 public FiniteDuration getOperationDuration() {
436 return operationDuration;
439 public Timeout getOperationTimeout() {
440 return operationTimeout;
443 public boolean isPathLocal(String path) {
444 if (Strings.isNullOrEmpty(path)) {
448 int pathAtIndex = path.indexOf('@');
449 if (pathAtIndex == -1) {
450 //if the path is of local format, then its local and is co-located
453 } else if (selfAddressHostPort != null) {
454 // self-address and tx actor path, both are of remote path format
455 int slashIndex = path.indexOf('/', pathAtIndex);
457 if (slashIndex == -1) {
461 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
462 return hostPort.equals(selfAddressHostPort);
465 // self address is local format and tx actor path is remote format
471 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
472 * us to create a timer for pretty much anything.
474 * @param operationName the name of the operation
475 * @return the Timer instance
477 public Timer getOperationTimer(String operationName) {
478 return getOperationTimer(datastoreContext.getDataStoreName(), operationName);
481 public Timer getOperationTimer(String dataStoreType, String operationName) {
482 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, dataStoreType,
483 operationName, METRIC_RATE);
484 return metricRegistry.timer(rate);
488 * Get the name of the data store to which this ActorContext belongs.
490 * @return the data store name
492 public String getDataStoreName() {
493 return datastoreContext.getDataStoreName();
497 * Get the current transaction creation rate limit.
499 * @return the rate limit
501 public double getTxCreationLimit() {
502 return txRateLimiter.getTxCreationLimit();
506 * Try to acquire a transaction creation permit. Will block if no permits are available.
508 public void acquireTxCreationPermit() {
509 txRateLimiter.acquire();
513 * Returns the operation timeout to be used when committing transactions.
515 * @return the operation timeout
517 public Timeout getTransactionCommitOperationTimeout() {
518 return transactionCommitOperationTimeout;
522 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
523 * code on the datastore.
525 * @return the dispatcher
527 public ExecutionContext getClientDispatcher() {
528 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
531 public String getNotificationDispatcherPath() {
532 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
535 public Configuration getConfiguration() {
536 return configuration;
539 public ShardStrategyFactory getShardStrategyFactory() {
540 return shardStrategyFactory;
543 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
544 return ask(actorRef, message, timeout);
547 protected Future<Object> doAsk(ActorSelection actorRef, Object message, Timeout timeout) {
548 return ask(actorRef, message, timeout);
551 public PrimaryShardInfoFutureCache getPrimaryShardInfoCache() {
552 return primaryShardInfoCache;