X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreIntegrationTest.java;h=7b78da29234706a56848031169be919a2f3458a7;hp=76ae3c71566bdce5663918e05a7f1d6cf54e352b;hb=a14ed47823d85d2547fbfd7e6c42649962913d34;hpb=103ceecd0195cca6c87fbd62a687d8addf128784 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 76ae3c7156..7b78da2923 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -1,20 +1,39 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -23,16 +42,31 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot; +import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; +import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; +import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; +import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; +import org.opendaylight.controller.sal.core.spi.data.DOMStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -40,10 +74,13 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; public class DistributedDataStoreIntegrationTest { @@ -237,6 +274,38 @@ public class DistributedDataStoreIntegrationTest { }}; } + @Test + public void testSingleTransactionsWritesInQuickSuccession() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testSingleTransactionsWritesInQuickSuccession", "cars-1"); + + DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + doCommit(writeTx.ready()); + + writeTx = txChain.newWriteOnlyTransaction(); + + int nCars = 5; + for(int i = 0; i < nCars; i++) { + writeTx.write(CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + } + + doCommit(writeTx.ready()); + + Optional> optional = txChain.newReadOnlyTransaction().read( + CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); + + cleanup(dataStore); + }}; + } + private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly) throws Exception { new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ @@ -428,6 +497,8 @@ public class DistributedDataStoreIntegrationTest { CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); // Create the write Tx @@ -498,6 +569,8 @@ public class DistributedDataStoreIntegrationTest { CountDownLatch blockRecoveryLatch = new CountDownLatch(1); InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch); + InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence"); + DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); // Create the read-write Tx @@ -555,23 +628,24 @@ public class DistributedDataStoreIntegrationTest { }}; } - private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable { + private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName) throws Throwable { new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ - String testName = "testTransactionCommitFailureWithNoShardLeader"; String shardName = "default"; - // We don't want the shard to become the leader so prevent shard election from completing - // by setting the election timeout, which is based on the heartbeat interval, really high. + // We don't want the shard to become the leader so prevent shard elections. + datastoreContextBuilder.customRaftPolicyImplementation( + "org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy"); - datastoreContextBuilder.shardHeartbeatIntervalInMillis(30000); - datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS); - - // Set the leader election timeout low for the test. - - datastoreContextBuilder.shardLeaderElectionTimeout(1, TimeUnit.MILLISECONDS); + // The ShardManager uses the election timeout for FindPrimary so reset it low so it will timeout quickly. + datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1). + shardInitializationTimeout(200, TimeUnit.MILLISECONDS); DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName); + Object result = dataStore.getActorContext().executeOperation(dataStore.getActorContext().getShardManager(), + new FindLocalShard(shardName, true)); + assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound); + // Create the write Tx. final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction() : @@ -627,17 +701,16 @@ public class DistributedDataStoreIntegrationTest { @Test(expected=NoShardLeaderException.class) public void testWriteOnlyTransactionCommitFailureWithNoShardLeader() throws Throwable { datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true); - testTransactionCommitFailureWithNoShardLeader(true); + testTransactionCommitFailureWithNoShardLeader(true, "testWriteOnlyTransactionCommitFailureWithNoShardLeader"); } @Test(expected=NoShardLeaderException.class) public void testReadWriteTransactionCommitFailureWithNoShardLeader() throws Throwable { - testTransactionCommitFailureWithNoShardLeader(false); + testTransactionCommitFailureWithNoShardLeader(false, "testReadWriteTransactionCommitFailureWithNoShardLeader"); } @Test public void testTransactionAbort() throws Exception{ - System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest", "test-1"); @@ -819,29 +892,45 @@ public class DistributedDataStoreIntegrationTest { public void testCreateChainedTransactionsInQuickSuccession() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ DistributedDataStore dataStore = setupDistributedDataStore( - "testCreateChainedTransactionsInQuickSuccession", "test-1"); + "testCreateChainedTransactionsInQuickSuccession", "cars-1"); - DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder().put( + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); - NormalizedNode testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + List> futures = new ArrayList<>(); - int nTxs = 20; - List cohorts = new ArrayList<>(nTxs); - for(int i = 0; i < nTxs; i++) { - DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer()); + writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + futures.add(writeTx.submit()); - rwTx.merge(TestModel.TEST_PATH, testNode); + int nCars = 100; + for(int i = 0; i < nCars; i++) { + DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); - cohorts.add(rwTx.ready()); + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i), + CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000))); + futures.add(rwTx.submit()); } - for(DOMStoreThreePhaseCommitCohort cohort: cohorts) { - doCommit(cohort); + for(CheckedFuture f: futures) { + f.checkedGet(); } + Optional> optional = txChain.newReadOnlyTransaction().read( + LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("# cars", nCars, ((Collection)optional.get().getValue()).size()); + txChain.close(); + broker.close(); + cleanup(dataStore); }}; } @@ -905,6 +994,127 @@ public class DistributedDataStoreIntegrationTest { }}; } + @Test + public void testChainWithReadOnlyTxAfterPreviousReady() throws Throwable { + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testChainWithReadOnlyTxAfterPreviousReady", "test-1"); + + final DOMStoreTransactionChain txChain = dataStore.createTransactionChain(); + + // Create a write tx and submit. + + DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready(); + + // Create read-only tx's and issue a read. + + CheckedFuture>, ReadFailedException> readFuture1 = + txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH); + + CheckedFuture>, ReadFailedException> readFuture2 = + txChain.newReadOnlyTransaction().read(TestModel.TEST_PATH); + + // Create another write tx and issue the write. + + DOMStoreWriteTransaction writeTx2 = txChain.newWriteOnlyTransaction(); + writeTx2.write(TestModel.OUTER_LIST_PATH, + ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + // Ensure the reads succeed. + + assertEquals("isPresent", true, readFuture1.checkedGet(5, TimeUnit.SECONDS).isPresent()); + assertEquals("isPresent", true, readFuture2.checkedGet(5, TimeUnit.SECONDS).isPresent()); + + // Ensure the writes succeed. + + DOMStoreThreePhaseCommitCohort cohort2 = writeTx2.ready(); + + doCommit(cohort1); + doCommit(cohort2); + + assertEquals("isPresent", true, txChain.newReadOnlyTransaction().read(TestModel.OUTER_LIST_PATH). + checkedGet(5, TimeUnit.SECONDS).isPresent()); + }}; + } + + @Test + public void testChainedTransactionFailureWithSingleShard() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testChainedTransactionFailureWithSingleShard", "cars-1"); + + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder().put( + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction(); + + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + + try { + rwTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } + + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(rwTx), any(Throwable.class)); + + txChain.close(); + broker.close(); + cleanup(dataStore); + }}; + } + + @Test + public void testChainedTransactionFailureWithMultipleShards() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + DistributedDataStore dataStore = setupDistributedDataStore( + "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1"); + + ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker( + ImmutableMap.builder().put( + LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor()); + + TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); + DOMTransactionChain txChain = broker.createTransactionChain(listener); + + DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + + writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); + + ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build(); + + // Note that merge will validate the data and fail but put succeeds b/c deep validation is not + // done for put for performance reasons. + writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); + + try { + writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + fail("Expected TransactionCommitFailedException"); + } catch (TransactionCommitFailedException e) { + // Expected + } + + verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class)); + + txChain.close(); + broker.close(); + cleanup(dataStore); + }}; + } + @Test public void testChangeListenerRegistration() throws Exception{ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ @@ -953,4 +1163,52 @@ public class DistributedDataStoreIntegrationTest { cleanup(dataStore); }}; } + + @Test + public void testRestoreFromDatastoreSnapshot() throws Exception{ + new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{ + String name = "transactionIntegrationTest"; + + ContainerNode carsNode = CarsModel.newCarsNode(CarsModel.newCarsMapNode( + CarsModel.newCarEntry("optima", BigInteger.valueOf(20000L)), + CarsModel.newCarEntry("sportage", BigInteger.valueOf(30000L)))); + + ShardDataTree dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL); + AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode); + NormalizedNode root = AbstractShardTest.readStore(dataTree.getDataTree(), + YangInstanceIdentifier.builder().build()); + + Snapshot carsSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1"); + + NormalizedNode peopleNode = PeopleModel.create(); + dataTree = new ShardDataTree(SchemaContextHelper.full(), TreeType.OPERATIONAL); + AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode); + root = AbstractShardTest.readStore(dataTree.getDataTree(), YangInstanceIdentifier.builder().build()); + + Snapshot peopleSnapshot = Snapshot.create(SerializationUtils.serializeNormalizedNode(root), + Collections.emptyList(), 2, 1, 2, 1, 1, "member-1"); + + restoreFromSnapshot = new DatastoreSnapshot(name, null, Arrays.asList( + new DatastoreSnapshot.ShardSnapshot("cars", + org.apache.commons.lang3.SerializationUtils.serialize(carsSnapshot)), + new DatastoreSnapshot.ShardSnapshot("people", + org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot)))); + + DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf", + true, "cars", "people"); + + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + + Optional> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", carsNode, optional.get()); + + optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS); + assertEquals("isPresent", true, optional.isPresent()); + assertEquals("Data node", peopleNode, optional.get()); + + cleanup(dataStore); + }}; + } }