Bug 1686 - Rework TopologyManager to use transaction chaining in order to not conflic...
[controller.git] / opendaylight / md-sal / inventory-manager / src / main / java / org / opendaylight / controller / md / inventory / manager / FlowCapableInventoryProvider.java
index 9724d31f9ae6cd3ab8eaef0c23212bf626cfca84..3db929b99d9c68065cec7875412901090570530d 100644 (file)
@@ -7,34 +7,39 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingDeque;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
-class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
+class FlowCapableInventoryProvider implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(FlowCapableInventoryProvider.class);
     private static final int QUEUE_DEPTH = 500;
     private static final int MAX_BATCH = 100;
 
     private final BlockingQueue<InventoryOperation> queue = new LinkedBlockingDeque<>(QUEUE_DEPTH);
     private final NotificationProviderService notificationService;
-    private final DataProviderService dataService;
+
+    private final DataBroker dataBroker;
+    private BindingTransactionChain txChain;
     private ListenerRegistration<?> listenerRegistration;
     private Thread thread;
 
-    FlowCapableInventoryProvider(final DataProviderService dataService, final NotificationProviderService notificationService) {
-        this.dataService = Preconditions.checkNotNull(dataService);
+    FlowCapableInventoryProvider(final DataBroker dataBroker, final NotificationProviderService notificationService) {
+        this.dataBroker = Preconditions.checkNotNull(dataBroker);
         this.notificationService = Preconditions.checkNotNull(notificationService);
     }
 
@@ -42,6 +47,7 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
         final NodeChangeCommiter changeCommiter = new NodeChangeCommiter(FlowCapableInventoryProvider.this);
         this.listenerRegistration = this.notificationService.registerNotificationListener(changeCommiter);
 
+        this.txChain = dataBroker.createTransactionChain(this);
         thread = new Thread(this);
         thread.setDaemon(true);
         thread.setName("FlowCapableInventoryProvider");
@@ -75,6 +81,10 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
             thread.join();
             thread = null;
         }
+        if (txChain != null) {
+            txChain.close();
+            txChain = null;
+        }
 
 
     }
@@ -82,10 +92,10 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
     @Override
     public void run() {
         try {
-            for (;;) {
+            for (; ; ) {
                 InventoryOperation op = queue.take();
 
-                final DataModificationTransaction tx = dataService.beginTransaction();
+                final ReadWriteTransaction tx = txChain.newReadWriteTransaction();
                 LOG.debug("New operations available, starting transaction {}", tx.getIdentifier());
 
                 int ops = 0;
@@ -102,14 +112,18 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
 
                 LOG.debug("Processed {} operations, submitting transaction {}", ops, tx.getIdentifier());
 
-                try {
-                    final RpcResult<TransactionStatus> result = tx.commit().get();
-                    if(!result.isSuccessful()) {
-                        LOG.error("Transaction {} failed", tx.getIdentifier());
+                final CheckedFuture<Void, TransactionCommitFailedException> result = tx.submit();
+                Futures.addCallback(result, new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void aVoid) {
+                        //NOOP
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable throwable) {
+                        LOG.error("Transaction {} failed.", tx.getIdentifier(), throwable);
                     }
-                } catch (ExecutionException e) {
-                    LOG.warn("Failed to commit inventory change", e.getCause());
-                }
+                });
             }
         } catch (InterruptedException e) {
             LOG.info("Processing interrupted, terminating", e);
@@ -120,4 +134,16 @@ class FlowCapableInventoryProvider implements AutoCloseable, Runnable {
             queue.poll();
         }
     }
+
+    @Override
+    public void onTransactionChainFailed(final TransactionChain<?, ?> chain, final AsyncTransaction<?, ?> transaction,
+                                         final Throwable cause) {
+        LOG.error("Failed to export Flow Capable Inventory, Transaction {} failed.", transaction.getIdentifier(), cause);
+
+    }
+
+    @Override
+    public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+        // NOOP
+    }
 }