import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
-import akka.actor.PoisonPill;
import akka.cluster.Cluster;
-import akka.cluster.ddata.DistributedData;
import akka.testkit.JavaTestKit;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Ignore("Needs to have the configuration backend switched from distributed-data")
public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
private static final DOMDataTreeIdentifier TEST_ID =
new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+ private static final String MODULE_SHARDS_CONFIG = "module-shards-cars-member-1-and-2.conf";
+
private ActorSystem leaderSystem;
private ActorSystem followerSystem;
.logicalStoreType(
org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION);
- private DistributedDataStore followerDistributedDataStore;
- private DistributedDataStore leaderDistributedDataStore;
+ private DistributedDataStore leaderConfigDatastore;
+ private DistributedDataStore leaderOperDatastore;
+
+ private DistributedDataStore followerConfigDatastore;
+ private DistributedDataStore followerOperDatastore;
+
+
private IntegrationTestKit followerTestKit;
private IntegrationTestKit leaderTestKit;
-
private DistributedShardedDOMDataTree leaderShardFactory;
- private DistributedShardedDOMDataTree followerShardFactory;
+ private DistributedShardedDOMDataTree followerShardFactory;
private ActorSystemProvider leaderSystemProvider;
private ActorSystemProvider followerSystemProvider;
@Before
public void setUp() {
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
@After
public void tearDown() {
- if (followerDistributedDataStore != null) {
- followerDistributedDataStore.close();
+ if (leaderConfigDatastore != null) {
+ leaderConfigDatastore.close();
}
- if (leaderDistributedDataStore != null) {
- leaderDistributedDataStore.close();
+ if (leaderOperDatastore != null) {
+ leaderOperDatastore.close();
}
- DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
- DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ if (followerConfigDatastore != null) {
+ followerConfigDatastore.close();
+ }
+ if (followerOperDatastore != null) {
+ followerOperDatastore.close();
+ }
JavaTestKit.shutdownActorSystem(leaderSystem);
JavaTestKit.shutdownActorSystem(followerSystem);
+
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
- private void initEmptyDatastores(final String type) {
+ private void initEmptyDatastores() {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
- leaderDistributedDataStore =
- leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+ leaderConfigDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+ "config", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+ leaderOperDatastore = (DistributedDataStore) leaderTestKit.setupDistributedDataStore(
+ "operational", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+
+ leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
+ leaderOperDatastore,
+ leaderConfigDatastore);
followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
- followerDistributedDataStore =
- followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
- leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
- leaderDistributedDataStore,
- leaderDistributedDataStore);
+ followerConfigDatastore = (DistributedDataStore) followerTestKit.setupDistributedDataStore(
+ "config", MODULE_SHARDS_CONFIG, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
+ followerOperDatastore = (DistributedDataStore) followerTestKit.setupDistributedDataStore(
+ "operational", MODULE_SHARDS_CONFIG, true,
+ SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
- followerDistributedDataStore,
- followerDistributedDataStore);
+ followerOperDatastore,
+ followerConfigDatastore);
+
+ leaderShardFactory.init();
+ followerShardFactory.init();
+
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
+ ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorContext(),
ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
+
}
@Test
public void testProducerRegistrations() throws Exception {
- initEmptyDatastores("config");
+ initEmptyDatastores();
leaderTestKit.waitForMembersUp("member-2");
TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+ final ActorRef leaderShardManager = leaderConfigDatastore.getActorContext().getShardManager();
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
} catch (final DOMDataTreeShardingConflictException e) {
assertTrue(e.getMessage().contains("is already occupied by another shard"));
}
+
+ shardRegistration.close().toCompletableFuture().get();
}
@Test
public void testWriteIntoMultipleShards() throws Exception {
- initEmptyDatastores("config");
+ initEmptyDatastores();
leaderTestKit.waitForMembersUp("member-2");
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- findLocalShard(followerDistributedDataStore.getActorContext(),
+ findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
LOG.debug("Got after waiting for nonleader");
LOG.warn("Got to pre submit");
tx.submit().checkedGet();
+
+ shardRegistration.close().toCompletableFuture().get();
}
@Test
public void testMultipleShardRegistrations() throws Exception {
- initEmptyDatastores("config");
+ initEmptyDatastores();
final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
// check leader has local shards
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
// check follower has local shards
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
LOG.debug("Closing registrations");
- reg1.close();
- reg2.close();
- reg3.close();
- reg4.close();
+ reg1.close().toCompletableFuture().get();
+ reg2.close().toCompletableFuture().get();
+ reg3.close().toCompletableFuture().get();
+ reg4.close().toCompletableFuture().get();
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
LOG.debug("All leader shards gone");
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
LOG.debug("All follower shards gone");
@Test
public void testMultipleRegistrationsAtOnePrefix() throws Exception {
- initEmptyDatastores("config");
+ initEmptyDatastores();
for (int i = 0; i < 10; i++) {
LOG.debug("Round {}", i);
TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
- assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ assertNotNull(findLocalShard(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(leaderConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ waitUntilShardIsDown(followerConfigDatastore.getActorContext(),
ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
}
+
}
}