* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.ovsdb.southbound.transactions.md;
import com.google.common.util.concurrent.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class TransactionInvokerImpl implements TransactionInvoker,TransactionChainListener, Runnable, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionInvokerImpl.class);
private static final int QUEUE_SIZE = 10000;
- private BindingTransactionChain chain;
+
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue
- = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue
- = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<ReadWriteTransaction> successfulTransactionQueue =
+ new LinkedBlockingQueue<>(QUEUE_SIZE);
private final ExecutorService executor;
- private Map<ReadWriteTransaction,TransactionCommand> transactionToCommand
- = new HashMap<>();
- private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+
private final AtomicBoolean runTask = new AtomicBoolean(true);
- public TransactionInvokerImpl(DataBroker db) {
+ private Map<ReadWriteTransaction, TransactionCommand> transactionToCommand = new HashMap<>();
+ private List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
+ private BindingTransactionChain chain;
+
+ public TransactionInvokerImpl(final DataBroker db) {
this.db = db;
this.chain = db.createTransactionChain(this);
ThreadFactory threadFact = new ThreadFactoryBuilder().setNameFormat("transaction-invoker-impl-%d").build();
}
@Override
- public void onTransactionChainFailed(TransactionChain<?, ?> chainArg,
- AsyncTransaction<?, ?> transaction, Throwable cause) {
+ public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
+ final AsyncTransaction<?, ?> transaction, final Throwable cause) {
LOG.error("Failed to write operational topology", cause);
offerFailedTransaction(transaction);
}
@Override
- public void onTransactionChainSuccessful(TransactionChain<?, ?> chainArg) {
+ public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
// NO OP
}
}
}
- private void offerFailedTransaction(AsyncTransaction<?, ?> transaction) {
+ private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
}
successfulTransactionQueue.clear();
}
- private void recordPendingTransaction(TransactionCommand command,
+ private void recordPendingTransaction(final TransactionCommand command,
final ReadWriteTransaction transaction) {
transactionToCommand.put(transaction, command);
pendingTransactions.add(transaction);
private List<TransactionCommand> extractCommandsFromQueue() throws InterruptedException {
List<TransactionCommand> result = new ArrayList<>();
TransactionCommand command = inputQueue.take();
- while (command != null) {
- result.add(command);
- command = inputQueue.poll();
- }
+ result.add(command);
+ inputQueue.drainTo(result);
return result;
}