Hotfix bug 1556 - FRM: NPE thrown in notification
[controller.git] / opendaylight / md-sal / topology-manager / src / main / java / org / opendaylight / md / controller / topology / manager / OperationProcessor.java
index d60c88032dbcc7015fc064a791ca9a16921d7332..3800413eb1b9964d00207f526177862eb69c5885 100644 (file)
@@ -7,30 +7,27 @@
  */
 package org.opendaylight.md.controller.topology.manager;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 final class OperationProcessor implements Runnable {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
     private static final int OPERATION_QUEUE_DEPTH = 500;
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
-    // FIXME: Flow capable topology exporter should use transaction chaining API
-    private final DataProviderService dataService;
+    private final DataBroker dataBroker;
 
-    OperationProcessor(final DataProviderService dataService) {
-        this.dataService = Preconditions.checkNotNull(dataService);
+    OperationProcessor(final DataBroker dataBroker) {
+        this.dataBroker = Preconditions.checkNotNull(dataBroker);
     }
 
     void enqueueOperation(final TopologyOperation task) {
@@ -44,11 +41,11 @@ final class OperationProcessor implements Runnable {
     @Override
     public void run() {
         try {
-            for (;;) {
+            for (; ; ) {
                 TopologyOperation op = queue.take();
 
                 LOG.debug("New operations available, starting transaction");
-                final DataModificationTransaction tx = dataService.beginTransaction();
+                final ReadWriteTransaction tx = dataBroker.newReadWriteTransaction();
 
                 int ops = 0;
                 do {
@@ -64,14 +61,18 @@ final class OperationProcessor implements Runnable {
 
                 LOG.debug("Processed {} operations, submitting transaction", ops);
 
-                try {
-                    final RpcResult<TransactionStatus> s = tx.commit().get();
-                    if (!s.isSuccessful()) {
-                        LOG.error("Topology export failed for Tx:{}", tx.getIdentifier());
+                final CheckedFuture txResultFuture = tx.submit();
+                Futures.addCallback(txResultFuture, new FutureCallback() {
+                    @Override
+                    public void onSuccess(Object o) {
+                        LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
                     }
-                } catch (ExecutionException e) {
-                    LOG.error("Topology export transaction {} failed", tx.getIdentifier(), e.getCause());
-                }
+                });
             }
         } catch (InterruptedException e) {
             LOG.info("Interrupted processing, terminating", e);