Bug 2650: Fix ConcurrentModificationEx in TransactionProxy 91/14591/3
authortpantelis <tpanteli@brocade.com>
Fri, 23 Jan 2015 09:44:07 +0000 (04:44 -0500)
committertpantelis <tpanteli@brocade.com>
Thu, 29 Jan 2015 03:50:42 +0000 (22:50 -0500)
There were a couple patches proposed by other folks. Gary's patch
https://git.opendaylight.org/gerrit/#/c/14577/ to use a BlockingQueue
would work and is slightly simpler code but I prefer to avoid the unneeded synchronization
overhead that BlockingQueue carries as we still need explicit
synchronization for atomic access to txOperationsOnComplete and
transactionContext.

I implemented something similar to Robert's propopsal in
https://git.opendaylight.org/gerrit/#/c/14565/ .

I added a unit test that does a transaction put after an async read but
unfortunately it didn't reproduce the issue due to the behavior of the
akka dispatcher in the unit tests. The read future callback is batched
by the dispatcher even when using the CallingThreadDispatcher so the
synchronous read callback isn't achieved. Somehow the threading behavior
is different in the producton system. Regardless I kept the unit test
anyway - better than not haviing it.

Change-Id: I70f4e2507411c63cff99d03bf046c65e78a8138e
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index f28a1e5f73d4823fe31ba20d14fa4d986f9700da..d79cd6f69f4b0e3e4f1171a332b45c36adbbd515 100644 (file)
@@ -21,6 +21,8 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -565,15 +567,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
          */
         void addTxOperationOnComplete(TransactionOperation operation) {
          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
          */
         void addTxOperationOnComplete(TransactionOperation operation) {
+            boolean invokeOperation = true;
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
                     LOG.debug("Tx {} Adding operation on complete {}", identifier);
 
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
                     LOG.debug("Tx {} Adding operation on complete {}", identifier);
 
+                    invokeOperation = false;
                     txOperationsOnComplete.add(operation);
                     txOperationsOnComplete.add(operation);
-                } else {
-                    operation.invoke(transactionContext);
                 }
             }
                 }
             }
+
+            if(invokeOperation) {
+                operation.invoke(transactionContext);
+            }
         }
 
 
         }
 
 
@@ -678,43 +684,63 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
                 }
             }
 
-            // Create the TransactionContext from the response or failure and execute delayed
-            // TransactionOperations. This entire section is done atomically (ie synchronized) with
-            // respect to #addTxOperationOnComplete to handle timing issues and ensure no
-            // TransactionOperation is missed and that they are processed in the order they occurred.
-            synchronized(txOperationsOnComplete) {
-                // Store the new TransactionContext locally until we've completed invoking the
-                // TransactionOperations. This avoids thread timing issues which could cause
-                // out-of-order TransactionOperations. Eg, on a modification operation, if the
-                // TransactionContext is non-null, then we directly call the TransactionContext.
-                // However, at the same time, the code may be executing the cached
-                // TransactionOperations. So to avoid thus timing, we don't publish the
-                // TransactionContext until after we've executed all cached TransactionOperations.
-                TransactionContext localTransactionContext;
-                if(failure != null) {
-                    LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
-                            failure.getMessage());
-
-                    localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
-                } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                    localTransactionContext = createValidTransactionContext(
-                            CreateTransactionReply.fromSerializable(response));
-                } else {
-                    IllegalArgumentException exception = new IllegalArgumentException(String.format(
+            // Create the TransactionContext from the response or failure. Store the new
+            // TransactionContext locally until we've completed invoking the
+            // TransactionOperations. This avoids thread timing issues which could cause
+            // out-of-order TransactionOperations. Eg, on a modification operation, if the
+            // TransactionContext is non-null, then we directly call the TransactionContext.
+            // However, at the same time, the code may be executing the cached
+            // TransactionOperations. So to avoid thus timing, we don't publish the
+            // TransactionContext until after we've executed all cached TransactionOperations.
+            TransactionContext localTransactionContext;
+            if(failure != null) {
+                LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
+                        failure.getMessage());
+
+                localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
+            } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+                localTransactionContext = createValidTransactionContext(
+                        CreateTransactionReply.fromSerializable(response));
+            } else {
+                IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+                localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+            }
+
+            executeTxOperatonsOnComplete(localTransactionContext);
+        }
+
+        private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
+            while(true) {
+                // Access to txOperationsOnComplete and transactionContext must be protected and atomic
+                // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
+                // issues and ensure no TransactionOperation is missed and that they are processed
+                // in the order they occurred.
+
+                // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
+                // in case a TransactionOperation results in another transaction operation being
+                // queued (eg a put operation from a client read Future callback that is notified
+                // synchronously).
+                Collection<TransactionOperation> operationsBatch = null;
+                synchronized(txOperationsOnComplete) {
+                    if(txOperationsOnComplete.isEmpty()) {
+                        // We're done invoking the TransactionOperations so we can now publish the
+                        // TransactionContext.
+                        transactionContext = localTransactionContext;
+                        break;
+                    }
+
+                    operationsBatch = new ArrayList<>(txOperationsOnComplete);
+                    txOperationsOnComplete.clear();
                 }
 
                 }
 
