Bug 7271: Use mdsal AbstractDOMStoreTreeChangePublisher in CDS 03/48903/4
authorTom Pantelis <tpanteli@brocade.com>
Thu, 1 Dec 2016 18:32:29 +0000 (13:32 -0500)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 15 Dec 2016 12:24:07 +0000 (12:24 +0000)
Modified DefaultShardDataTreeChangeListenerPublisher to derive from
the AbstractDOMStoreTreeChangePublisher in the mdsal project to get
the DataTreeCandidate batching.

Change-Id: Ic86da04a80e9db56dd234549b88f4c958b1a708c
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardDataTreeChangeListenerPublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataTreeChangeListener.java

index b539443f6ef72ad3ebe3c2c315efd4174f10afba..a63859e9e695516488129a496319d1070e9b3804 100644 (file)
@@ -8,14 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import java.util.Collection;
 package org.opendaylight.controller.cluster.datastore;
 
 import java.util.Collection;
-import java.util.Collections;
 import javax.annotation.concurrent.NotThreadSafe;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
+import org.opendaylight.mdsal.dom.spi.store.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,18 +40,29 @@ final class DefaultShardDataTreeChangeListenerPublisher extends AbstractDOMStore
     }
 
     @Override
     }
 
     @Override
-    protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations,
-            final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
-        final Collection<DataTreeCandidate> changes = Collections.<DataTreeCandidate>singleton(
-                DataTreeCandidates.newDataTreeCandidate(path, node));
-
-        for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
-            reg.getInstance().onDataTreeChanged(changes);
-        }
+    protected void notifyListener(AbstractDOMDataTreeChangeListenerRegistration<?> registration,
+            Collection<DataTreeCandidate> changes) {
+        registration.getInstance().onDataTreeChanged(changes);
     }
 
     @Override
     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
         LOG.debug("Registration {} removed", registration);
     }
     }
 
     @Override
     protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
         LOG.debug("Registration {} removed", registration);
     }
+
+    @Override
+    public <L extends org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener> ListenerRegistration<L>
+            registerTreeChangeListener(final YangInstanceIdentifier treeId, final L listener) {
+        final AbstractDOMDataTreeChangeListenerRegistration<DOMDataTreeChangeListener> registration =
+            super.registerTreeChangeListener(treeId, (org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener)
+                changes -> listener.onDataTreeChanged(changes));
+
+        return new org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration<L>(
+                listener) {
+            @Override
+            protected void removeRegistration() {
+                registration.close();
+            }
+        };
+    }
 }
 }
index ab7e8f040159471c3290713797d2105a321584a8..3de3e9ac222f9a8e429c94dbc16f90a9e1a1aa0a 100644 (file)
@@ -313,7 +313,7 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         BatchedModifications batched = newBatchedModifications(nextTransactionId(), id, node, true, true, 1);
         DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
         batched.apply(modification);
         BatchedModifications batched = newBatchedModifications(nextTransactionId(), id, node, true, true, 1);
         DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
         batched.apply(modification);
-        store.commit(modification);
+        store.notifyListeners(store.commit(modification));
     }
 
     public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
     }
 
     public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
@@ -337,7 +337,7 @@ public abstract class AbstractShardTest extends AbstractActorTest {
 
         DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
         batched.apply(modification);
 
         DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
         batched.apply(modification);
-        store.commit(modification);
+        store.notifyListeners(store.commit(modification));
     }
 
     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
     }
 
     DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
index a69179eef6f3f5c485f7f79f4cd1c3eb7187f764..6d09d1cfac3cd4a36003fc7b4bbb8f3aae53c81d 100644 (file)
@@ -21,14 +21,27 @@ import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.o
 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.testNodeWithOuter;
 
 import akka.actor.ActorRef;
 import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.testNodeWithOuter;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Throwables;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Await;
+import scala.concurrent.duration.Duration;
 
 /**
  * Unit tests for DataTreeChangeListenerSupport.
 
 /**
  * Unit tests for DataTreeChangeListenerSupport.
@@ -37,12 +50,13 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
  */
 public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
     private Shard shard;
  */
 public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
     private Shard shard;
-    private DataTreeChangeListenerSupport support;
+    private TestActorRef<Shard> shardActor;
 
 
+    @Override
     @Before
     @Before
-    public void setup() {
-        shard = createShard();
-        support = new DataTreeChangeListenerSupport(shard);
+    public void setUp() {
+        super.setUp();
+        createShard();
     }
 
     @Override
     }
 
     @Override
@@ -54,7 +68,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
 
     @Test
     public void testChangeListenerWithNoInitialData() throws Exception {
 
     @Test
     public void testChangeListenerWithNoInitialData() throws Exception {
-        MockDataTreeChangeListener listener = registerChangeListener(TEST_PATH, 0, true);
+        MockDataTreeChangeListener listener = registerChangeListener(TEST_PATH, 0).getKey();
 
         listener.expectNoMoreChanges("Unexpected initial change event");
     }
 
         listener.expectNoMoreChanges("Unexpected initial change event");
     }
