Speed up inputQueue interaction
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transactions / md / TransactionInvokerImpl.java
index cca624d292b7452a8b3a2dd0e6b90e1104e3b457..201e6b8154230aef185a40b3933bd9370459f2c9 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -40,22 +39,22 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         Thread.UncaughtExceptionHandler {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
     private static final int QUEUE_SIZE = 10000;
-    private BindingTransactionChain chain;
+
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
-        = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
-        = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
+            new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
-    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
-        = new HashMap<>();
+
+    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    private BindingTransactionChain chain;
     //This is made volatile as it is accessed from uncaught exception handler thread also
     private volatile ReadWriteTransaction transactionInFlight = null;
     private Iterator<TransactionCommand> commandIterator = null;
 
-    public TransactionInvokerImpl(DataBroker db) {
+    public TransactionInvokerImpl(final DataBroker db) {
         this.db = db;
         this.chain = db.createTransactionChain(this);
         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
@@ -75,13 +74,13 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @Override
-    public void onTransactionChainFailed(TransactionChain<?, ?> txChain,
-            AsyncTransaction<?, ?> transaction, Throwable cause) {
+    public void onTransactionChainFailed(final TransactionChain<?, ?> txChain,
+            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
         offerFailedTransaction(transaction);
     }
 
     @Override
-    public void onTransactionChainSuccessful(TransactionChain<?, ?> txChain) {
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> txChain) {
         // NO OP
     }
 
@@ -137,7 +136,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         }
     }
 
-    private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
+    private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
         if (!failedTransactionQueue.offer(transaction)) {
             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
         }
@@ -175,7 +174,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(TransactionCommand command,
+    private void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -195,10 +194,8 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
         List<TransactionCommand> result = new ArrayList<>();
         TransactionCommand command = inputQueue.take();
-        while (command != null) {
-            result.add(command);
-            command = inputQueue.poll();
-        }
+        result.add(command);
+        inputQueue.drainTo(result);
         return result;
     }
 
@@ -218,7 +215,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @Override
-    public void uncaughtException(Thread thread, Throwable ex) {
+    public void uncaughtException(final Thread thread, final Throwable ex) {
         LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
         if (transactionInFlight != null) {
             offerFailedTransaction(transactionInFlight);