import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.util.Timeout;
+
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
-
- private SchemaContext schemaContext = null;
+ private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
return actorSystem.actorSelection(actorPath);
}
+ public void setSchemaContext(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+
+ if(shardManager != null) {
+ shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+ }
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
/**
* Finds the primary for a given shard
return actorSystem.actorSelection(path);
}
+ /**
+ * Finds a local shard given it's shard name and return it's ActorRef
+ *
+ * @param shardName the name of the local shard that needs to be found
+ * @return a reference to a local shard actor which represents the shard
+ * specified by the shardName
+ */
+ public ActorRef findLocalShard(String shardName) {
+ Object result = executeLocalOperation(shardManager,
+ new FindLocalShard(shardName), ASK_DURATION);
+
+ if (result instanceof LocalShardFound) {
+ LocalShardFound found = (LocalShardFound) result;
+
+ LOG.debug("Local shard found {}", found.getPath());
+
+ return found.getPath();
+ }
+
+ return null;
+ }
+
+
public String findPrimaryPath(String shardName) {
Object result = executeLocalOperation(shardManager,
new FindPrimary(shardName).toSerializable(), ASK_DURATION);
}
}
+ /**
+ * Execute an operation on a remote actor asynchronously.
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ * @param duration the maximum amount of time to send he message
+ * @return a Future containing the eventual result
+ */
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
+ FiniteDuration duration) {
+
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
+ return ask(actor, message, new Timeout(duration));
+ }
+
+ /**
+ * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+ * reply (essentially set and forget).
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ */
+ public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
+ actor.tell(message, ActorRef.noSender());
+ }
+
/**
* Execute an operation on the primary for a given shard
* <p>
return executeRemoteOperation(primary, message, duration);
}
+ /**
+ * Execute an operation on the the local shard only
+ * <p>
+ * This method first finds the address of the local shard if any. It then
+ * executes the operation on it.
+ * </p>
+ *
+ * @param shardName the name of the shard on which the operation needs to be executed
+ * @param message the message that needs to be sent to the shard
+ * @param duration the time duration in which this operation should complete
+ * @return the message that was returned by the local actor on which the
+ * the operation was executed. If a local shard was not found then
+ * null is returned
+ * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
+ * if the operation does not complete in a specified time duration
+ */
+ public Object executeLocalShardOperation(String shardName, Object message,
+ FiniteDuration duration) {
+ ActorRef local = findLocalShard(shardName);
+
+ if(local != null) {
+ return executeLocalOperation(local, message, duration);
+ }
+
+ return null;
+ }
+
+
public void shutdown() {
shardManager.tell(PoisonPill.getInstance(), null);
actorSystem.shutdown();