X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FMemberNode.java;h=e6ea97124707cbd2cf85c605dba139bcf81628a4;hp=db6c5e664d49786293cbb4668656737fc0426cfd;hb=7204c455a1636a7fc89bcd28fe9e9000eaa81b3b;hpb=5464f50be733df1bbbe31cf05665d542d3b7c5e7 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java index db6c5e664d..e6ea971247 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java @@ -12,7 +12,6 @@ import static org.junit.Assert.fail; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.cluster.ClusterEvent.CurrentClusterState; @@ -23,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.util.List; import java.util.Set; @@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftS import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -47,11 +48,11 @@ import scala.concurrent.duration.Duration; * @author Thomas Pantelis */ public class MemberNode { - static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558"; private IntegrationTestKit kit; - private DistributedDataStore configDataStore; - private DistributedDataStore operDataStore; + private AbstractDataStore configDataStore; + private AbstractDataStore operDataStore; private DatastoreContext.Builder datastoreContextBuilder; private boolean cleanedUp; @@ -62,7 +63,7 @@ public class MemberNode { * callers to cleanup instances on test completion. * @return a Builder instance */ - public static Builder builder(List members) { + public static Builder builder(final List members) { return new Builder(members); } @@ -71,12 +72,12 @@ public class MemberNode { } - public DistributedDataStore configDataStore() { + public AbstractDataStore configDataStore() { return configDataStore; } - public DistributedDataStore operDataStore() { + public AbstractDataStore operDataStore() { return operDataStore; } @@ -84,11 +85,11 @@ public class MemberNode { return datastoreContextBuilder; } - public void waitForMembersUp(String... otherMembers) { + public void waitForMembersUp(final String... otherMembers) { kit.waitForMembersUp(otherMembers); } - public void waitForMemberDown(String member) { + public void waitForMemberDown(final String member) { Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 10) { CurrentClusterState state = Cluster.get(kit.getSystem()).state(); @@ -110,6 +111,7 @@ public class MemberNode { fail("Member " + member + " is now down"); } + @SuppressWarnings("checkstyle:IllegalCatch") public void cleanup() { if (!cleanedUp) { cleanedUp = true; @@ -120,12 +122,16 @@ public class MemberNode { operDataStore.close(); } - IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE); + try { + IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE); + } catch (RuntimeException e) { + LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e); + } } } - public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) - throws Exception { + public static void verifyRaftState(final AbstractDataStore datastore, final String shardName, + final RaftStateVerifier verifier) throws Exception { ActorContext actorContext = datastore.getActorContext(); Future future = actorContext.findLocalShardAsync(shardName); @@ -149,8 +155,8 @@ public class MemberNode { throw lastError; } - public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, - String... peerMemberNames) throws Exception { + public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName, + final String... peerMemberNames) throws Exception { final Set peerIds = Sets.newHashSet(); for (String p: peerMemberNames) { peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p), @@ -161,7 +167,7 @@ public class MemberNode { raftState.getPeerAddresses().keySet())); } - public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) { + public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) { Stopwatch sw = Stopwatch.createStarted(); while (sw.elapsed(TimeUnit.SECONDS) <= 5) { Optional shardReply = datastore.getActorContext().findLocalShard(shardName); @@ -179,6 +185,7 @@ public class MemberNode { private final List members; private String moduleShardsConfig; private String akkaConfig; + private boolean useAkkaArtery = true; private String[] waitForshardLeader = new String[0]; private String testName; private SchemaContext schemaContext; @@ -186,7 +193,7 @@ public class MemberNode { private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder() .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); - Builder(List members) { + Builder(final List members) { this.members = members; } @@ -195,7 +202,7 @@ public class MemberNode { * * @return this Builder */ - public Builder moduleShardsConfig(String newModuleShardsConfig) { + public Builder moduleShardsConfig(final String newModuleShardsConfig) { this.moduleShardsConfig = newModuleShardsConfig; return this; } @@ -205,17 +212,27 @@ public class MemberNode { * * @return this Builder */ - public Builder akkaConfig(String newAkkaConfig) { + public Builder akkaConfig(final String newAkkaConfig) { this.akkaConfig = newAkkaConfig; return this; } + /** + * Specifies whether or not to use akka artery for remoting. Default is true. + * + * @return this Builder + */ + public Builder useAkkaArtery(final boolean newUseAkkaArtery) { + this.useAkkaArtery = newUseAkkaArtery; + return this; + } + /** * Specifies the name of the test that is appended to the data store names. This is required. * * @return this Builder */ - public Builder testName(String newTestName) { + public Builder testName(final String newTestName) { this.testName = newTestName; return this; } @@ -225,7 +242,7 @@ public class MemberNode { * * @return this Builder */ - public Builder waitForShardLeader(String... shardNames) { + public Builder waitForShardLeader(final String... shardNames) { this.waitForshardLeader = shardNames; return this; } @@ -235,7 +252,7 @@ public class MemberNode { * * @return this Builder */ - public Builder createOperDatastore(boolean value) { + public Builder createOperDatastore(final boolean value) { this.createOperDatastore = value; return this; } @@ -245,7 +262,7 @@ public class MemberNode { * * @return this Builder */ - public Builder schemaContext(SchemaContext newSchemaContext) { + public Builder schemaContext(final SchemaContext newSchemaContext) { this.schemaContext = newSchemaContext; return this; } @@ -255,7 +272,7 @@ public class MemberNode { * * @return this Builder */ - public Builder datastoreContextBuilder(DatastoreContext.Builder builder) { + public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) { datastoreContextBuilder = builder; return this; } @@ -272,8 +289,18 @@ public class MemberNode { MemberNode node = new MemberNode(); node.datastoreContextBuilder = datastoreContextBuilder; - ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig)); - Cluster.get(system).join(MEMBER_1_ADDRESS); + Config baseConfig = ConfigFactory.load(); + Config config; + if (useAkkaArtery) { + config = baseConfig.getConfig(akkaConfig); + } else { + config = baseConfig.getConfig(akkaConfig + "-without-artery") + .withFallback(baseConfig.getConfig(akkaConfig)); + } + + ActorSystem system = ActorSystem.create("cluster-test", config); + String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp"); + Cluster.get(system).join(AddressFromURIString.parse(member1Address)); node.kit = new IntegrationTestKit(system, datastoreContextBuilder);