From 769ef0f950f2ed6cfc14d274e6a8edc583a36a96 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Sat, 14 Nov 2015 00:08:25 -0500 Subject: [PATCH 1/1] Bug 2187: Implement add-shard-replica RPC The unit test creates 3 actor systems each with their own datastores. Now that the ShardManager persists shard info and due to the static nature of the InMemorySnapshotStore, each ShardManager needs to have a unique persistenceId otherwise the equivalent ShardManager's persistence Ids will clash. Therefore I added a shardManagerPersistenceId field to the DatastoreContext so the unit test can provide a uniique Id based on member name. Change-Id: I907cd568d64f43586ffc1ec8581e4208f46db327 Signed-off-by: Tom Pantelis --- .../sal/common/util/jmx/AbstractMXBean.java | 8 +- .../cluster/datastore/DatastoreContext.java | 16 + .../cluster/datastore/ShardManager.java | 7 +- .../admin/ClusterAdminRpcService.java | 64 +++- .../cluster/datastore/IntegrationTestKit.java | 11 +- .../admin/ClusterAdminRpcServiceTest.java | 352 +++++++++++++++--- .../src/test/resources/application.conf | 3 + .../module-shards-cars-member-1.conf | 13 + 8 files changed, 394 insertions(+), 80 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1.conf diff --git a/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/AbstractMXBean.java b/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/AbstractMXBean.java index a2db29d1e8..c24c8dc068 100644 --- a/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/AbstractMXBean.java +++ b/opendaylight/md-sal/sal-common-util/src/main/java/org/opendaylight/controller/md/sal/common/util/jmx/AbstractMXBean.java @@ -8,8 +8,8 @@ package org.opendaylight.controller.md.sal.common.util.jmx; +import com.google.common.annotations.Beta; import java.lang.management.ManagementFactory; - import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.management.InstanceNotFoundException; @@ -17,12 +17,9 @@ import javax.management.MBeanRegistrationException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.Beta; - /** * Abstract base for an MXBean implementation class. *

