MD-SAL API integration
[ovsdb.git] / hwvtepsouthbound / hwvtepsouthbound-impl / src / main / java / org / opendaylight / ovsdb / hwvtepsouthbound / transactions / md / TransactionInvokerImpl.java
index e9d2dc348a3040ddf407ee3c77b273dcdd1236e1..251e4dc6bc0f7c55d09b9c169cee97d2d45e6e23 100644 (file)
@@ -7,9 +7,8 @@
  */
 package org.opendaylight.ovsdb.hwvtepsouthbound.transactions.md;
 
+import com.google.common.util.concurrent.FluentFuture;
 import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
@@ -23,12 +22,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
+import org.opendaylight.mdsal.binding.api.Transaction;
+import org.opendaylight.mdsal.binding.api.TransactionChain;
+import org.opendaylight.mdsal.binding.api.TransactionChainListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +41,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
 
     private final DataBroker db;
     private final BlockingQueue<TransactionCommand> inputQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
-    private final BlockingQueue<AsyncTransaction<?, ?>> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
+    private final BlockingQueue<Transaction> failedTransactionQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
     private final ExecutorService executor;
 
     @GuardedBy("this")
@@ -51,7 +49,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     @GuardedBy("this")
     private final List<ReadWriteTransaction> pendingTransactions = new ArrayList<>();
 
-    private BindingTransactionChain chain;
+    private TransactionChain chain;
     //This is made volatile as it is accessed from uncaught exception handler thread also
     private volatile ReadWriteTransaction transactionInFlight = null;
     private Iterator<TransactionCommand> commandIterator = null;
@@ -76,13 +74,13 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     }
 
     @Override
-    public void onTransactionChainFailed(final TransactionChain<?, ?> txChain,
-            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+    public void onTransactionChainFailed(final TransactionChain txChain,
+            final Transaction transaction, final Throwable cause) {
         offerFailedTransaction(transaction);
     }
 
     @Override
-    public void onTransactionChainSuccessful(final TransactionChain<?, ?> txChain) {
+    public void onTransactionChainSuccessful(final TransactionChain txChain) {
         // NO OP
     }
 
@@ -99,26 +97,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
             commandIterator = commands.iterator();
             try {
                 while (commandIterator.hasNext()) {
-                    TransactionCommand command = commandIterator.next();
-                    synchronized (this) {
-                        final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
-                        transactionInFlight = transaction;
-                        recordPendingTransaction(command, transaction);
-                        command.execute(transaction);
-                        ListenableFuture<Void> ft = transaction.submit();
-                        command.setTransactionResultFuture(ft);
-                        Futures.addCallback(ft, new FutureCallback<Void>() {
-                            @Override
-                            public void onSuccess(final Void result) {
-                                forgetSuccessfulTransaction(transaction);
-                            }
-
-                            @Override
-                            public void onFailure(final Throwable throwable) {
-                                // NOOP - handled by failure of transaction chain
-                            }
-                        }, MoreExecutors.directExecutor());
-                    }
+                    executeCommand(commandIterator.next());
                 }
                 transactionInFlight = null;
             } catch (IllegalStateException e) {
@@ -135,7 +114,27 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
         }
     }
 
-    private void offerFailedTransaction(final AsyncTransaction<?, ?> transaction) {
+    private synchronized void executeCommand(final TransactionCommand command) {
+        final ReadWriteTransaction transaction = chain.newReadWriteTransaction();
+        transactionInFlight = transaction;
+        recordPendingTransaction(command, transaction);
+        command.execute(transaction);
+        FluentFuture<?> ft = transaction.commit();
+        command.setTransactionResultFuture(ft);
+        ft.addCallback(new FutureCallback<Object>() {
+            @Override
+            public void onSuccess(final Object result) {
+                forgetSuccessfulTransaction(transaction);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                // NOOP - handled by failure of transaction chain
+            }
+        }, MoreExecutors.directExecutor());
+    }
+
+    private void offerFailedTransaction(final Transaction transaction) {
         if (!failedTransactionQueue.offer(transaction)) {
             LOG.warn("failedTransactionQueue is full (size: {})", failedTransactionQueue.size());
         }
@@ -144,7 +143,7 @@ public class TransactionInvokerImpl implements TransactionInvoker,TransactionCha
     private List<TransactionCommand> extractResubmitCommands() {
         List<TransactionCommand> commands = new ArrayList<>();
         synchronized (this) {
-            AsyncTransaction<?, ?> transaction = failedTransactionQueue.poll();
+            Transaction transaction = failedTransactionQueue.poll();
             if (transaction != null) {
                 int index = pendingTransactions.lastIndexOf(transaction);
                 //This logic needs to be revisited. Is it ok to resubmit these things again ?