@@ -63,17 +77,32 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
     public void testInitialChangeListenerEventWithContainerPath() throws Exception {
         writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
 
     public void testInitialChangeListenerEventWithContainerPath() throws Exception {
         writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
 
-        MockDataTreeChangeListener listener = registerChangeListener(TEST_PATH, 1, true);
+        Entry<MockDataTreeChangeListener, ActorSelection> entry = registerChangeListener(TEST_PATH, 1);
+        MockDataTreeChangeListener listener = entry.getKey();
 
         listener.waitForChangeEvents();
         listener.verifyNotifiedData(TEST_PATH);
 
         listener.waitForChangeEvents();
         listener.verifyNotifiedData(TEST_PATH);
+
+        listener.reset(1);
+
+        writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
+        listener.waitForChangeEvents();
+        listener.verifyNotifiedData(TEST_PATH);
+
+        listener.reset(1);
+        JavaTestKit kit = new JavaTestKit(getSystem());
+        entry.getValue().tell(CloseDataTreeChangeListenerRegistration.getInstance(), kit.getRef());
+        kit.expectMsgClass(JavaTestKit.duration("5 seconds"), CloseDataTreeChangeListenerRegistrationReply.class);
+
+        writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
+        listener.verifyNoNotifiedData(TEST_PATH);
     }
 
     @Test
     public void testInitialChangeListenerEventWithListPath() throws Exception {
         mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
 
     }
 
     @Test
     public void testInitialChangeListenerEventWithListPath() throws Exception {
         mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
 
-        MockDataTreeChangeListener listener = registerChangeListener(OUTER_LIST_PATH, 1, true);
+        MockDataTreeChangeListener listener = registerChangeListener(OUTER_LIST_PATH, 1).getKey();
 
         listener.waitForChangeEvents();
         listener.verifyNotifiedData(OUTER_LIST_PATH);
 
         listener.waitForChangeEvents();
         listener.verifyNotifiedData(OUTER_LIST_PATH);
@@ -83,7 +112,8 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
     public void testInitialChangeListenerEventWithWildcardedListPath() throws Exception {
         mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
 
     public void testInitialChangeListenerEventWithWildcardedListPath() throws Exception {
         mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
 
-        MockDataTreeChangeListener listener = registerChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME), 2, true);
+        MockDataTreeChangeListener listener =
+                registerChangeListener(OUTER_LIST_PATH.node(OUTER_LIST_QNAME), 1).getKey();
 
         listener.waitForChangeEvents();
         listener.verifyNotifiedData(outerEntryPath(1), outerEntryPath(2));
 
         listener.waitForChangeEvents();
         listener.verifyNotifiedData(outerEntryPath(1), outerEntryPath(2));
@@ -95,7 +125,7 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
                 outerNodeEntry(1, innerNode("one", "two")), outerNodeEntry(2, innerNode("three", "four")))));
 
         MockDataTreeChangeListener listener = registerChangeListener(
                 outerNodeEntry(1, innerNode("one", "two")), outerNodeEntry(2, innerNode("three", "four")))));
 
         MockDataTreeChangeListener listener = registerChangeListener(
-                OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), 4, true);
+                OUTER_LIST_PATH.node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), 1).getKey();
 
         listener.waitForChangeEvents();
         listener.verifyNotifiedData(innerEntryPath(1, "one"), innerEntryPath(1, "two"), innerEntryPath(2, "three"),
 
         listener.waitForChangeEvents();
         listener.verifyNotifiedData(innerEntryPath(1, "one"), innerEntryPath(1, "two"), innerEntryPath(2, "three"),
@@ -104,24 +134,46 @@ public class DataTreeChangeListenerSupportTest extends AbstractShardTest {
         // Register for a specific outer list entry
 
         MockDataTreeChangeListener listener2 = registerChangeListener(
         // Register for a specific outer list entry
 
         MockDataTreeChangeListener listener2 = registerChangeListener(
-                OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), 2, true);
+                OUTER_LIST_PATH.node(outerEntryKey(1)).node(INNER_LIST_QNAME).node(INNER_LIST_QNAME), 1).getKey();
 
         listener2.waitForChangeEvents();
         listener2.verifyNotifiedData(innerEntryPath(1, "one"), innerEntryPath(1, "two"));
         listener2.verifyNoNotifiedData(innerEntryPath(2, "three"), innerEntryPath(2, "four"));
 
         listener2.waitForChangeEvents();
         listener2.verifyNotifiedData(innerEntryPath(1, "one"), innerEntryPath(1, "two"));
         listener2.verifyNoNotifiedData(innerEntryPath(2, "three"), innerEntryPath(2, "four"));
+
+        listener.reset(1);
+        listener2.reset(1);
+
+        mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(outerNode(
+                outerNodeEntry(1, innerNode("three")))));
+
+        listener.waitForChangeEvents();
+        listener.verifyNotifiedData(innerEntryPath(1, "three"));
+
+        listener2.waitForChangeEvents();
+        listener2.verifyNotifiedData(innerEntryPath(1, "three"));
     }
 
     }
 
