X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=2cd2b29c8d3e38d65f241d164f93570f560a4e9a;hp=20dd17d15c7e70b49d1e99c427151c215a18db4f;hb=e78622411319748472b5d9edab14eb6dc92cf6b1;hpb=aeabc9205320987968f20f19119b2591ac6c8d6a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 20dd17d15c..2cd2b29c8d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -45,7 +45,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -64,6 +63,7 @@ import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardData import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; @@ -118,6 +118,8 @@ public class DistributedDataStoreIntegrationTest { @Before public void setUp() throws IOException { + InMemorySnapshotStore.clear(); + InMemoryJournal.clear(); system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1")); Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558"); Cluster.get(system).join(member1Address); @@ -137,7 +139,7 @@ public class DistributedDataStoreIntegrationTest { public void testWriteTransactionWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "transactionIntegrationTest", "test-1")) { testWriteTransaction(dataStore, TestModel.TEST_PATH, @@ -154,7 +156,7 @@ public class DistributedDataStoreIntegrationTest { public void testWriteTransactionWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) { DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); @@ -203,7 +205,7 @@ public class DistributedDataStoreIntegrationTest { public void testReadWriteTransactionWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) { // 1. Create a read-write Tx @@ -244,7 +246,7 @@ public class DistributedDataStoreIntegrationTest { public void testReadWriteTransactionWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) { DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); @@ -301,7 +303,7 @@ public class DistributedDataStoreIntegrationTest { public void testSingleTransactionsWritesInQuickSuccession() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -344,7 +346,7 @@ public class DistributedDataStoreIntegrationTest { final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, testName, false, shardName)) { // Create the write Tx @@ -444,7 +446,7 @@ public class DistributedDataStoreIntegrationTest { final CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, testName, false, shardName)) { // Create the read-write Tx @@ -588,7 +590,7 @@ public class DistributedDataStoreIntegrationTest { InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, testName, false, shardName)) { // Create the read-write Tx @@ -658,7 +660,7 @@ public class DistributedDataStoreIntegrationTest { datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1) .shardInitializationTimeout(200, TimeUnit.MILLISECONDS); - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, testName, false, shardName)) { final Object result = dataStore.getActorContext().executeOperation( @@ -666,7 +668,7 @@ public class DistributedDataStoreIntegrationTest { assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); // Create the write Tx. - try (final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() + try (DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : dataStore.newReadWriteTransaction()) { assertNotNull("newReadWriteTransaction returned null", writeTx); @@ -729,7 +731,7 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionAbort() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "transactionAbortIntegrationTest", "test-1")) { final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); @@ -755,7 +757,7 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionChainWithSingleShard() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testTransactionChainWithSingleShard", "test-1")) { // 1. Create a Tx chain and write-only Tx @@ -839,7 +841,7 @@ public class DistributedDataStoreIntegrationTest { public void testTransactionChainWithMultipleShards() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -907,7 +909,7 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionsInQuickSuccession() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) { final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( @@ -956,7 +958,7 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -981,7 +983,7 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -1022,7 +1024,7 @@ public class DistributedDataStoreIntegrationTest { public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) { final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); @@ -1064,12 +1066,9 @@ public class DistributedDataStoreIntegrationTest { @Test public void testChainedTransactionFailureWithSingleShard() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) { final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( @@ -1110,12 +1109,9 @@ public class DistributedDataStoreIntegrationTest { @Test public void testChainedTransactionFailureWithMultipleShards() throws Exception { - //TODO remove when test passes also for ClientBackedDataStore - Assume.assumeTrue(testParameter.equals(DistributedDataStore.class)); - new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) { final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( @@ -1161,7 +1157,7 @@ public class DistributedDataStoreIntegrationTest { public void testChangeListenerRegistration() throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) { { - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, "testChangeListenerRegistration", "test-1")) { testWriteTransaction(dataStore, TestModel.TEST_PATH, @@ -1174,6 +1170,10 @@ public class DistributedDataStoreIntegrationTest { assertNotNull("registerChangeListener returned null", listenerReg); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getDataChangeListenerActors", 1, + state.getDataChangeListenerActors().size())); + // Wait for the initial notification listener.waitForChangeEvents(TestModel.TEST_PATH); listener.reset(2); @@ -1191,6 +1191,63 @@ public class DistributedDataStoreIntegrationTest { listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); listenerReg.close(); + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getDataChangeListenerActors", 0, + state.getDataChangeListenerActors().size())); + + testWriteTransaction(dataStore, + YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); + + listener.expectNoMoreChanges("Received unexpected change after close"); + } + } + }; + } + + @Test + public void testDataTreeChangeListenerRegistration() throws Exception { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) { + { + try (AbstractDataStore dataStore = setupAbstractDataStore( + testParameter, "testDataTreeChangeListenerRegistration", "test-1")) { + + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1); + + ListenerRegistration listenerReg = dataStore + .registerTreeChangeListener(TestModel.TEST_PATH, listener); + + assertNotNull("registerTreeChangeListener returned null", listenerReg); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 1, + state.getTreeChangeListenerActors().size())); + + // Wait for the initial notification + listener.waitForChangeEvents(TestModel.TEST_PATH); + listener.reset(2); + + // Write 2 updates. + testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(); + testWriteTransaction(dataStore, listPath, + ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + + // Wait for the 2 updates. + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); + listenerReg.close(); + + IntegrationTestKit.verifyShardState(dataStore, "test-1", + state -> assertEquals("getTreeChangeListenerActors", 0, + state.getTreeChangeListenerActors().size())); + testWriteTransaction(dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), @@ -1237,7 +1294,7 @@ public class DistributedDataStoreIntegrationTest { new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot), new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot))); - try (final AbstractDataStore dataStore = setupAbstractDataStore( + try (AbstractDataStore dataStore = setupAbstractDataStore( testParameter, name, "module-shards-member1.conf", true, "cars", "people")) { final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); @@ -1273,7 +1330,7 @@ public class DistributedDataStoreIntegrationTest { MetadataShardDataTreeSnapshot shardSnapshot = new MetadataShardDataTreeSnapshot(root); final ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try (final DataOutputStream dos = new DataOutputStream(bos)) { + try (DataOutputStream dos = new DataOutputStream(bos)) { PayloadVersion.BORON.writeTo(dos); try (ObjectOutputStream oos = new ObjectOutputStream(dos)) { oos.writeObject(shardSnapshot);