import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.util.Timeout;
+
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
private static final Logger
LOG = LoggerFactory.getLogger(ActorContext.class);
- public static final FiniteDuration ASK_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
- public static final Duration AWAIT_DURATION =
- Duration.create(5, TimeUnit.SECONDS);
+ private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
+
+ public static final String MAILBOX = "bounded-mailbox";
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
-
- private SchemaContext schemaContext = null;
+ private volatile SchemaContext schemaContext;
+ private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
+ private Timeout operationTimeout = new Timeout(operationDuration);
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
return actorSystem.actorSelection(actorPath);
}
+ public void setSchemaContext(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+
+ if(shardManager != null) {
+ shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+ }
+ }
+
+ public void setOperationTimeout(int timeoutInSeconds) {
+ operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
+ operationTimeout = new Timeout(operationDuration);
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
/**
* Finds the primary for a given shard
*/
public ActorRef findLocalShard(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindLocalShard(shardName), ASK_DURATION);
+ new FindLocalShard(shardName));
if (result instanceof LocalShardFound) {
LocalShardFound found = (LocalShardFound) result;
public String findPrimaryPath(String shardName) {
Object result = executeLocalOperation(shardManager,
- new FindPrimary(shardName).toSerializable(), ASK_DURATION);
+ new FindPrimary(shardName).toSerializable());
if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
PrimaryFound found = PrimaryFound.fromSerializable(result);
*
* @param actor
* @param message
- * @param duration
* @return The response of the operation
*/
- public Object executeLocalOperation(ActorRef actor, Object message,
- FiniteDuration duration) {
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ public Object executeLocalOperation(ActorRef actor, Object message) {
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
}
*
* @param actor
* @param message
- * @param duration
* @return
*/
- public Object executeRemoteOperation(ActorSelection actor, Object message,
- FiniteDuration duration) {
+ public Object executeRemoteOperation(ActorSelection actor, Object message) {
LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
- Future<Object> future =
- ask(actor, message, new Timeout(duration));
+ Future<Object> future = ask(actor, message, operationTimeout);
try {
- return Await.result(future, AWAIT_DURATION);
+ return Await.result(future, operationDuration);
} catch (Exception e) {
- throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
+ throw new TimeoutException("Sending message " + message.getClass().toString() +
+ " to actor " + actor.toString() + " failed" , e);
}
}
+ /**
+ * Execute an operation on a remote actor asynchronously.
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ * @return a Future containing the eventual result
+ */
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message) {
+
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
+ return ask(actor, message, operationTimeout);
+ }
+
+ /**
+ * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+ * reply (essentially set and forget).
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ */
+ public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
+ actor.tell(message, ActorRef.noSender());
+ }
+
/**
* Execute an operation on the primary for a given shard
* <p>
*
* @param shardName
* @param message
- * @param duration
* @return
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
* @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
*/
- public Object executeShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeShardOperation(String shardName, Object message) {
ActorSelection primary = findPrimary(shardName);
- return executeRemoteOperation(primary, message, duration);
+ return executeRemoteOperation(primary, message);
}
/**
*
* @param shardName the name of the shard on which the operation needs to be executed
* @param message the message that needs to be sent to the shard
- * @param duration the time duration in which this operation should complete
* @return the message that was returned by the local actor on which the
* the operation was executed. If a local shard was not found then
* null is returned
* @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException
* if the operation does not complete in a specified time duration
*/
- public Object executeLocalShardOperation(String shardName, Object message,
- FiniteDuration duration) {
+ public Object executeLocalShardOperation(String shardName, Object message) {
ActorRef local = findLocalShard(shardName);
if(local != null) {
- return executeLocalOperation(local, message, duration);
+ return executeLocalOperation(local, message);
}
return null;