Changed key actors to use bounded mailbox 67/10267/5
authorAbhishek Kumar <abhishk2@cisco.com>
Fri, 22 Aug 2014 21:18:19 +0000 (14:18 -0700)
committerAbhishek Kumar <abhishk2@cisco.com>
Fri, 29 Aug 2014 18:33:35 +0000 (11:33 -0700)
1. Changed key actors in clustering components to use
bounded mailbox.

2. Implementation of Bounded mailbox queue changed to
 use deque as its required by Shard Actors.

3. Cleanup of hard coded values.

Change-Id: I9fb87e1857f2aa9fee0819c4dff63ca3c967ec60
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
24 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/md-sal/sal-clustering-commons/pom.xml
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/pom.xml
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogicTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf

index 425e968cf7ee9255cde33a7440834ac0ae05a576..b812fcbd8b8e12746509bf09ef293a821f70c61a 100644 (file)
@@ -39,6 +39,7 @@
     <clustering.test.version>0.4.2-SNAPSHOT</clustering.test.version>
     <commmons.northbound.version>0.4.2-SNAPSHOT</commmons.northbound.version>
     <!-- Third Party Versions -->
+    <codahale.metrics.version>3.0.1</codahale.metrics.version>
     <commons.catalina>7.0.32.v201211201336</commons.catalina>
     <commons.catalina.ha>7.0.32.v201211201952</commons.catalina.ha>
     <commons.catalina.tribes>7.0.32.v201211201952</commons.catalina.tribes>
         <artifactId>logback-core</artifactId>
         <version>${logback.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.codahale.metrics</groupId>
+        <artifactId>metrics-core</artifactId>
+        <version>${codahale.metrics.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>com.codahale.metrics</groupId>
+        <artifactId>metrics-graphite</artifactId>
+        <version>${codahale.metrics.version}</version>
+      </dependency>
       <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-annotations</artifactId>
index 2bf1a5597362c9eeee9a312ea179dda1a093815f..b8980cd0bea46b22676ccec1cb38def91239d275 100644 (file)
           <artifactId>jsr305</artifactId>
           <version>2.0.1</version>
       </dependency>
-
       <dependency>
           <groupId>com.codahale.metrics</groupId>
           <artifactId>metrics-core</artifactId>
-          <version>3.0.1</version>
+      </dependency>
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-graphite</artifactId>
       </dependency>
   </dependencies>
   <build>
index 646431522e3c9d4ee3b7b3703839505b38f39bfd..c6d3625ac3ca4ba2cb5e093a64a8681e5c1b4b81 100644 (file)
@@ -11,9 +11,8 @@ package org.opendaylight.controller.common.actor;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.dispatch.BoundedMailbox;
+import akka.dispatch.BoundedDequeBasedMailbox;
 import akka.dispatch.MailboxType;
-import akka.dispatch.MessageQueue;
 import akka.dispatch.ProducesMessageQueue;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
@@ -24,7 +23,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.TimeUnit;
 
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<BoundedMailbox.MessageQueue> {
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
 
     private MeteredMessageQueue queue;
     private Integer capacity;
@@ -33,16 +32,18 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
     private MetricsReporter reporter;
 
     private final String QUEUE_SIZE = "queue-size";
+    private final String CAPACITY = "mailbox-capacity";
+    private final String TIMEOUT  = "mailbox-push-timeout-time";
     private final Long DEFAULT_TIMEOUT = 10L;
 
     public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
         Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
-        this.capacity = config.getInt("mailbox-capacity");
+        this.capacity = config.getInt(CAPACITY);
         Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
 
         Long timeout = -1L;
-        if ( config.hasPath("mailbox-push-timeout-time") ){
-            timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS);
+        if ( config.hasPath(TIMEOUT) ){
+            timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS);
         } else {
             timeout = DEFAULT_TIMEOUT;
         }
@@ -54,7 +55,7 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
 
 
     @Override
-    public MessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+    public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
         this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
         monitorQueueSize(owner, this.queue);
         return this.queue;
@@ -65,14 +66,15 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
             return; //there's no actor to monitor
         }
         actorPath = owner.get().path();
-        MetricRegistry registry = reporter.getMetricsRegistry();
+        String actorInstanceId = Integer.toString(owner.get().hashCode());
 
-        String actorName = registry.name(actorPath.toString(), QUEUE_SIZE);
+        MetricRegistry registry = reporter.getMetricsRegistry();
+        String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE);
 
         if (registry.getMetrics().containsKey(actorName))
             return; //already registered
 
