From: Abhishek Kumar Date: Fri, 22 Aug 2014 21:18:19 +0000 (-0700) Subject: Changed key actors to use bounded mailbox X-Git-Tag: release/helium~176 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=refs%2Fchanges%2F67%2F10267%2F5 Changed key actors to use bounded mailbox 1. Changed key actors in clustering components to use bounded mailbox. 2. Implementation of Bounded mailbox queue changed to use deque as its required by Shard Actors. 3. Cleanup of hard coded values. Change-Id: I9fb87e1857f2aa9fee0819c4dff63ca3c967ec60 Signed-off-by: Abhishek Kumar --- diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 425e968cf7..b812fcbd8b 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -39,6 +39,7 @@ 0.4.2-SNAPSHOT 0.4.2-SNAPSHOT + 3.0.1 7.0.32.v201211201336 7.0.32.v201211201952 7.0.32.v201211201952 @@ -253,6 +254,16 @@ logback-core ${logback.version} + + com.codahale.metrics + metrics-core + ${codahale.metrics.version} + + + com.codahale.metrics + metrics-graphite + ${codahale.metrics.version} + com.fasterxml.jackson.core jackson-annotations diff --git a/opendaylight/md-sal/sal-clustering-commons/pom.xml b/opendaylight/md-sal/sal-clustering-commons/pom.xml index 2bf1a55973..b8980cd0be 100644 --- a/opendaylight/md-sal/sal-clustering-commons/pom.xml +++ b/opendaylight/md-sal/sal-clustering-commons/pom.xml @@ -199,11 +199,13 @@ jsr305 2.0.1 - com.codahale.metrics metrics-core - 3.0.1 + + + com.codahale.metrics + metrics-graphite diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java index 646431522e..c6d3625ac3 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/common/actor/MeteredBoundedMailbox.java @@ -11,9 +11,8 @@ package org.opendaylight.controller.common.actor; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.dispatch.BoundedMailbox; +import akka.dispatch.BoundedDequeBasedMailbox; import akka.dispatch.MailboxType; -import akka.dispatch.MessageQueue; import akka.dispatch.ProducesMessageQueue; import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; @@ -24,7 +23,7 @@ import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; -public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue { +public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue { private MeteredMessageQueue queue; private Integer capacity; @@ -33,16 +32,18 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue< private MetricsReporter reporter; private final String QUEUE_SIZE = "queue-size"; + private final String CAPACITY = "mailbox-capacity"; + private final String TIMEOUT = "mailbox-push-timeout-time"; private final Long DEFAULT_TIMEOUT = 10L; public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) { Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" ); - this.capacity = config.getInt("mailbox-capacity"); + this.capacity = config.getInt(CAPACITY); Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0"); Long timeout = -1L; - if ( config.hasPath("mailbox-push-timeout-time") ){ - timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS); + if ( config.hasPath(TIMEOUT) ){ + timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS); } else { timeout = DEFAULT_TIMEOUT; } @@ -54,7 +55,7 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue< @Override - public MessageQueue create(final scala.Option owner, scala.Option system) { + public MeteredMessageQueue create(final scala.Option owner, scala.Option system) { this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut); monitorQueueSize(owner, this.queue); return this.queue; @@ -65,14 +66,15 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue< return; //there's no actor to monitor } actorPath = owner.get().path(); - MetricRegistry registry = reporter.getMetricsRegistry(); + String actorInstanceId = Integer.toString(owner.get().hashCode()); - String actorName = registry.name(actorPath.toString(), QUEUE_SIZE); + MetricRegistry registry = reporter.getMetricsRegistry(); + String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE); if (registry.getMetrics().containsKey(actorName)) return; //already registered - reporter.getMetricsRegistry().register(actorName, + registry.register(actorName, new Gauge() { @Override public Integer getValue() { @@ -82,7 +84,7 @@ public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue< } - public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue { + public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue { public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) { super(capacity, pushTimeOut); diff --git a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf index 0535179aad..05322137aa 100644 --- a/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf +++ b/opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf @@ -1,5 +1,10 @@ odl-cluster-data { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } akka { actor { provider = "akka.cluster.ClusterActorRefProvider" @@ -33,6 +38,11 @@ odl-cluster-data { } odl-cluster-rpc { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } akka { actor { provider = "akka.cluster.ClusterActorRefProvider" diff --git a/opendaylight/md-sal/sal-distributed-datastore/pom.xml b/opendaylight/md-sal/sal-distributed-datastore/pom.xml index 4669fbdb61..31b0bb80c0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/pom.xml +++ b/opendaylight/md-sal/sal-distributed-datastore/pom.xml @@ -138,7 +138,11 @@ com.codahale.metrics metrics-core - 3.0.1 + + + + com.codahale.metrics + metrics-graphite diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 51f3735f81..b971648392 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -67,9 +67,11 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(), TimeUnit.MINUTES)); - actorContext = new ActorContext(actorSystem, actorSystem - .actorOf(ShardManager.props(type, cluster, configuration, shardContext), - shardManagerId ), cluster, configuration); + actorContext + = new ActorContext( + actorSystem, actorSystem.actorOf( + ShardManager.props(type, cluster, configuration, shardContext). + withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration); } public DistributedDataStore(ActorContext actorContext) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index c9b7c07e9a..186f2cff41 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import scala.concurrent.duration.Duration; import java.util.ArrayList; @@ -244,8 +245,9 @@ public class ShardManager extends AbstractUntypedActor { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); ActorRef actor = getContext() - .actorOf(Shard.props(shardId, peerAddresses, shardContext), - shardId.toString()); + .actorOf(Shard.props(shardId, peerAddresses, shardContext). + withMailbox(ActorContext.MAILBOX), shardId.toString()); + localShardActorNames.add(shardId.toString()); localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index f76430f5a1..818a8ca8b3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -52,6 +52,8 @@ public class ActorContext { public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS); + public static final String MAILBOX = "bounded-mailbox"; + private final ActorSystem actorSystem; private final ActorRef shardManager; private final ClusterWrapper clusterWrapper; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf index 8af9bd07d7..c29f93bb07 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf @@ -1,5 +1,10 @@ odl-cluster-data { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } akka { loggers = ["akka.event.slf4j.Slf4jLogger"] cluster { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index e6f68c032a..d16e4f53de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -46,7 +46,7 @@ public class ShardManagerTest { final TestActorRef subject = TestActorRef.create(system, props); - new Within(duration("1 seconds")) { + new Within(duration("10 seconds")) { @Override protected void run() { @@ -71,7 +71,7 @@ public class ShardManagerTest { final TestActorRef subject = TestActorRef.create(system, props); - new Within(duration("1 seconds")) { + new Within(duration("10 seconds")) { @Override protected void run() { @@ -95,13 +95,13 @@ public class ShardManagerTest { final TestActorRef subject = TestActorRef.create(system, props); - new Within(duration("1 seconds")) { + new Within(duration("10 seconds")) { @Override protected void run() { subject.tell(new FindLocalShard("inventory"), getRef()); - final String out = new ExpectMsg(duration("1 seconds"), "find local") { + final String out = new ExpectMsg(duration("10 seconds"), "find local") { @Override protected String match(Object in) { if (in instanceof LocalShardNotFound) { @@ -132,13 +132,13 @@ public class ShardManagerTest { final TestActorRef subject = TestActorRef.create(system, props); - new Within(duration("1 seconds")) { + new Within(duration("10 seconds")) { @Override protected void run() { subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef()); - final ActorRef out = new ExpectMsg(duration("1 seconds"), "find local") { + final ActorRef out = new ExpectMsg(duration("10 seconds"), "find local") { @Override protected ActorRef match(Object in) { if (in instanceof LocalShardFound) { @@ -169,7 +169,7 @@ public class ShardManagerTest { TestActorRef.create(system, props); // the run() method needs to finish within 3 seconds - new Within(duration("1 seconds")) { + new Within(duration("10 seconds")) { @Override protected void run() { @@ -209,7 +209,7 @@ public class ShardManagerTest { TestActorRef.create(system, props); // the run() method needs to finish within 3 seconds - new Within(duration("1 seconds")) { + new Within(duration("10 seconds")) { @Override protected void run() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 27b0374bac..6851b1b72c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -14,3 +14,8 @@ akka { } } } +bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml index 6e92a22037..08450b3e78 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/pom.xml @@ -81,37 +81,27 @@ org.opendaylight.yangtools yang-data-api - org.opendaylight.yangtools yang-model-api - - org.opendaylight.yangtools yang-data-impl - - org.opendaylight.yangtools yang-common - - - org.osgi org.osgi.core - org.slf4j slf4j-api - org.scala-lang scala-library @@ -120,8 +110,12 @@ com.codahale.metrics metrics-core - 3.0.1 + + + com.codahale.metrics + metrics-graphite + junit diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java index 96f2472428..d4da226b9d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/RpcManager.java @@ -15,8 +15,11 @@ import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.japi.Creator; import akka.japi.Function; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext; import org.opendaylight.controller.remote.rpc.registry.RpcRegistry; +import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry; import org.opendaylight.yangtools.yang.common.QName; @@ -69,10 +72,16 @@ public class RpcManager extends AbstractUntypedActor { private void createRpcActors() { LOG.debug("Create rpc registry and broker actors"); + Config conf = ConfigFactory.load(); - rpcRegistry = getContext().actorOf(Props.create(RpcRegistry.class), ActorConstants.RPC_REGISTRY); + rpcRegistry = + getContext().actorOf(Props.create(RpcRegistry.class). + withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY); + + rpcBroker = + getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext). + withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER); - rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER); RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker); rpcRegistry.tell(localRouter, self()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java index 76f5930457..5109d31644 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistry.java @@ -20,6 +20,7 @@ import akka.pattern.Patterns; import com.google.common.base.Preconditions; import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket; import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore; +import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import org.opendaylight.controller.sal.connector.api.RpcRouter; import scala.concurrent.Future; @@ -107,7 +108,7 @@ public class RpcRegistry extends UntypedActor { Preconditions.checkState(localRouter != null, "Router must be set first"); - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis()); futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); } @@ -116,7 +117,7 @@ public class RpcRegistry extends UntypedActor { */ private void receiveRemoveRoutes(RemoveRoutes msg) { - Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000); + Future futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis()); futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher()); } @@ -129,7 +130,7 @@ public class RpcRegistry extends UntypedActor { private void receiveGetRouter(FindRouters msg) { final ActorRef sender = getSender(); - Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000); + Future futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis()); futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 3b078aa062..ff51f4fcfa 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -16,6 +16,7 @@ import akka.actor.UntypedActor; import akka.cluster.ClusterActorRefProvider; import akka.event.Logging; import akka.event.LoggingAdapter; +import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import org.opendaylight.controller.utils.ConditionalProbe; import java.util.HashMap; @@ -76,7 +77,7 @@ public class BucketStore extends UntypedActor { selfAddress = provider.getDefaultAddress(); if ( provider instanceof ClusterActorRefProvider) - getContext().actorOf(Props.create(Gossiper.class), "gossiper"); + getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper"); } @Override diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index a8bc25c40b..f6ce5e55f3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -21,6 +21,7 @@ import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.pattern.Patterns; +import org.opendaylight.controller.remote.rpc.utils.ActorUtil; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -105,7 +106,7 @@ public class Gossiper extends UntypedActor { if (autoStartGossipTicks) { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval + ActorUtil.GOSSIP_TICK_INTERVAL, //interval getSelf(), //target new Messages.GossiperMessages.GossipTick(), //message getContext().dispatcher(), //execution context @@ -227,7 +228,9 @@ public class Gossiper extends UntypedActor { return; final ActorRef sender = getSender(); - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); + Future futureReply = + Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis()); + futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); } @@ -267,7 +270,8 @@ public class Gossiper extends UntypedActor { */ void sendGossipTo(final ActorRef remote, final Set
addresses){ - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000); + Future futureReply = + Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis()); futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); } @@ -279,7 +283,10 @@ public class Gossiper extends UntypedActor { void getLocalStatusAndSendTo(Address remoteActorSystemAddress){ //Get local status from bucket store and send to remote - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); + Future futureReply = + Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis()); + + //Find gossiper on remote system ActorSelection remoteRef = getContext().system().actorSelection( remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java index b7b2216a08..ca14fecb4c 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/utils/ActorUtil.java @@ -20,28 +20,32 @@ import java.util.concurrent.TimeUnit; import static akka.pattern.Patterns.ask; public class ActorUtil { - public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS); - public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS); - public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS); - public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS); - public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS); - public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS); - - /** - * Executes an operation on a local actor and wait for it's response - * @param actor - * @param message - * @param askDuration - * @param awaitDuration - * @return The response of the operation - */ - public static Object executeOperation(ActorRef actor, Object message, - FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{ - Future future = - ask(actor, message, new Timeout(askDuration)); - - return Await.result(future, awaitDuration); - } + public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS); + public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS); + public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS); + public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS); + public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS); + public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS); + public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS); + public static final String MAILBOX = "bounded-mailbox"; + + + /** + * Executes an operation on a local actor and wait for it's response + * + * @param actor + * @param message + * @param askDuration + * @param awaitDuration + * @return The response of the operation + */ + public static Object executeOperation(ActorRef actor, Object message, + FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception { + Future future = + ask(actor, message, new Timeout(askDuration)); + + return Await.result(future, awaitDuration); + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf index 711ae1c48b..266832a0ab 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/resources/application.conf @@ -38,10 +38,15 @@ odl-cluster-data { } odl-cluster-rpc { + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 100ms + } + akka { actor { provider = "akka.cluster.ClusterActorRefProvider" - } remote { log-remote-lifecycle-events = off diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java index 17ad237ad7..8a7e4a0398 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RemoteRpcProviderTest.java @@ -14,8 +14,8 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; import com.typesafe.config.ConfigFactory; -import junit.framework.Assert; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.sal.core.api.Broker; @@ -25,7 +25,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.duration.Duration; - import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; @@ -38,7 +37,7 @@ public class RemoteRpcProviderTest { @BeforeClass public static void setup() throws InterruptedException { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); + system = ActorSystem.create("odl-cluster-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); } @AfterClass @@ -59,7 +58,4 @@ public class RemoteRpcProviderTest { Duration.create(2, TimeUnit.SECONDS)); Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH)); } - - - } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java index 9b6215addd..98a33bf4e6 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RouteRpcListenerTest.java @@ -32,7 +32,7 @@ public class RouteRpcListenerTest { @BeforeClass public static void setup() throws InterruptedException { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); } @AfterClass diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java index 7b5a968866..956e159990 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/RpcListenerTest.java @@ -28,7 +28,7 @@ public class RpcListenerTest { @BeforeClass public static void setup() throws InterruptedException { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); } @AfterClass diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java index e6793741a3..83f52930b2 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java @@ -10,10 +10,8 @@ import akka.actor.Props; import akka.testkit.JavaTestKit; import com.google.common.base.Predicate; import com.typesafe.config.ConfigFactory; - import org.junit.After; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -22,8 +20,6 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.utils.ConditionalProbe; import org.opendaylight.yangtools.yang.common.QName; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import javax.annotation.Nullable; @@ -33,9 +29,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes; import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes; +import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter; public class RpcRegistryTest { @@ -95,7 +91,6 @@ public class RpcRegistryTest { */ @Test public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException { - validateSystemStartup(); final JavaTestKit mockBroker = new JavaTestKit(node1); @@ -137,8 +132,6 @@ public class RpcRegistryTest { @Test public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException { - validateSystemStartup(); - final JavaTestKit mockBroker1 = new JavaTestKit(node1); //install probe on node2's bucket store @@ -146,7 +139,6 @@ public class RpcRegistryTest { final JavaTestKit probe2 = createProbeForMessage( node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class); - //Add rpc on node 1 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef()); registry1.tell(getAddRouteMessage(), mockBroker1.getRef()); @@ -175,8 +167,6 @@ public class RpcRegistryTest { @Test public void testRpcAddedOnMultiNodes() throws Exception { - validateSystemStartup(); - final JavaTestKit mockBroker1 = new JavaTestKit(node1); final JavaTestKit mockBroker2 = new JavaTestKit(node2); final JavaTestKit mockBroker3 = new JavaTestKit(node3); @@ -225,49 +215,6 @@ public class RpcRegistryTest { } - private void validateSystemStartup() throws InterruptedException { - - ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper"); - ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper"); - ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper"); - - ActorSelection gossiper1 = node1.actorSelection(gossiper1Path); - ActorSelection gossiper2 = node2.actorSelection(gossiper2Path); - ActorSelection gossiper3 = node3.actorSelection(gossiper3Path); - - - if (!resolveReference(gossiper1, gossiper2, gossiper3)) - Assert.fail("Could not find gossipers"); - } - - private Boolean resolveReference(ActorSelection... gossipers) { - - Boolean resolved = true; - for (int i = 0; i < 5; i++) { - - resolved = true; - System.out.println(System.currentTimeMillis() + " Resolving gossipers; trial #" + i); - - for (ActorSelection gossiper : gossipers) { - ActorRef ref = null; - - try { - Future future = gossiper.resolveOne(new FiniteDuration(15000, TimeUnit.MILLISECONDS)); - ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS)); - } catch (Exception e) { - System.out.println("Could not find gossiper in attempt#" + i + ". Got exception " + e.getMessage()); - } - - if (ref == null) - resolved = false; - } - - if (resolved) break; - - } - return resolved; - } - private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException { return new AddOrUpdateRoutes(createRouteIds()); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogicTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogicTest.java index b21f0f0069..cd031a1d0a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogicTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/utils/LatestEntryRoutingLogicTest.java @@ -30,7 +30,7 @@ public class LatestEntryRoutingLogicTest { @BeforeClass public static void setup() throws InterruptedException { - system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster")); + system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc")); } @AfterClass diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf index 8100ed35ab..5c4af8d3da 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/resources/application.conf @@ -1,6 +1,12 @@ -odl-cluster{ +odl-cluster-rpc{ + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10ms + } + akka { - loglevel = "DEBUG" + loglevel = "INFO" #log-config-on-start = on actor { @@ -34,12 +40,22 @@ unit-test{ loglevel = "INFO" #loggers = ["akka.event.slf4j.Slf4jLogger"] } + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10ms + } } memberA{ + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10ms + } akka { loglevel = "INFO" - loggers = ["akka.event.slf4j.Slf4jLogger"] + #loggers = ["akka.event.slf4j.Slf4jLogger"] actor { provider = "akka.cluster.ClusterActorRefProvider" debug { @@ -65,11 +81,20 @@ memberA{ } } memberB{ + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10ms + } akka { loglevel = "INFO" - loggers = ["akka.event.slf4j.Slf4jLogger"] + #loggers = ["akka.event.slf4j.Slf4jLogger"] + actor { provider = "akka.cluster.ClusterActorRefProvider" + debug { + #lifecycle = on + } } remote { log-received-messages = off @@ -90,11 +115,19 @@ memberB{ } } memberC{ + bounded-mailbox { + mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox" + mailbox-capacity = 1000 + mailbox-push-timeout-time = 10ms + } akka { loglevel = "INFO" - loggers = ["akka.event.slf4j.Slf4jLogger"] + #loggers = ["akka.event.slf4j.Slf4jLogger"] actor { provider = "akka.cluster.ClusterActorRefProvider" + debug { + #lifecycle = on + } } remote { log-received-messages = off @@ -113,4 +146,5 @@ memberC{ auto-down-unreachable-after = 10s } } -} \ No newline at end of file +} +