The while() loop here is an open-coded Queue.drainTo(). Read the API
and lead a happy life (knowing the thread contention is much lower
now).
This should help with our ability to drain the input queue more
quickly as we will have less cacheline thrashing.
JIRA: OVSDB-428
Change-Id: I53f3b24fb354dd0b727de26cc55890a70994ae8f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
import com.google.common.util.concurrent.FutureCallback;
package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
import com.google.common.util.concurrent.FutureCallback;
Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
private static final int QUEUE_SIZE = 10000;
Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
private static final int QUEUE_SIZE = 10000;
- private BindingTransactionChain chain;
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
- = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
- = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
+ new LinkedBlockingQueue<>(QUEUE_SIZE);
private final ExecutorService executor;
private final ExecutorService executor;
- private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
- = new HashMap<>();
+
+ private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+ private BindingTransactionChain chain;
//This is made volatile as it is accessed from uncaught exception handler thread also
private volatile ReadWriteTransaction transactionInFlight = null;
private Iterator<TransactionCommand> commandIterator = null;
//This is made volatile as it is accessed from uncaught exception handler thread also
private volatile ReadWriteTransaction transactionInFlight = null;
private Iterator<TransactionCommand> commandIterator = null;
- public TransactionInvokerImpl(DataBroker db) {
+ public TransactionInvokerImpl(final DataBroker db) {
this.db = db;
this.chain = db.createTransactionChain(this);
ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
this.db = db;
this.chain = db.createTransactionChain(this);
ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
- public void onTransactionChainFailed(TransactionChain<?, ?> txChain,
- AsyncTransaction<?, ?> transaction, Throwable cause) {
+ public void onTransactionChainFailed(final TransactionChain<?, ?> txChain,
+ final AsyncTransaction<?, ?> transaction, final Throwable cause) {
offerFailedTransaction(transaction);
}
@Override
offerFailedTransaction(transaction);
}
@Override
- public void onTransactionChainSuccessful(TransactionChain<?, ?> txChain) {
+ public void onTransactionChainSuccessful(final TransactionChain<?, ?> txChain) {
- private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
+ private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
}
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
}
successfulTransactionQueue.clear();
}
successfulTransactionQueue.clear();
}
- private void recordPendingTransaction(TransactionCommand command,
+ private void recordPendingTransaction(final TransactionCommand command,
final ReadWriteTransaction transaction) {
transactionToCommand.put(transaction, command);
pendingTransactions.add(transaction);
final ReadWriteTransaction transaction) {
transactionToCommand.put(transaction, command);
pendingTransactions.add(transaction);
private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
List<TransactionCommand> result = new ArrayList<>();
TransactionCommand command = inputQueue.take();
private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
List<TransactionCommand> result = new ArrayList<>();
TransactionCommand command = inputQueue.take();
- while (command != null) {
- result.add(command);
- command = inputQueue.poll();
- }
+ result.add(command);
+ inputQueue.drainTo(result);
- public void uncaughtException(Thread thread, Throwable ex) {
+ public void uncaughtException(final Thread thread, final Throwable ex) {
LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
if (transactionInFlight != null) {
offerFailedTransaction(transactionInFlight);
LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
if (transactionInFlight != null) {
offerFailedTransaction(transactionInFlight);
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.ovsdb.southbound.transactions.md;
import com.google.common.util.concurrent.FutureCallback;
package org.opendaylight.ovsdb.southbound.transactions.md;
import com.google.common.util.concurrent.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
private static final int QUEUE_SIZE = 10000;
public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
private static final int QUEUE_SIZE = 10000;
- private BindingTransactionChain chain;
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
- = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
- = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
+ new LinkedBlockingQueue<>(QUEUE_SIZE);
private final ExecutorService executor;
private final ExecutorService executor;
- private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
- = new HashMap<>();
- private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
private final AtomicBoolean runTask = new AtomicBoolean(true);
private final AtomicBoolean runTask = new AtomicBoolean(true);
- public TransactionInvokerImpl(DataBroker db) {
+ private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+ private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+ private BindingTransactionChain chain;
+
+ public TransactionInvokerImpl(final DataBroker db) {
this.db = db;
this.chain = db.createTransactionChain(this);
ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
this.db = db;
this.chain = db.createTransactionChain(this);
ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
- public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
- AsyncTransaction<?, ?> transaction, Throwable cause) {
+ public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
+ final AsyncTransaction<?, ?> transaction, final Throwable cause) {
LOG.error("Failed to write operational topology", cause);
offerFailedTransaction(transaction);
}
@Override
LOG.error("Failed to write operational topology", cause);
offerFailedTransaction(transaction);
}
@Override
- public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
+ public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
- private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
+ private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
}
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
}
successfulTransactionQueue.clear();
}
successfulTransactionQueue.clear();
}
- private void recordPendingTransaction(TransactionCommand command,
+ private void recordPendingTransaction(final TransactionCommand command,
final ReadWriteTransaction transaction) {
transactionToCommand.put(transaction, command);
pendingTransactions.add(transaction);
final ReadWriteTransaction transaction) {
transactionToCommand.put(transaction, command);
pendingTransactions.add(transaction);
private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
List<TransactionCommand> result = new ArrayList<>();
TransactionCommand command = inputQueue.take();
private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
List<TransactionCommand> result = new ArrayList<>();
TransactionCommand command = inputQueue.take();
- while (command != null) {
- result.add(command);
- command = inputQueue.poll();
- }
+ result.add(command);
+ inputQueue.drainTo(result);