Speed up inputQueue interaction 72/86272/3
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 5 Dec 2019 20:52:08 +0000 (21:52 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 9 Dec 2019 09:50:14 +0000 (10:50 +0100)
The while() loop here is an open-coded Queue.drainTo(). Read the API
and lead a happy life (knowing the thread contention is much lower
now).

This should help with our ability to drain the input queue more
quickly as we will have less cacheline thrashing.

JIRA: OVSDB-428
Change-Id: I53f3b24fb354dd0b727de26cc55890a70994ae8f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java

index cca624d292b7452a8b3a2dd0e6b90e1104e3b457..201e6b8154230aef185a40b3933bd9370459f2c9 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -40,22 +39,22 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         Thread.UncaughtExceptionHandler {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
     private static final int QUEUE_SIZE = 10000;
-    private BindingTransactionChain chain;
+
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
-        = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
-        = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
+            new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
-    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
-        = new HashMap<>();
+
+    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
     private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    private BindingTransactionChain chain;
     //This is made volatile as it is accessed from uncaught exception handler thread also
     private volatile ReadWriteTransaction transactionInFlight = null;
     private Iterator<TransactionCommand> commandIterator = null;
 
-    public TransactionInvokerImpl(DataBroker db) {
+    public TransactionInvokerImpl(final DataBroker db) {
         this.db = db;
         this.chain = db.createTransactionChain(this);
         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
@@ -75,13 +74,13 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @Override
-    public void onTransactionChainFailed(TransactionChain<?, ?> txChain,
-            AsyncTransaction<?, ?> transaction, Throwable cause) {
+    public void onTransactionChainFailed(final TransactionChain<?, ?> txChain,
+            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
         offerFailedTransaction(transaction);
     }
 
     @Override
-    public void onTransactionChainSuccessful(TransactionChain<?, ?> txChain) {
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> txChain) {
         // NO OP
     }
 
@@ -137,7 +136,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         }
     }
 
-    private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
+    private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
         if (!failedTransactionQueue.offer(transaction)) {
             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
         }
@@ -175,7 +174,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(TransactionCommand command,
+    private void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -195,10 +194,8 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
         List<TransactionCommand> result = new ArrayList<>();
         TransactionCommand command = inputQueue.take();
-        while (command != null) {
-            result.add(command);
-            command = inputQueue.poll();
-        }
+        result.add(command);
+        inputQueue.drainTo(result);
         return result;
     }
 
@@ -218,7 +215,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @Override
-    public void uncaughtException(Thread thread, Throwable ex) {
+    public void uncaughtException(final Thread thread, final Throwable ex) {
         LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
         if (transactionInFlight != null) {
             offerFailedTransaction(transactionInFlight);
index 46b7934622a7023ac3c4d848c76186b2f89c4d05..49472f75b8b068ad88c86391a194274c3c8ff9cd 100644 (file)
@@ -5,7 +5,6 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.ovsdb.southbound.transactions.md;
 
 import com.google.common.util.concurrent.FutureCallback;
@@ -32,24 +31,24 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListen
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
     private static final int QUEUE_SIZE = 10000;
-    private BindingTransactionChain chain;
+
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
-        = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
-        = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
+            new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
-    private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
-        = new HashMap<>();
-    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
     private final AtomicBoolean runTask = new AtomicBoolean(true);
 
-    public TransactionInvokerImpl(DataBroker db) {
+    private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+    private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+    private BindingTransactionChain chain;
+
+    public TransactionInvokerImpl(final DataBroker db) {
         this.db = db;
         this.chain = db.createTransactionChain(this);
         ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
@@ -66,14 +65,14 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @Override
-    public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
-            AsyncTransaction<?, ?> transaction, Throwable cause) {
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
+            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
         LOG.error("Failed to write operational topology", cause);
         offerFailedTransaction(transaction);
     }
 
     @Override
-    public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
         // NO OP
     }
 
@@ -127,7 +126,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         }
     }
 
-    private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
+    private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
         if (!failedTransactionQueue.offer(transaction)) {
             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
         }
@@ -157,7 +156,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         successfulTransactionQueue.clear();
     }
 
-    private void recordPendingTransaction(TransactionCommand command,
+    private void recordPendingTransaction(final TransactionCommand command,
             final ReadWriteTransaction transaction) {
         transactionToCommand.put(transaction, command);
         pendingTransactions.add(transaction);
@@ -172,10 +171,8 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
         List<TransactionCommand> result = new ArrayList<>();
         TransactionCommand command = inputQueue.take();
-        while (command != null) {
-            result.add(command);
-            command = inputQueue.poll();
-        }
+        result.add(command);
+        inputQueue.drainTo(result);
         return result;
     }