import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final DataBroker db;
private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
- private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+ private final BlockingQueue<Transaction> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final ExecutorService executor;
private final AtomicBoolean runTask = new AtomicBoolean(true);
@GuardedBy("this")
private final Queue<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions = new ArrayDeque<>();
@GuardedBy("this")
- private BindingTransactionChain chain;
+ private TransactionChain chain;
public TransactionInvokerImpl(final DataBroker db) {
this.db = db;
}
@Override
- public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
- final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+ public void onTransactionChainFailed(final TransactionChain chainArg,
+ final Transaction transaction, final Throwable cause) {
LOG.error("Failed to write operational topology", cause);
offerFailedTransaction(transaction);
}
@Override
- public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
+ public void onTransactionChainSuccessful(final TransactionChain chainArg) {
// NO OP
}
transactionInFlight = transaction;
recordPendingTransaction(command, transaction);
command.execute(transaction);
- Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+ Futures.addCallback(transaction.commit(), new FutureCallback<Object>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final Object result) {
forgetSuccessfulTransaction(transaction);
command.onSuccess();
}
}
}
- private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
+ private void offerFailedTransaction(final Transaction transaction) {
if (!failedTransactionQueue.offer(transaction)) {
LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
}
@VisibleForTesting
synchronized List<TransactionCommand> extractResubmitCommands() {
- AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+ Transaction transaction = failedTransactionQueue.poll();
List<TransactionCommand> commands = new ArrayList<>();
if (transaction != null) {
// Process all pending transactions, looking for the failed one...