2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.pattern.Patterns;
17 import akka.util.Timeout;
18 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
19 import org.opendaylight.controller.cluster.datastore.Configuration;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.concurrent.Await;
31 import scala.concurrent.Future;
32 import scala.concurrent.duration.Duration;
33 import scala.concurrent.duration.FiniteDuration;
35 import java.util.concurrent.TimeUnit;
37 import static akka.pattern.Patterns.ask;
40 * The ActorContext class contains utility methods which could be used by
41 * non-actors (like DistributedDataStore) to work with actors a little more
42 * easily. An ActorContext can be freely passed around to local object instances
43 * but should not be passed to actors especially remote actors
45 public class ActorContext {
46 private static final Logger
47 LOG = LoggerFactory.getLogger(ActorContext.class);
49 private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
51 public static final String MAILBOX = "bounded-mailbox";
53 private final ActorSystem actorSystem;
54 private final ActorRef shardManager;
55 private final ClusterWrapper clusterWrapper;
56 private final Configuration configuration;
57 private volatile SchemaContext schemaContext;
58 private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
59 private Timeout operationTimeout = new Timeout(operationDuration);
61 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
62 ClusterWrapper clusterWrapper,
63 Configuration configuration) {
64 this.actorSystem = actorSystem;
65 this.shardManager = shardManager;
66 this.clusterWrapper = clusterWrapper;
67 this.configuration = configuration;
70 public ActorSystem getActorSystem() {
74 public ActorRef getShardManager() {
78 public ActorSelection actorSelection(String actorPath) {
79 return actorSystem.actorSelection(actorPath);
82 public ActorSelection actorSelection(ActorPath actorPath) {
83 return actorSystem.actorSelection(actorPath);
86 public void setSchemaContext(SchemaContext schemaContext) {
87 this.schemaContext = schemaContext;
89 if(shardManager != null) {
90 shardManager.tell(new UpdateSchemaContext(schemaContext), null);
94 public void setOperationTimeout(int timeoutInSeconds) {
95 operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
96 operationTimeout = new Timeout(operationDuration);
99 public SchemaContext getSchemaContext() {
100 return schemaContext;
104 * Finds the primary for a given shard
109 public ActorSelection findPrimary(String shardName) {
110 String path = findPrimaryPath(shardName);
111 return actorSystem.actorSelection(path);
115 * Finds a local shard given it's shard name and return it's ActorRef
117 * @param shardName the name of the local shard that needs to be found
118 * @return a reference to a local shard actor which represents the shard
119 * specified by the shardName
121 public ActorRef findLocalShard(String shardName) {
122 Object result = executeLocalOperation(shardManager,
123 new FindLocalShard(shardName));
125 if (result instanceof LocalShardFound) {
126 LocalShardFound found = (LocalShardFound) result;
128 if(LOG.isDebugEnabled()) {
129 LOG.debug("Local shard found {}", found.getPath());
131 return found.getPath();
138 public String findPrimaryPath(String shardName) {
139 Object result = executeLocalOperation(shardManager,
140 new FindPrimary(shardName).toSerializable());
142 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
143 PrimaryFound found = PrimaryFound.fromSerializable(result);
145 if(LOG.isDebugEnabled()) {
146 LOG.debug("Primary found {}", found.getPrimaryPath());
148 return found.getPrimaryPath();
150 throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
155 * Executes an operation on a local actor and wait for it's response
159 * @return The response of the operation
161 public Object executeLocalOperation(ActorRef actor, Object message) {
162 Future<Object> future = ask(actor, message, operationTimeout);
165 return Await.result(future, operationDuration);
166 } catch (Exception e) {
167 throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
172 * Execute an operation on a remote actor and wait for it's response
178 public Object executeRemoteOperation(ActorSelection actor, Object message) {
180 if(LOG.isDebugEnabled()) {
181 LOG.debug("Sending remote message {} to {}", message.getClass().toString(),
184 Future<Object> future = ask(actor, message, operationTimeout);
187 return Await.result(future, operationDuration);
188 } catch (Exception e) {
189 throw new TimeoutException("Sending message " + message.getClass().toString() +
190 " to actor " + actor.toString() + " failed" , e);
195 * Execute an operation on a remote actor asynchronously.
197 * @param actor the ActorSelection
198 * @param message the message to send
199 * @return a Future containing the eventual result
201 public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
203 if(LOG.isDebugEnabled()) {
204 LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
206 return ask(actor, message, operationTimeout);
210 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
211 * reply (essentially set and forget).
213 * @param actor the ActorSelection
214 * @param message the message to send
216 public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
217 actor.tell(message, ActorRef.noSender());
220 public void sendShardOperationAsync(String shardName, Object message) {
221 ActorSelection primary = findPrimary(shardName);
223 primary.tell(message, ActorRef.noSender());
228 * Execute an operation on the primary for a given shard
230 * This method first finds the primary for a given shard ,then sends
231 * the message to the remote shard and waits for a response
237 * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
238 * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
240 public Object executeShardOperation(String shardName, Object message) {
241 ActorSelection primary = findPrimary(shardName);
243 return executeRemoteOperation(primary, message);
247 * Execute an operation on the the local shard only
249 * This method first finds the address of the local shard if any. It then
250 * executes the operation on it.
253 * @param shardName the name of the shard on which the operation needs to be executed
254 * @param message the message that needs to be sent to the shard
255 * @return the message that was returned by the local actor on which the
256 * the operation was executed. If a local shard was not found then
258 * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
259 * if the operation does not complete in a specified time duration
261 public Object executeLocalShardOperation(String shardName, Object message) {
262 ActorRef local = findLocalShard(shardName);
265 return executeLocalOperation(local, message);
273 * Execute an operation on the the local shard only asynchronously
276 * This method first finds the address of the local shard if any. It then
277 * executes the operation on it.
280 * @param shardName the name of the shard on which the operation needs to be executed
281 * @param message the message that needs to be sent to the shard
282 * @param timeout the amount of time that this method should wait for a response before timing out
283 * @return null if the shard could not be located else a future on which the caller can wait
286 public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) {
287 ActorRef local = findLocalShard(shardName);
291 return Patterns.ask(local, message, timeout);
296 public void shutdown() {
297 shardManager.tell(PoisonPill.getInstance(), null);
298 actorSystem.shutdown();
302 * @deprecated Need to stop using this method. There are ways to send a
303 * remote ActorRef as a string which should be used instead of this hack
306 * @param localPathOfRemoteActor
310 public String resolvePath(final String primaryPath,
311 final String localPathOfRemoteActor) {
312 StringBuilder builder = new StringBuilder();
313 String[] primaryPathElements = primaryPath.split("/");
314 builder.append(primaryPathElements[0]).append("//")
315 .append(primaryPathElements[1]).append(primaryPathElements[2]);
316 String[] remotePathElements = localPathOfRemoteActor.split("/");
317 for (int i = 3; i < remotePathElements.length; i++) {
318 builder.append("/").append(remotePathElements[i]);
321 return builder.toString();
325 public ActorPath actorFor(String path){
326 return actorSystem.actorFor(path).path();
329 public String getCurrentMemberName(){
330 return clusterWrapper.getCurrentMemberName();
334 * Send the message to each and every shard
338 public void broadcast(Object message){
339 for(String shardName : configuration.getAllShardNames()){
341 sendShardOperationAsync(shardName, message);
342 } catch(Exception e){
343 LOG.warn("broadcast failed to send message " + message.getClass().getSimpleName() + " to shard " + shardName, e);
348 public FiniteDuration getOperationDuration() {
349 return operationDuration;