import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
DistributedDataStore dataStore =
setupDistributedDataStore("testChangeListenerRegistration", "test-1");
- final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
- changeList = Lists.newArrayList();
- final CountDownLatch changeLatch = new CountDownLatch(3);
- AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
- new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
- @Override
- public void onDataChanged(AsyncDataChangeEvent<YangInstanceIdentifier,
- NormalizedNode<?, ?>> change) {
- changeList.add(change);
- changeLatch.countDown();
- }
- };
+ MockDataChangeListener listener = new MockDataChangeListener(3);
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+ ListenerRegistration<MockDataChangeListener>
listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
DataChangeScope.SUBTREE);
testWriteTransaction(dataStore, listPath,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
- assertEquals("Change notifications complete", true,
- Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
-
- assertTrue("Change 1 does not contain " + TestModel.TEST_PATH,
- changeList.get(0).getCreatedData().containsKey(TestModel.TEST_PATH));
-
- assertTrue("Change 2 does not contain " + TestModel.OUTER_LIST_PATH,
- changeList.get(1).getCreatedData().containsKey(TestModel.OUTER_LIST_PATH));
-
- assertTrue("Change 3 does not contain " + listPath,
- changeList.get(2).getCreatedData().containsKey(listPath));
+ listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
listenerReg.close();
nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-
- assertEquals("Received unexpected change after close", 3, changeList.size());
+ listener.expectNoMoreChanges("Received unexpected change after close");
cleanup(dataStore);
}};