package org.opendaylight.controller.cluster.sharding;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
-import akka.actor.PoisonPill;
import akka.cluster.Cluster;
-import akka.cluster.ddata.DistributedData;
import akka.testkit.JavaTestKit;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
-import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
-import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
-import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
-import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Ignore("Needs to have the configuration backend switched from distributed-data")
public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
private static final DOMDataTreeIdentifier TEST_ID =
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+ private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
+
private ActorSystem leaderSystem;
private ActorSystem followerSystem;
private final Builder leaderDatastoreContextBuilder =
- DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
private final DatastoreContext.Builder followerDatastoreContextBuilder =
- DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
- .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+
+ private DistributedDataStore leaderConfigDatastore;
+ private DistributedDataStore leaderOperDatastore;
+
+ private DistributedDataStore followerConfigDatastore;
+ private DistributedDataStore followerOperDatastore;
+
- private DistributedDataStore followerDistributedDataStore;
- private DistributedDataStore leaderDistributedDataStore;
private IntegrationTestKit followerTestKit;
private IntegrationTestKit leaderTestKit;
-
private DistributedShardedDOMDataTree leaderShardFactory;
- private DistributedShardedDOMDataTree followerShardFactory;
+ private DistributedShardedDOMDataTree followerShardFactory;
private ActorSystemProvider leaderSystemProvider;
private ActorSystemProvider followerSystemProvider;
@Before
public void setUp() {
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
@After
public void tearDown() {
- if (followerDistributedDataStore != null) {
- followerDistributedDataStore.close();
+ if (leaderConfigDatastore != null) {
+ leaderConfigDatastore.close();
+ }
+ if (leaderOperDatastore != null) {
+ leaderOperDatastore.close();
}
- if (leaderDistributedDataStore != null) {
- leaderDistributedDataStore.close();
+
+ if (followerConfigDatastore != null) {
+ followerConfigDatastore.close();
+ }
+ if (followerOperDatastore != null) {
+ followerOperDatastore.close();
}
- DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
- DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
+ JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
- JavaTestKit.shutdownActorSystem(leaderSystem);
- JavaTestKit.shutdownActorSystem(followerSystem);
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
- private void initEmptyDatastores(final String type) {
+ private void initEmptyDatastores() throws Exception {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
- leaderDistributedDataStore =
- leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+ leaderConfigDatastore = leaderTestKit.setupDistributedDataStore(
+ "config", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+ leaderOperDatastore = leaderTestKit.setupDistributedDataStore(
+ "operational", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+
+ leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
+ leaderOperDatastore,
+ leaderConfigDatastore);
followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
- followerDistributedDataStore =
- followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
- leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
- leaderDistributedDataStore,
- leaderDistributedDataStore);
+ followerConfigDatastore = followerTestKit.setupDistributedDataStore(
+ "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+ followerOperDatastore = followerTestKit.setupDistributedDataStore(
+ "operational", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
- followerDistributedDataStore,
- followerDistributedDataStore);
+ followerOperDatastore,
+ followerConfigDatastore);
+
+ followerTestKit.waitForMembersUp("member-1");
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderShardFactory.init();
+ followerShardFactory.init();
+
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
+ ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
+
+ leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(),
ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
}
@Test
- @Ignore("Needs different shard creation handling due to replicas")
public void testProducerRegistrations() throws Exception {
- initEmptyDatastores("config");
+ LOG.info("testProducerRegistrations starting");
+ initEmptyDatastores();
leaderTestKit.waitForMembersUp("member-2");
+ // TODO refactor shard creation and verification to own method
final DistributedShardRegistration shardRegistration =
- leaderShardFactory.createDistributedShard(
- TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+ final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager();
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
+ final Set<String> peers = new HashSet<>();
+ IntegrationTestKit.verifyShardState(leaderConfigDatastore,
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
+ peers.addAll(onDemandShardState.getPeerAddresses().values()));
+ assertEquals(peers.size(), 1);
+
final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
try {
followerShardFactory.createProducer(Collections.singleton(TEST_ID));
followerProducer.close();
// try to create a shard on an already registered prefix on follower
try {
- followerShardFactory.createDistributedShard(TEST_ID,
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ waitOnAsyncTask(followerShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
fail("This prefix already should have a shard registration that was forwarded from the other node");
} catch (final DOMDataTreeShardingConflictException e) {
- assertTrue(e.getMessage().contains("is already occupied by shard"));
+ assertTrue(e.getMessage().contains("is already occupied by another shard"));
}
+
+ shardRegistration.close().toCompletableFuture().get();
+
+ LOG.info("testProducerRegistrations ending");
}
@Test
- @Ignore("Needs different shard creation handling due to replicas")
public void testWriteIntoMultipleShards() throws Exception {
- initEmptyDatastores("config");
+ LOG.info("testWriteIntoMultipleShards starting");
+ initEmptyDatastores();
leaderTestKit.waitForMembersUp("member-2");
- LOG.warn("registering first shard");
+ LOG.debug("registering first shard");
final DistributedShardRegistration shardRegistration =
- leaderShardFactory.createDistributedShard(TEST_ID,
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- findLocalShard(followerDistributedDataStore.getActorContext(),
+ findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- LOG.warn("Got after waiting for nonleader");
- final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
-
- new JavaTestKit(leaderSystem) {
- {
- leaderShardManager.tell(
- new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalShardFound.class);
-
- final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
-
- followerShardManager.tell(new FindLocalShard(
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
- followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
- LOG.warn("Found follower shard");
-
- leaderDistributedDataStore.getActorContext().getShardManager().tell(
- new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
- expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
- }
- };
+ final Set<String> peers = new HashSet<>();
+ IntegrationTestKit.verifyShardState(leaderConfigDatastore,
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
+ peers.addAll(onDemandShardState.getPeerAddresses().values()));
+ assertEquals(peers.size(), 1);
+ LOG.debug("Got after waiting for nonleader");
final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
cursor.close();
LOG.warn("Got to pre submit");
- tx.submit();
+ tx.submit().checkedGet();
+
+ shardRegistration.close().toCompletableFuture().get();
+
+ LOG.info("testWriteIntoMultipleShards ending");
}
@Test
public void testMultipleShardRegistrations() throws Exception {
- initEmptyDatastores("config");
+ LOG.info("testMultipleShardRegistrations starting");
+ initEmptyDatastores();
- final DistributedShardRegistration reg1 = leaderShardFactory
- .createDistributedShard(TEST_ID,
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- final DistributedShardRegistration reg2 = leaderShardFactory
- .createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- final DistributedShardRegistration reg3 = leaderShardFactory
- .createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- final DistributedShardRegistration reg4 = leaderShardFactory
- .createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
// check leader has local shards
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
// check follower has local shards
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
-
LOG.debug("Closing registrations");
- reg1.close();
- reg2.close();
- reg3.close();
- reg4.close();
+ reg1.close().toCompletableFuture().get();
+ reg2.close().toCompletableFuture().get();
+ reg3.close().toCompletableFuture().get();
+ reg4.close().toCompletableFuture().get();
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
LOG.debug("All leader shards gone");
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
LOG.debug("All follower shards gone");
+ LOG.info("testMultipleShardRegistrations ending");
}
@Test
public void testMultipleRegistrationsAtOnePrefix() throws Exception {
- initEmptyDatastores("config");
+ LOG.info("testMultipleRegistrationsAtOnePrefix starting");
+ initEmptyDatastores();
- for (int i = 0; i < 10; i++) {
- LOG.debug("Round {}", i);
- final DistributedShardRegistration reg1 = leaderShardFactory
- .createDistributedShard(TEST_ID,
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ for (int i = 0; i < 5; i++) {
+ LOG.info("Round {}", i);
+ final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
+ DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- reg1.close();
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ final Set<String> peers = new HashSet<>();
+ IntegrationTestKit.verifyShardState(leaderConfigDatastore,
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
+ peers.addAll(onDemandShardState.getPeerAddresses().values()));
+ assertEquals(peers.size(), 1);
+
+ waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
+
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
}
+
+ LOG.info("testMultipleRegistrationsAtOnePrefix ending");
}
}