2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.dispatch.Mapper;
17 import akka.pattern.AskTimeoutException;
18 import akka.util.Timeout;
19 import com.google.common.base.Optional;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
22 import org.opendaylight.controller.cluster.datastore.Configuration;
23 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
24 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
25 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
26 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
27 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
28 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
29 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
30 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
31 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
32 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
33 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
34 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
35 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
36 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.concurrent.Await;
41 import scala.concurrent.Future;
42 import scala.concurrent.duration.Duration;
43 import scala.concurrent.duration.FiniteDuration;
44 import java.util.concurrent.TimeUnit;
45 import static akka.pattern.Patterns.ask;
48 * The ActorContext class contains utility methods which could be used by
49 * non-actors (like DistributedDataStore) to work with actors a little more
50 * easily. An ActorContext can be freely passed around to local object instances
51 * but should not be passed to actors especially remote actors
53 public class ActorContext {
54 private static final Logger
55 LOG = LoggerFactory.getLogger(ActorContext.class);
57 public static final String MAILBOX = "bounded-mailbox";
59 private static final Mapper<Throwable, Throwable> FIND_PRIMARY_FAILURE_TRANSFORMER =
60 new Mapper<Throwable, Throwable>() {
62 public Throwable apply(Throwable failure) {
63 Throwable actualFailure = failure;
64 if(failure instanceof AskTimeoutException) {
65 // A timeout exception most likely means the shard isn't initialized.
66 actualFailure = new NotInitializedException(
67 "Timed out trying to find the primary shard. Most likely cause is the " +
68 "shard is not initialized yet.");
75 private final ActorSystem actorSystem;
76 private final ActorRef shardManager;
77 private final ClusterWrapper clusterWrapper;
78 private final Configuration configuration;
79 private final DatastoreContext datastoreContext;
80 private volatile SchemaContext schemaContext;
81 private final FiniteDuration operationDuration;
82 private final Timeout operationTimeout;
84 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
85 ClusterWrapper clusterWrapper, Configuration configuration) {
86 this(actorSystem, shardManager, clusterWrapper, configuration,
87 DatastoreContext.newBuilder().build());
90 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
91 ClusterWrapper clusterWrapper, Configuration configuration,
92 DatastoreContext datastoreContext) {
93 this.actorSystem = actorSystem;
94 this.shardManager = shardManager;
95 this.clusterWrapper = clusterWrapper;
96 this.configuration = configuration;
97 this.datastoreContext = datastoreContext;
99 operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(),
101 operationTimeout = new Timeout(operationDuration);
104 public DatastoreContext getDatastoreContext() {
105 return datastoreContext;
108 public ActorSystem getActorSystem() {
112 public ActorRef getShardManager() {
116 public ActorSelection actorSelection(String actorPath) {
117 return actorSystem.actorSelection(actorPath);
120 public ActorSelection actorSelection(ActorPath actorPath) {
121 return actorSystem.actorSelection(actorPath);
124 public void setSchemaContext(SchemaContext schemaContext) {
125 this.schemaContext = schemaContext;
127 if(shardManager != null) {
128 shardManager.tell(new UpdateSchemaContext(schemaContext), null);
132 public SchemaContext getSchemaContext() {
133 return schemaContext;
137 * Finds the primary shard for the given shard name
142 public Optional<ActorSelection> findPrimaryShard(String shardName) {
143 String path = findPrimaryPathOrNull(shardName);
145 return Optional.absent();
147 return Optional.of(actorSystem.actorSelection(path));
150 public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
151 Future<Object> future = executeOperationAsync(shardManager,
152 new FindPrimary(shardName, true).toSerializable(),
153 datastoreContext.getShardInitializationTimeout());
155 return future.transform(new Mapper<Object, ActorSelection>() {
157 public ActorSelection checkedApply(Object response) throws Exception {
158 if(response.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
159 PrimaryFound found = PrimaryFound.fromSerializable(response);
161 LOG.debug("Primary found {}", found.getPrimaryPath());
162 return actorSystem.actorSelection(found.getPrimaryPath());
163 } else if(response instanceof ActorNotInitialized) {
164 throw new NotInitializedException(
165 String.format("Found primary shard %s but it's not initialized yet. " +
166 "Please try again later", shardName));
167 } else if(response instanceof PrimaryNotFound) {
168 throw new PrimaryNotFoundException(
169 String.format("No primary shard found for %S.", shardName));
172 throw new UnknownMessageException(String.format(
173 "FindPrimary returned unkown response: %s", response));
175 }, FIND_PRIMARY_FAILURE_TRANSFORMER, getActorSystem().dispatcher());
179 * Finds a local shard given its shard name and return it's ActorRef
181 * @param shardName the name of the local shard that needs to be found
182 * @return a reference to a local shard actor which represents the shard
183 * specified by the shardName
185 public Optional<ActorRef> findLocalShard(String shardName) {
186 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
188 if (result instanceof LocalShardFound) {
189 LocalShardFound found = (LocalShardFound) result;
190 LOG.debug("Local shard found {}", found.getPath());
191 return Optional.of(found.getPath());
194 return Optional.absent();
198 * Finds a local shard async given its shard name and return a Future from which to obtain the
201 * @param shardName the name of the local shard that needs to be found
203 public Future<ActorRef> findLocalShardAsync( final String shardName) {
204 Future<Object> future = executeOperationAsync(shardManager,
205 new FindLocalShard(shardName, true), datastoreContext.getShardInitializationTimeout());
207 return future.map(new Mapper<Object, ActorRef>() {
209 public ActorRef checkedApply(Object response) throws Throwable {
210 if(response instanceof LocalShardFound) {
211 LocalShardFound found = (LocalShardFound)response;
212 LOG.debug("Local shard found {}", found.getPath());
213 return found.getPath();
214 } else if(response instanceof ActorNotInitialized) {
215 throw new NotInitializedException(
216 String.format("Found local shard for %s but it's not initialized yet.",
218 } else if(response instanceof LocalShardNotFound) {
219 throw new LocalShardNotFoundException(
220 String.format("Local shard for %s does not exist.", shardName));
223 throw new UnknownMessageException(String.format(
224 "FindLocalShard returned unkown response: %s", response));
226 }, getActorSystem().dispatcher());
229 private String findPrimaryPathOrNull(String shardName) {
230 Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
232 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
233 PrimaryFound found = PrimaryFound.fromSerializable(result);
235 LOG.debug("Primary found {}", found.getPrimaryPath());
236 return found.getPrimaryPath();
238 } else if (result.getClass().equals(ActorNotInitialized.class)){
239 throw new NotInitializedException(
240 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
250 * Executes an operation on a local actor and wait for it's response
254 * @return The response of the operation
256 public Object executeOperation(ActorRef actor, Object message) {
257 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
260 return Await.result(future, operationDuration);
261 } catch (Exception e) {
262 throw new TimeoutException("Sending message " + message.getClass().toString() +
263 " to actor " + actor.toString() + " failed. Try again later.", e);
267 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
268 Preconditions.checkArgument(actor != null, "actor must not be null");
269 Preconditions.checkArgument(message != null, "message must not be null");
271 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
272 return ask(actor, message, timeout);
276 * Execute an operation on a remote actor and wait for it's response
282 public Object executeOperation(ActorSelection actor, Object message) {
283 Future<Object> future = executeOperationAsync(actor, message);
286 return Await.result(future, operationDuration);
287 } catch (Exception e) {
288 throw new TimeoutException("Sending message " + message.getClass().toString() +
289 " to actor " + actor.toString() + " failed. Try again later.", e);
294 * Execute an operation on a remote actor asynchronously.
296 * @param actor the ActorSelection
297 * @param message the message to send
298 * @param timeout the operation timeout
299 * @return a Future containing the eventual result
301 public Future<Object> executeOperationAsync(ActorSelection actor, Object message,
303 Preconditions.checkArgument(actor != null, "actor must not be null");
304 Preconditions.checkArgument(message != null, "message must not be null");
306 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
308 return ask(actor, message, timeout);
312 * Execute an operation on a remote actor asynchronously.
314 * @param actor the ActorSelection
315 * @param message the message to send
316 * @return a Future containing the eventual result
318 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
319 return executeOperationAsync(actor, message, operationTimeout);
323 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
324 * reply (essentially set and forget).
326 * @param actor the ActorSelection
327 * @param message the message to send
329 public void sendOperationAsync(ActorSelection actor, Object message) {
330 Preconditions.checkArgument(actor != null, "actor must not be null");
331 Preconditions.checkArgument(message != null, "message must not be null");
333 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
335 actor.tell(message, ActorRef.noSender());
338 public void shutdown() {
339 shardManager.tell(PoisonPill.getInstance(), null);
340 actorSystem.shutdown();
343 public ClusterWrapper getClusterWrapper() {
344 return clusterWrapper;
347 public String getCurrentMemberName(){
348 return clusterWrapper.getCurrentMemberName();
352 * Send the message to each and every shard
356 public void broadcast(Object message){
357 for(String shardName : configuration.getAllShardNames()){
359 Optional<ActorSelection> primary = findPrimaryShard(shardName);
360 if (primary.isPresent()) {
361 primary.get().tell(message, ActorRef.noSender());
363 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
364 message.getClass().getSimpleName(), shardName);
369 public FiniteDuration getOperationDuration() {
370 return operationDuration;
373 public boolean isLocalPath(String path) {
374 String selfAddress = clusterWrapper.getSelfAddress();
375 if (path == null || selfAddress == null) {
379 int atIndex1 = path.indexOf("@");
380 int atIndex2 = selfAddress.indexOf("@");
382 if (atIndex1 == -1 || atIndex2 == -1) {
386 int slashIndex1 = path.indexOf("/", atIndex1);
387 int slashIndex2 = selfAddress.indexOf("/", atIndex2);
389 if (slashIndex1 == -1 || slashIndex2 == -1) {
393 String hostPort1 = path.substring(atIndex1, slashIndex1);
394 String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
396 return hostPort1.equals(hostPort2);
400 * @deprecated This method is present only to support backward compatibility with Helium and should not be
405 * @param localPathOfRemoteActor
409 public String resolvePath(final String primaryPath,
410 final String localPathOfRemoteActor) {
411 StringBuilder builder = new StringBuilder();
412 String[] primaryPathElements = primaryPath.split("/");
413 builder.append(primaryPathElements[0]).append("//")
414 .append(primaryPathElements[1]).append(primaryPathElements[2]);
415 String[] remotePathElements = localPathOfRemoteActor.split("/");
416 for (int i = 3; i < remotePathElements.length; i++) {
417 builder.append("/").append(remotePathElements[i]);
420 return builder.toString();