2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
18 import org.opendaylight.controller.cluster.datastore.Configuration;
19 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
21 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
22 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
25 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Await;
29 import scala.concurrent.Future;
30 import scala.concurrent.duration.Duration;
31 import scala.concurrent.duration.FiniteDuration;
33 import java.util.concurrent.TimeUnit;
35 import static akka.pattern.Patterns.ask;
38 * The ActorContext class contains utility methods which could be used by
39 * non-actors (like DistributedDataStore) to work with actors a little more
40 * easily. An ActorContext can be freely passed around to local object instances
41 * but should not be passed to actors especially remote actors
43 public class ActorContext {
44 private static final Logger
45 LOG = LoggerFactory.getLogger(ActorContext.class);
47 public static final FiniteDuration ASK_DURATION =
48 Duration.create(5, TimeUnit.SECONDS);
49 public static final Duration AWAIT_DURATION =
50 Duration.create(5, TimeUnit.SECONDS);
52 private final ActorSystem actorSystem;
53 private final ActorRef shardManager;
54 private final ClusterWrapper clusterWrapper;
55 private final Configuration configuration;
57 private SchemaContext schemaContext = null;
59 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
60 ClusterWrapper clusterWrapper,
61 Configuration configuration) {
62 this.actorSystem = actorSystem;
63 this.shardManager = shardManager;
64 this.clusterWrapper = clusterWrapper;
65 this.configuration = configuration;
68 public ActorSystem getActorSystem() {
72 public ActorRef getShardManager() {
76 public ActorSelection actorSelection(String actorPath) {
77 return actorSystem.actorSelection(actorPath);
80 public ActorSelection actorSelection(ActorPath actorPath) {
81 return actorSystem.actorSelection(actorPath);
86 * Finds the primary for a given shard
91 public ActorSelection findPrimary(String shardName) {
92 String path = findPrimaryPath(shardName);
93 return actorSystem.actorSelection(path);
97 * Finds a local shard given it's shard name and return it's ActorRef
99 * @param shardName the name of the local shard that needs to be found
100 * @return a reference to a local shard actor which represents the shard
101 * specified by the shardName
103 public ActorRef findLocalShard(String shardName) {
104 Object result = executeLocalOperation(shardManager,
105 new FindLocalShard(shardName), ASK_DURATION);
107 if (result instanceof LocalShardFound) {
108 LocalShardFound found = (LocalShardFound) result;
110 LOG.debug("Local shard found {}", found.getPath());
112 return found.getPath();
119 public String findPrimaryPath(String shardName) {
120 Object result = executeLocalOperation(shardManager,
121 new FindPrimary(shardName).toSerializable(), ASK_DURATION);
123 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
124 PrimaryFound found = PrimaryFound.fromSerializable(result);
126 LOG.debug("Primary found {}", found.getPrimaryPath());
128 return found.getPrimaryPath();
130 throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
135 * Executes an operation on a local actor and wait for it's response
140 * @return The response of the operation
142 public Object executeLocalOperation(ActorRef actor, Object message,
143 FiniteDuration duration) {
144 Future<Object> future =
145 ask(actor, message, new Timeout(duration));
148 return Await.result(future, AWAIT_DURATION);
149 } catch (Exception e) {
150 throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
155 * Execute an operation on a remote actor and wait for it's response
162 public Object executeRemoteOperation(ActorSelection actor, Object message,
163 FiniteDuration duration) {
165 LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
167 Future<Object> future =
168 ask(actor, message, new Timeout(duration));
171 return Await.result(future, AWAIT_DURATION);
172 } catch (Exception e) {
173 throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
178 * Execute an operation on the primary for a given shard
180 * This method first finds the primary for a given shard ,then sends
181 * the message to the remote shard and waits for a response
188 * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
189 * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
191 public Object executeShardOperation(String shardName, Object message,
192 FiniteDuration duration) {
193 ActorSelection primary = findPrimary(shardName);
195 return executeRemoteOperation(primary, message, duration);
199 * Execute an operation on the the local shard only
201 * This method first finds the address of the local shard if any. It then
202 * executes the operation on it.
205 * @param shardName the name of the shard on which the operation needs to be executed
206 * @param message the message that needs to be sent to the shard
207 * @param duration the time duration in which this operation should complete
208 * @return the message that was returned by the local actor on which the
209 * the operation was executed. If a local shard was not found then
211 * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
212 * if the operation does not complete in a specified time duration
214 public Object executeLocalShardOperation(String shardName, Object message,
215 FiniteDuration duration) {
216 ActorRef local = findLocalShard(shardName);
219 return executeLocalOperation(local, message, duration);
226 public void shutdown() {
227 shardManager.tell(PoisonPill.getInstance(), null);
228 actorSystem.shutdown();
232 * @deprecated Need to stop using this method. There are ways to send a
233 * remote ActorRef as a string which should be used instead of this hack
236 * @param localPathOfRemoteActor
240 public String resolvePath(final String primaryPath,
241 final String localPathOfRemoteActor) {
242 StringBuilder builder = new StringBuilder();
243 String[] primaryPathElements = primaryPath.split("/");
244 builder.append(primaryPathElements[0]).append("//")
245 .append(primaryPathElements[1]).append(primaryPathElements[2]);
246 String[] remotePathElements = localPathOfRemoteActor.split("/");
247 for (int i = 3; i < remotePathElements.length; i++) {
248 builder.append("/").append(remotePathElements[i]);
251 return builder.toString();
255 public ActorPath actorFor(String path){
256 return actorSystem.actorFor(path).path();
259 public String getCurrentMemberName(){
260 return clusterWrapper.getCurrentMemberName();