From: Tom Pantelis Date: Wed, 18 Nov 2015 05:13:03 +0000 (-0500) Subject: Bug 2187: Refactor MemberNode from ClusterAdminRpcServiceTest X-Git-Tag: release/beryllium~122 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=09892e9e17d9bff10178b4631678557b855d8a1e Bug 2187: Refactor MemberNode from ClusterAdminRpcServiceTest Moved the MemberNode class in ClusterAdminRpcServiceTest so it can be reused with the DistributedEntityOwnershipIntegrationTest. Also fixed log statements in a couple other classes. Change-Id: I66aaa44a30aeabef2e7e3d74f9566f464565995d Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java index ec0081d38b..0277271020 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java @@ -89,7 +89,7 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService @Override public void onComplete(Throwable failure, Object response) { if(failure != null) { - LOG.error("Failed to create {} shard", ENTITY_OWNERSHIP_SHARD_NAME); + LOG.error("Failed to create {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure); } else { LOG.info("Successfully created {} shard", ENTITY_OWNERSHIP_SHARD_NAME); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java new file mode 100644 index 0000000000..33be6bee02 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java @@ -0,0 +1,274 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Address; +import akka.actor.AddressFromURIString; +import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.Member; +import akka.cluster.MemberStatus; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; +import com.typesafe.config.ConfigFactory; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +/** + * Class that represents a cluster member node for unit tests. It encapsulates an actor system with + * config and (optional) operational data store instances. The Builder is used to specify the setup + * parameters and create the data store instances. The actor system is automatically joined to address + * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address. + * + * @author Thomas Pantelis + */ +public class MemberNode { + static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); + + private IntegrationTestKit kit; + private DistributedDataStore configDataStore; + private DistributedDataStore operDataStore; + private DatastoreContext.Builder datastoreContextBuilder; + private boolean cleanedUp; + + /** + * Constructs a Builder. + * + * @param members the list to which the resulting MemberNode will be added. This makes it easier for + * callers to cleanup instances on test completion. + * @return a Builder instance + */ + public static Builder builder(List members) { + return new Builder(members); + } + + public IntegrationTestKit kit() { + return kit; + } + + + public DistributedDataStore configDataStore() { + return configDataStore; + } + + + public DistributedDataStore operDataStore() { + return operDataStore; + } + + public DatastoreContext.Builder datastoreContextBuilder() { + return datastoreContextBuilder; + } + + public void waitForMembersUp(String... otherMembers) { + Set otherMembersSet = Sets.newHashSet(otherMembers); + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 10) { + CurrentClusterState state = Cluster.get(kit.getSystem()).state(); + for(Member m: state.getMembers()) { + if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) && + otherMembersSet.isEmpty()) { + return; + } + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + fail("Member(s) " + otherMembersSet + " are not Up"); + } + + public void cleanup() { + if(!cleanedUp) { + cleanedUp = true; + kit.cleanup(configDataStore); + kit.cleanup(operDataStore); + kit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE); + } + } + + public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) + throws Exception { + ActorContext actorContext = datastore.getActorContext(); + + Future future = actorContext.findLocalShardAsync(shardName); + ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); + + AssertionError lastError = null; + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + OnDemandRaftState raftState = (OnDemandRaftState)actorContext. + executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); + + try { + verifier.verify(raftState); + return; + } catch (AssertionError e) { + lastError = e; + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + throw lastError; + } + + public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, + String... peerMemberNames) throws Exception { + final Set peerIds = Sets.newHashSet(); + for(String p: peerMemberNames) { + peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName). + type(datastore.getActorContext().getDataStoreType()).build().toString()); + } + + verifyRaftState(datastore, shardName, new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName, + raftState.getPeerAddresses().keySet().containsAll(peerIds)); + } + }); + } + + public static class Builder { + private final List members; + private String moduleShardsConfig; + private String akkaConfig; + private String[] waitForshardLeader = new String[0]; + private String testName; + private SchemaContext schemaContext; + private boolean createOperDatastore = true; + private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); + + Builder(List members) { + this.members = members; + } + + /** + * Specifies the name of the module shards config file. This is required. + * + * @return this Builder + */ + public Builder moduleShardsConfig(String moduleShardsConfig) { + this.moduleShardsConfig = moduleShardsConfig; + return this; + } + + /** + * Specifies the name of the akka configuration. This is required. + * + * @return this Builder + */ + public Builder akkaConfig(String akkaConfig) { + this.akkaConfig = akkaConfig; + return this; + } + + /** + * Specifies the name of the test that is appended to the data store names. This is required. + * + * @return this Builder + */ + public Builder testName(String testName) { + this.testName = testName; + return this; + } + + /** + * Specifies the optional names of the shards to initially wait for a leader to be elected. + * + * @return this Builder + */ + public Builder waitForShardLeader(String... shardNames) { + this.waitForshardLeader = shardNames; + return this; + } + + /** + * Specifies whether or not to create an operational data store. Defaults to true. + * + * @return this Builder + */ + public Builder createOperDatastore(boolean value) { + this.createOperDatastore = value; + return this; + } + + /** + * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full(). + * + * @return this Builder + */ + public Builder schemaContext(SchemaContext schemaContext) { + this.schemaContext = schemaContext; + return this; + } + + /** + * Specifies the DatastoreContext Builder. If not specified, a default instance is used. + * + * @return this Builder + */ + public Builder datastoreContextBuilder(DatastoreContext.Builder builder) { + datastoreContextBuilder = builder; + return this; + } + + public MemberNode build() { + Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified"); + Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified"); + Preconditions.checkNotNull(testName, "testName must be specified"); + + if(schemaContext == null) { + schemaContext = SchemaContextHelper.full(); + } + + MemberNode node = new MemberNode(); + node.datastoreContextBuilder = datastoreContextBuilder; + + ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig)); + Cluster.get(system).join(MEMBER_1_ADDRESS); + + node.kit = new IntegrationTestKit(system, datastoreContextBuilder); + + String memberName = new ClusterWrapperImpl(system).getCurrentMemberName(); + node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName); + node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig, + true, schemaContext, waitForshardLeader); + + if(createOperDatastore) { + node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName); + node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig, + true, schemaContext, waitForshardLeader); + } + + members.add(node); + return node; + } + } + + public static interface RaftStateVerifier { + void verify(OnDemandRaftState raftState); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index abb6adb295..a85f651823 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -9,25 +9,16 @@ package org.opendaylight.controller.cluster.datastore.admin; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent; +import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState; import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.actor.AddressFromURIString; import akka.actor.PoisonPill; import akka.cluster.Cluster; -import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.Member; -import akka.cluster.MemberStatus; -import akka.testkit.JavaTestKit; import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.ConfigFactory; import java.io.File; import java.io.FileInputStream; import java.util.ArrayList; @@ -37,16 +28,15 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.junit.After; +import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; -import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; -import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.MemberNode; +import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ActorContext; -import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; @@ -55,9 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; /** * Unit tests for ClusterAdminRpcService. @@ -65,10 +52,14 @@ import scala.concurrent.duration.Duration; * @author Thomas Pantelis */ public class ClusterAdminRpcServiceTest { - private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); - private final List memberNodes = new ArrayList<>(); + @Before + public void setUp() { + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); + } + @After public void tearDown() { for(MemberNode m: memberNodes) { @@ -85,7 +76,7 @@ public class ClusterAdminRpcServiceTest { String fileName = "target/testBackupDatastore"; new File(fileName).delete(); - ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore, node.operDataStore); + ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore()); RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder(). setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); @@ -97,21 +88,21 @@ public class ClusterAdminRpcServiceTest { ImmutableMap map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0), snapshots.get(1).getType(), snapshots.get(1)); - verifyDatastoreSnapshot(node.configDataStore.getActorContext().getDataStoreType(), - map.get(node.configDataStore.getActorContext().getDataStoreType()), "cars", "people"); + verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreType(), + map.get(node.configDataStore().getActorContext().getDataStoreType()), "cars", "people"); } finally { new File(fileName).delete(); } // Test failure by killing a shard. - node.configDataStore.getActorContext().getShardManager().tell(node.datastoreContextBuilder. + node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder(). shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender()); - ActorRef carsShardActor = node.configDataStore.getActorContext().findLocalShard("cars").get(); - node.kit.watch(carsShardActor); + ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get(); + node.kit().watch(carsShardActor); carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - node.kit.expectTerminated(carsShardActor); + node.kit().expectTerminated(carsShardActor); rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()). get(5, TimeUnit.SECONDS); @@ -153,15 +144,15 @@ public class ClusterAdminRpcServiceTest { testAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2"); - verifyRaftPeersPresent(newReplicaNode2.configDataStore, "cars", "member-1", "member-3"); - verifyRaftPeersPresent(newReplicaNode2.operDataStore, "cars", "member-1", "member-3"); + verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3"); + verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3"); // Write data to member-2's config datastore and read/verify via member-3 - NormalizedNode configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore, - newReplicaNode3.configDataStore); + NormalizedNode configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(), + newReplicaNode3.configDataStore()); // Write data to member-3's oper datastore and read/verify via member-2 - writeCarsNodeAndVerify(newReplicaNode3.operDataStore, newReplicaNode2.operDataStore); + writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore()); // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 - // 2 ServerConfigurationPayload entries and the transaction payload entry. @@ -174,25 +165,25 @@ public class ClusterAdminRpcServiceTest { } }; - verifyRaftState(leaderNode1.configDataStore, "cars", verifier); - verifyRaftState(leaderNode1.operDataStore, "cars", verifier); + verifyRaftState(leaderNode1.configDataStore(), "cars", verifier); + verifyRaftState(leaderNode1.operDataStore(), "cars", verifier); - verifyRaftState(newReplicaNode2.configDataStore, "cars", verifier); - verifyRaftState(newReplicaNode2.operDataStore, "cars", verifier); + verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier); + verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier); - verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier); - verifyRaftState(newReplicaNode3.operDataStore, "cars", verifier); + verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier); + verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier); // Restart member-3 and verify the cars config shard is re-instated. - Cluster.get(leaderNode1.kit.getSystem()).down(Cluster.get(newReplicaNode3.kit.getSystem()).selfAddress()); + Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress()); newReplicaNode3.cleanup(); newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build(); - verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier); - readCarsNodeAndVerify(newReplicaNode3.configDataStore, configCarsNode); + verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier); + readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode); } private NormalizedNode writeCarsNodeAndVerify(DistributedDataStore writeToStore, @@ -223,61 +214,19 @@ public class ClusterAdminRpcServiceTest { throws Exception { memberNode.waitForMembersUp(peerMemberNames); - ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore, - memberNode.operDataStore); + ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(), + memberNode.operDataStore()); RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName). build()).get(10, TimeUnit.SECONDS); checkSuccessfulRpcResult(rpcResult); - verifyRaftPeersPresent(memberNode.configDataStore, shardName, peerMemberNames); - verifyRaftPeersPresent(memberNode.operDataStore, shardName, peerMemberNames); + verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames); + verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames); service.close(); } - private void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, String... peerMemberNames) - throws Exception { - final Set peerIds = Sets.newHashSet(); - for(String p: peerMemberNames) { - peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName). - type(datastore.getActorContext().getDataStoreType()).build().toString()); - } - - verifyRaftState(datastore, shardName, new RaftStateVerifier() { - @Override - public void verify(OnDemandRaftState raftState) { - assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName, - raftState.getPeerAddresses().keySet().containsAll(peerIds)); - } - }); - } - - private void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) - throws Exception { - ActorContext actorContext = datastore.getActorContext(); - - Future future = actorContext.findLocalShardAsync(shardName); - ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); - - AssertionError lastError = null; - Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 5) { - OnDemandRaftState raftState = (OnDemandRaftState)actorContext. - executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); - - try { - verifier.verify(raftState); - return; - } catch (AssertionError e) { - lastError = e; - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - } - } - - throw lastError; - } - private void checkSuccessfulRpcResult(RpcResult rpcResult) { if(!rpcResult.isSuccessful()) { if(rpcResult.getErrors().size() > 0) { @@ -313,108 +262,4 @@ public class ClusterAdminRpcServiceTest { public void testConvertMembersToNonvotingForAllShards() { // TODO implement } - - private static class MemberNode { - IntegrationTestKit kit; - DistributedDataStore configDataStore; - DistributedDataStore operDataStore; - final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). - shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); - boolean cleanedUp; - - static Builder builder(List members) { - return new Builder(members); - } - - void waitForMembersUp(String... otherMembers) { - Set otherMembersSet = Sets.newHashSet(otherMembers); - Stopwatch sw = Stopwatch.createStarted(); - while(sw.elapsed(TimeUnit.SECONDS) <= 10) { - CurrentClusterState state = Cluster.get(kit.getSystem()).state(); - for(Member m: state.getMembers()) { - if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) && - otherMembersSet.isEmpty()) { - return; - } - } - - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - } - - fail("Member(s) " + otherMembersSet + " are not Up"); - } - - void cleanup() { - if(!cleanedUp) { - cleanedUp = true; - kit.cleanup(configDataStore); - kit.cleanup(operDataStore); - JavaTestKit.shutdownActorSystem(kit.getSystem()); - } - } - - static class Builder { - List members; - String moduleShardsConfig; - String akkaConfig; - String[] waitForshardLeader = new String[0]; - String testName; - boolean createOperDatastore = true; - - Builder(List members) { - this.members = members; - } - - Builder moduleShardsConfig(String moduleShardsConfig) { - this.moduleShardsConfig = moduleShardsConfig; - return this; - } - - Builder akkaConfig(String akkaConfig) { - this.akkaConfig = akkaConfig; - return this; - } - - Builder testName(String testName) { - this.testName = testName; - return this; - } - - Builder waitForShardLeader(String... shardNames) { - this.waitForshardLeader = shardNames; - return this; - } - - Builder createOperDatastore(boolean value) { - this.createOperDatastore = value; - return this; - } - - MemberNode build() { - MemberNode node = new MemberNode(); - ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig)); - Cluster.get(system).join(MEMBER_1_ADDRESS); - - node.kit = new IntegrationTestKit(system, node.datastoreContextBuilder); - - String memberName = new ClusterWrapperImpl(system).getCurrentMemberName(); - node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName); - node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig, - true, waitForshardLeader); - - if(createOperDatastore) { - node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName); - node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig, - true, waitForshardLeader); - } - - members.add(node); - return node; - } - } - } - - private static interface RaftStateVerifier { - void verify(OnDemandRaftState raftState); - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java index ecae2fa1d8..46ca5f284c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java @@ -19,21 +19,17 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Abst import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath; -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.actor.AddressFromURIString; import akka.actor.Status.Failure; import akka.actor.Status.Success; -import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Uninterruptibles; -import com.typesafe.config.ConfigFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; @@ -44,15 +40,18 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; -import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; +import org.opendaylight.controller.cluster.datastore.MemberNode; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; +import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -67,7 +66,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * @author Thomas Pantelis */ public class DistributedEntityOwnershipIntegrationTest { - private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf"; private static final String MODULE_SHARDS_MEMBER_1_CONFIG = "module-shards-default-member-1.conf"; private static final String ENTITY_TYPE1 = "entityType1"; @@ -79,25 +77,14 @@ public class DistributedEntityOwnershipIntegrationTest { private static final Entity ENTITY4 = new Entity(ENTITY_TYPE1, "entity4"); private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners(); - private ActorSystem leaderSystem; - private ActorSystem follower1System; - private ActorSystem follower2System; - private final DatastoreContext.Builder leaderDatastoreContextBuilder = - DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5). + shardIsolatedLeaderCheckIntervalInMillis(1000000); private final DatastoreContext.Builder followerDatastoreContextBuilder = DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000); - private DistributedDataStore leaderDistributedDataStore; - private DistributedDataStore follower1DistributedDataStore; - private DistributedDataStore follower2DistributedDataStore; - private DistributedEntityOwnershipService leaderEntityOwnershipService; - private DistributedEntityOwnershipService follower1EntityOwnershipService; - private DistributedEntityOwnershipService follower2EntityOwnershipService; - private IntegrationTestKit leaderTestKit; - private IntegrationTestKit follower1TestKit; - private IntegrationTestKit follower2TestKit; + private final List memberNodes = new ArrayList<>(); @Mock private EntityOwnershipListener leaderMockListener; @@ -114,96 +101,50 @@ public class DistributedEntityOwnershipIntegrationTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); + InMemoryJournal.clear(); + InMemorySnapshotStore.clear(); } @After public void tearDown() { - if(leaderSystem != null) { - JavaTestKit.shutdownActorSystem(leaderSystem); - } - - if(follower1System != null) { - JavaTestKit.shutdownActorSystem(follower1System); - } - - if(follower2System != null) { - JavaTestKit.shutdownActorSystem(follower2System); + for(MemberNode m: memberNodes) { + m.cleanup(); } } - private void startAllSystems() { - startLeaderSystem(); - startFollower1System(); - startFollower2System(); - } - - private void startFollower2System() { - follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3")); - Cluster.get(follower2System).join(MEMBER_1_ADDRESS); - } - - private void startFollower1System() { - follower1System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2")); - Cluster.get(follower1System).join(MEMBER_1_ADDRESS); + private DistributedEntityOwnershipService newOwnershipService(DistributedDataStore datastore) { + DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(datastore, + EntityOwnerSelectionStrategyConfig.newBuilder().build()); + service.start(); + return service; } - private void startLeaderSystem() { - leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS); - } + @Test + public void testFunctionalityWithThreeNodes() throws Exception { + String name = "test"; + MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(leaderDatastoreContextBuilder).build(); - private void initDatastores(String type) { - initLeaderDatastore(type, MODULE_SHARDS_CONFIG); + MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ). + moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(followerDatastoreContextBuilder).build(); - initFollower1Datastore(type, MODULE_SHARDS_CONFIG); + MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ). + moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(followerDatastoreContextBuilder).build(); - follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder); - follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore( - type, MODULE_SHARDS_CONFIG, false, SCHEMA_CONTEXT); + DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); leaderDistributedDataStore.waitTillReady(); - follower1DistributedDataStore.waitTillReady(); - follower2DistributedDataStore.waitTillReady(); - - startLeaderService(); - - startFollower1Service(); - - follower2EntityOwnershipService = new DistributedEntityOwnershipService(follower2DistributedDataStore, - EntityOwnerSelectionStrategyConfig.newBuilder().build()); - follower2EntityOwnershipService.start(); - - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); - } - - private void startFollower1Service() { - follower1EntityOwnershipService = new DistributedEntityOwnershipService(follower1DistributedDataStore, - EntityOwnerSelectionStrategyConfig.newBuilder().build()); - follower1EntityOwnershipService.start(); - } - - private void startLeaderService() { - leaderEntityOwnershipService = new DistributedEntityOwnershipService(leaderDistributedDataStore, - EntityOwnerSelectionStrategyConfig.newBuilder().build()); - leaderEntityOwnershipService.start(); - } + follower1Node.configDataStore().waitTillReady(); + follower2Node.configDataStore().waitTillReady(); - private void initFollower1Datastore(String type, String moduleConfig) { - follower1TestKit = new IntegrationTestKit(follower1System, followerDatastoreContextBuilder); - follower1DistributedDataStore = follower1TestKit.setupDistributedDataStore( - type, moduleConfig, false, SCHEMA_CONTEXT); - } + EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore); + EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore()); + EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore()); - private void initLeaderDatastore(String type, String moduleConfig) { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder); - leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore( - type, moduleConfig, false, SCHEMA_CONTEXT); - } - - @Test - public void test() throws Exception { - startAllSystems(); - initDatastores("test"); + leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener); leaderEntityOwnershipService.registerListener(ENTITY_TYPE2, leaderMockListener2); @@ -236,7 +177,7 @@ public class DistributedEntityOwnershipIntegrationTest { // Register follower1 candidate for entity2 and verify it becomes owner - follower1EntityOwnershipService.registerCandidate(ENTITY2); + EntityOwnershipCandidateRegistration follower1Entity2Reg = follower1EntityOwnershipService.registerCandidate(ENTITY2); verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true)); verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true)); reset(leaderMockListener, follower1MockListener); @@ -250,7 +191,7 @@ public class DistributedEntityOwnershipIntegrationTest { // Unregister follower1 candidate for entity2 and verify follower2 becomes owner - follower1EntityOwnershipService.unregisterCandidate(ENTITY2); + follower1Entity2Reg.close(); verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-3"); verifyOwner(leaderDistributedDataStore, ENTITY2, "member-3"); verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, true)); @@ -289,7 +230,7 @@ public class DistributedEntityOwnershipIntegrationTest { // Shutdown follower2 and verify it's owned entities (entity 2 & 4) get re-assigned reset(leaderMockListener, follower1MockListener); - JavaTestKit.shutdownActorSystem(follower2System); + follower2Node.cleanup(); verify(follower1MockListener, timeout(15000).times(2)).ownershipChanged(or(ownershipChange(ENTITY4, false, true, true), ownershipChange(ENTITY2, false, false, false))); @@ -299,13 +240,13 @@ public class DistributedEntityOwnershipIntegrationTest { // Register leader candidate for entity2 and verify it becomes owner - leaderEntityOwnershipService.registerCandidate(ENTITY2); + EntityOwnershipCandidateRegistration leaderEntity2Reg = leaderEntityOwnershipService.registerCandidate(ENTITY2); verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true)); verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1"); // Unregister leader candidate for entity2 and verify the owner is cleared - leaderEntityOwnershipService.unregisterCandidate(ENTITY2); + leaderEntity2Reg.close(); verifyOwner(leaderDistributedDataStore, ENTITY2, ""); verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, false)); verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, false)); @@ -318,8 +259,30 @@ public class DistributedEntityOwnershipIntegrationTest { */ @Test public void testCloseCandidateRegistrationInQuickSuccession() throws CandidateAlreadyRegisteredException { - startAllSystems(); - initDatastores("testCloseCandidateRegistrationInQuickSuccession"); + String name = "testCloseCandidateRegistrationInQuickSuccession"; + MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(leaderDatastoreContextBuilder).build(); + + MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ). + moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(followerDatastoreContextBuilder).build(); + + MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ). + moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(followerDatastoreContextBuilder).build(); + + DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + + leaderDistributedDataStore.waitTillReady(); + follower1Node.configDataStore().waitTillReady(); + follower2Node.configDataStore().waitTillReady(); + + EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore); + EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore()); + EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore()); + + leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener); follower1EntityOwnershipService.registerListener(ENTITY_TYPE1, follower1MockListener); @@ -368,40 +331,48 @@ public class DistributedEntityOwnershipIntegrationTest { */ @Test public void testEntityOwnershipShardBootstrapping() throws Throwable { - startLeaderSystem(); - startFollower1System(); - String type = "testEntityOwnershipShardBootstrapping"; - initLeaderDatastore(type, MODULE_SHARDS_MEMBER_1_CONFIG); - initFollower1Datastore(type, MODULE_SHARDS_MEMBER_1_CONFIG); + String name = "testEntityOwnershipShardBootstrapping"; + String moduleShardsConfig = MODULE_SHARDS_MEMBER_1_CONFIG; + MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(leaderDatastoreContextBuilder).build(); - leaderDistributedDataStore.waitTillReady(); + DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore(); + EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore); + + leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); + + MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ). + moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(followerDatastoreContextBuilder).build(); + + DistributedDataStore follower1DistributedDataStore = follower1Node.configDataStore(); follower1DistributedDataStore.waitTillReady(); - startLeaderService(); - startFollower1Service(); + leaderNode.waitForMembersUp("member-2"); + follower1Node.waitForMembersUp("member-1"); - leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME); + EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1DistributedDataStore); leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener); // Register a candidate for follower1 - should get queued since follower1 has no leader - follower1EntityOwnershipService.registerCandidate(ENTITY1); + EntityOwnershipCandidateRegistration candidateReg = follower1EntityOwnershipService.registerCandidate(ENTITY1); verify(leaderMockListener, timeout(300).never()).ownershipChanged(ownershipChange(ENTITY1)); // Add replica in follower1 AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME); - follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1TestKit.getRef()); - Object reply = follower1TestKit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class); + follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1Node.kit().getRef()); + Object reply = follower1Node.kit().expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class); if(reply instanceof Failure) { - throw ((Failure)reply).cause(); + throw new AssertionError("AddShardReplica failed", ((Failure)reply).cause()); } // The queued candidate registration should proceed - verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1)); - + verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true)); } - private static void verifyGetOwnershipState(DistributedEntityOwnershipService service, Entity entity, + private static void verifyGetOwnershipState(EntityOwnershipService service, Entity entity, boolean isOwner, boolean hasOwner) { Optional state = service.getOwnershipState(entity); assertEquals("getOwnershipState present", true, state.isPresent());