import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
}};
}
+ @Test
+ public void testChangeListenerRegistration() throws Exception{
+ new IntegrationTestKit(getSystem()) {{
+ DistributedDataStore dataStore =
+ setupDistributedDataStore("testChangeListenerRegistration", "test-1");
+
+ final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ changeList = Lists.newArrayList();
+ final CountDownLatch changeLatch = new CountDownLatch(3);
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
+ new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
+ @Override
+ public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier,
+ NormalizedNode<?, ?>> change) {
+ changeList.add(change);
+ changeLatch.countDown();
+ }
+ };
+
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
+ DataChangeScope.SUBTREE);
+
+ assertNotNull("registerChangeListener returned null", listenerReg);
+
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+ nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+ testWriteTransaction(dataStore, listPath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+ assertEquals("Change notifications complete", true,
+ Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+
+ assertTrue("Change 1 does not contain " + TestModel.TEST_PATH,
+ changeList.get(0).getCreatedData().containsKey(TestModel.TEST_PATH));
+
+ assertTrue("Change 2 does not contain " + TestModel.OUTER_LIST_PATH,
+ changeList.get(1).getCreatedData().containsKey(TestModel.OUTER_LIST_PATH));
+
+ assertTrue("Change 3 does not contain " + listPath,
+ changeList.get(2).getCreatedData().containsKey(listPath));
+
+ listenerReg.close();
+
+ testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH).
+ nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ assertEquals("Received unexpected change after close", 3, changeList.size());
+
+ cleanup(dataStore);
+ }};
+ }
+
class IntegrationTestKit extends ShardTestKit {
IntegrationTestKit(ActorSystem actorSystem) {