From: Robert Varga Date: Thu, 5 Dec 2019 20:52:08 +0000 (+0100) Subject: Speed up inputQueue interaction X-Git-Tag: release/neon-sr3~2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=c6337f51aaa8097bc16fac345315c47d3e28b882;p=ovsdb.git Speed up inputQueue interaction The while() loop here is an open-coded Queue.drainTo(). Read the API and lead a happy life (knowing the thread contention is much lower now). This should help with our ability to drain the input queue more quickly as we will have less cacheline thrashing. JIRA: OVSDB-428 Change-Id: I53f3b24fb354dd0b727de26cc55890a70994ae8f Signed-off-by: Robert Varga --- diff --git a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java index cca624d29..201e6b815 100644 --- a/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java +++ b/hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transactions/md/TransactionInvokerImpl.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md; import com.google.common.util.concurrent.FutureCallback; @@ -40,22 +39,22 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha Thread.UncaughtExceptionHandler { private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class); private static final int QUEUE_SIZE = 10000; - private BindingTransactionChain chain; + private final DataBroker db; private final BlockingQueue inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final BlockingQueue successfulTransactionQueue - = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final BlockingQueue> failedTransactionQueue - = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final BlockingQueue> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final BlockingQueue successfulTransactionQueue = + new LinkedBlockingQueue<>(QUEUE_SIZE); private final ExecutorService executor; - private Map transactionToCommand - = new HashMap<>(); + + private Map transactionToCommand = new HashMap<>(); private List pendingTransactions = new ArrayList<>(); + private BindingTransactionChain chain; //This is made volatile as it is accessed from uncaught exception handler thread also private volatile ReadWriteTransaction transactionInFlight = null; private Iterator commandIterator = null; - public TransactionInvokerImpl(DataBroker db) { + public TransactionInvokerImpl(final DataBroker db) { this.db = db; this.chain = db.createTransactionChain(this); ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d") @@ -75,13 +74,13 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha } @Override - public void onTransactionChainFailed(TransactionChain txChain, - AsyncTransaction transaction, Throwable cause) { + public void onTransactionChainFailed(final TransactionChain txChain, + final AsyncTransaction transaction, final Throwable cause) { offerFailedTransaction(transaction); } @Override - public void onTransactionChainSuccessful(TransactionChain txChain) { + public void onTransactionChainSuccessful(final TransactionChain txChain) { // NO OP } @@ -137,7 +136,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha } } - private void offerFailedTransaction(AsyncTransaction transaction) { + private void offerFailedTransaction(final AsyncTransaction transaction) { if (!failedTransactionQueue.offer(transaction)) { LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size()); } @@ -175,7 +174,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha successfulTransactionQueue.clear(); } - private void recordPendingTransaction(TransactionCommand command, + private void recordPendingTransaction(final TransactionCommand command, final ReadWriteTransaction transaction) { transactionToCommand.put(transaction, command); pendingTransactions.add(transaction); @@ -195,10 +194,8 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha private List extractCommandsFromQueue() throws InterruptedException { List result = new ArrayList<>(); TransactionCommand command = inputQueue.take(); - while (command != null) { - result.add(command); - command = inputQueue.poll(); - } + result.add(command); + inputQueue.drainTo(result); return result; } @@ -218,7 +215,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha } @Override - public void uncaughtException(Thread thread, Throwable ex) { + public void uncaughtException(final Thread thread, final Throwable ex) { LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex); if (transactionInFlight != null) { offerFailedTransaction(transactionInFlight); diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java index 46b793462..49472f75b 100644 --- a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java +++ b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/transactions/md/TransactionInvokerImpl.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.ovsdb.southbound.transactions.md; import com.google.common.util.concurrent.FutureCallback; @@ -32,24 +31,24 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListen import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class); private static final int QUEUE_SIZE = 10000; - private BindingTransactionChain chain; + private final DataBroker db; private final BlockingQueue inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final BlockingQueue successfulTransactionQueue - = new LinkedBlockingQueue<>(QUEUE_SIZE); - private final BlockingQueue> failedTransactionQueue - = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final BlockingQueue> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); + private final BlockingQueue successfulTransactionQueue = + new LinkedBlockingQueue<>(QUEUE_SIZE); private final ExecutorService executor; - private Map transactionToCommand - = new HashMap<>(); - private List pendingTransactions = new ArrayList<>(); + private final AtomicBoolean runTask = new AtomicBoolean(true); - public TransactionInvokerImpl(DataBroker db) { + private Map transactionToCommand = new HashMap<>(); + private List pendingTransactions = new ArrayList<>(); + private BindingTransactionChain chain; + + public TransactionInvokerImpl(final DataBroker db) { this.db = db; this.chain = db.createTransactionChain(this); ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build(); @@ -66,14 +65,14 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha } @Override - public void onTransactionChainFailed(TransactionChain chainArg, - AsyncTransaction transaction, Throwable cause) { + public void onTransactionChainFailed(final TransactionChain chainArg, + final AsyncTransaction transaction, final Throwable cause) { LOG.error("Failed to write operational topology", cause); offerFailedTransaction(transaction); } @Override - public void onTransactionChainSuccessful(TransactionChain chainArg) { + public void onTransactionChainSuccessful(final TransactionChain chainArg) { // NO OP } @@ -127,7 +126,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha } } - private void offerFailedTransaction(AsyncTransaction transaction) { + private void offerFailedTransaction(final AsyncTransaction transaction) { if (!failedTransactionQueue.offer(transaction)) { LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size()); } @@ -157,7 +156,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha successfulTransactionQueue.clear(); } - private void recordPendingTransaction(TransactionCommand command, + private void recordPendingTransaction(final TransactionCommand command, final ReadWriteTransaction transaction) { transactionToCommand.put(transaction, command); pendingTransactions.add(transaction); @@ -172,10 +171,8 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha private List extractCommandsFromQueue() throws InterruptedException { List result = new ArrayList<>(); TransactionCommand command = inputQueue.take(); - while (command != null) { - result.add(command); - command = inputQueue.poll(); - } + result.add(command); + inputQueue.drainTo(result); return result; }