@@ -122,8 +119,7 @@ public abstract class AbstractMXBean { unregisterMBean(mbeanName); unregister = true; } catch(Exception e) { - - LOG.error("Failed when unregistering MBean {}", e); + LOG.debug("Failed when unregistering MBean {}", e); } return unregister; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java index 7b91705bf3..4f7d68ea52 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.util.Timeout; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -69,6 +70,7 @@ public class DatastoreContext { private boolean writeOnlyTransactionOptimizationsEnabled = true; private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS; private boolean transactionDebugContextEnabled = false; + private String shardManagerPersistenceId; public static Set getGlobalDatastoreTypes() { return globalDatastoreTypes; @@ -101,6 +103,7 @@ public class DatastoreContext { this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled; this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis; this.transactionDebugContextEnabled = other.transactionDebugContextEnabled; + this.shardManagerPersistenceId = shardManagerPersistenceId; setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize()); setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount()); @@ -177,6 +180,10 @@ public class DatastoreContext { return transactionCreationInitialRateLimit; } + public String getShardManagerPersistenceId() { + return shardManagerPersistenceId; + } + private void setPeerAddressResolver(PeerAddressResolver resolver) { raftConfig.setPeerAddressResolver(resolver); } @@ -422,6 +429,15 @@ public class DatastoreContext { return this; } + /** + * For unit tests only. + */ + @VisibleForTesting + public Builder shardManagerPersistenceId(String id) { + datastoreContext.shardManagerPersistenceId = id; + return this; + } + public DatastoreContext build() { datastoreContext.dataStoreProperties = InMemoryDOMDataStoreConfigProperties.create( maxShardDataChangeExecutorPoolSize, maxShardDataChangeExecutorQueueSize, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 814f117f31..bbac0e3e25 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -138,7 +138,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set shardReplicaOperationsInProgress = new HashSet<>(); - private final String id; + private final String persistenceId; /** */ @@ -154,7 +154,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.primaryShardInfoCache = builder.primaryShardInfoCache; this.restoreFromSnapshot = builder.restoreFromSnapshot; - id = "shard-manager-" + type; + String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId(); + persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type; peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName()); @@ -827,7 +828,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public String persistenceId() { - return id; + return persistenceId; } @VisibleForTesting diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index c6aa0dca87..ba84296140 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore.admin; import akka.actor.ActorRef; +import akka.actor.Status.Success; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.util.Timeout; @@ -22,6 +23,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshotList; import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot; @@ -70,10 +72,31 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl } @Override - public Future> addShardReplica(AddShardReplicaInput input) { - // TODO implement - return RpcResultBuilder.failed().withError(ErrorType.APPLICATION, "operation-not-supported", - "Not implemented yet").buildFuture(); + public Future> addShardReplica(final AddShardReplicaInput input) { + final String shardName = input.getShardName(); + if(Strings.isNullOrEmpty(shardName)) { + return newFailedRpcResultBuilder("A valid shard name must be specified").buildFuture(); + } + + LOG.info("Adding replica for shard {}", shardName); + + final SettableFuture> returnFuture = SettableFuture.create(); + ListenableFuture> future = sendMessageToShardManagers(new AddShardReplica(shardName)); + Futures.addCallback(future, new FutureCallback>() { + @Override + public void onSuccess(List snapshots) { + LOG.info("Successfully added replica for shard {}", shardName); + returnFuture.set(newSuccessfulResult()); + } + + @Override + public void onFailure(Throwable failure) { + onMessageFailure(String.format("Failed to add replica for shard %s", shardName), + returnFuture, failure); + } + }); + + return returnFuture; } @Override @@ -112,7 +135,6 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl "Not implemented yet").buildFuture(); } - @SuppressWarnings("unchecked") @Override public Future> backupDatastore(final BackupDatastoreInput input) { LOG.debug("backupDatastore: {}", input); @@ -121,14 +143,9 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl return newFailedRpcResultBuilder("A valid file path must be specified").buildFuture(); } - Timeout timeout = new Timeout(1, TimeUnit.MINUTES); - ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), - GetSnapshot.INSTANCE, timeout); - ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), - GetSnapshot.INSTANCE, timeout); - final SettableFuture> returnFuture = SettableFuture.create(); - Futures.addCallback(Futures.allAsList(configFuture, operFuture), new FutureCallback>() { + ListenableFuture> future = sendMessageToShardManagers(GetSnapshot.INSTANCE); + Futures.addCallback(future, new FutureCallback>() { @Override public void onSuccess(List snapshots) { saveSnapshotsToFile(new DatastoreSnapshotList(snapshots), input.getFilePath(), returnFuture); @@ -136,13 +153,22 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl @Override public void onFailure(Throwable failure) { - onDatastoreBackupFilure(input.getFilePath(), returnFuture, failure); + onDatastoreBackupFailure(input.getFilePath(), returnFuture, failure); } }); return returnFuture; } + @SuppressWarnings("unchecked") + private ListenableFuture> sendMessageToShardManagers(Object message) { + Timeout timeout = new Timeout(1, TimeUnit.MINUTES); + ListenableFuture configFuture = ask(configDataStore.getActorContext().getShardManager(), message, timeout); + ListenableFuture operFuture = ask(operDataStore.getActorContext().getShardManager(), message, timeout); + + return Futures.allAsList(configFuture, operFuture); + } + private static void saveSnapshotsToFile(DatastoreSnapshotList snapshots, String fileName, SettableFuture> returnFuture) { try(FileOutputStream fos = new FileOutputStream(fileName)) { @@ -151,15 +177,19 @@ public class ClusterAdminRpcService implements ClusterAdminService, AutoCloseabl returnFuture.set(newSuccessfulResult()); LOG.info("Successfully backed up datastore to file {}", fileName); } catch(Exception e) { - onDatastoreBackupFilure(fileName, returnFuture, e); + onDatastoreBackupFailure(fileName, returnFuture, e); } } - private static void onDatastoreBackupFilure(String fileName, final SettableFuture> returnFuture, + private static void onDatastoreBackupFailure(String fileName, SettableFuture> returnFuture, + Throwable failure) { + onMessageFailure(String.format("Failed to back up datastore to file %s", fileName), returnFuture, failure); + } + + private static void onMessageFailure(String msg, final SettableFuture> returnFuture, Throwable failure) { - String msg = String.format("Failed to back up datastore to file %s", fileName); LOG.error(msg, failure); - returnFuture.set(newFailedRpcResultBuilder(msg, failure).build()); + returnFuture.set(newFailedRpcResultBuilder(String.format("%s: %s", msg, failure.getMessage())).build()); } private ListenableFuture ask(ActorRef actor, Object message, Timeout timeout) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java index 40d123e7ed..9cc6d838c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java @@ -35,14 +35,18 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class IntegrationTestKit extends ShardTestKit { - DatastoreContext.Builder datastoreContextBuilder; - DatastoreSnapshot restoreFromSnapshot; + protected DatastoreContext.Builder datastoreContextBuilder; + protected DatastoreSnapshot restoreFromSnapshot; public IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) { super(actorSystem); this.datastoreContextBuilder = datastoreContextBuilder; } + public DatastoreContext.Builder getDatastoreContextBuilder() { + return datastoreContextBuilder; + } + public DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { return setupDistributedDataStore(typeName, "module-shards.conf", true, SchemaContextHelper.full(), shardNames); } @@ -80,6 +84,7 @@ public class IntegrationTestKit extends ShardTestKit { waitUntilLeader(dataStore.getActorContext(), shardNames); } + datastoreContextBuilder = DatastoreContext.newBuilderFrom(datastoreContext); return dataStore; } @@ -143,7 +148,7 @@ public class IntegrationTestKit extends ShardTestKit { assertEquals("Data node", nodeToWrite, optional.get()); } - void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { + public void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS); assertEquals("canCommit", true, canCommit); cohort.preCommit().get(5, TimeUnit.SECONDS); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java index 1f23f68a84..abb6adb295 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java @@ -10,35 +10,54 @@ package org.opendaylight.controller.cluster.datastore.admin; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.actor.PoisonPill; import akka.cluster.Cluster; +import akka.cluster.ClusterEvent.CurrentClusterState; +import akka.cluster.Member; +import akka.cluster.MemberStatus; import akka.testkit.JavaTestKit; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.io.File; import java.io.FileInputStream; -import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.SerializationUtils; import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.IntegrationTestKit; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.utils.ActorContext; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.BackupDatastoreInputBuilder; +import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; /** * Unit tests for ClusterAdminRpcService. @@ -46,57 +65,31 @@ import org.opendaylight.yangtools.yang.common.RpcResult; * @author Thomas Pantelis */ public class ClusterAdminRpcServiceTest { - private static ActorSystem system; + private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); - private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(); - - private IntegrationTestKit kit; - private DistributedDataStore configDataStore; - private DistributedDataStore operDataStore; - private ClusterAdminRpcService service; - - @BeforeClass - public static void setUpClass() throws IOException { - system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); - Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"); - Cluster.get(system).join(member1Address); - } - - @AfterClass - public static void tearDownClass() throws IOException { - JavaTestKit.shutdownActorSystem(system); - system = null; - } + private final List memberNodes = new ArrayList<>(); @After public void tearDown() { - if(kit != null) { - kit.cleanup(configDataStore); - kit.cleanup(operDataStore); + for(MemberNode m: memberNodes) { + m.cleanup(); } } - private void setup(String testName, String... shardNames) { - kit = new IntegrationTestKit(system, datastoreContextBuilder); - - configDataStore = kit.setupDistributedDataStore(testName + "Config", "module-shards-member1.conf", - true, shardNames); - operDataStore = kit.setupDistributedDataStore(testName + "Oper", "module-shards-member1.conf", - true, shardNames); - - service = new ClusterAdminRpcService(configDataStore, operDataStore); - } - @Test public void testBackupDatastore() throws Exception { - setup("testBackupDatastore", "cars", "people"); + MemberNode node = MemberNode.builder(memberNodes).akkaConfig("Member1"). + moduleShardsConfig("module-shards-member1.conf"). + waitForShardLeader("cars", "people").testName("testBackupDatastore").build(); String fileName = "target/testBackupDatastore"; new File(fileName).delete(); - RpcResult rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder(). + ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore, node.operDataStore); + + RpcResult rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder(). setFilePath(fileName).build()).get(5, TimeUnit.SECONDS); - assertEquals("isSuccessful", true, rpcResult.isSuccessful()); + checkSuccessfulRpcResult(rpcResult); try(FileInputStream fis = new FileInputStream(fileName)) { List snapshots = SerializationUtils.deserialize(fis); @@ -104,28 +97,28 @@ public class ClusterAdminRpcServiceTest { ImmutableMap map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0), snapshots.get(1).getType(), snapshots.get(1)); - verifyDatastoreSnapshot(configDataStore.getActorContext().getDataStoreType(), - map.get(configDataStore.getActorContext().getDataStoreType()), "cars", "people"); + verifyDatastoreSnapshot(node.configDataStore.getActorContext().getDataStoreType(), + map.get(node.configDataStore.getActorContext().getDataStoreType()), "cars", "people"); } finally { new File(fileName).delete(); } // Test failure by killing a shard. - configDataStore.getActorContext().getShardManager().tell(datastoreContextBuilder. + node.configDataStore.getActorContext().getShardManager().tell(node.datastoreContextBuilder. shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender()); - ActorRef carsShardActor = configDataStore.getActorContext().findLocalShard("cars").get(); - kit.watch(carsShardActor); + ActorRef carsShardActor = node.configDataStore.getActorContext().findLocalShard("cars").get(); + node.kit.watch(carsShardActor); carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender()); - kit.expectTerminated(carsShardActor); + node.kit.expectTerminated(carsShardActor); rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()). get(5, TimeUnit.SECONDS); assertEquals("isSuccessful", false, rpcResult.isSuccessful()); assertEquals("getErrors", 1, rpcResult.getErrors().size()); - assertTrue("Expected error cause TimeoutException", - rpcResult.getErrors().iterator().next().getCause() instanceof TimeoutException); + + service.close(); } private void verifyDatastoreSnapshot(String type, DatastoreSnapshot datastoreSnapshot, String... expShardNames) { @@ -139,8 +132,161 @@ public class ClusterAdminRpcServiceTest { } @Test - public void testAddShardReplica() { - // TODO implement + public void testAddShardReplica() throws Exception { + String name = "testAddShardReplica"; + String moduleShardsConfig = "module-shards-cars-member-1.conf"; + MemberNode leaderNode1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ). + moduleShardsConfig(moduleShardsConfig).waitForShardLeader("cars").build(); + + MemberNode newReplicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.waitForMembersUp("member-2"); + + testAddShardReplica(newReplicaNode2, "cars", "member-1"); + + MemberNode newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).build(); + + leaderNode1.waitForMembersUp("member-3"); + newReplicaNode2.waitForMembersUp("member-3"); + + testAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2"); + + verifyRaftPeersPresent(newReplicaNode2.configDataStore, "cars", "member-1", "member-3"); + verifyRaftPeersPresent(newReplicaNode2.operDataStore, "cars", "member-1", "member-3"); + + // Write data to member-2's config datastore and read/verify via member-3 + NormalizedNode configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore, + newReplicaNode3.configDataStore); + + // Write data to member-3's oper datastore and read/verify via member-2 + writeCarsNodeAndVerify(newReplicaNode3.operDataStore, newReplicaNode2.operDataStore); + + // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 - + // 2 ServerConfigurationPayload entries and the transaction payload entry. + + RaftStateVerifier verifier = new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertEquals("Commit index", 2, raftState.getCommitIndex()); + assertEquals("Last applied index", 2, raftState.getLastApplied()); + } + }; + + verifyRaftState(leaderNode1.configDataStore, "cars", verifier); + verifyRaftState(leaderNode1.operDataStore, "cars", verifier); + + verifyRaftState(newReplicaNode2.configDataStore, "cars", verifier); + verifyRaftState(newReplicaNode2.operDataStore, "cars", verifier); + + verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier); + verifyRaftState(newReplicaNode3.operDataStore, "cars", verifier); + + // Restart member-3 and verify the cars config shard is re-instated. + + Cluster.get(leaderNode1.kit.getSystem()).down(Cluster.get(newReplicaNode3.kit.getSystem()).selfAddress()); + newReplicaNode3.cleanup(); + + newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name). + moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build(); + + verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier); + readCarsNodeAndVerify(newReplicaNode3.configDataStore, configCarsNode); + } + + private NormalizedNode writeCarsNodeAndVerify(DistributedDataStore writeToStore, + DistributedDataStore readFromStore) throws Exception { + DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction(); + NormalizedNode carsNode = CarsModel.create(); + writeTx.write(CarsModel.BASE_PATH, carsNode); + + DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); + Boolean canCommit = cohort .canCommit().get(7, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + + readCarsNodeAndVerify(readFromStore, carsNode); + return carsNode; + } + + private void readCarsNodeAndVerify(DistributedDataStore readFromStore, + NormalizedNode expCarsNode) throws Exception { + Optional> optional = readFromStore.newReadOnlyTransaction(). + read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", expCarsNode, optional.get()); + } + + private void testAddShardReplica(MemberNode memberNode, String shardName, String... peerMemberNames) + throws Exception { + memberNode.waitForMembersUp(peerMemberNames); + + ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore, + memberNode.operDataStore); + + RpcResult rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName). + build()).get(10, TimeUnit.SECONDS); + checkSuccessfulRpcResult(rpcResult); + + verifyRaftPeersPresent(memberNode.configDataStore, shardName, peerMemberNames); + verifyRaftPeersPresent(memberNode.operDataStore, shardName, peerMemberNames); + + service.close(); + } + + private void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, String... peerMemberNames) + throws Exception { + final Set peerIds = Sets.newHashSet(); + for(String p: peerMemberNames) { + peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName). + type(datastore.getActorContext().getDataStoreType()).build().toString()); + } + + verifyRaftState(datastore, shardName, new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName, + raftState.getPeerAddresses().keySet().containsAll(peerIds)); + } + }); + } + + private void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier) + throws Exception { + ActorContext actorContext = datastore.getActorContext(); + + Future future = actorContext.findLocalShardAsync(shardName); + ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); + + AssertionError lastError = null; + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 5) { + OnDemandRaftState raftState = (OnDemandRaftState)actorContext. + executeOperation(shardActor, GetOnDemandRaftState.INSTANCE); + + try { + verifier.verify(raftState); + return; + } catch (AssertionError e) { + lastError = e; + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + } + } + + throw lastError; + } + + private void checkSuccessfulRpcResult(RpcResult rpcResult) { + if(!rpcResult.isSuccessful()) { + if(rpcResult.getErrors().size() > 0) { + RpcError error = Iterables.getFirst(rpcResult.getErrors(), null); + throw new AssertionError("Rpc failed with error: " + error, error.getCause()); + } + + fail("Rpc failed with no error"); + } } @Test @@ -167,4 +313,108 @@ public class ClusterAdminRpcServiceTest { public void testConvertMembersToNonvotingForAllShards() { // TODO implement } + + private static class MemberNode { + IntegrationTestKit kit; + DistributedDataStore configDataStore; + DistributedDataStore operDataStore; + final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder(). + shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30); + boolean cleanedUp; + + static Builder builder(List members) { + return new Builder(members); + } + + void waitForMembersUp(String... otherMembers) { + Set otherMembersSet = Sets.newHashSet(otherMembers); + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.SECONDS) <= 10) { + CurrentClusterState state = Cluster.get(kit.getSystem()).state(); + for(Member m: state.getMembers()) { + if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) && + otherMembersSet.isEmpty()) { + return; + } + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + fail("Member(s) " + otherMembersSet + " are not Up"); + } + + void cleanup() { + if(!cleanedUp) { + cleanedUp = true; + kit.cleanup(configDataStore); + kit.cleanup(operDataStore); + JavaTestKit.shutdownActorSystem(kit.getSystem()); + } + } + + static class Builder { + List members; + String moduleShardsConfig; + String akkaConfig; + String[] waitForshardLeader = new String[0]; + String testName; + boolean createOperDatastore = true; + + Builder(List members) { + this.members = members; + } + + Builder moduleShardsConfig(String moduleShardsConfig) { + this.moduleShardsConfig = moduleShardsConfig; + return this; + } + + Builder akkaConfig(String akkaConfig) { + this.akkaConfig = akkaConfig; + return this; + } + + Builder testName(String testName) { + this.testName = testName; + return this; + } + + Builder waitForShardLeader(String... shardNames) { + this.waitForshardLeader = shardNames; + return this; + } + + Builder createOperDatastore(boolean value) { + this.createOperDatastore = value; + return this; + } + + MemberNode build() { + MemberNode node = new MemberNode(); + ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig)); + Cluster.get(system).join(MEMBER_1_ADDRESS); + + node.kit = new IntegrationTestKit(system, node.datastoreContextBuilder); + + String memberName = new ClusterWrapperImpl(system).getCurrentMemberName(); + node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName); + node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig, + true, waitForshardLeader); + + if(createOperDatastore) { + node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName); + node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig, + true, waitForshardLeader); + } + + members.add(node); + return node; + } + } + } + + private static interface RaftStateVerifier { + void verify(OnDemandRaftState raftState); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf index 18e5b4d643..42fcd0ba40 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf @@ -81,6 +81,7 @@ Member1 { cluster { auto-down-unreachable-after = 100s + retry-unsuccessful-join-after = 100ms roles = [ "member-1" @@ -133,6 +134,7 @@ Member2 { cluster { auto-down-unreachable-after = 100s + retry-unsuccessful-join-after = 100ms roles = [ "member-2" @@ -187,6 +189,7 @@ Member3 { cluster { auto-down-unreachable-after = 100s + retry-unsuccessful-join-after = 100ms roles = [ "member-3" diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1.conf new file mode 100644 index 0000000000..42a518b949 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-cars-member-1.conf @@ -0,0 +1,13 @@ +module-shards = [ + { + name = "cars" + shards = [ + { + name="cars" + replicas = [ + "member-1" + ] + } + ] + } +] \ No newline at end of file -- 2.36.6