X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FMemberNode.java;h=e6ea97124707cbd2cf85c605dba139bcf81628a4;hp=e966c95298a0a36fc1e0f031b4da762e40f966da;hb=7204c455a1636a7fc89bcd28fe9e9000eaa81b3b;hpb=01be99539d7b19743a237b6e72d2d870491daf7a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java index e966c95298..e6ea971247 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java @@ -7,30 +7,34 @@ */ package org.opendaylight.controller.cluster.datastore; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; + import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; import akka.cluster.Member; import akka.cluster.MemberStatus; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -44,11 +48,11 @@ import scala.concurrent.duration.Duration; * @author Thomas Pantelis */ public class MemberNode { - static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558"; private IntegrationTestKit kit; - private DistributedDataStore configDataStore; - private DistributedDataStore operDataStore; + private AbstractDataStore configDataStore; + private AbstractDataStore operDataStore; private DatastoreContext.Builder datastoreContextBuilder; private boolean cleanedUp; @@ -59,7 +63,7 @@ public class MemberNode { * callers to cleanup instances on test completion. * @return a Builder instance */ - public static Builder builder(List members) { + public static Builder builder(final List members) { return new Builder(members); } @@ -68,12 +72,12 @@ public class MemberNode { } - public DistributedDataStore configDataStore() { + public AbstractDataStore configDataStore() { return configDataStore; } - public DistributedDataStore operDataStore() { + public AbstractDataStore operDataStore() { return operDataStore; } @@ -81,14 +85,22 @@ public class MemberNode { return datastoreContextBuilder; } - public void waitForMembersUp(String... otherMembers) { - Set otherMembersSet = Sets.newHashSet(otherMembers); + public void waitForMembersUp(final String... otherMembers) { + kit.waitForMembersUp(otherMembers); + } + + public void waitForMemberDown(final String member) { Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 10) { + while (sw.elapsed(TimeUnit.SECONDS) <= 10) { CurrentClusterState state = Cluster.get(kit.getSystem()).state(); - for(Member m: state.getMembers()) { - if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) && - otherMembersSet.isEmpty()) { + for (Member m : state.getUnreachable()) { + if (member.equals(m.getRoles().iterator().next())) { + return; + } + } + + for (Member m : state.getMembers()) { + if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) { return; } } @@ -96,20 +108,30 @@ public class MemberNode { Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); } - fail("Member(s) " + otherMembersSet + " are not Up"); + fail("Member " + member + " is now down"); } + @SuppressWarnings("checkstyle:IllegalCatch") public void cleanup() { - if(!cleanedUp) { + if (!cleanedUp) { cleanedUp = true; - kit.cleanup(configDataStore); - kit.cleanup(operDataStore); - kit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE); + if (configDataStore != null) { + configDataStore.close(); + } + if (operDataStore != null) { + operDataStore.close(); + } + + try { + IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE); + } catch (RuntimeException e) { + LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e); + } } } - public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) - throws Exception { + public static void verifyRaftState(final AbstractDataStore datastore, final String shardName, + final RaftStateVerifier verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); Future future = actorContext.findLocalShardAsync(shardName); @@ -117,9 +139,9 @@ public class MemberNode { AssertionError lastError = null; Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 5) { - OnDemandRaftState raftState = (OnDemandRaftState)actorContext. - executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); + while (sw.elapsed(TimeUnit.SECONDS) <= 5) { + OnDemandRaftState raftState = (OnDemandRaftState)actorContext + .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); try { verifier.verify(raftState); @@ -133,36 +155,45 @@ public class MemberNode { throw lastError; } - public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, - String... peerMemberNames) throws Exception { + public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName, + final String... peerMemberNames) throws Exception { final Set peerIds = Sets.newHashSet(); - for(String p: peerMemberNames) { - peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName). - type(datastore.getActorContext().getDataStoreType()).build().toString()); + for (String p: peerMemberNames) { + peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p), + datastore.getActorContext().getDataStoreName()).toString()); } - verifyRaftState(datastore, shardName, new RaftStateVerifier() { - @Override - public void verify(OnDemandRaftState raftState) { - assertTrue(String.format("Peer(s) %s not found for shard %s. Actual: %s", peerIds, shardName, - raftState.getPeerAddresses().keySet()), - raftState.getPeerAddresses().keySet().containsAll(peerIds)); + verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds, + raftState.getPeerAddresses().keySet())); + } + + public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) { + Stopwatch sw = Stopwatch.createStarted(); + while (sw.elapsed(TimeUnit.SECONDS) <= 5) { + Optional shardReply = datastore.getActorContext().findLocalShard(shardName); + if (!shardReply.isPresent()) { + return; } - }); + + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + + fail("Shard " + shardName + " is present"); } public static class Builder { private final List members; private String moduleShardsConfig; private String akkaConfig; + private boolean useAkkaArtery = true; private String[] waitForshardLeader = new String[0]; private String testName; private SchemaContext schemaContext; private boolean createOperDatastore = true; - private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). - shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); + private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() + .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); - Builder(List members) { + Builder(final List members) { this.members = members; } @@ -171,8 +202,8 @@ public class MemberNode { * * @return this Builder */ - public Builder moduleShardsConfig(String moduleShardsConfig) { - this.moduleShardsConfig = moduleShardsConfig; + public Builder moduleShardsConfig(final String newModuleShardsConfig) { + this.moduleShardsConfig = newModuleShardsConfig; return this; } @@ -181,8 +212,18 @@ public class MemberNode { * * @return this Builder */ - public Builder akkaConfig(String akkaConfig) { - this.akkaConfig = akkaConfig; + public Builder akkaConfig(final String newAkkaConfig) { + this.akkaConfig = newAkkaConfig; + return this; + } + + /** + * Specifies whether or not to use akka artery for remoting. Default is true. + * + * @return this Builder + */ + public Builder useAkkaArtery(final boolean newUseAkkaArtery) { + this.useAkkaArtery = newUseAkkaArtery; return this; } @@ -191,8 +232,8 @@ public class MemberNode { * * @return this Builder */ - public Builder testName(String testName) { - this.testName = testName; + public Builder testName(final String newTestName) { + this.testName = newTestName; return this; } @@ -201,7 +242,7 @@ public class MemberNode { * * @return this Builder */ - public Builder waitForShardLeader(String... shardNames) { + public Builder waitForShardLeader(final String... shardNames) { this.waitForshardLeader = shardNames; return this; } @@ -211,7 +252,7 @@ public class MemberNode { * * @return this Builder */ - public Builder createOperDatastore(boolean value) { + public Builder createOperDatastore(final boolean value) { this.createOperDatastore = value; return this; } @@ -221,8 +262,8 @@ public class MemberNode { * * @return this Builder */ - public Builder schemaContext(SchemaContext schemaContext) { - this.schemaContext = schemaContext; + public Builder schemaContext(final SchemaContext newSchemaContext) { + this.schemaContext = newSchemaContext; return this; } @@ -231,7 +272,7 @@ public class MemberNode { * * @return this Builder */ - public Builder datastoreContextBuilder(DatastoreContext.Builder builder) { + public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) { datastoreContextBuilder = builder; return this; } @@ -241,24 +282,34 @@ public class MemberNode { Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified"); Preconditions.checkNotNull(testName, "testName must be specified"); - if(schemaContext == null) { + if (schemaContext == null) { schemaContext = SchemaContextHelper.full(); } MemberNode node = new MemberNode(); node.datastoreContextBuilder = datastoreContextBuilder; - ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig)); - Cluster.get(system).join(MEMBER_1_ADDRESS); + Config baseConfig = ConfigFactory.load(); + Config config; + if (useAkkaArtery) { + config = baseConfig.getConfig(akkaConfig); + } else { + config = baseConfig.getConfig(akkaConfig + "-without-artery") + .withFallback(baseConfig.getConfig(akkaConfig)); + } + + ActorSystem system = ActorSystem.create("cluster-test", config); + String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp"); + Cluster.get(system).join(AddressFromURIString.parse(member1Address)); node.kit = new IntegrationTestKit(system, datastoreContextBuilder); - String memberName = new ClusterWrapperImpl(system).getCurrentMemberName(); + String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName(); node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName); node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader); - if(createOperDatastore) { + if (createOperDatastore) { node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName); node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader); @@ -269,7 +320,7 @@ public class MemberNode { } } - public static interface RaftStateVerifier { + public interface RaftStateVerifier { void verify(OnDemandRaftState raftState); } -} \ No newline at end of file +}