X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcServiceTest.java;h=abb6adb295d2dafccf5cf774727010a0dc2dced3;hp=1f23f68a84222225ab219bc7bf0a81a3ffbd2b00;hb=769ef0f950f2ed6cfc14d274e6a8edc583a36a96;hpb=d207a2f677545357095f2e5f145a5ecc8d3a60dd diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 1f23f68a84..abb6adb295 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -10,35 +10,54 @@ package org.opendaylight.controller.cluster.datastore.admin; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.actor.PoisonPill; import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.Member; +import akka.cluster.MemberStatus; import akka.testkit.JavaTestKit; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.io.File; import java.io.FileInputStream; -import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.SerializationUtils; import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; /** * Unit tests for ClusterAdminRpcService. @@ -46,57 +65,31 @@ import org.opendaylight.yangtools.yang.common.RpcResult; * @author Thomas Pantelis */ public class ClusterAdminRpcServiceTest { - private static ActorSystem system; + private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); - private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(); - - private IntegrationTestKit kit; - private DistributedDataStore configDataStore; - private DistributedDataStore operDataStore; - private ClusterAdminRpcService service; - - @BeforeClass - public static void setUpClass() throws IOException { - system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); - Cluster.get(system).join(member1Address); - } - - @AfterClass - public static void tearDownClass() throws IOException { - JavaTestKit.shutdownActorSystem(system); - system = null; - } + private final List memberNodes = new ArrayList<>(); @After public void tearDown() { - if(kit != null) { - kit.cleanup(configDataStore); - kit.cleanup(operDataStore); + for(MemberNode m: memberNodes) { + m.cleanup(); } } - private void setup(String testName, String... shardNames) { - kit = new IntegrationTestKit(system, datastoreContextBuilder); - - configDataStore = kit.setupDistributedDataStore(testName + "Config", "module-shards-member1.conf", - true, shardNames); - operDataStore = kit.setupDistributedDataStore(testName + "Oper", "module-shards-member1.conf", - true, shardNames); - - service = new ClusterAdminRpcService(configDataStore, operDataStore); - } - @Test public void testBackupDatastore() throws Exception { - setup("testBackupDatastore", "cars", "people"); + MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1"). + moduleShardsConfig("module-shards-member1.conf"). + waitForShardLeader("cars", "people").testName("testBackupDatastore").build(); String fileName = "target/testBackupDatastore"; new File(fileName).delete(); - RpcResult rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder(). + ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore, node.operDataStore); + + RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder(). setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); - assertEquals("isSuccessful", true, rpcResult.isSuccessful()); + checkSuccessfulRpcResult(rpcResult); try(FileInputStream fis = new FileInputStream(fileName)) { List snapshots = SerializationUtils.deserialize(fis); @@ -104,28 +97,28 @@ public class ClusterAdminRpcServiceTest { ImmutableMap map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0), snapshots.get(1).getType(), snapshots.get(1)); - verifyDatastoreSnapshot(configDataStore.getActorContext().getDataStoreType(), - map.get(configDataStore.getActorContext().getDataStoreType()), "cars", "people"); + verifyDatastoreSnapshot(node.configDataStore.getActorContext().getDataStoreType(), + map.get(node.configDataStore.getActorContext().getDataStoreType()), "cars", "people"); } finally { new File(fileName).delete(); } // Test failure by killing a shard. - configDataStore.getActorContext().getShardManager().tell(datastoreContextBuilder. + node.configDataStore.getActorContext().getShardManager().tell(node.datastoreContextBuilder. shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender()); - ActorRef carsShardActor = configDataStore.getActorContext().findLocalShard("cars").get(); - kit.watch(carsShardActor); + ActorRef carsShardActor = node.configDataStore.getActorContext().findLocalShard("cars").get(); + node.kit.watch(carsShardActor); carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - kit.expectTerminated(carsShardActor); + node.kit.expectTerminated(carsShardActor); rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()). get(5, TimeUnit.SECONDS); assertEquals("isSuccessful", false, rpcResult.isSuccessful()); assertEquals("getErrors", 1, rpcResult.getErrors().size()); - assertTrue("Expected error cause TimeoutException", - rpcResult.getErrors().iterator().next().getCause() instanceof TimeoutException); + + service.close(); } private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) { @@ -139,8 +132,161 @@ public class ClusterAdminRpcServiceTest { } @Test - public void testAddShardReplica() { - // TODO implement + public void testAddShardReplica() throws Exception { + String name = "testAddShardReplica"; + String moduleShardsConfig = "module-shards-cars-member-1.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build(); + + MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.waitForMembersUp("member-2"); + + testAddShardReplica(newReplicaNode2, "cars", "member-1"); + + MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.waitForMembersUp("member-3"); + newReplicaNode2.waitForMembersUp("member-3"); + + testAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2"); + + verifyRaftPeersPresent(newReplicaNode2.configDataStore, "cars", "member-1", "member-3"); + verifyRaftPeersPresent(newReplicaNode2.operDataStore, "cars", "member-1", "member-3"); + + // Write data to member-2's config datastore and read/verify via member-3 + NormalizedNode configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore, + newReplicaNode3.configDataStore); + + // Write data to member-3's oper datastore and read/verify via member-2 + writeCarsNodeAndVerify(newReplicaNode3.operDataStore, newReplicaNode2.operDataStore); + + // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 - + // 2 ServerConfigurationPayload entries and the transaction payload entry. + + RaftStateVerifier verifier = new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("Commit index", 2, raftState.getCommitIndex()); + assertEquals("Last applied index", 2, raftState.getLastApplied()); + } + }; + + verifyRaftState(leaderNode1.configDataStore, "cars", verifier); + verifyRaftState(leaderNode1.operDataStore, "cars", verifier); + + verifyRaftState(newReplicaNode2.configDataStore, "cars", verifier); + verifyRaftState(newReplicaNode2.operDataStore, "cars", verifier); + + verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier); + verifyRaftState(newReplicaNode3.operDataStore, "cars", verifier); + + // Restart member-3 and verify the cars config shard is re-instated. + + Cluster.get(leaderNode1.kit.getSystem()).down(Cluster.get(newReplicaNode3.kit.getSystem()).selfAddress()); + newReplicaNode3.cleanup(); + + newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build(); + + verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier); + readCarsNodeAndVerify(newReplicaNode3.configDataStore, configCarsNode); + } + + private NormalizedNode writeCarsNodeAndVerify(DistributedDataStore writeToStore, + DistributedDataStore readFromStore) throws Exception { + DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction(); + NormalizedNode carsNode = CarsModel.create(); + writeTx.write(CarsModel.BASE_PATH, carsNode); + + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + Boolean canCommit = cohort .canCommit().get(7, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + + readCarsNodeAndVerify(readFromStore, carsNode); + return carsNode; + } + + private void readCarsNodeAndVerify(DistributedDataStore readFromStore, + NormalizedNode expCarsNode) throws Exception { + Optional> optional = readFromStore.newReadOnlyTransaction(). + read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", expCarsNode, optional.get()); + } + + private void testAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) + throws Exception { + memberNode.waitForMembersUp(peerMemberNames); + + ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore, + memberNode.operDataStore); + + RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName). + build()).get(10, TimeUnit.SECONDS); + checkSuccessfulRpcResult(rpcResult); + + verifyRaftPeersPresent(memberNode.configDataStore, shardName, peerMemberNames); + verifyRaftPeersPresent(memberNode.operDataStore, shardName, peerMemberNames); + + service.close(); + } + + private void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, String... peerMemberNames) + throws Exception { + final Set peerIds = Sets.newHashSet(); + for(String p: peerMemberNames) { + peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName). + type(datastore.getActorContext().getDataStoreType()).build().toString()); + } + + verifyRaftState(datastore, shardName, new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName, + raftState.getPeerAddresses().keySet().containsAll(peerIds)); + } + }); + } + + private void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) + throws Exception { + ActorContext actorContext = datastore.getActorContext(); + + Future future = actorContext.findLocalShardAsync(shardName); + ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); + + AssertionError lastError = null; + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + OnDemandRaftState raftState = (OnDemandRaftState)actorContext. + executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); + + try { + verifier.verify(raftState); + return; + } catch (AssertionError e) { + lastError = e; + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + throw lastError; + } + + private void checkSuccessfulRpcResult(RpcResult rpcResult) { + if(!rpcResult.isSuccessful()) { + if(rpcResult.getErrors().size() > 0) { + RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); + throw new AssertionError("Rpc failed with error: " + error, error.getCause()); + } + + fail("Rpc failed with no error"); + } } @Test @@ -167,4 +313,108 @@ public class ClusterAdminRpcServiceTest { public void testConvertMembersToNonvotingForAllShards() { // TODO implement } + + private static class MemberNode { + IntegrationTestKit kit; + DistributedDataStore configDataStore; + DistributedDataStore operDataStore; + final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); + boolean cleanedUp; + + static Builder builder(List members) { + return new Builder(members); + } + + void waitForMembersUp(String... otherMembers) { + Set otherMembersSet = Sets.newHashSet(otherMembers); + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 10) { + CurrentClusterState state = Cluster.get(kit.getSystem()).state(); + for(Member m: state.getMembers()) { + if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) && + otherMembersSet.isEmpty()) { + return; + } + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + fail("Member(s) " + otherMembersSet + " are not Up"); + } + + void cleanup() { + if(!cleanedUp) { + cleanedUp = true; + kit.cleanup(configDataStore); + kit.cleanup(operDataStore); + JavaTestKit.shutdownActorSystem(kit.getSystem()); + } + } + + static class Builder { + List members; + String moduleShardsConfig; + String akkaConfig; + String[] waitForshardLeader = new String[0]; + String testName; + boolean createOperDatastore = true; + + Builder(List members) { + this.members = members; + } + + Builder moduleShardsConfig(String moduleShardsConfig) { + this.moduleShardsConfig = moduleShardsConfig; + return this; + } + + Builder akkaConfig(String akkaConfig) { + this.akkaConfig = akkaConfig; + return this; + } + + Builder testName(String testName) { + this.testName = testName; + return this; + } + + Builder waitForShardLeader(String... shardNames) { + this.waitForshardLeader = shardNames; + return this; + } + + Builder createOperDatastore(boolean value) { + this.createOperDatastore = value; + return this; + } + + MemberNode build() { + MemberNode node = new MemberNode(); + ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig)); + Cluster.get(system).join(MEMBER_1_ADDRESS); + + node.kit = new IntegrationTestKit(system, node.datastoreContextBuilder); + + String memberName = new ClusterWrapperImpl(system).getCurrentMemberName(); + node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName); + node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig, + true, waitForshardLeader); + + if(createOperDatastore) { + node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName); + node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig, + true, waitForshardLeader); + } + + members.add(node); + return node; + } + } + } + + private static interface RaftStateVerifier { + void verify(OnDemandRaftState raftState); + } }