* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
import com.google.common.util.concurrent.FutureCallback;
Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
private static final int QUEUE_SIZE = 10000;
- private BindingTransactionChain chain;
+
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
- = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
- = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
+ new LinkedBlockingQueue<>(QUEUE_SIZE);
private final ExecutorService executor;
- private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
- = new HashMap<>();
+
+ private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand = new HashMap<>();
private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+ private BindingTransactionChain chain;
//This is made volatile as it is accessed from uncaught exception handler thread also
private volatile ReadWriteTransaction transactionInFlight = null;
private Iterator<TransactionCommand> commandIterator = null;
- public TransactionInvokerImpl(DataBroker db) {
+ public TransactionInvokerImpl(final DataBroker db) {
this.db = db;
this.chain = db.createTransactionChain(this);
ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d")
}
@Override
- public void onTransactionChainFailed(TransactionChain<?, ?> txChain,
- AsyncTransaction<?, ?> transaction, Throwable cause) {
+ public void onTransactionChainFailed(final TransactionChain<?, ?> txChain,
+ final AsyncTransaction<?, ?> transaction, final Throwable cause) {
offerFailedTransaction(transaction);
}
@Override
- public void onTransactionChainSuccessful(TransactionChain<?, ?> txChain) {
+ public void onTransactionChainSuccessful(final TransactionChain<?, ?> txChain) {
// NO OP
}
}
}
- private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
+ private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
}
successfulTransactionQueue.clear();
}
- private void recordPendingTransaction(TransactionCommand command,
+ private void recordPendingTransaction(final TransactionCommand command,
final ReadWriteTransaction transaction) {
transactionToCommand.put(transaction, command);
pendingTransactions.add(transaction);
private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
List<TransactionCommand> result = new ArrayList<>();
TransactionCommand command = inputQueue.take();
- while (command != null) {
- result.add(command);
- command = inputQueue.poll();
- }
+ result.add(command);
+ inputQueue.drainTo(result);
return result;
}
}
@Override
- public void uncaughtException(Thread thread, Throwable ex) {
+ public void uncaughtException(final Thread thread, final Throwable ex) {
LOG.error("Failed to execute hwvtep transact command, re-submitting the transaction again", ex);
if (transactionInFlight != null) {
offerFailedTransaction(transactionInFlight);