Bug 2650: Fix ConcurrentModificationEx in TransactionProxy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
index f28a1e5f73d4823fe31ba20d14fa4d986f9700da..d79cd6f69f4b0e3e4f1171a332b45c36adbbd515 100644 (file)
@@ -21,6 +21,8 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -565,15 +567,19 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
          * Adds a TransactionOperation to be executed after the CreateTransaction completes.
          */
         void addTxOperationOnComplete(TransactionOperation operation) {
+            boolean invokeOperation = true;
             synchronized(txOperationsOnComplete) {
                 if(transactionContext == null) {
                     LOG.debug("Tx {} Adding operation on complete {}", identifier);
 
+                    invokeOperation = false;
                     txOperationsOnComplete.add(operation);
-                } else {
-                    operation.invoke(transactionContext);
                 }
             }
+
+            if(invokeOperation) {
+                operation.invoke(transactionContext);
+            }
         }
 
 
@@ -678,43 +684,63 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
                 }
             }
 
-            // Create the TransactionContext from the response or failure and execute delayed
-            // TransactionOperations. This entire section is done atomically (ie synchronized) with
-            // respect to #addTxOperationOnComplete to handle timing issues and ensure no
-            // TransactionOperation is missed and that they are processed in the order they occurred.
-            synchronized(txOperationsOnComplete) {
-                // Store the new TransactionContext locally until we've completed invoking the
-                // TransactionOperations. This avoids thread timing issues which could cause
-                // out-of-order TransactionOperations. Eg, on a modification operation, if the
-                // TransactionContext is non-null, then we directly call the TransactionContext.
-                // However, at the same time, the code may be executing the cached
-                // TransactionOperations. So to avoid thus timing, we don't publish the
-                // TransactionContext until after we've executed all cached TransactionOperations.
-                TransactionContext localTransactionContext;
-                if(failure != null) {
-                    LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
-                            failure.getMessage());
-
-                    localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
-                } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                    localTransactionContext = createValidTransactionContext(
-                            CreateTransactionReply.fromSerializable(response));
-                } else {
-                    IllegalArgumentException exception = new IllegalArgumentException(String.format(
+            // Create the TransactionContext from the response or failure. Store the new
+            // TransactionContext locally until we've completed invoking the
+            // TransactionOperations. This avoids thread timing issues which could cause
+            // out-of-order TransactionOperations. Eg, on a modification operation, if the
+            // TransactionContext is non-null, then we directly call the TransactionContext.
+            // However, at the same time, the code may be executing the cached
+            // TransactionOperations. So to avoid thus timing, we don't publish the
+            // TransactionContext until after we've executed all cached TransactionOperations.
+            TransactionContext localTransactionContext;
+            if(failure != null) {
+                LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
+                        failure.getMessage());
+
+                localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
+            } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+                localTransactionContext = createValidTransactionContext(
+                        CreateTransactionReply.fromSerializable(response));
+            } else {
+                IllegalArgumentException exception = new IllegalArgumentException(String.format(
                         "Invalid reply type %s for CreateTransaction", response.getClass()));
 
-                    localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+                localTransactionContext = new NoOpTransactionContext(exception, identifier, operationLimiter);
+            }
+
+            executeTxOperatonsOnComplete(localTransactionContext);
+        }
+
+        private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
+            while(true) {
+                // Access to txOperationsOnComplete and transactionContext must be protected and atomic
+                // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
+                // issues and ensure no TransactionOperation is missed and that they are processed
+                // in the order they occurred.
+
+                // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
+                // in case a TransactionOperation results in another transaction operation being
+                // queued (eg a put operation from a client read Future callback that is notified
+                // synchronously).
+                Collection<TransactionOperation> operationsBatch = null;
+                synchronized(txOperationsOnComplete) {
+                    if(txOperationsOnComplete.isEmpty()) {
+                        // We're done invoking the TransactionOperations so we can now publish the
+                        // TransactionContext.
+                        transactionContext = localTransactionContext;
+                        break;
+                    }
+
+                    operationsBatch = new ArrayList<>(txOperationsOnComplete);
+                    txOperationsOnComplete.clear();
                 }
 
-                for(TransactionOperation oper: txOperationsOnComplete) {
+                // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
+                // A slight down-side is that we need to re-acquire the lock below but this should
+                // be negligible.
+                for(TransactionOperation oper: operationsBatch) {
                     oper.invoke(localTransactionContext);
                 }
-
-                txOperationsOnComplete.clear();
-
-                // We're done invoking the TransactionOperations so we can now publish the
-                // TransactionContext.
-                transactionContext = localTransactionContext;
             }
         }