Speed up inputQueue interaction
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
index 46b7934622a7023ac3c4d848c76186b2f89c4d05..49472f75b8b068ad88c86391a194274c3c8ff9cd 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.southbound.transactions.md;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -32,24 +31,24 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListen
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
     private static final int QUEUE_SIZE = 10000;
-    private BindingTransactionChain chain;
+
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
-        = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
-        = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
+            new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
-    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
-        = new HashMap<>();
-    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
     private final AtomicBoolean runTask = new AtomicBoolean(true);
 
-    public TransactionInvokerImpl(DataBroker db) {
+    private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    private BindingTransactionChain chain;
+
+    public TransactionInvokerImpl(final DataBroker db) {
         this.db = db;
         this.chain = db.createTransactionChain(this);
         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
@@ -66,14 +65,14 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @Override
-    public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
-            AsyncTransaction<?, ?> transaction, Throwable cause) {
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
+            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
         LOG.error("Failed to write operational topology", cause);
         offerFailedTransaction(transaction);
     }
 
     @Override
-    public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
         // NO OP
     }
 
@@ -127,7 +126,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         }
     }
 
-    private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
+    private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
         if (!failedTransactionQueue.offer(transaction)) {
             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
         }
@@ -157,7 +156,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(TransactionCommand command,
+    private void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -172,10 +171,8 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
         List<TransactionCommand> result = new ArrayList<>();
         TransactionCommand command = inputQueue.take();
-        while (command != null) {
-            result.add(command);
-            command = inputQueue.poll();
-        }
+        result.add(command);
+        inputQueue.drainTo(result);
         return result;
     }