import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.util.Timeout;
+
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.Configuration;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
private final Configuration configuration;
-
- private SchemaContext schemaContext = null;
+ private volatile SchemaContext schemaContext;
public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
ClusterWrapper clusterWrapper,
return actorSystem.actorSelection(actorPath);
}
+ public void setSchemaContext(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+
+ if(shardManager != null) {
+ shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+ }
+ }
+
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
/**
* Finds the primary for a given shard
}
}
+ /**
+ * Execute an operation on a remote actor asynchronously.
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ * @param duration the maximum amount of time to send he message
+ * @return a Future containing the eventual result
+ */
+ public Future<Object> executeRemoteOperationAsync(ActorSelection actor, Object message,
+ FiniteDuration duration) {
+
+ LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
+
+ return ask(actor, message, new Timeout(duration));
+ }
+
+ /**
+ * Sends an operation to be executed by a remote actor asynchronously without waiting for a
+ * reply (essentially set and forget).
+ *
+ * @param actor the ActorSelection
+ * @param message the message to send
+ */
+ public void sendRemoteOperationAsync(ActorSelection actor, Object message) {
+ actor.tell(message, ActorRef.noSender());
+ }
+
/**
* Execute an operation on the primary for a given shard
* <p>