From: tpantelis Date: Sat, 1 Nov 2014 21:55:34 +0000 (-0400) Subject: Bug 2337: Fix Tx already sealed failure on Tx commit X-Git-Tag: release/lithium~944 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1a8ace932ef318601c17e0fa52b0a9dbd02e3005 Bug 2337: Fix Tx already sealed failure on Tx commit Details outlined in bug 2337. Moved the publishing of the TransactonContext instance in TransactionFutureCallback#onComplete after the cached operations are executed. Also fixed a timing issue in DistributedDataStoreIntegrationTest#testChangeListenerRegistration that caused intermittent failures. This was an issue with the test. The listener registration is done async in the shard so the notification for the first write commit could occur in the initial notification on registration which screwed up the test. So I moved the first write commit before the registration so we expect an initial notification. Change-Id: Ied2af93be8165208b853c48e57312c3a5acbdaea Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 40880d9075..d93bae22e0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -236,6 +236,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return recordedOperationFutures; } + @VisibleForTesting + boolean hasTransactionContext() { + for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) { + TransactionContext transactionContext = txFutureCallback.getTransactionContext(); + if(transactionContext != null) { + return true; + } + } + + return false; + } + @Override public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { @@ -640,29 +652,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // respect to #addTxOperationOnComplete to handle timing issues and ensure no // TransactionOperation is missed and that they are processed in the order they occurred. synchronized(txOperationsOnComplete) { + // Store the new TransactionContext locally until we've completed invoking the + // TransactionOperations. This avoids thread timing issues which could cause + // out-of-order TransactionOperations. Eg, on a modification operation, if the + // TransactionContext is non-null, then we directly call the TransactionContext. + // However, at the same time, the code may be executing the cached + // TransactionOperations. So to avoid thus timing, we don't publish the + // TransactionContext until after we've executed all cached TransactionOperations. + TransactionContext localTransactionContext; if(failure != null) { LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, failure.getMessage()); - transactionContext = new NoOpTransactionContext(failure, identifier); + localTransactionContext = new NoOpTransactionContext(failure, identifier); } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { - createValidTransactionContext(CreateTransactionReply.fromSerializable(response)); + localTransactionContext = createValidTransactionContext( + CreateTransactionReply.fromSerializable(response)); } else { IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - transactionContext = new NoOpTransactionContext(exception, identifier); + localTransactionContext = new NoOpTransactionContext(exception, identifier); } for(TransactionOperation oper: txOperationsOnComplete) { - oper.invoke(transactionContext); + oper.invoke(localTransactionContext); } txOperationsOnComplete.clear(); + + // We're done invoking the TransactionOperations so we can now publish the + // TransactionContext. + transactionContext = localTransactionContext; } } - private void createValidTransactionContext(CreateTransactionReply reply) { + private TransactionContext createValidTransactionContext(CreateTransactionReply reply) { String transactionPath = reply.getTransactionPath(); LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath); @@ -683,7 +708,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Check if TxActor is created in the same node boolean isTxActorLocal = actorContext.isLocalPath(transactionPath); - transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier, + return new TransactionContextImpl(transactionPath, transactionActor, identifier, actorContext, schemaContext, isTxActorLocal, reply.getVersion()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java index 25915b198c..12c566d33d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java @@ -77,10 +77,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { DOMStoreThreePhaseCommitCohort cohort = writeTx.ready(); - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // Verify the data in the store @@ -131,10 +128,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 5. Commit the Tx - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // 6. Verify the data in the store @@ -219,9 +213,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // Wait for the Tx commit to complete. - assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS)); - txCohort.get().preCommit().get(5, TimeUnit.SECONDS); - txCohort.get().commit().get(5, TimeUnit.SECONDS); + doCommit(txCohort.get()); // Verify the data in the store @@ -552,10 +544,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { @Test public void testTransactionChain() throws Exception{ - System.setProperty("shard.persistent", "true"); new IntegrationTestKit(getSystem()) {{ - DistributedDataStore dataStore = - setupDistributedDataStore("transactionChainIntegrationTest", "test-1"); + DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1"); // 1. Create a Tx chain and write-only Tx @@ -637,13 +627,6 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { assertEquals("Data node", outerNode, optional.get()); cleanup(dataStore); - } - - private void doCommit(final DOMStoreThreePhaseCommitCohort cohort1) throws Exception { - Boolean canCommit = cohort1.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort1.preCommit().get(5, TimeUnit.SECONDS); - cohort1.commit().get(5, TimeUnit.SECONDS); }}; } @@ -653,7 +636,10 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration", "test-1"); - MockDataChangeListener listener = new MockDataChangeListener(3); + testWriteTransaction(dataStore, TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + + MockDataChangeListener listener = new MockDataChangeListener(1); ListenerRegistration listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener, @@ -661,8 +647,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { assertNotNull("registerChangeListener returned null", listenerReg); - testWriteTransaction(dataStore, TestModel.TEST_PATH, - ImmutableNodes.containerNode(TestModel.TEST_QNAME)); + // Wait for the initial notification + + listener.waitForChangeEvents(TestModel.TEST_PATH); + + listener.reset(2); + + // Write 2 updates. testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); @@ -672,7 +663,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { testWriteTransaction(dataStore, listPath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)); - listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath ); + // Wait for the 2 updates. + + listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath); listenerReg.close(); @@ -747,10 +740,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { // 4. Commit the Tx - Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); - assertEquals("canCommit", true, canCommit); - cohort.preCommit().get(5, TimeUnit.SECONDS); - cohort.commit().get(5, TimeUnit.SECONDS); + doCommit(cohort); // 5. Verify the data in the store @@ -761,6 +751,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest { assertEquals("Data node", nodeToWrite, optional.get()); } + void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception { + Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS); + assertEquals("canCommit", true, canCommit); + cohort.preCommit().get(5, TimeUnit.SECONDS); + cohort.commit().get(5, TimeUnit.SECONDS); + } + void cleanup(DistributedDataStore dataStore) { dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java index f2f49d1bf3..5bbdcae93c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java @@ -9,6 +9,10 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -16,8 +20,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Uninterruptibles; /** * A mock DataChangeListener implementation. @@ -27,14 +29,21 @@ import com.google.common.util.concurrent.Uninterruptibles; public class MockDataChangeListener implements AsyncDataChangeListener> { - private final List>> - changeList = Lists.newArrayList(); - private final CountDownLatch changeLatch; - private final int expChangeEventCount; + private final List>> changeList = + Collections.synchronizedList(Lists.>>newArrayList()); + + private volatile CountDownLatch changeLatch; + private int expChangeEventCount; public MockDataChangeListener(int expChangeEventCount) { + reset(expChangeEventCount); + } + + public void reset(int expChangeEventCount) { changeLatch = new CountDownLatch(expChangeEventCount); this.expChangeEventCount = expChangeEventCount; + changeList.clear(); } @Override @@ -44,8 +53,13 @@ public class MockDataChangeListener implements } public void waitForChangeEvents(YangInstanceIdentifier... expPaths) { - assertEquals("Change notifications complete", true, - Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS)); + boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS); + if(!done) { + fail(String.format("Missing change notifications. Expected: %d. Actual: %d", + expChangeEventCount, (expChangeEventCount - changeLatch.getCount()))); + } + + assertEquals("Change notifications complete", true, done); for(int i = 0; i < expPaths.length; i++) { assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),