BUG-509 transaction future result listening 43/6743/1
authorMichal Rehak <mirehak@cisco.com>
Tue, 6 May 2014 13:09:27 +0000 (15:09 +0200)
committerMichal Rehak <mirehak@cisco.com>
Tue, 6 May 2014 13:09:27 +0000 (15:09 +0200)
- replaced transactionResult.get() with future listener

Change-Id: I6f047a3c65fbb2cf669cb8912920aacc0c2848d3
Signed-off-by: Michal Rehak <mirehak@cisco.com>
opendaylight/md-sal/inventory-manager/src/main/java/org/opendaylight/controller/md/inventory/manager/NodeChangeCommiter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java

index 03cdf97..2a3d8fd 100644 (file)
@@ -7,7 +7,6 @@
  */
 package org.opendaylight.controller.md.inventory.manager;
 
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
@@ -37,10 +36,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
 
 public class NodeChangeCommiter implements OpendaylightInventoryListener {
 
-    private final static Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class);
+    protected final static Logger LOG = LoggerFactory.getLogger(NodeChangeCommiter.class);
 
     private final FlowCapableInventoryProvider manager;
 
@@ -57,15 +59,10 @@ public class NodeChangeCommiter implements OpendaylightInventoryListener {
 
         final NodeConnectorRef ref = connector.getNodeConnectorRef();
         final DataModificationTransaction it = this.getManager().startChange();
-        NodeChangeCommiter.LOG.debug("removing node connector {} ", ref.getValue());
+        LOG.debug("removing node connector {} ", ref.getValue());
         it.removeOperationalData(ref.getValue());
         Future<RpcResult<TransactionStatus>> commitResult = it.commit();
-        try {
-            commitResult.get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Node Connector {} not removed.", ref.getValue(), e);
-        }
-
+        listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector removal", ref.getValue());
     }
 
     @Override
@@ -85,16 +82,11 @@ public class NodeChangeCommiter implements OpendaylightInventoryListener {
             data.addAugmentation(FlowCapableNodeConnector.class, augment);
         }
         InstanceIdentifier<? extends Object> value = ref.getValue();
-        NodeChangeCommiter.LOG.debug("updating node connector : {}.", value);
+        LOG.debug("updating node connector : {}.", value);
         NodeConnector build = data.build();
         it.putOperationalData((value), build);
         Future<RpcResult<TransactionStatus>> commitResult = it.commit();
-        try {
-            commitResult.get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Node Connector {} not updated.", ref.getValue(), e);
-        }
-
+        listenOnTransactionState(it.getIdentifier(), commitResult, "nodeConnector update", ref.getValue());
     }
 
     @Override
@@ -102,14 +94,10 @@ public class NodeChangeCommiter implements OpendaylightInventoryListener {
 
         final NodeRef ref = node.getNodeRef();
         final DataModificationTransaction it = this.manager.startChange();
-        NodeChangeCommiter.LOG.debug("removing node : {}", ref.getValue());
+        LOG.debug("removing node : {}", ref.getValue());
         it.removeOperationalData((ref.getValue()));
         Future<RpcResult<TransactionStatus>> commitResult = it.commit();
-        try {
-            commitResult.get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Node {} not removed.", ref.getValue(), e);
-        }
+        listenOnTransactionState(it.getIdentifier(), commitResult, "node removal", ref.getValue());
     }
 
     @Override
@@ -128,19 +116,39 @@ public class NodeChangeCommiter implements OpendaylightInventoryListener {
         final FlowCapableNode augment = InventoryMapping.toInventoryAugment(flowNode);
         nodeBuilder.addAugmentation(FlowCapableNode.class, augment);
         InstanceIdentifier<? extends Object> value = ref.getValue();
-        InstanceIdentifierBuilder<Node> builder = InstanceIdentifier.<Node> builder(((InstanceIdentifier<Node>) value));
+        InstanceIdentifierBuilder<Node> builder = ((InstanceIdentifier<Node>) value).builder();
         InstanceIdentifierBuilder<FlowCapableNode> augmentation = builder
                 .<FlowCapableNode> augmentation(FlowCapableNode.class);
         final InstanceIdentifier<FlowCapableNode> path = augmentation.build();
-        NodeChangeCommiter.LOG.debug("updating node :{} ", path);
+        LOG.debug("updating node :{} ", path);
         it.putOperationalData(path, augment);
 
         Future<RpcResult<TransactionStatus>> commitResult = it.commit();
-        try {
-            commitResult.get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Node  {} not updated.", ref.getValue(), e);
-        }
-
+        listenOnTransactionState(it.getIdentifier(), commitResult, "node update", ref.getValue());
+    }
+    
+    /**
+     * @param txId transaction identificator
+     * @param future transaction result
+     * @param action performed by transaction
+     * @param nodeConnectorPath target value
+     */
+    private static void listenOnTransactionState(final Object txId, Future<RpcResult<TransactionStatus>> future,
+            final String action, final InstanceIdentifier<?> nodeConnectorPath) {
+        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback<RpcResult<TransactionStatus>>() {
+            
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId, t);
+                
+            }
+            
+            @Override
+            public void onSuccess(RpcResult<TransactionStatus> result) {
+                if(!result.isSuccessful()) {
+                    LOG.error("Action {} [{}] failed for Tx:{}", action, nodeConnectorPath, txId);
+                }
+            }
+        });
     }
 }
