X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FMemberNode.java;h=e6ea97124707cbd2cf85c605dba139bcf81628a4;hp=3aaf027b227577a1128f6387d0477f5396547a1a;hb=20f8f30f4bbf1e982672c1f883a6a18b0e4539de;hpb=dea3effede98cfb561c44d66b24c2d71a44b10a3 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java index 3aaf027b22..e6ea971247 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java @@ -9,9 +9,9 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; + import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.List; import java.util.Set; @@ -33,6 +34,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -46,11 +48,11 @@ import scala.concurrent.duration.Duration; * @author Thomas Pantelis */ public class MemberNode { - static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558"; private IntegrationTestKit kit; - private DistributedDataStore configDataStore; - private DistributedDataStore operDataStore; + private AbstractDataStore configDataStore; + private AbstractDataStore operDataStore; private DatastoreContext.Builder datastoreContextBuilder; private boolean cleanedUp; @@ -61,7 +63,7 @@ public class MemberNode { * callers to cleanup instances on test completion. * @return a Builder instance */ - public static Builder builder(List members) { + public static Builder builder(final List members) { return new Builder(members); } @@ -70,12 +72,12 @@ public class MemberNode { } - public DistributedDataStore configDataStore() { + public AbstractDataStore configDataStore() { return configDataStore; } - public DistributedDataStore operDataStore() { + public AbstractDataStore operDataStore() { return operDataStore; } @@ -83,22 +85,22 @@ public class MemberNode { return datastoreContextBuilder; } - public void waitForMembersUp(String... otherMembers) { + public void waitForMembersUp(final String... otherMembers) { kit.waitForMembersUp(otherMembers); } - public void waitForMemberDown(String member) { + public void waitForMemberDown(final String member) { Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 10) { + while (sw.elapsed(TimeUnit.SECONDS) <= 10) { CurrentClusterState state = Cluster.get(kit.getSystem()).state(); - for(Member m: state.getUnreachable()) { - if(member.equals(m.getRoles().iterator().next())) { + for (Member m : state.getUnreachable()) { + if (member.equals(m.getRoles().iterator().next())) { return; } } - for(Member m: state.getMembers()) { - if(m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) { + for (Member m : state.getMembers()) { + if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) { return; } } @@ -109,8 +111,9 @@ public class MemberNode { fail("Member " + member + " is now down"); } + @SuppressWarnings("checkstyle:IllegalCatch") public void cleanup() { - if(!cleanedUp) { + if (!cleanedUp) { cleanedUp = true; if (configDataStore != null) { configDataStore.close(); @@ -119,12 +122,16 @@ public class MemberNode { operDataStore.close(); } - IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE); + try { + IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE); + } catch (RuntimeException e) { + LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e); + } } } - public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) - throws Exception { + public static void verifyRaftState(final AbstractDataStore datastore, final String shardName, + final RaftStateVerifier verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); Future future = actorContext.findLocalShardAsync(shardName); @@ -132,9 +139,9 @@ public class MemberNode { AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 5) { - OnDemandRaftState raftState = (OnDemandRaftState)actorContext. - executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); + while (sw.elapsed(TimeUnit.SECONDS) <= 5) { + OnDemandRaftState raftState = (OnDemandRaftState)actorContext + .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); try { verifier.verify(raftState); @@ -148,10 +155,10 @@ public class MemberNode { throw lastError; } - public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, - String... peerMemberNames) throws Exception { + public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName, + final String... peerMemberNames) throws Exception { final Set peerIds = Sets.newHashSet(); - for(String p: peerMemberNames) { + for (String p: peerMemberNames) { peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p), datastore.getActorContext().getDataStoreName()).toString()); } @@ -160,11 +167,11 @@ public class MemberNode { raftState.getPeerAddresses().keySet())); } - public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) { + public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) { Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + while (sw.elapsed(TimeUnit.SECONDS) <= 5) { Optional shardReply = datastore.getActorContext().findLocalShard(shardName); - if(!shardReply.isPresent()) { + if (!shardReply.isPresent()) { return; } @@ -178,14 +185,15 @@ public class MemberNode { private final List members; private String moduleShardsConfig; private String akkaConfig; + private boolean useAkkaArtery = true; private String[] waitForshardLeader = new String[0]; private String testName; private SchemaContext schemaContext; private boolean createOperDatastore = true; - private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). - shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); + private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); - Builder(List members) { + Builder(final List members) { this.members = members; } @@ -194,8 +202,8 @@ public class MemberNode { * * @return this Builder */ - public Builder moduleShardsConfig(String moduleShardsConfig) { - this.moduleShardsConfig = moduleShardsConfig; + public Builder moduleShardsConfig(final String newModuleShardsConfig) { + this.moduleShardsConfig = newModuleShardsConfig; return this; } @@ -204,8 +212,18 @@ public class MemberNode { * * @return this Builder */ - public Builder akkaConfig(String akkaConfig) { - this.akkaConfig = akkaConfig; + public Builder akkaConfig(final String newAkkaConfig) { + this.akkaConfig = newAkkaConfig; + return this; + } + + /** + * Specifies whether or not to use akka artery for remoting. Default is true. + * + * @return this Builder + */ + public Builder useAkkaArtery(final boolean newUseAkkaArtery) { + this.useAkkaArtery = newUseAkkaArtery; return this; } @@ -214,8 +232,8 @@ public class MemberNode { * * @return this Builder */ - public Builder testName(String testName) { - this.testName = testName; + public Builder testName(final String newTestName) { + this.testName = newTestName; return this; } @@ -224,7 +242,7 @@ public class MemberNode { * * @return this Builder */ - public Builder waitForShardLeader(String... shardNames) { + public Builder waitForShardLeader(final String... shardNames) { this.waitForshardLeader = shardNames; return this; } @@ -234,7 +252,7 @@ public class MemberNode { * * @return this Builder */ - public Builder createOperDatastore(boolean value) { + public Builder createOperDatastore(final boolean value) { this.createOperDatastore = value; return this; } @@ -244,8 +262,8 @@ public class MemberNode { * * @return this Builder */ - public Builder schemaContext(SchemaContext schemaContext) { - this.schemaContext = schemaContext; + public Builder schemaContext(final SchemaContext newSchemaContext) { + this.schemaContext = newSchemaContext; return this; } @@ -254,7 +272,7 @@ public class MemberNode { * * @return this Builder */ - public Builder datastoreContextBuilder(DatastoreContext.Builder builder) { + public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) { datastoreContextBuilder = builder; return this; } @@ -264,15 +282,25 @@ public class MemberNode { Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified"); Preconditions.checkNotNull(testName, "testName must be specified"); - if(schemaContext == null) { + if (schemaContext == null) { schemaContext = SchemaContextHelper.full(); } MemberNode node = new MemberNode(); node.datastoreContextBuilder = datastoreContextBuilder; - ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig)); - Cluster.get(system).join(MEMBER_1_ADDRESS); + Config baseConfig = ConfigFactory.load(); + Config config; + if (useAkkaArtery) { + config = baseConfig.getConfig(akkaConfig); + } else { + config = baseConfig.getConfig(akkaConfig + "-without-artery") + .withFallback(baseConfig.getConfig(akkaConfig)); + } + + ActorSystem system = ActorSystem.create("cluster-test", config); + String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp"); + Cluster.get(system).join(AddressFromURIString.parse(member1Address)); node.kit = new IntegrationTestKit(system, datastoreContextBuilder); @@ -281,7 +309,7 @@ public class MemberNode { node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader); - if(createOperDatastore) { + if (createOperDatastore) { node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName); node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader); @@ -295,4 +323,4 @@ public class MemberNode { public interface RaftStateVerifier { void verify(OnDemandRaftState raftState); } -} \ No newline at end of file +}