2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.pattern.AskTimeoutException;
20 import akka.util.Timeout;
21 import com.codahale.metrics.JmxReporter;
22 import com.codahale.metrics.MetricRegistry;
23 import com.codahale.metrics.Timer;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import com.google.common.util.concurrent.RateLimiter;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.Configuration;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
35 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
36 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
37 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
38 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
39 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
40 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
41 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
43 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Await;
50 import scala.concurrent.ExecutionContext;
51 import scala.concurrent.Future;
52 import scala.concurrent.duration.Duration;
53 import scala.concurrent.duration.FiniteDuration;
56 * The ActorContext class contains utility methods which could be used by
57 * non-actors (like DistributedDataStore) to work with actors a little more
58 * easily. An ActorContext can be freely passed around to local object instances
59 * but should not be passed to actors especially remote actors
61 public class ActorContext {
62 private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
63 private static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
64 private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
65 private static final String METRIC_RATE = "rate";
66 private static final String DOMAIN = "org.opendaylight.controller.cluster.datastore";
67 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
68 new Mapper<Throwable, Throwable>() {
70 public Throwable apply(Throwable failure) {
71 Throwable actualFailure = failure;
72 if(failure instanceof AskTimeoutException) {
73 // A timeout exception most likely means the shard isn't initialized.
74 actualFailure = new NotInitializedException(
75 "Timed out trying to find the primary shard. Most likely cause is the " +
76 "shard is not initialized yet.");
82 public static final String MAILBOX = "bounded-mailbox";
84 private final ActorSystem actorSystem;
85 private final ActorRef shardManager;
86 private final ClusterWrapper clusterWrapper;
87 private final Configuration configuration;
88 private final DatastoreContext datastoreContext;
89 private final FiniteDuration operationDuration;
90 private final Timeout operationTimeout;
91 private final String selfAddressHostPort;
92 private final RateLimiter txRateLimiter;
93 private final MetricRegistry metricRegistry = new MetricRegistry();
94 private final JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).inDomain(DOMAIN).build();
95 private final int transactionOutstandingOperationLimit;
96 private final Timeout transactionCommitOperationTimeout;
97 private final Dispatchers dispatchers;
99 private volatile SchemaContext schemaContext;
101 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
102 ClusterWrapper clusterWrapper, Configuration configuration) {
103 this(actorSystem, shardManager, clusterWrapper, configuration,
104 DatastoreContext.newBuilder().build());
107 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
108 ClusterWrapper clusterWrapper, Configuration configuration,
109 DatastoreContext datastoreContext) {
110 this.actorSystem = actorSystem;
111 this.shardManager = shardManager;
112 this.clusterWrapper = clusterWrapper;
113 this.configuration = configuration;
114 this.datastoreContext = datastoreContext;
115 this.txRateLimiter = RateLimiter.create(datastoreContext.getTransactionCreationInitialRateLimit());
116 this.dispatchers = new Dispatchers(actorSystem.dispatchers());
118 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS);
119 operationTimeout = new Timeout(operationDuration);
120 transactionCommitOperationTimeout = new Timeout(Duration.create(getDatastoreContext().getShardTransactionCommitTimeoutInSeconds(),
124 Address selfAddress = clusterWrapper.getSelfAddress();
125 if (selfAddress != null && !selfAddress.host().isEmpty()) {
126 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
128 selfAddressHostPort = null;
131 transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
136 public DatastoreContext getDatastoreContext() {
137 return datastoreContext;
140 public ActorSystem getActorSystem() {
144 public ActorRef getShardManager() {
148 public ActorSelection actorSelection(String actorPath) {
149 return actorSystem.actorSelection(actorPath);
152 public ActorSelection actorSelection(ActorPath actorPath) {
153 return actorSystem.actorSelection(actorPath);
156 public void setSchemaContext(SchemaContext schemaContext) {
157 this.schemaContext = schemaContext;
159 if(shardManager != null) {
160 shardManager.tell(new UpdateSchemaContext(schemaContext), null);
164 public SchemaContext getSchemaContext() {
165 return schemaContext;
169 * Finds the primary shard for the given shard name
174 public Optional<ActorSelection> findPrimaryShard(String shardName) {
175 String path = findPrimaryPathOrNull(shardName);
177 return Optional.absent();
179 return Optional.of(actorSystem.actorSelection(path));
182 public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
183 Future<Object> future = executeOperationAsync(shardManager,
184 new FindPrimary(shardName, true).toSerializable(),
185 datastoreContext.getShardInitializationTimeout());
187 return future.transform(new Mapper<Object, ActorSelection>() {
189 public ActorSelection checkedApply(Object response) throws Exception {
190 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
191 PrimaryFound found = PrimaryFound.fromSerializable(response);
193 LOG.debug("Primary found {}", found.getPrimaryPath());
194 return actorSystem.actorSelection(found.getPrimaryPath());
195 } else if(response instanceof ActorNotInitialized) {
196 throw new NotInitializedException(
197 String.format("Found primary shard %s but it's not initialized yet. " +
198 "Please try again later", shardName));
199 } else if(response instanceof PrimaryNotFound) {
200 throw new PrimaryNotFoundException(
201 String.format("No primary shard found for %S.", shardName));
204 throw new UnknownMessageException(String.format(
205 "FindPrimary returned unkown response: %s", response));
207 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getClientDispatcher());
211 * Finds a local shard given its shard name and return it's ActorRef
213 * @param shardName the name of the local shard that needs to be found
214 * @return a reference to a local shard actor which represents the shard
215 * specified by the shardName
217 public Optional<ActorRef> findLocalShard(String shardName) {
218 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
220 if (result instanceof LocalShardFound) {
221 LocalShardFound found = (LocalShardFound) result;
222 LOG.debug("Local shard found {}", found.getPath());
223 return Optional.of(found.getPath());
226 return Optional.absent();
230 * Finds a local shard async given its shard name and return a Future from which to obtain the
233 * @param shardName the name of the local shard that needs to be found
235 public Future<ActorRef> findLocalShardAsync( final String shardName) {
236 Future<Object> future = executeOperationAsync(shardManager,
237 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
239 return future.map(new Mapper<Object, ActorRef>() {
241 public ActorRef checkedApply(Object response) throws Throwable {
242 if(response instanceof LocalShardFound) {
243 LocalShardFound found = (LocalShardFound)response;
244 LOG.debug("Local shard found {}", found.getPath());
245 return found.getPath();
246 } else if(response instanceof ActorNotInitialized) {
247 throw new NotInitializedException(
248 String.format("Found local shard for %s but it's not initialized yet.",
250 } else if(response instanceof LocalShardNotFound) {
251 throw new LocalShardNotFoundException(
252 String.format("Local shard for %s does not exist.", shardName));
255 throw new UnknownMessageException(String.format(
256 "FindLocalShard returned unkown response: %s", response));
258 }, getClientDispatcher());
261 private String findPrimaryPathOrNull(String shardName) {
262 Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
264 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
265 PrimaryFound found = PrimaryFound.fromSerializable(result);
267 LOG.debug("Primary found {}", found.getPrimaryPath());
268 return found.getPrimaryPath();
270 } else if (result.getClass().equals(ActorNotInitialized.class)){
271 throw new NotInitializedException(
272 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
282 * Executes an operation on a local actor and wait for it's response
286 * @return The response of the operation
288 public Object executeOperation(ActorRef actor, Object message) {
289 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
292 return Await.result(future, operationDuration);
293 } catch (Exception e) {
294 throw new TimeoutException("Sending message " + message.getClass().toString() +
295 " to actor " + actor.toString() + " failed. Try again later.", e);
299 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
300 Preconditions.checkArgument(actor != null, "actor must not be null");
301 Preconditions.checkArgument(message != null, "message must not be null");
303 LOG.debug("Sending message {} to {}", message.getClass(), actor);
304 return ask(actor, message, timeout);
308 * Execute an operation on a remote actor and wait for it's response
314 public Object executeOperation(ActorSelection actor, Object message) {
315 Future<Object> future = executeOperationAsync(actor, message);
318 return Await.result(future, operationDuration);
319 } catch (Exception e) {
320 throw new TimeoutException("Sending message " + message.getClass().toString() +
321 " to actor " + actor.toString() + " failed. Try again later.", e);
326 * Execute an operation on a remote actor asynchronously.
328 * @param actor the ActorSelection
329 * @param message the message to send
330 * @param timeout the operation timeout
331 * @return a Future containing the eventual result
333 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
335 Preconditions.checkArgument(actor != null, "actor must not be null");
336 Preconditions.checkArgument(message != null, "message must not be null");
338 LOG.debug("Sending message {} to {}", message.getClass(), actor);
340 return ask(actor, message, timeout);
344 * Execute an operation on a remote actor asynchronously.
346 * @param actor the ActorSelection
347 * @param message the message to send
348 * @return a Future containing the eventual result
350 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
351 return executeOperationAsync(actor, message, operationTimeout);
355 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
356 * reply (essentially set and forget).
358 * @param actor the ActorSelection
359 * @param message the message to send
361 public void sendOperationAsync(ActorSelection actor, Object message) {
362 Preconditions.checkArgument(actor != null, "actor must not be null");
363 Preconditions.checkArgument(message != null, "message must not be null");
365 LOG.debug("Sending message {} to {}", message.getClass(), actor);
367 actor.tell(message, ActorRef.noSender());
370 public void shutdown() {
371 shardManager.tell(PoisonPill.getInstance(), null);
372 actorSystem.shutdown();
375 public ClusterWrapper getClusterWrapper() {
376 return clusterWrapper;
379 public String getCurrentMemberName(){
380 return clusterWrapper.getCurrentMemberName();
384 * Send the message to each and every shard
388 public void broadcast(Object message){
389 for(String shardName : configuration.getAllShardNames()){
391 Optional<ActorSelection> primary = findPrimaryShard(shardName);
392 if (primary.isPresent()) {
393 primary.get().tell(message, ActorRef.noSender());
395 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
396 message.getClass().getSimpleName(), shardName);
401 public FiniteDuration getOperationDuration() {
402 return operationDuration;
405 public boolean isPathLocal(String path) {
406 if (Strings.isNullOrEmpty(path)) {
410 int pathAtIndex = path.indexOf('@');
411 if (pathAtIndex == -1) {
412 //if the path is of local format, then its local and is co-located
415 } else if (selfAddressHostPort != null) {
416 // self-address and tx actor path, both are of remote path format
417 int slashIndex = path.indexOf('/', pathAtIndex);
419 if (slashIndex == -1) {
423 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
424 return hostPort.equals(selfAddressHostPort);
427 // self address is local format and tx actor path is remote format
433 * @deprecated This method is present only to support backward compatibility with Helium and should not be
438 * @param localPathOfRemoteActor
442 public String resolvePath(final String primaryPath,
443 final String localPathOfRemoteActor) {
444 StringBuilder builder = new StringBuilder();
445 String[] primaryPathElements = primaryPath.split("/");
446 builder.append(primaryPathElements[0]).append("//")
447 .append(primaryPathElements[1]).append(primaryPathElements[2]);
448 String[] remotePathElements = localPathOfRemoteActor.split("/");
449 for (int i = 3; i < remotePathElements.length; i++) {
450 builder.append("/").append(remotePathElements[i]);
453 return builder.toString();
457 * Get the maximum number of operations that are to be permitted within a transaction before the transaction
458 * should begin throttling the operations
460 * Parking reading this configuration here because we need to get to the actor system settings
464 public int getTransactionOutstandingOperationLimit(){
465 return transactionOutstandingOperationLimit;
469 * This is a utility method that lets us get a Timer object for any operation. This is a little open-ended to allow
470 * us to create a timer for pretty much anything.
472 * @param operationName
475 public Timer getOperationTimer(String operationName){
476 final String rate = MetricRegistry.name(DISTRIBUTED_DATA_STORE_METRIC_REGISTRY, datastoreContext.getDataStoreType(), operationName, METRIC_RATE);
477 return metricRegistry.timer(rate);
481 * Get the type of the data store to which this ActorContext belongs
485 public String getDataStoreType() {
486 return datastoreContext.getDataStoreType();
490 * Set the number of transaction creation permits that are to be allowed
492 * @param permitsPerSecond
494 public void setTxCreationLimit(double permitsPerSecond){
495 txRateLimiter.setRate(permitsPerSecond);
499 * Get the current transaction creation rate limit
502 public double getTxCreationLimit(){
503 return txRateLimiter.getRate();
507 * Try to acquire a transaction creation permit. Will block if no permits are available.
509 public void acquireTxCreationPermit(){
510 txRateLimiter.acquire();
514 * Return the operation timeout to be used when committing transactions
517 public Timeout getTransactionCommitOperationTimeout(){
518 return transactionCommitOperationTimeout;
522 * An akka dispatcher that is meant to be used when processing ask Futures which were triggered by client
523 * code on the datastore
526 public ExecutionContext getClientDispatcher() {
527 return this.dispatchers.getDispatcher(Dispatchers.DispatcherType.Client);
530 public String getNotificationDispatcherPath(){
531 return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);