import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
+import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
@Before
public void setUp() throws IOException {
+ InMemorySnapshotStore.clear();
+ InMemoryJournal.clear();
system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
Cluster.get(system).join(member1Address);
@Test
public void testChainedTransactionFailureWithSingleShard() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
-
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
try (final AbstractDataStore dataStore = setupAbstractDataStore(
@Test
public void testChainedTransactionFailureWithMultipleShards() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
-
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
try (final AbstractDataStore dataStore = setupAbstractDataStore(
assertNotNull("registerChangeListener returned null", listenerReg);
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("getDataChangeListenerActors", 1,
+ state.getDataChangeListenerActors().size()));
+
// Wait for the initial notification
listener.waitForChangeEvents(TestModel.TEST_PATH);
listener.reset(2);
listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
listenerReg.close();
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("getDataChangeListenerActors", 0,
+ state.getDataChangeListenerActors().size()));
+
+ testWriteTransaction(dataStore,
+ YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2));
+
+ listener.expectNoMoreChanges("Received unexpected change after close");
+ }
+ }
+ };
+ }
+
+ @Test
+ public void testDataTreeChangeListenerRegistration() throws Exception {
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
+ {
+ try (final AbstractDataStore dataStore = setupAbstractDataStore(
+ testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
+
+ testWriteTransaction(dataStore, TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+
+ ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
+ .registerTreeChangeListener(TestModel.TEST_PATH, listener);
+
+ assertNotNull("registerTreeChangeListener returned null", listenerReg);
+
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("getTreeChangeListenerActors", 1,
+ state.getTreeChangeListenerActors().size()));
+
+ // Wait for the initial notification
+ listener.waitForChangeEvents(TestModel.TEST_PATH);
+ listener.reset(2);
+
+ // Write 2 updates.
+ testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
+ testWriteTransaction(dataStore, listPath,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
+
+ // Wait for the 2 updates.
+ listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
+ listenerReg.close();
+
+ IntegrationTestKit.verifyShardState(dataStore, "test-1",
+ state -> assertEquals("getTreeChangeListenerActors", 0,
+ state.getTreeChangeListenerActors().size()));
+
testWriteTransaction(dataStore,
YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(),