2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.dispatch.Mapper;
19 import akka.pattern.AskTimeoutException;
20 import akka.util.Timeout;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Strings;
24 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
26 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
27 import org.opendaylight.controller.cluster.datastore.Configuration;
28 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
29 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
30 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
31 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
32 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
33 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
34 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
35 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
36 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
37 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
38 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
39 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
40 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
41 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import scala.concurrent.Await;
46 import scala.concurrent.Future;
47 import scala.concurrent.duration.Duration;
48 import scala.concurrent.duration.FiniteDuration;
51 * The ActorContext class contains utility methods which could be used by
52 * non-actors (like DistributedDataStore) to work with actors a little more
53 * easily. An ActorContext can be freely passed around to local object instances
54 * but should not be passed to actors especially remote actors
56 public class ActorContext {
57 private static final Logger
58 LOG = LoggerFactory.getLogger(ActorContext.class);
60 public static final String MAILBOX = "bounded-mailbox";
62 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
63 new Mapper<Throwable, Throwable>() {
65 public Throwable apply(Throwable failure) {
66 Throwable actualFailure = failure;
67 if(failure instanceof AskTimeoutException) {
68 // A timeout exception most likely means the shard isn't initialized.
69 actualFailure = new NotInitializedException(
70 "Timed out trying to find the primary shard. Most likely cause is the " +
71 "shard is not initialized yet.");
78 private final ActorSystem actorSystem;
79 private final ActorRef shardManager;
80 private final ClusterWrapper clusterWrapper;
81 private final Configuration configuration;
82 private final DatastoreContext datastoreContext;
83 private volatile SchemaContext schemaContext;
84 private final FiniteDuration operationDuration;
85 private final Timeout operationTimeout;
86 private final String selfAddressHostPort;
87 private final int transactionOutstandingOperationLimit;
89 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
90 ClusterWrapper clusterWrapper, Configuration configuration) {
91 this(actorSystem, shardManager, clusterWrapper, configuration,
92 DatastoreContext.newBuilder().build());
95 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
96 ClusterWrapper clusterWrapper, Configuration configuration,
97 DatastoreContext datastoreContext) {
98 this.actorSystem = actorSystem;
99 this.shardManager = shardManager;
100 this.clusterWrapper = clusterWrapper;
101 this.configuration = configuration;
102 this.datastoreContext = datastoreContext;
104 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
106 operationTimeout = new Timeout(operationDuration);
108 Address selfAddress = clusterWrapper.getSelfAddress();
109 if (selfAddress != null && !selfAddress.host().isEmpty()) {
110 selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get();
112 selfAddressHostPort = null;
115 transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity();
118 public DatastoreContext getDatastoreContext() {
119 return datastoreContext;
122 public ActorSystem getActorSystem() {
126 public ActorRef getShardManager() {
130 public ActorSelection actorSelection(String actorPath) {
131 return actorSystem.actorSelection(actorPath);
134 public ActorSelection actorSelection(ActorPath actorPath) {
135 return actorSystem.actorSelection(actorPath);
138 public void setSchemaContext(SchemaContext schemaContext) {
139 this.schemaContext = schemaContext;
141 if(shardManager != null) {
142 shardManager.tell(new UpdateSchemaContext(schemaContext), null);
146 public SchemaContext getSchemaContext() {
147 return schemaContext;
151 * Finds the primary shard for the given shard name
156 public Optional<ActorSelection> findPrimaryShard(String shardName) {
157 String path = findPrimaryPathOrNull(shardName);
159 return Optional.absent();
161 return Optional.of(actorSystem.actorSelection(path));
164 public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
165 Future<Object> future = executeOperationAsync(shardManager,
166 new FindPrimary(shardName, true).toSerializable(),
167 datastoreContext.getShardInitializationTimeout());
169 return future.transform(new Mapper<Object, ActorSelection>() {
171 public ActorSelection checkedApply(Object response) throws Exception {
172 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
173 PrimaryFound found = PrimaryFound.fromSerializable(response);
175 LOG.debug("Primary found {}", found.getPrimaryPath());
176 return actorSystem.actorSelection(found.getPrimaryPath());
177 } else if(response instanceof ActorNotInitialized) {
178 throw new NotInitializedException(
179 String.format("Found primary shard %s but it's not initialized yet. " +
180 "Please try again later", shardName));
181 } else if(response instanceof PrimaryNotFound) {
182 throw new PrimaryNotFoundException(
183 String.format("No primary shard found for %S.", shardName));
186 throw new UnknownMessageException(String.format(
187 "FindPrimary returned unkown response: %s", response));
189 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
193 * Finds a local shard given its shard name and return it's ActorRef
195 * @param shardName the name of the local shard that needs to be found
196 * @return a reference to a local shard actor which represents the shard
197 * specified by the shardName
199 public Optional<ActorRef> findLocalShard(String shardName) {
200 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
202 if (result instanceof LocalShardFound) {
203 LocalShardFound found = (LocalShardFound) result;
204 LOG.debug("Local shard found {}", found.getPath());
205 return Optional.of(found.getPath());
208 return Optional.absent();
212 * Finds a local shard async given its shard name and return a Future from which to obtain the
215 * @param shardName the name of the local shard that needs to be found
217 public Future<ActorRef> findLocalShardAsync( final String shardName) {
218 Future<Object> future = executeOperationAsync(shardManager,
219 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
221 return future.map(new Mapper<Object, ActorRef>() {
223 public ActorRef checkedApply(Object response) throws Throwable {
224 if(response instanceof LocalShardFound) {
225 LocalShardFound found = (LocalShardFound)response;
226 LOG.debug("Local shard found {}", found.getPath());
227 return found.getPath();
228 } else if(response instanceof ActorNotInitialized) {
229 throw new NotInitializedException(
230 String.format("Found local shard for %s but it's not initialized yet.",
232 } else if(response instanceof LocalShardNotFound) {
233 throw new LocalShardNotFoundException(
234 String.format("Local shard for %s does not exist.", shardName));
237 throw new UnknownMessageException(String.format(
238 "FindLocalShard returned unkown response: %s", response));
240 }, getActorSystem().dispatcher());
243 private String findPrimaryPathOrNull(String shardName) {
244 Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
246 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
247 PrimaryFound found = PrimaryFound.fromSerializable(result);
249 LOG.debug("Primary found {}", found.getPrimaryPath());
250 return found.getPrimaryPath();
252 } else if (result.getClass().equals(ActorNotInitialized.class)){
253 throw new NotInitializedException(
254 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
264 * Executes an operation on a local actor and wait for it's response
268 * @return The response of the operation
270 public Object executeOperation(ActorRef actor, Object message) {
271 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
274 return Await.result(future, operationDuration);
275 } catch (Exception e) {
276 throw new TimeoutException("Sending message " + message.getClass().toString() +
277 " to actor " + actor.toString() + " failed. Try again later.", e);
281 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
282 Preconditions.checkArgument(actor != null, "actor must not be null");
283 Preconditions.checkArgument(message != null, "message must not be null");
285 LOG.debug("Sending message {} to {}", message.getClass(), actor);
286 return ask(actor, message, timeout);
290 * Execute an operation on a remote actor and wait for it's response
296 public Object executeOperation(ActorSelection actor, Object message) {
297 Future<Object> future = executeOperationAsync(actor, message);
300 return Await.result(future, operationDuration);
301 } catch (Exception e) {
302 throw new TimeoutException("Sending message " + message.getClass().toString() +
303 " to actor " + actor.toString() + " failed. Try again later.", e);
308 * Execute an operation on a remote actor asynchronously.
310 * @param actor the ActorSelection
311 * @param message the message to send
312 * @param timeout the operation timeout
313 * @return a Future containing the eventual result
315 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
317 Preconditions.checkArgument(actor != null, "actor must not be null");
318 Preconditions.checkArgument(message != null, "message must not be null");
320 LOG.debug("Sending message {} to {}", message.getClass(), actor);
322 return ask(actor, message, timeout);
326 * Execute an operation on a remote actor asynchronously.
328 * @param actor the ActorSelection
329 * @param message the message to send
330 * @return a Future containing the eventual result
332 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
333 return executeOperationAsync(actor, message, operationTimeout);
337 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
338 * reply (essentially set and forget).
340 * @param actor the ActorSelection
341 * @param message the message to send
343 public void sendOperationAsync(ActorSelection actor, Object message) {
344 Preconditions.checkArgument(actor != null, "actor must not be null");
345 Preconditions.checkArgument(message != null, "message must not be null");
347 LOG.debug("Sending message {} to {}", message.getClass(), actor);
349 actor.tell(message, ActorRef.noSender());
352 public void shutdown() {
353 shardManager.tell(PoisonPill.getInstance(), null);
354 actorSystem.shutdown();
357 public ClusterWrapper getClusterWrapper() {
358 return clusterWrapper;
361 public String getCurrentMemberName(){
362 return clusterWrapper.getCurrentMemberName();
366 * Send the message to each and every shard
370 public void broadcast(Object message){
371 for(String shardName : configuration.getAllShardNames()){
373 Optional<ActorSelection> primary = findPrimaryShard(shardName);
374 if (primary.isPresent()) {
375 primary.get().tell(message, ActorRef.noSender());
377 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
378 message.getClass().getSimpleName(), shardName);
383 public FiniteDuration getOperationDuration() {
384 return operationDuration;
387 public boolean isPathLocal(String path) {
388 if (Strings.isNullOrEmpty(path)) {
392 int pathAtIndex = path.indexOf('@');
393 if (pathAtIndex == -1) {
394 //if the path is of local format, then its local and is co-located
397 } else if (selfAddressHostPort != null) {
398 // self-address and tx actor path, both are of remote path format
399 int slashIndex = path.indexOf('/', pathAtIndex);
401 if (slashIndex == -1) {
405 String hostPort = path.substring(pathAtIndex + 1, slashIndex);
406 return hostPort.equals(selfAddressHostPort);
409 // self address is local format and tx actor path is remote format
415 * @deprecated This method is present only to support backward compatibility with Helium and should not be
420 * @param localPathOfRemoteActor
424 public String resolvePath(final String primaryPath,
425 final String localPathOfRemoteActor) {
426 StringBuilder builder = new StringBuilder();
427 String[] primaryPathElements = primaryPath.split("/");
428 builder.append(primaryPathElements[0]).append("//")
429 .append(primaryPathElements[1]).append(primaryPathElements[2]);
430 String[] remotePathElements = localPathOfRemoteActor.split("/");
431 for (int i = 3; i < remotePathElements.length; i++) {
432 builder.append("/").append(remotePathElements[i]);
435 return builder.toString();
439 * Get the maximum number of operations that are to be permitted within a transaction before the transaction
440 * should begin throttling the operations
442 * Parking reading this configuration here because we need to get to the actor system settings
446 public int getTransactionOutstandingOperationLimit(){
447 return transactionOutstandingOperationLimit;