X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContext.java;h=c9fdf389311f73c70ca2e0f16dae8e86b7cc0a05;hb=77f32f904a0338742dd5357b07e2d4ed465eb394;hp=904dcdf43989bebfeb180b5f56c489cbf035621a;hpb=f5a373c5378af41f62a2c36ced4046fbdb77e00b;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index 904dcdf439..c9fdf38931 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -8,16 +8,21 @@ package org.opendaylight.controller.cluster.datastore.utils; +import static akka.pattern.Patterns.ask; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.actor.Address; import akka.actor.PoisonPill; import akka.dispatch.Mapper; import akka.pattern.AskTimeoutException; import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; @@ -41,8 +46,6 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.concurrent.TimeUnit; -import static akka.pattern.Patterns.ask; /** * The ActorContext class contains utility methods which could be used by @@ -80,6 +83,8 @@ public class ActorContext { private volatile SchemaContext schemaContext; private final FiniteDuration operationDuration; private final Timeout operationTimeout; + private final String selfAddressHostPort; + private final int transactionOutstandingOperationLimit; public ActorContext(ActorSystem actorSystem, ActorRef shardManager, ClusterWrapper clusterWrapper, Configuration configuration) { @@ -99,6 +104,15 @@ public class ActorContext { operationDuration = Duration.create(datastoreContext.getOperationTimeoutInSeconds(), TimeUnit.SECONDS); operationTimeout = new Timeout(operationDuration); + + Address selfAddress = clusterWrapper.getSelfAddress(); + if (selfAddress != null && !selfAddress.host().isEmpty()) { + selfAddressHostPort = selfAddress.host().get() + ":" + selfAddress.port().get(); + } else { + selfAddressHostPort = null; + } + + transactionOutstandingOperationLimit = new CommonConfig(this.getActorSystem().settings().config()).getMailBoxCapacity(); } public DatastoreContext getDatastoreContext() { @@ -268,7 +282,7 @@ public class ActorContext { Preconditions.checkArgument(actor != null, "actor must not be null"); Preconditions.checkArgument(message != null, "message must not be null"); - LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); + LOG.debug("Sending message {} to {}", message.getClass(), actor); return ask(actor, message, timeout); } @@ -303,7 +317,7 @@ public class ActorContext { Preconditions.checkArgument(actor != null, "actor must not be null"); Preconditions.checkArgument(message != null, "message must not be null"); - LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); + LOG.debug("Sending message {} to {}", message.getClass(), actor); return ask(actor, message, timeout); } @@ -330,7 +344,7 @@ public class ActorContext { Preconditions.checkArgument(actor != null, "actor must not be null"); Preconditions.checkArgument(message != null, "message must not be null"); - LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString()); + LOG.debug("Sending message {} to {}", message.getClass(), actor); actor.tell(message, ActorRef.noSender()); } @@ -370,30 +384,31 @@ public class ActorContext { return operationDuration; } - public boolean isLocalPath(String path) { - String selfAddress = clusterWrapper.getSelfAddress(); - if (path == null || selfAddress == null) { + public boolean isPathLocal(String path) { + if (Strings.isNullOrEmpty(path)) { return false; } - int atIndex1 = path.indexOf("@"); - int atIndex2 = selfAddress.indexOf("@"); + int pathAtIndex = path.indexOf('@'); + if (pathAtIndex == -1) { + //if the path is of local format, then its local and is co-located + return true; - if (atIndex1 == -1 || atIndex2 == -1) { - return false; - } + } else if (selfAddressHostPort != null) { + // self-address and tx actor path, both are of remote path format + int slashIndex = path.indexOf('/', pathAtIndex); + + if (slashIndex == -1) { + return false; + } - int slashIndex1 = path.indexOf("/", atIndex1); - int slashIndex2 = selfAddress.indexOf("/", atIndex2); + String hostPort = path.substring(pathAtIndex + 1, slashIndex); + return hostPort.equals(selfAddressHostPort); - if (slashIndex1 == -1 || slashIndex2 == -1) { + } else { + // self address is local format and tx actor path is remote format return false; } - - String hostPort1 = path.substring(atIndex1, slashIndex1); - String hostPort2 = selfAddress.substring(atIndex2, slashIndex2); - - return hostPort1.equals(hostPort2); } /** @@ -419,4 +434,16 @@ public class ActorContext { return builder.toString(); } + + /** + * Get the maximum number of operations that are to be permitted within a transaction before the transaction + * should begin throttling the operations + * + * Parking reading this configuration here because we need to get to the actor system settings + * + * @return + */ + public int getTransactionOutstandingOperationLimit(){ + return transactionOutstandingOperationLimit; + } }