Eliminate TransactionInvokerImpl.successfulTransactionQueue
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
index 49472f75b8b068ad88c86391a194274c3c8ff9cd..e7fc86de8d7c863593fe7b6662646ca4af050e68 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.ovsdb.southbound.transactions.md;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -22,6 +23,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -38,14 +40,15 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
-            new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
 
     private final AtomicBoolean runTask = new AtomicBoolean(true);
 
-    private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
-    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    @GuardedBy("this")
+    private final Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+    @GuardedBy("this")
+    private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
     private BindingTransactionChain chain;
 
     public TransactionInvokerImpl(final DataBroker db) {
@@ -79,9 +82,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     @Override
     public void run() {
         while (runTask.get()) {
-            forgetSuccessfulTransactions();
-
-            List<TransactionCommand> commands = null;
+            final List<TransactionCommand> commands;
             try {
                 commands = extractCommands();
             } catch (InterruptedException e) {
@@ -92,26 +93,25 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
             ReadWriteTransaction transactionInFlight = null;
             try {
                 for (TransactionCommand command: commands) {
-                    final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
-                    transactionInFlight = transaction;
-                    recordPendingTransaction(command, transaction);
-                    command.execute(transaction);
-                    Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
-                        @Override
-                        public void onSuccess(final Void result) {
-                            if (!successfulTransactionQueue.offer(transaction)) {
-                                LOG.error("successfulTransactionQueue is full (size: {}) - could not offer {}",
-                                        successfulTransactionQueue.size(), transaction);
+                    synchronized (this) {
+                        final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+                        transactionInFlight = transaction;
+                        recordPendingTransaction(command, transaction);
+                        command.execute(transaction);
+                        Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+                            @Override
+                            public void onSuccess(final Void result) {
+                                forgetSuccessfulTransaction(transaction);
+                                command.onSuccess();
                             }
-                            command.onSuccess();
-                        }
-
-                        @Override
-                        public void onFailure(final Throwable throwable) {
-                            command.onFailure(throwable);
-                            // NOOP - handled by failure of transaction chain
-                        }
-                    }, MoreExecutors.directExecutor());
+
+                            @Override
+                            public void onFailure(final Throwable throwable) {
+                                command.onFailure(throwable);
+                                // NOOP - handled by failure of transaction chain
+                            }
+                        }, MoreExecutors.directExecutor());
+                    }
                 }
             } catch (IllegalStateException e) {
                 if (transactionInFlight != null) {
@@ -132,7 +132,8 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         }
     }
 
-    private List<TransactionCommand> extractResubmitCommands() {
+    @VisibleForTesting
+    synchronized List<TransactionCommand> extractResubmitCommands() {
         AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
         List<TransactionCommand> commands = new ArrayList<>();
         if (transaction != null) {
@@ -147,16 +148,21 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return commands;
     }
 
-    private void resetTransactionQueue() {
+    @VisibleForTesting
+    synchronized void resetTransactionQueue() {
         chain.close();
         chain = db.createTransactionChain(this);
-        pendingTransactions = new ArrayList<>();
-        transactionToCommand = new HashMap<>();
+        pendingTransactions.clear();
+        transactionToCommand.clear();
         failedTransactionQueue.clear();
-        successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(final TransactionCommand command,
+    synchronized void forgetSuccessfulTransaction(final ReadWriteTransaction transaction) {
+        pendingTransactions.remove(transaction);
+        transactionToCommand.remove(transaction);
+    }
+
+    private synchronized void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -176,15 +182,6 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         return result;
     }
 
-    private void forgetSuccessfulTransactions() {
-        ReadWriteTransaction transaction = successfulTransactionQueue.poll();
-        while (transaction != null) {
-            pendingTransactions.remove(transaction);
-            transactionToCommand.remove(transaction);
-            transaction = successfulTransactionQueue.poll();
-        }
-    }
-
     @Override
     public void close() throws InterruptedException {
         this.chain.close();