-    private MockDataTreeChangeListener registerChangeListener(final YangInstanceIdentifier path,
-            final int expectedEvents, final boolean isLeader) {
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private Entry<MockDataTreeChangeListener, ActorSelection> registerChangeListener(final YangInstanceIdentifier path,
+            final int expectedEvents) {
         MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
         ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
         MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
         ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
-        support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true);
-        return listener;
+
+        try {
+            RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply)
+                Await.result(Patterns.ask(shardActor, new RegisterDataTreeChangeListener(path, dclActor, false),
+                    new Timeout(5, TimeUnit.SECONDS)), Duration.create(5, TimeUnit.SECONDS));
+            return new SimpleEntry<>(listener, getSystem().actorSelection(reply.getListenerRegistrationPath()));
+
+        } catch (Exception e) {
+            Throwables.propagate(e);
+            return null;
+        }
     }
 
     }
 
-    private Shard createShard() {
-        TestActorRef<Shard> actor = actorFactory.createTestActor(newShardProps());
-        ShardTestKit.waitUntilLeader(actor);
-        return actor.underlyingActor();
+    private void createShard() {
+        shardActor = actorFactory.createTestActor(newShardProps());
+        ShardTestKit.waitUntilLeader(shardActor);
+        shard = shardActor.underlyingActor();
     }
 }
     }
 }
index f5a61599df4e2a5a326b7870927b96d1b23b1c0e..4be431554118484f6760a0e78f72f0f82bfba86d 100644 (file)
@@ -27,8 +27,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
 
 
 public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
 
-    private final List<Collection<DataTreeCandidate>> changeList =
-            Lists.<Collection<DataTreeCandidate>>newArrayList();
+    private final List<DataTreeCandidate> changeList = Lists.newArrayList();
 
     private volatile CountDownLatch changeLatch;
     private int expChangeEventCount;
 
     private volatile CountDownLatch changeLatch;
     private int expChangeEventCount;
@@ -47,10 +46,12 @@ public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
 
     @Override
     public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
 
     @Override
     public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
-        synchronized (changeList) {
-            changeList.add(changes);
+        if (changeLatch.getCount() > 0) {
+            synchronized (changeList) {
+                changeList.addAll(changes);
+            }
+            changeLatch.countDown();
         }
         }
-        changeLatch.countDown();
     }
 
     public void waitForChangeEvents() {
     }
 
     public void waitForChangeEvents() {
@@ -64,10 +65,8 @@ public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
     public void verifyNotifiedData(YangInstanceIdentifier... paths) {
         Set<YangInstanceIdentifier> pathSet = new HashSet<>(Arrays.asList(paths));
         synchronized (changeList) {
     public void verifyNotifiedData(YangInstanceIdentifier... paths) {
         Set<YangInstanceIdentifier> pathSet = new HashSet<>(Arrays.asList(paths));
         synchronized (changeList) {
-            for (Collection<DataTreeCandidate> list : changeList) {
-                for (DataTreeCandidate c : list) {
-                    pathSet.remove(c.getRootPath());
-                }
+            for (DataTreeCandidate c : changeList) {
+                pathSet.remove(c.getRootPath());
             }
         }
 
             }
         }
 
@@ -86,11 +85,9 @@ public class MockDataTreeChangeListener implements DOMDataTreeChangeListener {
     public void verifyNoNotifiedData(YangInstanceIdentifier... paths) {
         Set<YangInstanceIdentifier> pathSet = new HashSet<>(Arrays.asList(paths));
         synchronized (changeList) {
     public void verifyNoNotifiedData(YangInstanceIdentifier... paths) {
         Set<YangInstanceIdentifier> pathSet = new HashSet<>(Arrays.asList(paths));
         synchronized (changeList) {
-            for (Collection<DataTreeCandidate> list : changeList) {
-                for (DataTreeCandidate c : list) {
-                    assertFalse("Unexpected " + c.getRootPath() + " present in DataTreeCandidate",
-                            pathSet.contains(c.getRootPath()));
-                }
+            for (DataTreeCandidate c : changeList) {
+                assertFalse("Unexpected " + c.getRootPath() + " present in DataTreeCandidate",
+                        pathSet.contains(c.getRootPath()));
             }
         }
     }
             }
         }
     }