MD-SAL API integration
[ovsdb.git] / southbound / southbound-impl / src / main / java / org / opendaylight / ovsdb / southbound / transactions / md / TransactionInvokerImpl.java
index 6773d43f8ca7766c9c37642094f8529ec741a5f6..bc41602e23b82851971b5c77a4752c414a76e6fc 100644 (file)
@@ -28,12 +28,11 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +42,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
 
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<Transaction> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
 
     private final AtomicBoolean runTask = new AtomicBoolean(true);
@@ -51,7 +50,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     @GuardedBy("this")
     private final Queue<Entry<ReadWriteTransaction, TransactionCommand>> pendingTransactions = new ArrayDeque<>();
     @GuardedBy("this")
-    private BindingTransactionChain chain;
+    private TransactionChain chain;
 
     public TransactionInvokerImpl(final DataBroker db) {
         this.db = db;
@@ -94,14 +93,14 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @Override
-    public void onTransactionChainFailed(final TransactionChain<?, ?> chainArg,
-            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+    public void onTransactionChainFailed(final TransactionChain chainArg,
+            final Transaction transaction, final Throwable cause) {
         LOG.error("Failed to write operational topology", cause);
         offerFailedTransaction(transaction);
     }
 
     @Override
-    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chainArg) {
+    public void onTransactionChainSuccessful(final TransactionChain chainArg) {
         // NO OP
     }
 
@@ -127,9 +126,9 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
             transactionInFlight = transaction;
             recordPendingTransaction(command, transaction);
             command.execute(transaction);
-            Futures.addCallback(transaction.submit(), new FutureCallback<Void>() {
+            Futures.addCallback(transaction.commit(), new FutureCallback<Object>() {
                 @Override
-                public void onSuccess(final Void result) {
+                public void onSuccess(final Object result) {
                     forgetSuccessfulTransaction(transaction);
                     command.onSuccess();
                 }
@@ -152,7 +151,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         }
     }
 
-    private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
+    private void offerFailedTransaction(final Transaction transaction) {
         if (!failedTransactionQueue.offer(transaction)) {
             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
         }
@@ -160,7 +159,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
 
     @VisibleForTesting
     synchronized List<TransactionCommand> extractResubmitCommands() {
-        AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+        Transaction transaction = failedTransactionQueue.poll();
         List<TransactionCommand> commands = new ArrayList<>();
         if (transaction != null) {
             // Process all pending transactions, looking for the failed one...