+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.sharding;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
+import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.actor.PoisonPill;
+import akka.cluster.Cluster;
+import akka.cluster.ddata.DistributedData;
+import akka.testkit.JavaTestKit;
+import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
+import java.util.Collections;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.DistributedShardFactory.DistributedShardRegistration;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Ignore("Needs to have the configuration backend switched from distributed-data")
+public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
+
+ private static final Address MEMBER_1_ADDRESS =
+ AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
+
+ private static final DOMDataTreeIdentifier TEST_ID =
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+ private ActorSystem leaderSystem;
+ private ActorSystem followerSystem;
+
+
+ private final Builder leaderDatastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+
+ private final DatastoreContext.Builder followerDatastoreContextBuilder =
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ private DistributedDataStore followerDistributedDataStore;
+ private DistributedDataStore leaderDistributedDataStore;
+ private IntegrationTestKit followerTestKit;
+ private IntegrationTestKit leaderTestKit;
+
+ private DistributedShardedDOMDataTree leaderShardFactory;
+ private DistributedShardedDOMDataTree followerShardFactory;
+
+ @Before
+ public void setUp() {
+
+ leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+ Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
+
+ followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+ Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
+
+ }
+
+ @After
+ public void tearDown() {
+ if (followerDistributedDataStore != null) {
+ followerDistributedDataStore.close();
+ }
+ if (leaderDistributedDataStore != null) {
+ leaderDistributedDataStore.close();
+ }
+
+ DistributedData.get(leaderSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ DistributedData.get(followerSystem).replicator().tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+ JavaTestKit.shutdownActorSystem(leaderSystem);
+ JavaTestKit.shutdownActorSystem(followerSystem);
+ }
+
+ private void initEmptyDatastores(final String type) {
+ leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+
+ leaderDistributedDataStore =
+ leaderTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+ followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+ followerDistributedDataStore =
+ followerTestKit.setupDistributedDataStoreWithoutConfig(type, SchemaContextHelper.full());
+
+ leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystem,
+ leaderDistributedDataStore,
+ leaderDistributedDataStore);
+
+ followerShardFactory = new DistributedShardedDOMDataTree(followerSystem,
+ followerDistributedDataStore,
+ followerDistributedDataStore);
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
+ }
+
+ @Test
+ @Ignore("Needs different shard creation handling due to replicas")
+ public void testProducerRegistrations() throws Exception {
+ initEmptyDatastores("config");
+
+ leaderTestKit.waitForMembersUp("member-2");
+
+ final DistributedShardRegistration shardRegistration =
+ leaderShardFactory.createDistributedShard(
+ TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+ final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
+
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+ try {
+ followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+ fail("Producer should be already registered on the other node");
+ } catch (final IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("is attached to producer"));
+ }
+
+ producer.close();
+
+ final DOMDataTreeProducer followerProducer =
+ followerShardFactory.createProducer(Collections.singleton(TEST_ID));
+ try {
+ leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+ fail("Producer should be already registered on the other node");
+ } catch (final IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("is attached to producer"));
+ }
+
+ followerProducer.close();
+ // try to create a shard on an already registered prefix on follower
+ try {
+ followerShardFactory.createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+ fail("This prefix already should have a shard registration that was forwarded from the other node");
+ } catch (final DOMDataTreeShardingConflictException e) {
+ assertTrue(e.getMessage().contains("is already occupied by shard"));
+ }
+ }
+
+ @Test
+ @Ignore("Needs different shard creation handling due to replicas")
+ public void testWriteIntoMultipleShards() throws Exception {
+ initEmptyDatastores("config");
+
+ leaderTestKit.waitForMembersUp("member-2");
+
+ LOG.warn("registering first shard");
+ final DistributedShardRegistration shardRegistration =
+ leaderShardFactory.createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+ findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
+
+ LOG.warn("Got after waiting for nonleader");
+ final ActorRef leaderShardManager = leaderDistributedDataStore.getActorContext().getShardManager();
+
+ new JavaTestKit(leaderSystem) {
+ {
+ leaderShardManager.tell(
+ new FindLocalShard(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+
+ final ActorRef followerShardManager = followerDistributedDataStore.getActorContext().getShardManager();
+
+ followerShardManager.tell(new FindLocalShard(
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), followerTestKit.getRef());
+ followerTestKit.expectMsgClass(duration("5 seconds"), LocalShardFound.class);
+ LOG.warn("Found follower shard");
+
+ leaderDistributedDataStore.getActorContext().getShardManager().tell(
+ new FindPrimary(ClusterUtils.getCleanShardName(TestModel.TEST_PATH), true), getRef());
+ expectMsgClass(duration("5 seconds"), LocalPrimaryShardFound.class);
+ }
+ };
+
+ final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
+
+ final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
+ final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
+ Assert.assertNotNull(cursor);
+ final YangInstanceIdentifier nameId =
+ YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
+ cursor.write(nameId.getLastPathArgument(),
+ ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
+ new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
+
+ cursor.close();
+ LOG.warn("Got to pre submit");
+
+ tx.submit();
+ }
+
+ @Test
+ public void testMultipleShardRegistrations() throws Exception {
+ initEmptyDatastores("config");
+
+ final DistributedShardRegistration reg1 = leaderShardFactory
+ .createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ final DistributedShardRegistration reg2 = leaderShardFactory
+ .createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ final DistributedShardRegistration reg3 = leaderShardFactory
+ .createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ final DistributedShardRegistration reg4 = leaderShardFactory
+ .createDistributedShard(
+ new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+ // check leader has local shards
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
+
+ // check follower has local shards
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
+
+
+ LOG.debug("Closing registrations");
+
+ reg1.close();
+ reg2.close();
+ reg3.close();
+ reg4.close();
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+ LOG.debug("All leader shards gone");
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
+
+ LOG.debug("All follower shards gone");
+ }
+
+ @Test
+ public void testMultipleRegistrationsAtOnePrefix() throws Exception {
+ initEmptyDatastores("config");
+
+ for (int i = 0; i < 10; i++) {
+ LOG.debug("Round {}", i);
+ final DistributedShardRegistration reg1 = leaderShardFactory
+ .createDistributedShard(TEST_ID,
+ Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME));
+
+ leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ assertNotNull(findLocalShard(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ assertNotNull(findLocalShard(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
+
+ reg1.close();
+
+ waitUntilShardIsDown(leaderDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+
+ waitUntilShardIsDown(followerDistributedDataStore.getActorContext(),
+ ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
+ }
+ }
+}