-                for(TransactionOperation oper: txOperationsOnComplete) {
+                // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
+                // A slight down-side is that we need to re-acquire the lock below but this should
+                // be negligible.
+                for(TransactionOperation oper: operationsBatch) {
                     oper.invoke(localTransactionContext);
                 }
                     oper.invoke(localTransactionContext);
                 }
-
-                txOperationsOnComplete.clear();
-
-                // We're done invoking the TransactionOperations so we can now publish the
-                // TransactionContext.
-                transactionContext = localTransactionContext;
             }
         }
 
             }
         }
 
index 6d80bbb5b158eb2d962a87c0bb6bf08654be2bf8..9d8227a11b238b71b08133a8f8ea136188d07b93 100644 (file)
@@ -25,11 +25,15 @@ import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.util.List;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -72,6 +76,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
+import scala.concurrent.Promise;
 import scala.concurrent.duration.Duration;
 
 @SuppressWarnings("resource")
 import scala.concurrent.duration.Duration;
 
 @SuppressWarnings("resource")
@@ -394,8 +399,7 @@ public class TransactionProxyTest {
             .build();
     }
 
             .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
-            TransactionType type, int transactionVersion) {
+    private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -403,10 +407,6 @@ public class TransactionProxyTest {
         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
-        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
-                executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
-                        eqCreateTransaction(memberName, type));
-
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
         doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
 
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
@@ -414,6 +414,17 @@ public class TransactionProxyTest {
         return actorRef;
     }
 
         return actorRef;
     }
 
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
+            TransactionType type, int transactionVersion) {
+        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem);
+
+        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
+                        eqCreateTransaction(memberName, type));
+
+        return actorRef;
+    }
+
     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
     }
     private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
         return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION);
     }
@@ -772,6 +783,62 @@ public class TransactionProxyTest {
                 WriteDataReply.class);
     }
 
                 WriteDataReply.class);
     }
 
+    @Test
+    public void testWriteAfterAsyncRead() throws Throwable {
+        ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem());
+
+        Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
+        doReturn(createTxPromise).when(mockActorContext).executeOperationAsync(
+                eq(getSystem().actorSelection(actorRef.path())),
+                eqCreateTransaction(memberName, READ_WRITE));
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+        final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        final CountDownLatch readComplete = new CountDownLatch(1);
+        final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
+        com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH),
+                new  FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+                    @Override
+                    public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+                        try {
+                            transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+                        } catch (Exception e) {
+                            caughtEx.set(e);
+                        } finally {
+                            readComplete.countDown();
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        caughtEx.set(t);
+                        readComplete.countDown();
+                    }
+                });
+
+        createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION));
+
+        Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
+
+        if(caughtEx.get() != null) {
+            throw caughtEx.get();
+        }
+
+        verify(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.class);
+    }
+
     @Test(expected=IllegalStateException.class)
     public void testWritePreConditionCheck() {
 
     @Test(expected=IllegalStateException.class)
     public void testWritePreConditionCheck() {