From: tpantelis Date: Fri, 23 Jan 2015 09:44:07 +0000 (-0500) Subject: Bug 2650: Fix ConcurrentModificationEx in TransactionProxy X-Git-Tag: release/lithium~647 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=166c432bc0611288abf2e13ef8f184cfbb2c101a;hp=37f0504d391efd8b7d61403759fcc22a6dd3a093 Bug 2650: Fix ConcurrentModificationEx in TransactionProxy There were a couple patches proposed by other folks. Gary's patch https://git.opendaylight.org/gerrit/#/c/14577/ to use a BlockingQueue would work and is slightly simpler code but I prefer to avoid the unneeded synchronization overhead that BlockingQueue carries as we still need explicit synchronization for atomic access to txOperationsOnComplete and transactionContext. I implemented something similar to Robert's propopsal in https://git.opendaylight.org/gerrit/#/c/14565/ . I added a unit test that does a transaction put after an async read but unfortunately it didn't reproduce the issue due to the behavior of the akka dispatcher in the unit tests. The read future callback is batched by the dispatcher even when using the CallingThreadDispatcher so the synchronous read callback isn't achieved. Somehow the threading behavior is different in the producton system. Regardless I kept the unit test anyway - better than not haviing it. Change-Id: I70f4e2507411c63cff99d03bf046c65e78a8138e Signed-off-by: tpantelis --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index f28a1e5f73..d79cd6f69f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -21,6 +21,8 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -565,15 +567,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { * Adds a TransactionOperation to be executed after the CreateTransaction completes. */ void addTxOperationOnComplete(TransactionOperation operation) { + boolean invokeOperation = true; synchronized(txOperationsOnComplete) { if(transactionContext == null) { LOG.debug("Tx {} Adding operation on complete {}", identifier); + invokeOperation = false; txOperationsOnComplete.add(operation); - } else { - operation.invoke(transactionContext); } } + + if(invokeOperation) { + operation.invoke(transactionContext); + } } @@ -678,43 +684,63 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } } - // Create the TransactionContext from the response or failure and execute delayed - // TransactionOperations. This entire section is done atomically (ie synchronized) with - // respect to #addTxOperationOnComplete to handle timing issues and ensure no - // TransactionOperation is missed and that they are processed in the order they occurred. - synchronized(txOperationsOnComplete) { - // Store the new TransactionContext locally until we've completed invoking the - // TransactionOperations. This avoids thread timing issues which could cause - // out-of-order TransactionOperations. Eg, on a modification operation, if the - // TransactionContext is non-null, then we directly call the TransactionContext. - // However, at the same time, the code may be executing the cached - // TransactionOperations. So to avoid thus timing, we don't publish the - // TransactionContext until after we've executed all cached TransactionOperations. - TransactionContext localTransactionContext; - if(failure != null) { - LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, - failure.getMessage()); - - localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); - } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { - localTransactionContext = createValidTransactionContext( - CreateTransactionReply.fromSerializable(response)); - } else { - IllegalArgumentException exception = new IllegalArgumentException(String.format( + // Create the TransactionContext from the response or failure. Store the new + // TransactionContext locally until we've completed invoking the + // TransactionOperations. This avoids thread timing issues which could cause + // out-of-order TransactionOperations. Eg, on a modification operation, if the + // TransactionContext is non-null, then we directly call the TransactionContext. + // However, at the same time, the code may be executing the cached + // TransactionOperations. So to avoid thus timing, we don't publish the + // TransactionContext until after we've executed all cached TransactionOperations. + TransactionContext localTransactionContext; + if(failure != null) { + LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier, + failure.getMessage()); + + localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter); + } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { + localTransactionContext = createValidTransactionContext( + CreateTransactionReply.fromSerializable(response)); + } else { + IllegalArgumentException exception = new IllegalArgumentException(String.format( "Invalid reply type %s for CreateTransaction", response.getClass())); - localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter); + localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter); + } + + executeTxOperatonsOnComplete(localTransactionContext); + } + + private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) { + while(true) { + // Access to txOperationsOnComplete and transactionContext must be protected and atomic + // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing + // issues and ensure no TransactionOperation is missed and that they are processed + // in the order they occurred. + + // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy + // in case a TransactionOperation results in another transaction operation being + // queued (eg a put operation from a client read Future callback that is notified + // synchronously). + Collection operationsBatch = null; + synchronized(txOperationsOnComplete) { + if(txOperationsOnComplete.isEmpty()) { + // We're done invoking the TransactionOperations so we can now publish the + // TransactionContext. + transactionContext = localTransactionContext; + break; + } + + operationsBatch = new ArrayList<>(txOperationsOnComplete); + txOperationsOnComplete.clear(); } - for(TransactionOperation oper: txOperationsOnComplete) { + // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. + // A slight down-side is that we need to re-acquire the lock below but this should + // be negligible. + for(TransactionOperation oper: operationsBatch) { oper.invoke(localTransactionContext); } - - txOperationsOnComplete.clear(); - - // We're done invoking the TransactionOperations so we can now publish the - // TransactionContext. - transactionContext = localTransactionContext; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 6d80bbb5b1..9d8227a11b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -25,11 +25,15 @@ import akka.testkit.JavaTestKit; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.io.IOException; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -72,6 +76,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.Promise; import scala.concurrent.duration.Duration; @SuppressWarnings("resource") @@ -394,8 +399,7 @@ public class TransactionProxyTest { .build(); } - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, - TransactionType type, int transactionVersion) { + private ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); @@ -403,10 +407,6 @@ public class TransactionProxyTest { doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). - executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), - eqCreateTransaction(memberName, type)); - doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString()); doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit(); @@ -414,6 +414,17 @@ public class TransactionProxyTest { return actorRef; } + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, + TransactionType type, int transactionVersion) { + ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem); + + doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). + executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), + eqCreateTransaction(memberName, type)); + + return actorRef; + } + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION); } @@ -772,6 +783,62 @@ public class TransactionProxyTest { WriteDataReply.class); } + @Test + public void testWriteAfterAsyncRead() throws Throwable { + ActorRef actorRef = setupActorContextWithoutInitialCreateTransaction(getSystem()); + + Promise createTxPromise = akka.dispatch.Futures.promise(); + doReturn(createTxPromise).when(mockActorContext).executeOperationAsync( + eq(getSystem().actorSelection(actorRef.path())), + eqCreateTransaction(memberName, READ_WRITE)); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + final NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE); + + final CountDownLatch readComplete = new CountDownLatch(1); + final AtomicReference caughtEx = new AtomicReference<>(); + com.google.common.util.concurrent.Futures.addCallback(transactionProxy.read(TestModel.TEST_PATH), + new FutureCallback>>() { + @Override + public void onSuccess(Optional> result) { + try { + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + } catch (Exception e) { + caughtEx.set(e); + } finally { + readComplete.countDown(); + } + } + + @Override + public void onFailure(Throwable t) { + caughtEx.set(t); + readComplete.countDown(); + } + }); + + createTxPromise.success(createTransactionReply(actorRef, DataStoreVersions.CURRENT_VERSION)); + + Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS); + + if(caughtEx.get() != null) { + throw caughtEx.get(); + } + + verify(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.class); + } + @Test(expected=IllegalStateException.class) public void testWritePreConditionCheck() {