import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
import akka.util.Timeout;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.Configuration;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
private static final Logger
LOG = LoggerFactory.getLogger(ActorContext.class);
- public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
- public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
+ public static final FiniteDuration ASK_DURATION =
+ Duration.create(5, TimeUnit.SECONDS);
+ public static final Duration AWAIT_DURATION =
+ Duration.create(5, TimeUnit.SECONDS);
private final ActorSystem actorSystem;
private final ActorRef shardManager;
+ private final ClusterWrapper clusterWrapper;
+ private final Configuration configuration;
- public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
+ private SchemaContext schemaContext = null;
+
+ public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
+ ClusterWrapper clusterWrapper,
+ Configuration configuration) {
this.actorSystem = actorSystem;
this.shardManager = shardManager;
+ this.clusterWrapper = clusterWrapper;
+ this.configuration = configuration;
}
public ActorSystem getActorSystem() {
return shardManager;
}
- public ActorSelection actorSelection(String actorPath){
+ public ActorSelection actorSelection(String actorPath) {
return actorSystem.actorSelection(actorPath);
}
- public ActorSelection actorSelection(ActorPath actorPath){
+ public ActorSelection actorSelection(ActorPath actorPath) {
return actorSystem.actorSelection(actorPath);
}
* @return
*/
public ActorSelection findPrimary(String shardName) {
+ String path = findPrimaryPath(shardName);
+ return actorSystem.actorSelection(path);
+ }
+
+ public String findPrimaryPath(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindPrimary(shardName), ASK_DURATION);
+ new FindPrimary(shardName).toSerializable(), ASK_DURATION);
- if(result instanceof PrimaryFound){
- PrimaryFound found = (PrimaryFound) result;
+ if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
+ PrimaryFound found = PrimaryFound.fromSerializable(result);
- LOG.error("Primary found {}", found.getPrimaryPath());
+ LOG.debug("Primary found {}", found.getPrimaryPath());
- return actorSystem.actorSelection(found.getPrimaryPath());
+ return found.getPrimaryPath();
}
- throw new RuntimeException("primary was not found");
+ throw new PrimaryNotFoundException();
}
+
/**
* Executes an operation on a local actor and wait for it's response
+ *
* @param actor
* @param message
* @param duration
* @return The response of the operation
*/
public Object executeLocalOperation(ActorRef actor, Object message,
- FiniteDuration duration){
+ FiniteDuration duration) {
Future<Object> future =
ask(actor, message, new Timeout(duration));
try {
return Await.result(future, AWAIT_DURATION);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new TimeoutException(e);
}
}
/**
* Execute an operation on a remote actor and wait for it's response
+ *
* @param actor
* @param message
* @param duration
* @return
*/
public Object executeRemoteOperation(ActorSelection actor, Object message,
- FiniteDuration duration){
+ FiniteDuration duration) {
+
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
Future<Object> future =
ask(actor, message, new Timeout(duration));
try {
return Await.result(future, AWAIT_DURATION);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new TimeoutException(e);
}
}
/**
* Execute an operation on the primary for a given shard
* <p>
- * This method first finds the primary for a given shard ,then sends
- * the message to the remote shard and waits for a response
+ * This method first finds the primary for a given shard ,then sends
+ * the message to the remote shard and waits for a response
* </p>
+ *
* @param shardName
* @param message
* @param duration
- * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out
- *
* @return
+ * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
+ * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
*/
- public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
+ public Object executeShardOperation(String shardName, Object message,
+ FiniteDuration duration) {
ActorSelection primary = findPrimary(shardName);
return executeRemoteOperation(primary, message, duration);
}
+ public void shutdown() {
+ shardManager.tell(PoisonPill.getInstance(), null);
+ actorSystem.shutdown();
+ }
+
+ public String getRemoteActorPath(final String shardName,
+ final String localPathOfRemoteActor) {
+ final String path = findPrimaryPath(shardName);
+
+ LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
+ .expireAfterAccess(2, TimeUnit.SECONDS)
+ .build(
+ new CacheLoader<String, String>() {
+ public String load(String key) {
+ return resolvePath(path, localPathOfRemoteActor);
+ }
+ }
+ );
+ return graphs.getUnchecked(localPathOfRemoteActor);
+ }
+
+ public String resolvePath(final String primaryPath,
+ final String localPathOfRemoteActor) {
+ StringBuilder builder = new StringBuilder();
+ String[] primaryPathElements = primaryPath.split("/");
+ builder.append(primaryPathElements[0]).append("//")
+ .append(primaryPathElements[1]).append(primaryPathElements[2]);
+ String[] remotePathElements = localPathOfRemoteActor.split("/");
+ for (int i = 3; i < remotePathElements.length; i++) {
+ builder.append("/").append(remotePathElements[i]);
+ }
+
+ return builder.toString();
+
+ }
+
+ public ActorPath actorFor(String path){
+ return actorSystem.actorFor(path).path();
+ }
+
+ public String getCurrentMemberName(){
+ return clusterWrapper.getCurrentMemberName();
+ }
+
}