Bug 2337: Fix Tx already sealed failure on Tx commit 81/12581/2
authortpantelis <tpanteli@brocade.com>
Sat, 1 Nov 2014 21:55:34 +0000 (17:55 -0400)
committertpantelis <tpanteli@brocade.com>
Sun, 2 Nov 2014 02:53:16 +0000 (22:53 -0400)
Details outlined in bug 2337.

Moved the publishing of the TransactonContext instance in
TransactionFutureCallback#onComplete after the cached operations are
executed.

Also fixed a timing issue in
DistributedDataStoreIntegrationTest#testChangeListenerRegistration that
caused intermittent failures. This was an issue with the test. The
listener registration is done async in the shard so the notification for
the first write commit could occur in the initial notification on
registration which screwed up the test. So I moved the first write
commit before the registration so we expect an initial notification.

Change-Id: Ied2af93be8165208b853c48e57312c3a5acbdaea
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java

index 40880d907580bbaec6a81756d07b3b8b37210ade..d93bae22e08d9fddb3f1ac7d9c12aa4a278d0ff6 100644 (file)
@@ -236,6 +236,18 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         return recordedOperationFutures;
     }
 
+    @VisibleForTesting
+    boolean hasTransactionContext() {
+        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+            if(transactionContext != null) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
             final YangInstanceIdentifier path) {
@@ -640,29 +652,42 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // respect to #addTxOperationOnComplete to handle timing issues and ensure no
             // TransactionOperation is missed and that they are processed in the order they occurred.
             synchronized(txOperationsOnComplete) {
+                // Store the new TransactionContext locally until we've completed invoking the
+                // TransactionOperations. This avoids thread timing issues which could cause
+                // out-of-order TransactionOperations. Eg, on a modification operation, if the
+                // TransactionContext is non-null, then we directly call the TransactionContext.
+                // However, at the same time, the code may be executing the cached
+                // TransactionOperations. So to avoid thus timing, we don't publish the
+                // TransactionContext until after we've executed all cached TransactionOperations.
+                TransactionContext localTransactionContext;
                 if(failure != null) {
                     LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
                             failure.getMessage());
 
-                    transactionContext = new NoOpTransactionContext(failure, identifier);
+                    localTransactionContext = new NoOpTransactionContext(failure, identifier);
                 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                    createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
+                    localTransactionContext = createValidTransactionContext(
+                            CreateTransactionReply.fromSerializable(response));
                 } else {
                     IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    transactionContext = new NoOpTransactionContext(exception, identifier);
+                    localTransactionContext = new NoOpTransactionContext(exception, identifier);
                 }
 
                 for(TransactionOperation oper: txOperationsOnComplete) {
-                    oper.invoke(transactionContext);
+                    oper.invoke(localTransactionContext);
                 }
 
                 txOperationsOnComplete.clear();
+
+                // We're done invoking the TransactionOperations so we can now publish the
+                // TransactionContext.
+                transactionContext = localTransactionContext;
             }
         }
 
-        private void createValidTransactionContext(CreateTransactionReply reply) {
+        private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
             String transactionPath = reply.getTransactionPath();
 
             LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
@@ -683,7 +708,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
 
-            transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+            return new TransactionContextImpl(transactionPath, transactionActor, identifier,
                 actorContext, schemaContext, isTxActorLocal, reply.getVersion());
         }
     }
index 25915b198ca3cda368a92771356aaaca92ed57e4..12c566d33de786db62bfad3ec86a56939edbe1c2 100644 (file)
@@ -77,10 +77,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
 
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
+            doCommit(cohort);
 
             // Verify the data in the store
 
@@ -131,10 +128,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // 5. Commit the Tx
 
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
+            doCommit(cohort);
 
             // 6. Verify the data in the store
 
@@ -219,9 +213,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // Wait for the Tx commit to complete.
 
-            assertEquals("canCommit", true, txCohort.get().canCommit().get(5, TimeUnit.SECONDS));
-            txCohort.get().preCommit().get(5, TimeUnit.SECONDS);
-            txCohort.get().commit().get(5, TimeUnit.SECONDS);
+            doCommit(txCohort.get());
 
             // Verify the data in the store
 
