*/
package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
+import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<Transaction> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final ExecutorService executor;
@GuardedBy("this")
@GuardedBy("this")
private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
- private BindingTransactionChain chain;
+ private TransactionChain chain;
//This is made volatile as it is accessed from uncaught exception handler thread also
private volatile ReadWriteTransaction transactionInFlight = null;
private Iterator<TransactionCommand> commandIterator = null;
}
@Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> txChain,
- final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+ public void onTransactionChainFailed(final TransactionChain txChain,
+ final Transaction transaction, final Throwable cause) {
offerFailedTransaction(transaction);
}
@Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> txChain) {
+ public void onTransactionChainSuccessful(final TransactionChain txChain) {
// NO OP
}
commandIterator = commands.iterator();
try {
while (commandIterator.hasNext()) {
- TransactionCommand command = commandIterator.next();
- synchronized (this) {
- final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
- transactionInFlight = transaction;
- recordPendingTransaction(command, transaction);
- command.execute(transaction);
- ListenableFuture<Void> ft = transaction.submit();
- command.setTransactionResultFuture(ft);
- Futures.addCallback(ft, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- forgetSuccessfulTransaction(transaction);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- // NOOP - handled by failure of transaction chain
- }
- }, MoreExecutors.directExecutor());
- }
+ executeCommand(commandIterator.next());
}
transactionInFlight = null;
} catch (IllegalStateException e) {
}
}
- private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
+ private synchronized void executeCommand(final TransactionCommand command) {
+ final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+ transactionInFlight = transaction;
+ recordPendingTransaction(command, transaction);
+ command.execute(transaction);
+ FluentFuture<?> ft = transaction.commit();
+ command.setTransactionResultFuture(ft);
+ ft.addCallback(new FutureCallback<Object>() {
+ @Override
+ public void onSuccess(final Object result) {
+ forgetSuccessfulTransaction(transaction);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ // NOOP - handled by failure of transaction chain
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ private void offerFailedTransaction(final Transaction transaction) {
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
}
private List<TransactionCommand> extractResubmitCommands() {
List<TransactionCommand> commands = new ArrayList<>();
synchronized (this) {
- AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+ Transaction transaction = failedTransactionQueue.poll();
if (transaction != null) {
int index = pendingTransactions.lastIndexOf(transaction);
//This logic needs to be revisited. Is it ok to resubmit these things again ?