Merge "Bug 2055: Handle shard not initialized resiliently"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
index 395021d361c6d4b210fcf29bf20707802b6c590a..d35c36fb0a2c883649ec7fcccc42ac981f20e256 100644 (file)
@@ -4,9 +4,11 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@ -14,15 +16,21 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
@@ -204,6 +212,68 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testChangeListenerRegistration() throws Exception{
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("testChangeListenerRegistration", "test-1");
+
+            final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                                                                changeList = Lists.newArrayList();
+            final CountDownLatch changeLatch = new CountDownLatch(3);
+            AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+                    new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
+                @Override
+                public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier,
+                                                               NormalizedNode<?, ?>> change) {
+                    changeList.add(change);
+                    changeLatch.countDown();
+                }
+            };
+
+            ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                    listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
+                            DataChangeScope.SUBTREE);
+
+            assertNotNull("registerChangeListener returned null", listenerReg);
+
+            testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+            YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+                    nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+            testWriteTransaction(dataStore, listPath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+            assertEquals("Change notifications complete", true,
+                    Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+
+            assertTrue("Change 1 does not contain " + TestModel.TEST_PATH,
+                    changeList.get(0).getCreatedData().containsKey(TestModel.TEST_PATH));
+
+            assertTrue("Change 2 does not contain " + TestModel.OUTER_LIST_PATH,
+                    changeList.get(1).getCreatedData().containsKey(TestModel.OUTER_LIST_PATH));
+
+            assertTrue("Change 3 does not contain " + listPath,
+                    changeList.get(2).getCreatedData().containsKey(listPath));
+
+            listenerReg.close();
+
+            testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+                    nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+            assertEquals("Received unexpected change after close", 3, changeList.size());
+
+            cleanup(dataStore);
+        }};
+    }
+
     class IntegrationTestKit extends ShardTestKit {
 
         IntegrationTestKit(ActorSystem actorSystem) {