BUG-509 transaction future result listening
[controller.git] / opendaylight / md-sal / topology-manager / src / main / java / org / opendaylight / md / controller / topology / manager / FlowCapableTopologyExporter.java
index 54f1fc0bb4fbcd0889fa185fd82248907594c3ec..f4685d2bd918b7a3aa0653e6ed809e010662abc9 100644 (file)
@@ -15,9 +15,10 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
@@ -48,15 +49,20 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+
 class FlowCapableTopologyExporter implements //
         FlowTopologyDiscoveryListener, //
         OpendaylightInventoryListener //
 {
 
-    private final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
+    protected final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
     public static TopologyKey topology = new TopologyKey(new TopologyId("flow:1"));
 
     // FIXME: Flow capable topology exporter should use transaction chaining API
@@ -79,7 +85,7 @@ class FlowCapableTopologyExporter implements //
         Topology top = tb.build();
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.putOperationalData(topologyPath, top);
-        tx.commit();
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
     }
 
     @Override
@@ -90,11 +96,7 @@ class FlowCapableTopologyExporter implements //
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.removeOperationalData(nodeInstance);
         removeAffectedLinks(tx, nodeId);
-        try {
-            tx.commit().get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Topology state export not successful. ",e);
-        }
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
     }
 
     @Override
@@ -105,11 +107,7 @@ class FlowCapableTopologyExporter implements //
             InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
             DataModificationTransaction tx = dataService.beginTransaction();
             tx.putOperationalData(path, node);
-            try {
-                tx.commit().get();
-            } catch (InterruptedException | ExecutionException e) {
-                LOG.error("Topology state export not successful. ",e);
-            }
+            listenOnTransactionState(tx.getIdentifier(),tx.commit());
         }
     }
 
@@ -121,11 +119,7 @@ class FlowCapableTopologyExporter implements //
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.removeOperationalData(tpInstance);
         removeAffectedLinks(tx, tpId);
-        try {
-            tx.commit().get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Topology state export not successful. ",e);
-        }
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
 
     }
 
@@ -144,11 +138,7 @@ class FlowCapableTopologyExporter implements //
                     || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
                 removeAffectedLinks(tx, point.getTpId());
             }
-            try {
-                tx.commit().get();
-            } catch (InterruptedException | ExecutionException e) {
-                LOG.error("Topology state export not successful. ",e);
-            }
+            listenOnTransactionState(tx.getIdentifier(),tx.commit());
         }
     }
 
@@ -158,11 +148,8 @@ class FlowCapableTopologyExporter implements //
         InstanceIdentifier<Link> path = linkPath(link);
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.putOperationalData(path, link);
-        try {
-            tx.commit().get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Topology state export not successful. ",e);
-        }
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
+
     }
 
     @Override
@@ -175,7 +162,7 @@ class FlowCapableTopologyExporter implements //
         InstanceIdentifier<Link> path = linkPath(toTopologyLink(notification));
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.removeOperationalData(path);
-        ;
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
     }
 
     @Override
@@ -246,4 +233,26 @@ class FlowCapableTopologyExporter implements //
                 .child(Topology.class, topology).child(Link.class, link.getKey()).build();
         return linkInstanceId;
     }
+    
+    /**
+     * @param txId transaction identificator
+     * @param future transaction result
+     */
+    private static void listenOnTransactionState(final Object txId, Future<RpcResult<TransactionStatus>> future) {
+        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback<RpcResult<TransactionStatus>>() {
+            
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("Topology export failed for Tx:{}", txId, t);
+                
+            }
+            
+            @Override
+            public void onSuccess(RpcResult<TransactionStatus> result) {
+                if(!result.isSuccessful()) {
+                    LOG.error("Topology export failed for Tx:{}", txId);
+                }
+            }
+        });
+    }
 }