*/
public class ExampleRoleChangeListener extends AbstractUntypedActor implements AutoCloseable{
// the akka url should be set to the notifiers actor-system and domain.
- private static final String NOTIFIER_AKKA_URL = "akka.tcp://raft-test@127.0.0.1:2550/user/";
+ private static final String NOTIFIER_AKKA_URL = "akka://raft-test@127.0.0.1:2550/user/";
private Map<String, Boolean> notifierRegistrationStatus = new HashMap<>();
private Cancellable registrationSchedule = null;
}
remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2550
- }
- }
+ log-remote-lifecycle-events = off
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2550
+ }
+ }
}
}
remote {
log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2554
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2554
}
}
+++ /dev/null
-akka {
-
- loglevel = "DEBUG"
-
- actor {
- # enable to test serialization only.
- # serialize-messages = on
-
- serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- }
-
- serialization-bindings {
- "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
- "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
- "com.google.protobuf.Message" = proto
- "com.google.protobuf.GeneratedMessage" = proto
- }
- }
-}
-
-raft-test {
- akka {
-
- loglevel = "DEBUG"
-
- actor {
- # enable to test serialization only.
- # serialize-messages = on
-
- provider = "akka.remote.RemoteActorRefProvider"
-
- serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- }
-
- serialization-bindings {
- "org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry" = java
- "org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener" = java
- "com.google.protobuf.Message" = proto
- "com.google.protobuf.GeneratedMessage" = proto
- }
- }
-
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2550
- }
- }
- }
-}
-
-raft-test-listener {
-
- akka {
- loglevel = "DEBUG"
-
- actor {
- provider = "akka.remote.RemoteActorRefProvider"
- }
-
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2554
- }
- }
-
- member-id = "member-1"
- }
-}
-
-
-
odl-cluster-data {
akka {
remote {
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2550
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2550
}
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
+ seed-nodes = ["akka://opendaylight-cluster-data@127.0.0.1:2550"]
roles = [
"member-1"
}
remote {
log-remote-lifecycle-events = off
+
netty.tcp {
- hostname = "127.0.0.1"
- port = 2550
maximum-frame-size = 419430400
send-buffer-size = 52428800
receive-buffer-size = 52428800
}
+
+ artery {
+ advanced {
+ maximum-frame-size = 1 GiB
+ maximum-large-frame-size = 1 GiB
+ }
+ }
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
-
seed-node-timeout = 12s
# Following is an excerpt from Akka Cluster Documentation
#auto-down-unreachable-after = 30s
allow-weakly-up-members = on
-
- roles = [
- "member-1"
- ]
-
}
persistence {
+++ /dev/null
-ODLCluster{
- akka {
- actor {
- serialize-messages = on
-
- provider = "akka.cluster.ClusterActorRefProvider"
- serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- }
-
- serialization-bindings {
- "com.google.protobuf.Message" = proto
- "com.google.protobuf.GeneratedMessage" = proto
- "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
- "com.google.protobuf.FieldSet" = proto
- }
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2552
- maximum-frame-size = 2097152
- send-buffer-size = 52428800
- receive-buffer-size = 52428800
- }
- }
-
- cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
-
- auto-down-unreachable-after = 10s
- }
- }
-}
+++ /dev/null
-
-ODLCluster{
- akka {
- actor {
- serialize-messages = on
-
- provider = "akka.cluster.ClusterActorRefProvider"
- serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- }
-
- serialization-bindings {
- "com.google.protobuf.Message" = proto
- "com.google.protobuf.GeneratedMessage" = proto
- "com.google.protobuf.GeneratedMessage$GeneratedExtension" = proto
- "com.google.protobuf.FieldSet" = proto
- }
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2550
- maximum-frame-size = 2097152
- send-buffer-size = 52428800
- receive-buffer-size = 52428800
- }
- }
-
- cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster@127.0.0.1:2550"]
-
- auto-down-unreachable-after = 10s
- }
- }
-}
+++ /dev/null
-
-odl-cluster-data {
- bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
- mailbox-capacity = 1000
- mailbox-push-timeout-time = 100ms
- }
-
- metric-capture-enabled = true
-
- akka {
- loggers = ["akka.event.slf4j.Slf4jLogger"]
- cluster {
- roles = [
- "member-1"
- ]
- }
- actor {
- provider = "akka.cluster.ClusterActorRefProvider"
- serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
- }
-
- serialization-bindings {
- "com.google.protobuf.Message" = proto
- "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
- }
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2550
- maximum-frame-size = 419430400
- send-buffer-size = 52428800
- receive-buffer-size = 52428800
- }
- }
-
- cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
-
- auto-down-unreachable-after = 10s
- }
- }
-}
-
-odl-cluster-rpc {
- akka {
- loggers = ["akka.event.slf4j.Slf4jLogger"]
- actor {
- provider = "akka.cluster.ClusterActorRefProvider"
-
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2551
- }
- }
-
- cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
-
- auto-down-unreachable-after = 10s
- }
- }
-}
+++ /dev/null
-module-shards = [
- {
- name = "default"
- shards = [
- {
- name="default",
- replicas = [
- "member-1",
- ]
- }
- ]
- },
- {
- name = "inventory"
- shards = [
- {
- name="inventory"
- replicas = [
- "member-1",
- ]
- }
- ]
- }
-
-]
+++ /dev/null
-modules = [
- {
- name = "inventory"
- namespace = "urn:opendaylight:inventory"
- shard-strategy = "module"
- }
-]
@BeforeClass
public static void setUpClass() throws IOException {
system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- final Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ final Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
Cluster.get(system).join(member1Address);
}
@BeforeClass
public static void setUpClass() throws IOException {
system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
Cluster.get(system).join(member1Address);
}
private static final String[] CARS = {"cars"};
private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse(
- "akka.tcp://cluster-test@127.0.0.1:2558");
+ "akka://cluster-test@127.0.0.1:2558");
private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse(
- "akka.tcp://cluster-test@127.0.0.1:2559");
+ "akka://cluster-test@127.0.0.1:2559");
private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
String testName = "testLeadershipTransferOnShutdown";
initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
- IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
+ IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
+ DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100));
try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
@Test(expected = AskTimeoutException.class)
public void testTransactionWithShardLeaderNotResponding() throws Exception {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
// Do an initial read to get the primary shard info cached.
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.List;
import java.util.Set;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
* @author Thomas Pantelis
*/
public class MemberNode {
- static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
private IntegrationTestKit kit;
private AbstractDataStore configDataStore;
fail("Member " + member + " is now down");
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void cleanup() {
if (!cleanedUp) {
cleanedUp = true;
operDataStore.close();
}
- IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+ try {
+ IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+ } catch (RuntimeException e) {
+ LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
+ }
}
}
private final List<MemberNode> members;
private String moduleShardsConfig;
private String akkaConfig;
+ private boolean useAkkaArtery = true;
private String[] waitForshardLeader = new String[0];
private String testName;
private SchemaContext schemaContext;
return this;
}
+ /**
+ * Specifies whether or not to use akka artery for remoting. Default is true.
+ *
+ * @return this Builder
+ */
+ public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
+ this.useAkkaArtery = newUseAkkaArtery;
+ return this;
+ }
+
/**
* Specifies the name of the test that is appended to the data store names. This is required.
*
MemberNode node = new MemberNode();
node.datastoreContextBuilder = datastoreContextBuilder;
- ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
- Cluster.get(system).join(MEMBER_1_ADDRESS);
+ Config baseConfig = ConfigFactory.load();
+ Config config;
+ if (useAkkaArtery) {
+ config = baseConfig.getConfig(akkaConfig);
+ } else {
+ config = baseConfig.getConfig(akkaConfig + "-without-artery")
+ .withFallback(baseConfig.getConfig(akkaConfig));
+ }
+
+ ActorSystem system = ActorSystem.create("cluster-test", config);
+ String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
+ Cluster.get(system).join(AddressFromURIString.parse(member1Address));
node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
// Create ACtorSystem for member-1
final ActorSystem system1 = newActorSystem("Member1");
- Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
newTestShardMgrBuilder(new ConfigurationImpl(new EmptyModuleShardConfigProvider()))
final ActorSystem system2 = newActorSystem("Member2");
- Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
final TestActorRef<TestShardManager> shardManager2 = TestActorRef.create(system2,
newTestShardMgrBuilder()
actorSystems.add(system);
return system;
}
-}
\ No newline at end of file
+}
.customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
String name = "test";
- final MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
+ final MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1")
+ .useAkkaArtery(false).testName(name)
.moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
.createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
- final MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
+ final MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2")
+ .useAkkaArtery(false).testName(name)
.moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
.createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
- final MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
+ final MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3")
+ .useAkkaArtery(false).testName(name)
.moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
.createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
- final MemberNode follower3Node = MemberNode.builder(memberNodes).akkaConfig("Member4").testName(name)
+ final MemberNode follower3Node = MemberNode.builder(memberNodes).akkaConfig("Member4")
+ .useAkkaArtery(false).testName(name)
.moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
.createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
- final MemberNode follower4Node = MemberNode.builder(memberNodes).akkaConfig("Member5").testName(name)
+ final MemberNode follower4Node = MemberNode.builder(memberNodes).akkaConfig("Member5")
+ .useAkkaArtery(false).testName(name)
.moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
.createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
// Create an ActorSystem ShardManager actor for member-1.
final ActorSystem system1 = newActorSystem("Member1");
- Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
final TestActorRef<TestShardManager> shardManager1 = TestActorRef.create(system1,
newTestShardMgrBuilderWithMockShardActor().cluster(
final ActorSystem system2 = newActorSystem("Member2");
- Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
shardManager2.underlyingActor().verifyFindPrimary();
- Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system2).down(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
shardManager1.underlyingActor().waitForMemberRemoved();
// Create an ActorSystem ShardManager actor for member-1.
final ActorSystem system1 = newActorSystem("Member1");
- Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
final ActorSystem system2 = newActorSystem("Member2");
- Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-default-config"));
shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
- "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
+ "akka://cluster-test@127.0.0.1:2558"), getRef());
shardManager1.underlyingActor().waitForUnreachableMember();
MessageCollectorActor.clearMessages(mockShardActor1);
shardManager1.tell(
- MockClusterWrapper.createMemberRemoved("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"),
+ MockClusterWrapper.createMemberRemoved("member-2", "akka://cluster-test@127.0.0.1:2558"),
getRef());
MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerDown.class);
expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
shardManager1.tell(
- MockClusterWrapper.createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"),
+ MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
getRef());
shardManager1.underlyingActor().waitForReachableMember();
assertTrue("Unexpected primary path " + path1, path1.contains("member-2-shard-default-config"));
shardManager1.tell(
- MockClusterWrapper.createMemberUp("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"),
+ MockClusterWrapper.createMemberUp("member-2", "akka://cluster-test@127.0.0.1:2558"),
getRef());
MessageCollectorActor.expectFirstMatching(mockShardActor1, PeerUp.class);
// Test FindPrimary wait succeeds after reachable member event.
shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
- "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
+ "akka://cluster-test@127.0.0.1:2558"), getRef());
shardManager1.underlyingActor().waitForUnreachableMember();
shardManager1.tell(new FindPrimary("default", true), getRef());
shardManager1.tell(
- MockClusterWrapper.createReachableMember("member-2", "akka.tcp://cluster-test@127.0.0.1:2558"),
+ MockClusterWrapper.createReachableMember("member-2", "akka://cluster-test@127.0.0.1:2558"),
getRef());
RemotePrimaryShardFound found2 = expectMsgClass(duration("5 seconds"), RemotePrimaryShardFound.class);
// Create an ActorSystem ShardManager actor for member-1.
final ActorSystem system1 = newActorSystem("Member1");
- Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
final ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
final ActorSystem system2 = newActorSystem("Member2");
- Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
final ActorRef mockShardActor2 = newMockShardActor(system2, Shard.DEFAULT_NAME, "member-2");
system1.actorSelection(mockShardActor1.path()), DataStoreVersions.CURRENT_VERSION));
shardManager1.tell(MockClusterWrapper.createUnreachableMember("member-2",
- "akka.tcp://cluster-test@127.0.0.1:2558"), getRef());
+ "akka://cluster-test@127.0.0.1:2558"), getRef());
shardManager1.underlyingActor().waitForUnreachableMember();
// Create an ActorSystem ShardManager actor for member-1.
final ActorSystem system1 = newActorSystem("Member1");
- Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
newTestShardMgrBuilder(mockConfig).shardActor(mockDefaultShardActor)
// Create an ActorSystem ShardManager actor for member-2.
final ActorSystem system2 = newActorSystem("Member2");
- Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
String name = ShardIdentifier.create("astronauts", MEMBER_2, "config").toString();
newReplicaShardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
MockClusterWrapper.sendMemberUp(newReplicaShardManager, "member-2",
- AddressFromURIString.parse("akka.tcp://non-existent@127.0.0.1:5").toString());
+ AddressFromURIString.parse("akka://non-existent@127.0.0.1:5").toString());
newReplicaShardManager.tell(new AddShardReplica("astronauts"), getRef());
Status.Failure resp = expectMsgClass(duration("5 seconds"), Status.Failure.class);
// Create an ActorSystem ShardManager actor for member-1.
final ActorSystem system1 = newActorSystem("Member1");
- Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system1).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
ActorRef mockDefaultShardActor = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
final TestActorRef<TestShardManager> newReplicaShardManager = TestActorRef.create(system1,
// Create an ActorSystem ShardManager actor for member-2.
final ActorSystem system2 = newActorSystem("Member2");
- Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
+ Cluster.get(system2).join(AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"));
String name = ShardIdentifier.create("default", MEMBER_2, shardMrgIDSuffix).toString();
String memberId2 = "member-2-shard-default-" + shardMrgIDSuffix;
shardManagerID);
// Because mockShardLeaderActor is created at the top level of the actor system it has an address like so,
- // akka.tcp://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
+ // akka://cluster-test@127.0.0.1:2559/user/member-2-shard-default-config1
// However when a shard manager has a local shard which is a follower and a leader that is remote it will
// try to compute an address for the remote shard leader using the ShardPeerAddressResolver. This address will
// look like so,
- // akka.tcp://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
+ // akka://cluster-test@127.0.0.1:2559/user/shardmanager-config1/member-2-shard-default-config1
// In this specific case if we did a FindPrimary for shard default from member-1 we would come up
// with the address of an actor which does not exist, therefore any message sent to that actor would go to
// dead letters.
assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
// self address of remote format,but Tx path local format.
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
assertEquals(true, actorContext.isPathLocal(
"akka://system/user/shardmanager/shard/transaction"));
// self address of local format,but Tx path remote format.
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
+ clusterWrapper.setSelfAddress(new Address("akka", "system"));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
assertEquals(false, actorContext.isPathLocal(
"akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
//ip and port same
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
+ assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
// forward-slash missing in address
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
//ips differ
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
//ports differ
- clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
+ clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
- assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
+ assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
}
@Test
public class MockClusterWrapper implements ClusterWrapper {
- private Address selfAddress = new Address("akka.tcp", "test", "127.0.0.1", 2550);
+ private Address selfAddress = new Address("akka", "test", "127.0.0.1", 2550);
private final MemberName currentMemberName;
public MockClusterWrapper() {
}
remote {
log-remote-lifecycle-events = off
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2558
+ }
+
netty.tcp {
hostname = "127.0.0.1"
port = 2558
}
cluster {
- auto-down-unreachable-after = 100s
retry-unsuccessful-join-after = 100ms
roles = [
}
remote {
log-remote-lifecycle-events = off
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2559
+ }
+
netty.tcp {
hostname = "127.0.0.1"
port = 2559
}
cluster {
- auto-down-unreachable-after = 100s
retry-unsuccessful-join-after = 100ms
roles = [
}
remote {
log-remote-lifecycle-events = off
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2557
+ }
+
netty.tcp {
hostname = "127.0.0.1"
port = 2557
}
cluster {
- auto-down-unreachable-after = 100s
retry-unsuccessful-join-after = 100ms
roles = [
}
remote {
log-remote-lifecycle-events = off
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2560
+ }
+
netty.tcp {
hostname = "127.0.0.1"
port = 2560
}
cluster {
- auto-down-unreachable-after = 100s
retry-unsuccessful-join-after = 100ms
roles = [
}
remote {
log-remote-lifecycle-events = off
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2561
+ }
+
netty.tcp {
hostname = "127.0.0.1"
port = 2561
}
cluster {
- auto-down-unreachable-after = 100s
retry-unsuccessful-join-after = 100ms
roles = [
}
}
}
+
+Member1-without-artery {
+ akka.remote.artery.enabled = off
+}
+
+Member2-without-artery {
+ akka.remote.artery.enabled = off
+}
+
+Member3-without-artery {
+ akka.remote.artery.enabled = off
+}
+
+Member4-without-artery {
+ akka.remote.artery.enabled = off
+}
+
+Member5-without-artery {
+ akka.remote.artery.enabled = off
+}
}
remote {
log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2553
- maximum-frame-size = 419430400
- send-buffer-size = 52428800
- receive-buffer-size = 52428800
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2553
}
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550", "akka.tcp://opendaylight-cluster-data@127.0.0.1:2553"]
+ seed-nodes = ["akka://opendaylight-cluster-data@127.0.0.1:2550", "akka://opendaylight-cluster-data@127.0.0.1:2553"]
auto-down-unreachable-after = 10s
}
}
-}
\ No newline at end of file
+}
}
remote {
log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2554
- maximum-frame-size = 419430400
- send-buffer-size = 52428800
- receive-buffer-size = 52428800
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2554
}
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550", "akka.tcp://opendaylight-cluster-data@127.0.0.1:2554"]
+ seed-nodes = ["akka://opendaylight-cluster-data@127.0.0.1:2550", "akka://opendaylight-cluster-data@127.0.0.1:2554"]
auto-down-unreachable-after = 10s
}
}
-}
\ No newline at end of file
+}
+++ /dev/null
-
-odl-cluster-data {
- akka {
- cluster {
- roles = [
- "member-1"
- ]
- }
- actor {
- provider = "akka.cluster.ClusterActorRefProvider"
- serializers {
- java = "akka.serialization.JavaSerializer"
- proto = "akka.remote.serialization.ProtobufSerializer"
- }
-
- serialization-bindings {
- "com.google.protobuf.Message" = proto
-
- }
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2550
- maximum-frame-size = 2097152
- send-buffer-size = 52428800
- receive-buffer-size = 52428800
- }
- }
-
- cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-data@127.0.0.1:2550"]
-
- auto-down-unreachable-after = 10s
- }
- }
-}
-
-odl-cluster-rpc {
- bounded-mailbox {
- mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
- mailbox-capacity = 1000
- mailbox-push-timeout-time = 100ms
- }
-
- akka {
- actor {
- provider = "akka.cluster.ClusterActorRefProvider"
- }
- remote {
- log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2551
- }
- }
-
- cluster {
- seed-nodes = ["akka.tcp://opendaylight-cluster-rpc@127.0.0.1:2551"]
- auto-down-unreachable-after = 10s
- }
- }
-}
-odl-cluster-rpc{
+odl-cluster-rpc {
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
log-sent-messages = on
log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2550
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2550
}
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-rpc@127.0.0.1:2550"]
+ seed-nodes = ["akka://opendaylight-rpc@127.0.0.1:2550"]
auto-down-unreachable-after = 10s
}
}
}
-unit-test{
+unit-test {
akka {
loglevel = "DEBUG"
#loggers = ["akka.event.slf4j.Slf4jLogger"]
}
}
-memberA{
+memberA {
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
log-sent-messages = off
log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2551
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2551
}
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-rpc@127.0.0.1:2551"]
+ seed-nodes = ["akka://opendaylight-rpc@127.0.0.1:2551"]
auto-down-unreachable-after = 10s
}
}
}
-memberB{
+memberB {
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
log-sent-messages = off
log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2552
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2552
}
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-rpc@127.0.0.1:2551"]
+ seed-nodes = ["akka://opendaylight-rpc@127.0.0.1:2551"]
auto-down-unreachable-after = 10s
}
}
}
-memberC{
+memberC {
bounded-mailbox {
mailbox-type = "org.opendaylight.controller.cluster.common.actor.MeteredBoundedMailbox"
mailbox-capacity = 1000
log-sent-messages = off
log-remote-lifecycle-events = off
- netty.tcp {
- hostname = "127.0.0.1"
- port = 2553
+ artery {
+ enabled = on
+ canonical.hostname = "127.0.0.1"
+ canonical.port = 2553
}
}
cluster {
- seed-nodes = ["akka.tcp://opendaylight-rpc@127.0.0.1:2551"]
+ seed-nodes = ["akka://opendaylight-rpc@127.0.0.1:2551"]
auto-down-unreachable-after = 10s
}