@@ -552,10 +544,8 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testTransactionChain() throws Exception{
-        System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem()) {{
-            DistributedDataStore dataStore =
-                    setupDistributedDataStore("transactionChainIntegrationTest", "test-1");
+            DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
 
             // 1. Create a Tx chain and write-only Tx
 
@@ -637,13 +627,6 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             assertEquals("Data node", outerNode, optional.get());
 
             cleanup(dataStore);
-        }
-
-        private void doCommit(final DOMStoreThreePhaseCommitCohort cohort1) throws Exception {
-            Boolean canCommit = cohort1.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort1.preCommit().get(5, TimeUnit.SECONDS);
-            cohort1.commit().get(5, TimeUnit.SECONDS);
         }};
     }
 
@@ -653,7 +636,10 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             DistributedDataStore dataStore =
                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
 
-            MockDataChangeListener listener = new MockDataChangeListener(3);
+            testWriteTransaction(dataStore, TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            MockDataChangeListener listener = new MockDataChangeListener(1);
 
             ListenerRegistration<MockDataChangeListener>
                     listenerReg = dataStore.registerChangeListener(TestModel.TEST_PATH, listener,
@@ -661,8 +647,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             assertNotNull("registerChangeListener returned null", listenerReg);
 
-            testWriteTransaction(dataStore, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+            // Wait for the initial notification
+
+            listener.waitForChangeEvents(TestModel.TEST_PATH);
+
+            listener.reset(2);
+
+            // Write 2 updates.
 
             testWriteTransaction(dataStore, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
@@ -672,7 +663,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             testWriteTransaction(dataStore, listPath,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
 
-            listener.waitForChangeEvents(TestModel.TEST_PATH, TestModel.OUTER_LIST_PATH, listPath );
+            // Wait for the 2 updates.
+
+            listener.waitForChangeEvents(TestModel.OUTER_LIST_PATH, listPath);
 
             listenerReg.close();
 
@@ -747,10 +740,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
             // 4. Commit the Tx
 
-            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
+            doCommit(cohort);
 
             // 5. Verify the data in the store
 
@@ -761,6 +751,13 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             assertEquals("Data node", nodeToWrite, optional.get());
         }
 
+        void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+            Boolean canCommit = cohort.canCommit().get(5, TimeUnit.SECONDS);
+            assertEquals("canCommit", true, canCommit);
+            cohort.preCommit().get(5, TimeUnit.SECONDS);
+            cohort.commit().get(5, TimeUnit.SECONDS);
+        }
+
         void cleanup(DistributedDataStore dataStore) {
             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
         }
index f2f49d1bf3e21f048b393d5f5c14a6e9595e05d6..5bbdcae93c2241cd8b9e7a87613735b69841973d 100644 (file)
@@ -9,6 +9,10 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -16,8 +20,6 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 /**
  * A mock DataChangeListener implementation.
@@ -27,14 +29,21 @@ import com.google.common.util.concurrent.Uninterruptibles;
 public class MockDataChangeListener implements
                          AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> {
 
-    private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-                                                               changeList = Lists.newArrayList();
-    private final CountDownLatch changeLatch;
-    private final int expChangeEventCount;
+    private final List<AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>>> changeList =
+            Collections.synchronizedList(Lists.<AsyncDataChangeEvent<YangInstanceIdentifier,
+                                                NormalizedNode<?, ?>>>newArrayList());
+
+    private volatile CountDownLatch changeLatch;
+    private int expChangeEventCount;
 
     public MockDataChangeListener(int expChangeEventCount) {
+        reset(expChangeEventCount);
+    }
+
+    public void reset(int expChangeEventCount) {
         changeLatch = new CountDownLatch(expChangeEventCount);
         this.expChangeEventCount = expChangeEventCount;
+        changeList.clear();
     }
 
     @Override
@@ -44,8 +53,13 @@ public class MockDataChangeListener implements
     }
 
     public void waitForChangeEvents(YangInstanceIdentifier... expPaths) {
-        assertEquals("Change notifications complete", true,
-                Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS));
+        boolean done = Uninterruptibles.awaitUninterruptibly(changeLatch, 5, TimeUnit.SECONDS);
+        if(!done) {
+            fail(String.format("Missing change notifications. Expected: %d. Actual: %d",
+                    expChangeEventCount, (expChangeEventCount - changeLatch.getCount())));
+        }
+
+        assertEquals("Change notifications complete", true, done);
 
         for(int i = 0; i < expPaths.length; i++) {
             assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),