import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.util.Timeout;
+
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
private static final Logger
LOG = LoggerFactory.getLogger(ActorContext.class);
- public static final FiniteDuration ASK_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
- public static final Duration AWAIT_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
+ private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
+
+ public static final String MAILBOX = "bounded-mailbox";
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
-
- private SchemaContext schemaContext = null;
+ private volatile SchemaContext schemaContext;
+ private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+ private Timeout operationTimeout = new Timeout(operationDuration);
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
return actorSystem.actorSelection(actorPath);
}
+ public void setSchemaContext(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+
+ if(shardManager != null) {
+ shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+ }
+ }
+
+ public void setOperationTimeout(int timeoutInSeconds) {
+ operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
/**
* Finds the primary for a given shard
return actorSystem.actorSelection(path);
}
+ /**
+ * Finds a local shard given it's shard name and return it's ActorRef
+ *
+ * @param shardName the name of the local shard that needs to be found
+ * @return a reference to a local shard actor which represents the shard
+ * specified by the shardName
+ */
+ public ActorRef findLocalShard(String shardName) {
+ Object result = executeLocalOperation(shardManager,
+ new FindLocalShard(shardName));
+
+ if (result instanceof LocalShardFound) {
+ LocalShardFound found = (LocalShardFound) result;
+
+ LOG.debug("Local shard found {}", found.getPath());
+
+ return found.getPath();
+ }
+
+ return null;
+ }
+
+
public String findPrimaryPath(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+ new FindPrimary(shardName).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
*
* @param actor
* @param message
- * @param duration
* @return The response of the operation
*/
- public Object executeLocalOperation(ActorRef actor, Object message,
- FiniteDuration duration) {
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ public Object executeLocalOperation(ActorRef actor, Object message) {
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
}
*
* @param actor
* @param message
- * @param duration
* @return
*/
- public Object executeRemoteOperation(ActorSelection actor, Object message,
- FiniteDuration duration) {
+ public Object executeRemoteOperation(ActorSelection actor, Object message) {
LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
- throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+ throw new TimeoutException("Sending message " + message.getClass().toString() +
+ " to actor " + actor.toString() + " failed" , e);
}
}
+ /**
+ * Execute an operation on a remote actor asynchronously.
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ * @return a Future containing the eventual result
+ */
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
+
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
+ return ask(actor, message, operationTimeout);
+ }
+
+ /**
+ * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+ * reply (essentially set and forget).
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ */
+ public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
+ actor.tell(message, ActorRef.noSender());
+ }
+
/**
* Execute an operation on the primary for a given shard
* <p>
*
* @param shardName
* @param message
- * @param duration
* @return
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
* @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
*/
- public Object executeShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeShardOperation(String shardName, Object message) {
ActorSelection primary = findPrimary(shardName);
- return executeRemoteOperation(primary, message, duration);
+ return executeRemoteOperation(primary, message);
+ }
+
+ /**
+ * Execute an operation on the the local shard only
+ * <p>
+ * This method first finds the address of the local shard if any. It then
+ * executes the operation on it.
+ * </p>
+ *
+ * @param shardName the name of the shard on which the operation needs to be executed
+ * @param message the message that needs to be sent to the shard
+ * @return the message that was returned by the local actor on which the
+ * the operation was executed. If a local shard was not found then
+ * null is returned
+ * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
+ * if the operation does not complete in a specified time duration
+ */
+ public Object executeLocalShardOperation(String shardName, Object message) {
+ ActorRef local = findLocalShard(shardName);
+
+ if(local != null) {
+ return executeLocalOperation(local, message);
+ }
+
+ return null;
}
+
public void shutdown() {
shardManager.tell(PoisonPill.getInstance(), null);
actorSystem.shutdown();