index 54f1fc0..f4685d2 100644 (file)
@@ -15,9 +15,10 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
 import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
@@ -48,15 +49,20 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+
 class FlowCapableTopologyExporter implements //
         FlowTopologyDiscoveryListener, //
         OpendaylightInventoryListener //
 {
 
-    private final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
+    protected final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
     public static TopologyKey topology = new TopologyKey(new TopologyId("flow:1"));
 
     // FIXME: Flow capable topology exporter should use transaction chaining API
@@ -79,7 +85,7 @@ class FlowCapableTopologyExporter implements //
         Topology top = tb.build();
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.putOperationalData(topologyPath, top);
-        tx.commit();
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
     }
 
     @Override
@@ -90,11 +96,7 @@ class FlowCapableTopologyExporter implements //
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.removeOperationalData(nodeInstance);
         removeAffectedLinks(tx, nodeId);
-        try {
-            tx.commit().get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Topology state export not successful. ",e);
-        }
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
     }
 
     @Override
@@ -105,11 +107,7 @@ class FlowCapableTopologyExporter implements //
             InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
             DataModificationTransaction tx = dataService.beginTransaction();
             tx.putOperationalData(path, node);
-            try {
-                tx.commit().get();
-            } catch (InterruptedException | ExecutionException e) {
-                LOG.error("Topology state export not successful. ",e);
-            }
+            listenOnTransactionState(tx.getIdentifier(),tx.commit());
         }
     }
 
@@ -121,11 +119,7 @@ class FlowCapableTopologyExporter implements //
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.removeOperationalData(tpInstance);
         removeAffectedLinks(tx, tpId);
-        try {
-            tx.commit().get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Topology state export not successful. ",e);
-        }
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
 
     }
 
@@ -144,11 +138,7 @@ class FlowCapableTopologyExporter implements //
                     || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
                 removeAffectedLinks(tx, point.getTpId());
             }
-            try {
-                tx.commit().get();
-            } catch (InterruptedException | ExecutionException e) {
-                LOG.error("Topology state export not successful. ",e);
-            }
+            listenOnTransactionState(tx.getIdentifier(),tx.commit());
         }
     }
 
@@ -158,11 +148,8 @@ class FlowCapableTopologyExporter implements //
         InstanceIdentifier<Link> path = linkPath(link);
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.putOperationalData(path, link);
-        try {
-            tx.commit().get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.error("Topology state export not successful. ",e);
-        }
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
+
     }
 
     @Override
@@ -175,7 +162,7 @@ class FlowCapableTopologyExporter implements //
         InstanceIdentifier<Link> path = linkPath(toTopologyLink(notification));
         DataModificationTransaction tx = dataService.beginTransaction();
         tx.removeOperationalData(path);
-        ;
+        listenOnTransactionState(tx.getIdentifier(),tx.commit());
     }
 
     @Override
@@ -246,4 +233,26 @@ class FlowCapableTopologyExporter implements //
                 .child(Topology.class, topology).child(Link.class, link.getKey()).build();
         return linkInstanceId;
     }
+    
+    /**
+     * @param txId transaction identificator
+     * @param future transaction result
+     */
+    private static void listenOnTransactionState(final Object txId, Future<RpcResult<TransactionStatus>> future) {
+        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback<RpcResult<TransactionStatus>>() {
+            
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("Topology export failed for Tx:{}", txId, t);
+                
+            }
+            
+            @Override
+            public void onSuccess(RpcResult<TransactionStatus> result) {
+                if(!result.isSuccessful()) {
+                    LOG.error("Topology export failed for Tx:{}", txId);
+                }
+            }
+        });
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.