X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=7b5588cb196a66fa68de947fecc58137977275ae;hp=b87dc4f608b191c21d5fa34b8bf4a9744864f96a;hb=9f61e98b036119694dfef0759a7cafc56aae6e86;hpb=15fa131be8b16703089a6d8508546120cf15d45d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index b87dc4f608..7b5588cb19 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -13,8 +13,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.PoisonPill; +import akka.pattern.Patterns; import akka.util.Timeout; - import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -27,7 +27,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -176,7 +175,8 @@ public class ActorContext { */ public Object executeRemoteOperation(ActorSelection actor, Object message) { - LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString()); + LOG.debug("Sending remote message {} to {}", message.getClass().toString(), + actor.toString()); Future future = ask(actor, message, operationTimeout); @@ -213,6 +213,13 @@ public class ActorContext { actor.tell(message, ActorRef.noSender()); } + public void sendShardOperationAsync(String shardName, Object message) { + ActorSelection primary = findPrimary(shardName); + + primary.tell(message, ActorRef.noSender()); + } + + /** * Execute an operation on the primary for a given shard *

@@ -258,6 +265,30 @@ public class ActorContext { } + /** + * Execute an operation on the the local shard only asynchronously + * + *

+ * This method first finds the address of the local shard if any. It then + * executes the operation on it. + *

+ * + * @param shardName the name of the shard on which the operation needs to be executed + * @param message the message that needs to be sent to the shard + * @param timeout the amount of time that this method should wait for a response before timing out + * @return null if the shard could not be located else a future on which the caller can wait + * + */ + public Future executeLocalShardOperationAsync(String shardName, Object message, Timeout timeout) { + ActorRef local = findLocalShard(shardName); + if(local == null){ + return null; + } + return Patterns.ask(local, message, timeout); + } + + + public void shutdown() { shardManager.tell(PoisonPill.getInstance(), null); actorSystem.shutdown(); @@ -295,4 +326,22 @@ public class ActorContext { return clusterWrapper.getCurrentMemberName(); } + /** + * Send the message to each and every shard + * + * @param message + */ + public void broadcast(Object message){ + for(String shardName : configuration.getAllShardNames()){ + try { + sendShardOperationAsync(shardName, message); + } catch(Exception e){ + LOG.warn("broadcast failed to send message " + message.getClass().getSimpleName() + " to shard " + shardName, e); + } + } + } + + public FiniteDuration getOperationDuration() { + return operationDuration; + } }