Eliminate TransactionInvokerImpl.successfulTransactionQueue
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transactions / md / TransactionInvokerImpl.java
index 201e6b8154230aef185a40b3933bd9370459f2c9..e9d2dc348a3040ddf407ee3c77b273dcdd1236e1 100644 (file)
@@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -43,12 +44,13 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
-            new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
 
-    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
-    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    @GuardedBy("this")
+    private final Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
+    @GuardedBy("this")
+    private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
     private BindingTransactionChain chain;
     //This is made volatile as it is accessed from uncaught exception handler thread also
     private volatile ReadWriteTransaction transactionInFlight = null;
@@ -87,9 +89,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     @Override
     public void run() {
         while (true) {
-            forgetSuccessfulTransactions();
-
-            List<TransactionCommand> commands = null;
+            final List<TransactionCommand> commands;
             try {
                 commands = extractCommands();
             } catch (InterruptedException e) {
@@ -100,26 +100,25 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
             try {
                 while (commandIterator.hasNext()) {
                     TransactionCommand command = commandIterator.next();
-                    final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
-                    transactionInFlight = transaction;
-                    recordPendingTransaction(command, transaction);
-                    command.execute(transaction);
-                    ListenableFuture<Void> ft = transaction.submit();
-                    command.setTransactionResultFuture(ft);
-                    Futures.addCallback(ft, new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(final Void result) {
-                            if (!successfulTransactionQueue.offer(transaction)) {
-                                LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
-                                        successfulTransactionQueue.size(), transaction);
+                    synchronized (this) {
+                        final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+                        transactionInFlight = transaction;
+                        recordPendingTransaction(command, transaction);
+                        command.execute(transaction);
+                        ListenableFuture<Void> ft = transaction.submit();
+                        command.setTransactionResultFuture(ft);
+                        Futures.addCallback(ft, new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(final Void result) {
+                                forgetSuccessfulTransaction(transaction);
                             }
-                        }
 
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            // NOOP - handled by failure of transaction chain
-                        }
-                    }, MoreExecutors.directExecutor());
+                            @Override
+                            public void onFailure(final Throwable throwable) {
+                                // NOOP - handled by failure of transaction chain
+                            }
+                        }, MoreExecutors.directExecutor());
+                    }
                 }
                 transactionInFlight = null;
             } catch (IllegalStateException e) {
@@ -143,19 +142,21 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     private List<TransactionCommand> extractResubmitCommands() {
-        AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
         List<TransactionCommand> commands = new ArrayList<>();
-        if (transaction != null) {
-            int index = pendingTransactions.lastIndexOf(transaction);
-            //This logic needs to be revisited. Is it ok to resubmit these things again ?
-            //are these operations idempotent ?
-            //Does the transaction chain execute n+1th if nth one threw error ?
-            List<ReadWriteTransaction> transactions =
-                    pendingTransactions.subList(index, pendingTransactions.size() - 1);
-            for (ReadWriteTransaction tx: transactions) {
-                commands.add(transactionToCommand.get(tx));
+        synchronized (this) {
+            AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+            if (transaction != null) {
+                int index = pendingTransactions.lastIndexOf(transaction);
+                //This logic needs to be revisited. Is it ok to resubmit these things again ?
+                //are these operations idempotent ?
+                //Does the transaction chain execute n+1th if nth one threw error ?
+                List<ReadWriteTransaction> transactions =
+                        pendingTransactions.subList(index, pendingTransactions.size() - 1);
+                for (ReadWriteTransaction tx: transactions) {
+                    commands.add(transactionToCommand.get(tx));
+                }
+                resetTransactionQueue();
             }
-            resetTransactionQueue();
         }
         if (commandIterator != null) {
             while (commandIterator.hasNext()) {
@@ -168,13 +169,17 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private void resetTransactionQueue() {
         chain.close();
         chain = db.createTransactionChain(this);
-        pendingTransactions = new ArrayList<>();
-        transactionToCommand = new HashMap<>();
+        pendingTransactions.clear();
+        transactionToCommand.clear();
         failedTransactionQueue.clear();
-        successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(final TransactionCommand command,
+    synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
+        pendingTransactions.remove(transaction);
+        transactionToCommand.remove(transaction);
+    }
+
+    private synchronized void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -199,15 +204,6 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return result;
     }
 
-    private void forgetSuccessfulTransactions() {
-        ReadWriteTransaction transaction = successfulTransactionQueue.poll();
-        while (transaction != null) {
-            pendingTransactions.remove(transaction);
-            transactionToCommand.remove(transaction);
-            transaction = successfulTransactionQueue.poll();
-        }
-    }
-
     @Override
     public void close() throws Exception {
         this.chain.close();