X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=5a45a9961afc6567586b506395a7f24d57a4671c;hb=refs%2Fchanges%2F15%2F12215%2F7;hp=d35c36fb0a2c883649ec7fcccc42ac981f20e256;hpb=c8e6e650e2dd0647dbfed665244445cf64c1e262;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index d35c36fb0a..5a45a9961a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -4,21 +4,23 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import com.google.common.base.Optional; -import com.google.common.collect.Lists; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Uninterruptibles; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; +import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; +import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -29,12 +31,16 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class DistributedDataStoreIntegrationTest extends AbstractActorTest { + private final DatastoreContext.Builder datastoreContextBuilder = + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100); + @Test public void testWriteTransactionWithSingleShard() throws Exception{ System.setProperty("shard.persistent", "true"); @@ -77,7 +83,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { cohort.preCommit().get(5, TimeUnit.SECONDS); cohort.commit().get(5, TimeUnit.SECONDS); - // 5. Verify the data in the store + // Verify the data in the store DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); @@ -100,7 +106,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { DistributedDataStore dataStore = setupDistributedDataStore("testReadWriteTransaction", "test-1"); - // 1. Create a read-write Tx + // 1. Create a read-write Tx DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); assertNotNull("newReadWriteTransaction returned null", readWriteTx); @@ -143,6 +149,383 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { }}; } + @Test + public void testTransactionWritesWithShardNotInitiallyReady() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionWritesWithShardNotInitiallyReady"; + String shardName = "test-1"; + + // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't + // initialized until we create and submit the write the Tx. + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the write Tx + + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modification operations and ready the Tx on a separate thread. + + final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier.builder( + TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, + TestModel.ID_QNAME, 1).build(); + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + writeTx.merge(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder( + TestModel.OUTER_LIST_QNAME).build()); + + writeTx.write(listEntryPath, ImmutableNodes.mapEntry( + TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); + + writeTx.delete(listEntryPath); + + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx ready", true, done); + + // At this point the Tx operations should be waiting for the shard to initialize so + // trigger the latch to let the shard recovery to continue. + + blockRecoveryLatch.countDown(); + + // Wait for the Tx commit to complete. + + assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS)); + txCohort.get().preCommit().get(5, TimeUnit.SECONDS); + txCohort.get().commit().get(5, TimeUnit.SECONDS); + + // Verify the data in the store + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(TestModel.TEST_PATH). + get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + + optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + + optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", false, optional.isPresent()); + + cleanup(dataStore); + }}; + } + + @Test + public void testTransactionReadsWithShardNotInitiallyReady() throws Exception{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionReadsWithShardNotInitiallyReady"; + String shardName = "test-1"; + + // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't + // initialized until we create the Tx. + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the read-write Tx + + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + // Do some reads on the Tx on a separate thread. + + final AtomicReference> txExistsFuture = + new AtomicReference<>(); + final AtomicReference>, ReadFailedException>> + txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadsDone = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH)); + + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReadsDone.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx reads done", true, done); + + // At this point the Tx operations should be waiting for the shard to initialize so + // trigger the latch to let the shard recovery to continue. + + blockRecoveryLatch.countDown(); + + // Wait for the reads to complete and verify. + + assertEquals("exists", true, txExistsFuture.get().checkedGet(5, TimeUnit.SECONDS)); + assertEquals("read", true, txReadFuture.get().checkedGet(5, TimeUnit.SECONDS).isPresent()); + + readWriteTx.close(); + + cleanup(dataStore); + }}; + } + + @Test(expected=NotInitializedException.class) + public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionCommitFailureWithShardNotInitialized"; + String shardName = "test-1"; + + // Set the shard initialization timeout low for the test. + + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + + // Setup the InMemoryJournal to block shard recovery indefinitely. + + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the write Tx + + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modifications and ready the Tx on a separate thread. + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx ready", true, done); + + // Wait for the commit to complete. Since the shard never initialized, the Tx should + // have timed out and throw an appropriate exception cause. + + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + } catch(ExecutionException e) { + throw e.getCause(); + } finally { + blockRecoveryLatch.countDown(); + cleanup(dataStore); + } + }}; + } + + @Test(expected=NotInitializedException.class) + public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionReadFailureWithShardNotInitialized"; + String shardName = "test-1"; + + // Set the shard initialization timeout low for the test. + + datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); + + // Setup the InMemoryJournal to block shard recovery indefinitely. + + String persistentID = String.format("member-1-shard-%s-%s", shardName, testName); + CountDownLatch blockRecoveryLatch = new CountDownLatch(1); + InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the read-write Tx + + final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction(); + assertNotNull("newReadWriteTransaction returned null", readWriteTx); + + // Do a read on the Tx on a separate thread. + + final AtomicReference>, ReadFailedException>> + txReadFuture = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReadDone = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + readWriteTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH)); + + readWriteTx.close(); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReadDone.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx read done", true, done); + + // Wait for the read to complete. Since the shard never initialized, the Tx should + // have timed out and throw an appropriate exception cause. + + try { + txReadFuture.get().checkedGet(5, TimeUnit.SECONDS); + } catch(ReadFailedException e) { + throw e.getCause(); + } finally { + blockRecoveryLatch.countDown(); + cleanup(dataStore); + } + }}; + } + + @Test(expected=NoShardLeaderException.class) + public void testTransactionCommitFailureWithNoShardLeader() throws Throwable{ + new IntegrationTestKit(getSystem()) {{ + String testName = "testTransactionCommitFailureWithNoShardLeader"; + String shardName = "test-1"; + + // We don't want the shard to become the leader so prevent shard election from completing + // by setting the election timeout, which is based on the heartbeat interval, really high. + + datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); + + // Set the leader election timeout low for the test. + + datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS); + + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + + // Create the write Tx. + + final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction(); + assertNotNull("newReadWriteTransaction returned null", writeTx); + + // Do some modifications and ready the Tx on a separate thread. + + final AtomicReference txCohort = new AtomicReference<>(); + final AtomicReference caughtEx = new AtomicReference<>(); + final CountDownLatch txReady = new CountDownLatch(1); + Thread txThread = new Thread() { + @Override + public void run() { + try { + writeTx.write(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + txCohort.set(writeTx.ready()); + } catch(Exception e) { + caughtEx.set(e); + return; + } finally { + txReady.countDown(); + } + } + }; + + txThread.start(); + + // Wait for the Tx operations to complete. + + boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS); + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + assertEquals("Tx ready", true, done); + + // Wait for the commit to complete. Since no shard leader was elected in time, the Tx + // should have timed out and throw an appropriate exception cause. + + try { + txCohort.get().canCommit().get(5, TimeUnit.SECONDS); + } catch(ExecutionException e) { + throw e.getCause(); + } finally { + cleanup(dataStore); + } + }}; + } + @Test public void testTransactionAbort() throws Exception{ System.setProperty("shard.persistent", "true"); @@ -218,20 +601,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", "test-1"); - final List>> - changeList = Lists.newArrayList(); - final CountDownLatch changeLatch = new CountDownLatch(3); - AsyncDataChangeListener> listener = - new AsyncDataChangeListener>() { - @Override - public void onDataChanged(AsyncDataChangeEvent> change) { - changeList.add(change); - changeLatch.countDown(); - } - }; + MockDataChangeListener listener = new MockDataChangeListener(3); - ListenerRegistration>> + ListenerRegistration listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, DataChangeScope.SUBTREE); @@ -248,17 +620,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { testWriteTransaction(dataStore, listPath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - assertEquals("Change notifications complete", true, - Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS)); - - assertTrue("Change 1 does not contain " + TestModel.TEST_PATH, - changeList.get(0).getCreatedData().containsKey(TestModel.TEST_PATH)); - - assertTrue("Change 2 does not contain " + TestModel.OUTER_LIST_PATH, - changeList.get(1).getCreatedData().containsKey(TestModel.OUTER_LIST_PATH)); - - assertTrue("Change 3 does not contain " + listPath, - changeList.get(2).getCreatedData().containsKey(listPath)); + listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath ); listenerReg.close(); @@ -266,9 +628,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - - assertEquals("Received unexpected change after close", 3, changeList.size()); + listener.expectNoMoreChanges("Received unexpected change after close"); cleanup(dataStore); }}; @@ -281,31 +641,37 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { } DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) { + return setupDistributedDataStore(typeName, true, shardNames); + } + + DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader, + String... shardNames) { MockClusterWrapper cluster = new MockClusterWrapper(); Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf"); ShardStrategyFactory.setConfiguration(config); - DatastoreContext datastoreContext = DatastoreContext.newBuilder().build(); + DatastoreContext datastoreContext = datastoreContextBuilder.build(); DistributedDataStore dataStore = new DistributedDataStore(getSystem(), typeName, cluster, config, datastoreContext); SchemaContext schemaContext = SchemaContextHelper.full(); dataStore.onGlobalContextUpdated(schemaContext); - for(String shardName: shardNames) { - ActorRef shard = null; - for(int i = 0; i < 20 * 5 && shard == null; i++) { - Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); - Optional shardReply = dataStore.getActorContext().findLocalShard(shardName); - if(shardReply.isPresent()) { - shard = shardReply.get(); + if(waitUntilLeader) { + for(String shardName: shardNames) { + ActorRef shard = null; + for(int i = 0; i < 20 * 5 && shard == null; i++) { + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + Optional shardReply = dataStore.getActorContext().findLocalShard(shardName); + if(shardReply.isPresent()) { + shard = shardReply.get(); + } } - } - assertNotNull("Shard was not created", shard); + assertNotNull("Shard was not created", shard); - System.out.println("!!!!!!shard: "+shard.path().toString()); - waitUntilLeader(shard); + waitUntilLeader(shard); + } } return dataStore;