<clustering.test.version>0.4.2-SNAPSHOT</clustering.test.version>
<commmons.northbound.version>0.4.2-SNAPSHOT</commmons.northbound.version>
<!-- Third Party Versions -->
+ <codahale.metrics.version>3.0.1</codahale.metrics.version>
<commons.catalina>7.0.32.v201211201336</commons.catalina>
<commons.catalina.ha>7.0.32.v201211201952</commons.catalina.ha>
<commons.catalina.tribes>7.0.32.v201211201952</commons.catalina.tribes>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>${codahale.metrics.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ <version>${codahale.metrics.version}</version>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<artifactId>jsr305</artifactId>
<version>2.0.1</version>
</dependency>
-
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
- <version>3.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
</dependency>
</dependencies>
<build>
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.dispatch.BoundedMailbox;
+import akka.dispatch.BoundedDequeBasedMailbox;
import akka.dispatch.MailboxType;
-import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import java.util.concurrent.TimeUnit;
-public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<BoundedMailbox.MessageQueue> {
+public class MeteredBoundedMailbox implements MailboxType, ProducesMessageQueue<MeteredBoundedMailbox.MeteredMessageQueue> {
private MeteredMessageQueue queue;
private Integer capacity;
private MetricsReporter reporter;
private final String QUEUE_SIZE = "queue-size";
+ private final String CAPACITY = "mailbox-capacity";
+ private final String TIMEOUT = "mailbox-push-timeout-time";
private final Long DEFAULT_TIMEOUT = 10L;
public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
Preconditions.checkArgument( config.hasPath("mailbox-capacity"), "Missing configuration [mailbox-capacity]" );
- this.capacity = config.getInt("mailbox-capacity");
+ this.capacity = config.getInt(CAPACITY);
Preconditions.checkArgument( this.capacity > 0, "mailbox-capacity must be > 0");
Long timeout = -1L;
- if ( config.hasPath("mailbox-push-timeout-time") ){
- timeout = config.getDuration("mailbox-push-timeout-time", TimeUnit.NANOSECONDS);
+ if ( config.hasPath(TIMEOUT) ){
+ timeout = config.getDuration(TIMEOUT, TimeUnit.NANOSECONDS);
} else {
timeout = DEFAULT_TIMEOUT;
}
@Override
- public MessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+ public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
this.queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
monitorQueueSize(owner, this.queue);
return this.queue;
return; //there's no actor to monitor
}
actorPath = owner.get().path();
- MetricRegistry registry = reporter.getMetricsRegistry();
+ String actorInstanceId = Integer.toString(owner.get().hashCode());
- String actorName = registry.name(actorPath.toString(), QUEUE_SIZE);
+ MetricRegistry registry = reporter.getMetricsRegistry();
+ String actorName = registry.name(actorPath.toString(), actorInstanceId, QUEUE_SIZE);
if (registry.getMetrics().containsKey(actorName))
return; //already registered
- reporter.getMetricsRegistry().register(actorName,
+ registry.register(actorName,
new Gauge<Integer>() {
@Override
public Integer getValue() {
}
- public static class MeteredMessageQueue extends BoundedMailbox.MessageQueue {
+ public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
super(capacity, pushTimeOut);
odl-cluster-data {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
odl-cluster-rpc {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
- <version>3.0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
</dependency>
<!-- Test Dependencies -->
<dependency>
Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
TimeUnit.MINUTES));
- actorContext = new ActorContext(actorSystem, actorSystem
- .actorOf(ShardManager.props(type, cluster, configuration, shardContext),
- shardManagerId ), cluster, configuration);
+ actorContext
+ = new ActorContext(
+ actorSystem, actorSystem.actorOf(
+ ShardManager.props(type, cluster, configuration, shardContext).
+ withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration);
}
public DistributedDataStore(ActorContext actorContext) {
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
ActorRef actor = getContext()
- .actorOf(Shard.props(shardId, peerAddresses, shardContext),
- shardId.toString());
+ .actorOf(Shard.props(shardId, peerAddresses, shardContext).
+ withMailbox(ActorContext.MAILBOX), shardId.toString());
+
localShardActorNames.add(shardId.toString());
localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
}
public static final Duration AWAIT_DURATION =
Duration.create(5, TimeUnit.SECONDS);
+ public static final String MAILBOX = "bounded-mailbox";
+
private final ActorSystem actorSystem;
private final ActorRef shardManager;
private final ClusterWrapper clusterWrapper;
odl-cluster-data {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
cluster {
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
- new Within(duration("1 seconds")) {
+ new Within(duration("10 seconds")) {
@Override
protected void run() {
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
- new Within(duration("1 seconds")) {
+ new Within(duration("10 seconds")) {
@Override
protected void run() {
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
- new Within(duration("1 seconds")) {
+ new Within(duration("10 seconds")) {
@Override
protected void run() {
subject.tell(new FindLocalShard("inventory"), getRef());
- final String out = new ExpectMsg<String>(duration("1 seconds"), "find local") {
+ final String out = new ExpectMsg<String>(duration("10 seconds"), "find local") {
@Override
protected String match(Object in) {
if (in instanceof LocalShardNotFound) {
final TestActorRef<ShardManager> subject =
TestActorRef.create(system, props);
- new Within(duration("1 seconds")) {
+ new Within(duration("10 seconds")) {
@Override
protected void run() {
subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
- final ActorRef out = new ExpectMsg<ActorRef>(duration("1 seconds"), "find local") {
+ final ActorRef out = new ExpectMsg<ActorRef>(duration("10 seconds"), "find local") {
@Override
protected ActorRef match(Object in) {
if (in instanceof LocalShardFound) {
TestActorRef.create(system, props);
// the run() method needs to finish within 3 seconds
- new Within(duration("1 seconds")) {
+ new Within(duration("10 seconds")) {
@Override
protected void run() {
TestActorRef.create(system, props);
// the run() method needs to finish within 3 seconds
- new Within(duration("1 seconds")) {
+ new Within(duration("10 seconds")) {
@Override
protected void run() {
}
}
}
+bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+}
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-api</artifactId>
-
</dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-model-api</artifactId>
-
</dependency>
-
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-impl</artifactId>
-
</dependency>
-
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-common</artifactId>
-
</dependency>
-
-
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
-
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
- <version>3.0.1</version>
</dependency>
+
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-graphite</artifactId>
+ </dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
import akka.actor.SupervisorStrategy;
import akka.japi.Creator;
import akka.japi.Function;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.opendaylight.controller.remote.rpc.messages.UpdateSchemaContext;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.common.QName;
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
+ Config conf = ConfigFactory.load();
- rpcRegistry = getContext().actorOf(Props.create(RpcRegistry.class), ActorConstants.RPC_REGISTRY);
+ rpcRegistry =
+ getContext().actorOf(Props.create(RpcRegistry.class).
+ withMailbox(ActorUtil.MAILBOX), ActorConstants.RPC_REGISTRY);
+
+ rpcBroker =
+ getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext).
+ withMailbox(ActorUtil.MAILBOX),ActorConstants.RPC_BROKER);
- rpcBroker = getContext().actorOf(RpcBroker.props(brokerSession, rpcRegistry, schemaContext), ActorConstants.RPC_BROKER);
RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
rpcRegistry.tell(localRouter, self());
}
import com.google.common.base.Preconditions;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import scala.concurrent.Future;
Preconditions.checkState(localRouter != null, "Router must be set first");
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
futureReply.map(getMapperToAddRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), 1000);
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetLocalBucket(), ActorUtil.ASK_DURATION.toMillis());
futureReply.map(getMapperToRemoveRoutes(msg.getRouteIdentifiers()), getContext().dispatcher());
}
private void receiveGetRouter(FindRouters msg) {
final ActorRef sender = getSender();
- Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), 1000);
+ Future<Object> futureReply = Patterns.ask(bucketStore, new GetAllBuckets(), ActorUtil.ASK_DURATION.toMillis());
futureReply.map(getMapperToGetRouter(msg.getRouteIdentifier(), sender), getContext().dispatcher());
}
import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import org.opendaylight.controller.utils.ConditionalProbe;
import java.util.HashMap;
selfAddress = provider.getDefaultAddress();
if ( provider instanceof ClusterActorRefProvider)
- getContext().actorOf(Props.create(Gossiper.class), "gossiper");
+ getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper");
}
@Override
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
+import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
+ ActorUtil.GOSSIP_TICK_INTERVAL, //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
return;
final ActorRef sender = getSender();
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
}
*/
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
}
void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
//Get local status from bucket store and send to remote
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+
+ //Find gossiper on remote system
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
import static akka.pattern.Patterns.ask;
public class ActorUtil {
- public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
- public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
- public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
- public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS);
- public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS);
- public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS);
-
- /**
- * Executes an operation on a local actor and wait for it's response
- * @param actor
- * @param message
- * @param askDuration
- * @param awaitDuration
- * @return The response of the operation
- */
- public static Object executeOperation(ActorRef actor, Object message,
- FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception{
- Future<Object> future =
- ask(actor, message, new Timeout(askDuration));
-
- return Await.result(future, awaitDuration);
- }
+ public static final FiniteDuration LOCAL_ASK_DURATION = Duration.create(2, TimeUnit.SECONDS);
+ public static final FiniteDuration REMOTE_ASK_DURATION = Duration.create(15, TimeUnit.SECONDS);
+ public static final FiniteDuration ASK_DURATION = Duration.create(17, TimeUnit.SECONDS);
+ public static final FiniteDuration LOCAL_AWAIT_DURATION = Duration.create(2, TimeUnit.SECONDS);
+ public static final FiniteDuration REMOTE_AWAIT_DURATION = Duration.create(15, TimeUnit.SECONDS);
+ public static final FiniteDuration AWAIT_DURATION = Duration.create(17, TimeUnit.SECONDS);
+ public static final FiniteDuration GOSSIP_TICK_INTERVAL = Duration.create(500, TimeUnit.MILLISECONDS);
+ public static final String MAILBOX = "bounded-mailbox";
+
+
+ /**
+ * Executes an operation on a local actor and wait for it's response
+ *
+ * @param actor
+ * @param message
+ * @param askDuration
+ * @param awaitDuration
+ * @return The response of the operation
+ */
+ public static Object executeOperation(ActorRef actor, Object message,
+ FiniteDuration askDuration, FiniteDuration awaitDuration) throws Exception {
+ Future<Object> future =
+ ask(actor, message, new Timeout(askDuration));
+
+ return Await.result(future, awaitDuration);
+ }
}
}
odl-cluster-rpc {
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 100ms
+ }
+
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
-
}
remote {
log-remote-lifecycle-events = off
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.sal.core.api.Broker;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
-
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
@BeforeClass
public static void setup() throws InterruptedException {
- system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster"));
+ system = ActorSystem.create("odl-cluster-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
}
@AfterClass
Duration.create(2, TimeUnit.SECONDS));
Assert.assertTrue(actorRef.path().toString().contains(ActorConstants.RPC_MANAGER_PATH));
}
-
-
-
}
@BeforeClass
public static void setup() throws InterruptedException {
- system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster"));
+ system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
}
@AfterClass
@BeforeClass
public static void setup() throws InterruptedException {
- system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster"));
+ system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
}
@AfterClass
import akka.testkit.JavaTestKit;
import com.google.common.base.Predicate;
import com.typesafe.config.ConfigFactory;
-
import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.utils.ConditionalProbe;
import org.opendaylight.yangtools.yang.common.QName;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
public class RpcRegistryTest {
*/
@Test
public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
- validateSystemStartup();
final JavaTestKit mockBroker = new JavaTestKit(node1);
@Test
public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
- validateSystemStartup();
-
final JavaTestKit mockBroker1 = new JavaTestKit(node1);
//install probe on node2's bucket store
final JavaTestKit probe2 = createProbeForMessage(
node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
//Add rpc on node 1
registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
@Test
public void testRpcAddedOnMultiNodes() throws Exception {
- validateSystemStartup();
-
final JavaTestKit mockBroker1 = new JavaTestKit(node1);
final JavaTestKit mockBroker2 = new JavaTestKit(node2);
final JavaTestKit mockBroker3 = new JavaTestKit(node3);
}
- private void validateSystemStartup() throws InterruptedException {
-
- ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
- ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
- ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
-
- ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
- ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
- ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
-
-
- if (!resolveReference(gossiper1, gossiper2, gossiper3))
- Assert.fail("Could not find gossipers");
- }
-
- private Boolean resolveReference(ActorSelection... gossipers) {
-
- Boolean resolved = true;
- for (int i = 0; i < 5; i++) {
-
- resolved = true;
- System.out.println(System.currentTimeMillis() + " Resolving gossipers; trial #" + i);
-
- for (ActorSelection gossiper : gossipers) {
- ActorRef ref = null;
-
- try {
- Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(15000, TimeUnit.MILLISECONDS));
- ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
- } catch (Exception e) {
- System.out.println("Could not find gossiper in attempt#" + i + ". Got exception " + e.getMessage());
- }
-
- if (ref == null)
- resolved = false;
- }
-
- if (resolved) break;
-
- }
- return resolved;
- }
-
private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
return new AddOrUpdateRoutes(createRouteIds());
}
@BeforeClass
public static void setup() throws InterruptedException {
- system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster"));
+ system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
}
@AfterClass
-odl-cluster{
+odl-cluster-rpc{
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 10ms
+ }
+
akka {
- loglevel = "DEBUG"
+ loglevel = "INFO"
#log-config-on-start = on
actor {
loglevel = "INFO"
#loggers = ["akka.event.slf4j.Slf4jLogger"]
}
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 10ms
+ }
}
memberA{
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 10ms
+ }
akka {
loglevel = "INFO"
- loggers = ["akka.event.slf4j.Slf4jLogger"]
+ #loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
debug {
}
}
memberB{
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 10ms
+ }
akka {
loglevel = "INFO"
- loggers = ["akka.event.slf4j.Slf4jLogger"]
+ #loggers = ["akka.event.slf4j.Slf4jLogger"]
+
actor {
provider = "akka.cluster.ClusterActorRefProvider"
+ debug {
+ #lifecycle = on
+ }
}
remote {
log-received-messages = off
}
}
memberC{
+ bounded-mailbox {
+ mailbox-type = "org.opendaylight.controller.common.actor.MeteredBoundedMailbox"
+ mailbox-capacity = 1000
+ mailbox-push-timeout-time = 10ms
+ }
akka {
loglevel = "INFO"
- loggers = ["akka.event.slf4j.Slf4jLogger"]
+ #loggers = ["akka.event.slf4j.Slf4jLogger"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
+ debug {
+ #lifecycle = on
+ }
}
remote {
log-received-messages = off
auto-down-unreachable-after = 10s
}
}
-}
\ No newline at end of file
+}
+