-        reporter.getMetricsRegistry().register(actorName,
+        registry.register(actorName,
                 new Gauge<Integer>() {
                     @Override
                     public Integer getValue() {
@@ -82,7 +84,7 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<
     }
 
 
-    public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue {
+    public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
 
         public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
             super(capacity, pushTimeOut);
index 0535179aadf79c4196a8f606433c1cd8b4e89949..05322137aafc2d62b6b18e548a419be91446aeeb 100644 (file)
@@ -1,5 +1,10 @@
 
 odl-cluster-data {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }    
   akka {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
@@ -33,6 +38,11 @@ odl-cluster-data {
 }
 
 odl-cluster-rpc {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
   akka {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
index 4669fbdb61ec7a28c67af91cb9157cfbaa81965b..31b0bb80c05ca221fc865b6d8de467a4e43ebac8 100644 (file)
       <dependency>
           <groupId>com.codahale.metrics</groupId>
           <artifactId>metrics-core</artifactId>
-          <version>3.0.1</version>
+      </dependency>
+
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-graphite</artifactId>
       </dependency>
     <!-- Test Dependencies -->
     <dependency>
index 51f3735f81ee6a9f4a5ad192cb6d1f1c27ebd031..b97164839277279e35479ea8f69b5cd8c459d418 100644 (file)
@@ -67,9 +67,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
                 Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
                         TimeUnit.MINUTES));
 
-        actorContext = new ActorContext(actorSystem, actorSystem
-            .actorOf(ShardManager.props(type, cluster, configuration, shardContext),
-                shardManagerId ), cluster, configuration);
+        actorContext
+                = new ActorContext(
+                    actorSystem, actorSystem.actorOf(
+                        ShardManager.props(type, cluster, configuration, shardContext).
+                            withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
     }
 
     public DistributedDataStore(ActorContext actorContext) {
index c9b7c07e9a3a44d1b0c64425122e53a5b3199543..186f2cff41351a97f9d1852396a149047bd345cc 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import scala.concurrent.duration.Duration;
 
 import java.util.ArrayList;
@@ -244,8 +245,9 @@ public class ShardManager extends AbstractUntypedActor {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, shardContext),
-                    shardId.toString());
+                .actorOf(Shard.props(shardId, peerAddresses, shardContext).
+                    withMailbox(ActorContext.MAILBOX), shardId.toString());
+
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
index f76430f5a1cb1cdb9e84f3b92f85c923a7111967..818a8ca8b390ec2fcb33f2cad91ad597fa7ca758 100644 (file)
@@ -52,6 +52,8 @@ public class ActorContext {
     public static final Duration AWAIT_DURATION =
         Duration.create(5, TimeUnit.SECONDS);
 
+    public static final String MAILBOX = "bounded-mailbox";
+
     private final ActorSystem actorSystem;
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
index 8af9bd07d793cfb0eb0dae33414b6631d109e4a9..c29f93bb073cb712df045bf4839cc247a2b49111 100644 (file)
@@ -1,5 +1,10 @@
 
 odl-cluster-data {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
   akka {
     loggers = ["akka.event.slf4j.Slf4jLogger"]
     cluster {
index e6f68c032a325f59bea291e2f17e129fd47da766..d16e4f53de7552bd23a1be0ca8d022071bd07245 100644 (file)
@@ -46,7 +46,7 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("10 seconds")) {
                 @Override
                 protected void run() {
 
@@ -71,7 +71,7 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("10 seconds")) {
                 @Override
                 protected void run() {
 
@@ -95,13 +95,13 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("10 seconds")) {
                 @Override
                 protected void run() {
 
                     subject.tell(new FindLocalShard("inventory"), getRef());
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "find local") {
+                    final String out = new ExpectMsg<String>(duration("10 seconds"), "find local") {
                         @Override
                         protected String match(Object in) {
                             if (in instanceof LocalShardNotFound) {
@@ -132,13 +132,13 @@ public class ShardManagerTest {
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("10 seconds")) {
                 @Override
                 protected void run() {
 
                     subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
 
-                    final ActorRef out = new ExpectMsg<ActorRef>(duration("1 seconds"), "find local") {
+                    final ActorRef out = new ExpectMsg<ActorRef>(duration("10 seconds"), "find local") {
                         @Override
                         protected ActorRef match(Object in) {
                             if (in instanceof LocalShardFound) {
@@ -169,7 +169,7 @@ public class ShardManagerTest {
                 TestActorRef.create(system, props);
 
             // the run() method needs to finish within 3 seconds
-            new Within(duration("1 seconds")) {
+            new Within(duration("10 seconds")) {
                 @Override
                 protected void run() {
 
@@ -209,7 +209,7 @@ public class ShardManagerTest {
                 TestActorRef.create(system, props);
 
             // the run() method needs to finish within 3 seconds
-            new Within(duration("1 seconds")) {
+            new Within(duration("10 seconds")) {
                 @Override
                 protected void run() {
 
index 27b0374bacbaa6b224c52ceaf7d33a83ffa554fb..6851b1b72ce39d4c011cb0771b95014bb31ade32 100644 (file)
@@ -14,3 +14,8 @@ akka {
         }
     }
 }
+bounded-mailbox {
+  mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+  mailbox-capacity = 1000
+  mailbox-push-timeout-time = 100ms
+}
index 6e92a2203792f6c262dd7aec4101a07d843b4e48..08450b3e789892429bc37aa1f68b015fae2271fb 100644 (file)
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-api</artifactId>
-
     </dependency>
       <dependency>
           <groupId>org.opendaylight.yangtools</groupId>
           <artifactId>yang-model-api</artifactId>
-
       </dependency>
-
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-impl</artifactId>
-
     </dependency>
-
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-common</artifactId>
-
     </dependency>
-
-
     <dependency>
       <groupId>org.osgi</groupId>
       <artifactId>org.osgi.core</artifactId>
     </dependency>
-
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
-
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-library</artifactId>
     <dependency>
       <groupId>com.codahale.metrics</groupId>
       <artifactId>metrics-core</artifactId>
-      <version>3.0.1</version>
     </dependency>
+
+      <dependency>
+          <groupId>com.codahale.metrics</groupId>
+          <artifactId>metrics-graphite</artifactId>
+      </dependency>
     <!-- Test Dependencies -->
     <dependency>
       <groupId>junit</groupId>
index 96f24724286192131c08177943a572c45dba5a96..d4da226b9dc4278cd508e83082283a3163c6615c 100644 (file)
@@ -15,8 +15,11 @@ import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.japi.Creator;
 import akka.japi.Function;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -69,10 +72,16 @@ public class RpcManager extends AbstractUntypedActor {
   private void createRpcActors() {
     LOG.debug("Create rpc registry and broker actors");
 
+      Config conf = ConfigFactory.load();
 
-    rpcRegistry = getContext().actorOf(Props.create(RpcRegistry.class), ActorConstants.RPC_REGISTRY);
+    rpcRegistry =
+            getContext().actorOf(Props.create(RpcRegistry.class).
+                withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY);
+
+    rpcBroker =
+            getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
+                withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER);
 
-    rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER);
     RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
     rpcRegistry.tell(localRouter, self());
   }
index 76f59304576b731f1a15df9c277628ad9ff611d6..5109d316446b13158e3739824e653e0259929135 100644 (file)
@@ -20,6 +20,7 @@ import akka.pattern.Patterns;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
 import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import scala.concurrent.Future;
 
@@ -107,7 +108,7 @@ public class RpcRegistry extends UntypedActor {
 
         Preconditions.checkState(localRouter != null, "Router must be set first");
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
         futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
     }
 
@@ -116,7 +117,7 @@ public class RpcRegistry extends UntypedActor {
      */
     private void receiveRemoveRoutes(RemoveRoutes msg) {
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
         futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
 
     }
@@ -129,7 +130,7 @@ public class RpcRegistry extends UntypedActor {
     private void receiveGetRouter(FindRouters msg) {
         final ActorRef sender = getSender();
 
-        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
+        Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
         futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
     }
 
index 3b078aa062fdf1fc3feaca7fa10147de8c3503cc..ff51f4fcfa671ff4ae71be72c6403b741e3de8dd 100644 (file)
@@ -16,6 +16,7 @@ import akka.actor.UntypedActor;
 import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import org.opendaylight.controller.utils.ConditionalProbe;
 
 import java.util.HashMap;
@@ -76,7 +77,7 @@ public class BucketStore extends UntypedActor {
         selfAddress = provider.getDefaultAddress();
 
         if ( provider instanceof ClusterActorRefProvider)
-            getContext().actorOf(Props.create(Gossiper.class), "gossiper");
+            getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper");
     }
 
     @Override
index a8bc25c40ba14b2ecf8c45926f7871f189dbdafa..f6ce5e55f3ee63602fc92529e92d6e93d0ff9bb3 100644 (file)
@@ -21,6 +21,7 @@ import akka.dispatch.Mapper;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.pattern.Patterns;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -105,7 +106,7 @@ public class Gossiper extends UntypedActor {
         if (autoStartGossipTicks) {
             gossipTask = getContext().system().scheduler().schedule(
                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
-                    new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
+                    ActorUtil.GOSSIP_TICK_INTERVAL,                 //interval
                     getSelf(),                                       //target
                     new Messages.GossiperMessages.GossipTick(),      //message
                     getContext().dispatcher(),                       //execution context
@@ -227,7 +228,9 @@ public class Gossiper extends UntypedActor {
             return;
 
         final ActorRef sender = getSender();
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+
         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
 
     }
@@ -267,7 +270,8 @@ public class Gossiper extends UntypedActor {
      */
     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
 
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
     }
 
@@ -279,7 +283,10 @@ public class Gossiper extends UntypedActor {
     void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
 
         //Get local status from bucket store and send to remote
-        Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+        Future<Object> futureReply =
+                Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+
+        //Find gossiper on remote system
         ActorSelection remoteRef = getContext().system().actorSelection(
                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
 
index b7b2216a08b0293fccc733836355ed8e4bd5767a..ca14fecb4c4bc65d4d56f66b67b2533fd802ce2a 100644 (file)
@@ -20,28 +20,32 @@ import java.util.concurrent.TimeUnit;
 import static akka.pattern.Patterns.ask;
 
 public class ActorUtil {
-  public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
-  public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
-  public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
-  public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS);
-  public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS);
-  public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS);
-
-  /**
-   * Executes an operation on a local actor and wait for it's response
-   * @param actor
-   * @param message
-   * @param askDuration
-   * @param awaitDuration
-   * @return The response of the operation
-   */
-  public static Object executeOperation(ActorRef actor, Object message,
-                                        FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{
-    Future<Object> future =
-        ask(actor, message, new Timeout(askDuration));
-
-      return Await.result(future, awaitDuration);
-  }
+    public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
+    public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
+    public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
+    public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS);
+    public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS);
+    public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS);
+    public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS);
+    public static final String MAILBOX = "bounded-mailbox";
+
+
+    /**
+     * Executes an operation on a local actor and wait for it's response
+     *
+     * @param actor
+     * @param message
+     * @param askDuration
+     * @param awaitDuration
+     * @return The response of the operation
+     */
+    public static Object executeOperation(ActorRef actor, Object message,
+                                          FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception {
+        Future<Object> future =
+                ask(actor, message, new Timeout(askDuration));
+
+        return Await.result(future, awaitDuration);
+    }
 
 
 }
index 711ae1c48b75c794bad3cb93de12e397632bcbfd..266832a0ab0491dfd5d58d2c0414c2733ef0300b 100644 (file)
@@ -38,10 +38,15 @@ odl-cluster-data {
 }
 
 odl-cluster-rpc {
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 100ms
+  }
+
   akka {
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
-
     }
     remote {
       log-remote-lifecycle-events = off
index 17ad237ad7e4302065c08ff8796a058cd78d7885..8a7e4a039846205846e1b54b21981f78af843783 100644 (file)
@@ -14,8 +14,8 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.sal.core.api.Broker;
@@ -25,7 +25,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.duration.Duration;
 
-
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.mock;
@@ -38,7 +37,7 @@ public class RemoteRpcProviderTest {
 
   @BeforeClass
   public static void setup() throws InterruptedException {
-    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster"));
+    system = ActorSystem.create("odl-cluster-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
   }
 
   @AfterClass
@@ -59,7 +58,4 @@ public class RemoteRpcProviderTest {
         Duration.create(2, TimeUnit.SECONDS));
     Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH));
   }
-
-
-
 }
index 9b6215addd841633cf1dd57ce42c635d1dae89b7..98a33bf4e64a680adcc40d2dd3e92823a86c5fb6 100644 (file)
@@ -32,7 +32,7 @@ public class RouteRpcListenerTest {
 
   @BeforeClass
   public static void setup() throws InterruptedException {
-    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster"));
+    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
   }
 
   @AfterClass
index 7b5a968866ca1ecca306b42fc09d76b0a6aaf20a..956e1599904ccc52303ff380892bc3531861cd5b 100644 (file)
@@ -28,7 +28,7 @@ public class RpcListenerTest {
 
   @BeforeClass
   public static void setup() throws InterruptedException {
-    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster"));
+    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
   }
 
   @AfterClass
index e6793741a3ec2ef9030a2e469058ab92de93c153..83f52930b2b07ac32c75f83f437f1048132b54df 100644 (file)
@@ -10,10 +10,8 @@ import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Predicate;
 import com.typesafe.config.ConfigFactory;
-
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -22,8 +20,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.utils.ConditionalProbe;
 import org.opendaylight.yangtools.yang.common.QName;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import javax.annotation.Nullable;
@@ -33,9 +29,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 
 public class RpcRegistryTest {
 
@@ -95,7 +91,6 @@ public class RpcRegistryTest {
    */
   @Test
   public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
-    validateSystemStartup();
 
     final JavaTestKit mockBroker = new JavaTestKit(node1);
 
@@ -137,8 +132,6 @@ public class RpcRegistryTest {
   @Test
   public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
 
-    validateSystemStartup();
-
     final JavaTestKit mockBroker1 = new JavaTestKit(node1);
 
     //install probe on node2's bucket store
@@ -146,7 +139,6 @@ public class RpcRegistryTest {
     final JavaTestKit probe2 = createProbeForMessage(
         node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
 
-
     //Add rpc on node 1
     registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
     registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
@@ -175,8 +167,6 @@ public class RpcRegistryTest {
   @Test
   public void testRpcAddedOnMultiNodes() throws Exception {
 
-    validateSystemStartup();
-
     final JavaTestKit mockBroker1 = new JavaTestKit(node1);
     final JavaTestKit mockBroker2 = new JavaTestKit(node2);
     final JavaTestKit mockBroker3 = new JavaTestKit(node3);
@@ -225,49 +215,6 @@ public class RpcRegistryTest {
 
   }
 
-  private void validateSystemStartup() throws InterruptedException {
-
-    ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
-    ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
-    ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
-
-    ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
-    ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
-    ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
-
-
-    if (!resolveReference(gossiper1, gossiper2, gossiper3))
-      Assert.fail("Could not find gossipers");
-  }
-
-  private Boolean resolveReference(ActorSelection... gossipers) {
-
-    Boolean resolved = true;
-    for (int i = 0; i < 5; i++) {
-
-      resolved = true;
-      System.out.println(System.currentTimeMillis() + " Resolving gossipers; trial #" + i);
-
-      for (ActorSelection gossiper : gossipers) {
-        ActorRef ref = null;
-
-        try {
-          Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(15000, TimeUnit.MILLISECONDS));
-          ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
-        } catch (Exception e) {
-          System.out.println("Could not find gossiper in attempt#" + i + ". Got exception " + e.getMessage());
-        }
-
-        if (ref == null)
-          resolved = false;
-      }
-
-      if (resolved) break;
-
-    }
-    return resolved;
-  }
-
   private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
     return new AddOrUpdateRoutes(createRouteIds());
   }
index b21f0f0069e3db4d62a7e024c7c3d5ff513067d2..cd031a1d0abcf330b3c63c90a576b65dc2cd2d1d 100644 (file)
@@ -30,7 +30,7 @@ public class LatestEntryRoutingLogicTest {
 
   @BeforeClass
   public static void setup() throws InterruptedException {
-    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster"));
+    system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
   }
 
   @AfterClass
index 8100ed35abd580a13367922ff255e42c61369f93..5c4af8d3da457c99344c0a26942c9d3456694f75 100644 (file)
@@ -1,6 +1,12 @@
-odl-cluster{
+odl-cluster-rpc{
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 10ms
+  }
+
   akka {
-    loglevel = "DEBUG"
+    loglevel = "INFO"
     #log-config-on-start = on
 
     actor {
@@ -34,12 +40,22 @@ unit-test{
     loglevel = "INFO"
     #loggers = ["akka.event.slf4j.Slf4jLogger"]
   }
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 10ms
+  }
 }
 
 memberA{
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 10ms
+  }
   akka {
     loglevel = "INFO"
-    loggers = ["akka.event.slf4j.Slf4jLogger"]
+    #loggers = ["akka.event.slf4j.Slf4jLogger"]
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
       debug {
@@ -65,11 +81,20 @@ memberA{
   }
 }
 memberB{
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 10ms
+  }
   akka {
     loglevel = "INFO"
-    loggers = ["akka.event.slf4j.Slf4jLogger"]
+    #loggers = ["akka.event.slf4j.Slf4jLogger"]
+
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
+      debug {
+        #lifecycle = on
+      }
     }
     remote {
       log-received-messages = off
@@ -90,11 +115,19 @@ memberB{
   }
 }
 memberC{
+  bounded-mailbox {
+    mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+    mailbox-capacity = 1000
+    mailbox-push-timeout-time = 10ms
+  }
   akka {
     loglevel = "INFO"
-    loggers = ["akka.event.slf4j.Slf4jLogger"]
+    #loggers = ["akka.event.slf4j.Slf4jLogger"]
     actor {
       provider = "akka.cluster.ClusterActorRefProvider"
+      debug {
+        #lifecycle = on
+      }
     }
     remote {
       log-received-messages = off
@@ -113,4 +146,5 @@ memberC{
       auto-down-unreachable-after = 10s
     }
   }
-}
\ No newline at end of file
+}
+