Merge "FlowCapableTopologyExporter uses merge instead of push when link discovered"
[controller.git] / opendaylight / md-sal / topology-manager / src / main / java / org / opendaylight / md / controller / topology / manager / FlowCapableTopologyExporter.java
index 542e972deb2d7585348876219b5f82dab6ceca14..c1996f4691632637abc9fc7dffacce0bcb12f2ad 100644 (file)
@@ -15,12 +15,12 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
-import java.util.concurrent.Future;
+import java.util.Collections;
+import java.util.List;
 
-import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.FlowTopologyDiscoveryListener;
@@ -36,146 +36,150 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRem
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdated;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.OpendaylightInventoryListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
-import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 
-class FlowCapableTopologyExporter implements //
-        FlowTopologyDiscoveryListener, //
-        OpendaylightInventoryListener //
-{
+class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
 
-    protected final static Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
-    public static TopologyKey topology = new TopologyKey(new TopologyId("flow:1"));
+    private final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
+    private final InstanceIdentifier<Topology> topology;
+    private final OperationProcessor processor;
 
-    // FIXME: Flow capable topology exporter should use transaction chaining API
-    private DataProviderService dataService;
-
-    public DataProviderService getDataService() {
-        return dataService;
+    FlowCapableTopologyExporter(final OperationProcessor processor, final InstanceIdentifier<Topology> topology) {
+        this.processor = Preconditions.checkNotNull(processor);
+        this.topology = Preconditions.checkNotNull(topology);
     }
 
-    public void setDataService(final DataProviderService dataService) {
-        this.dataService = dataService;
-    }
+    @Override
+    public void onNodeRemoved(final NodeRemoved notification) {
 
-    private InstanceIdentifier<Topology> topologyPath;
+        final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
+        final InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
 
-    public void start() {
-        TopologyBuilder tb = new TopologyBuilder();
-        tb.setKey(topology);
-        topologyPath = InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, topology).build();
-        Topology top = tb.build();
-        DataModificationTransaction tx = dataService.beginTransaction();
-        tx.putOperationalData(topologyPath, top);
-        listenOnTransactionState(tx.getIdentifier(),tx.commit());
-    }
-
-    @Override
-    public synchronized void onNodeRemoved(final NodeRemoved notification) {
-        NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
-        InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                removeAffectedLinks(nodeId);
+            }
+        });
 
-        DataModificationTransaction tx = dataService.beginTransaction();
-        tx.removeOperationalData(nodeInstance);
-        removeAffectedLinks(tx, nodeId);
-        listenOnTransactionState(tx.getIdentifier(),tx.commit());
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(ReadWriteTransaction transaction) {
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+            }
+        });
     }
 
     @Override
-    public synchronized void onNodeUpdated(final NodeUpdated notification) {
+    public void onNodeUpdated(final NodeUpdated notification) {
         FlowCapableNodeUpdated fcnu = notification.getAugmentation(FlowCapableNodeUpdated.class);
         if (fcnu != null) {
-            Node node = toTopologyNode(toTopologyNodeId(notification.getId()), notification.getNodeRef());
-            InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
-            DataModificationTransaction tx = dataService.beginTransaction();
-            tx.putOperationalData(path, node);
-            listenOnTransactionState(tx.getIdentifier(),tx.commit());
+            processor.enqueueOperation(new TopologyOperation() {
+                @Override
+                public void applyOperation(final ReadWriteTransaction transaction) {
+                    final Node node = toTopologyNode(toTopologyNodeId(notification.getId()), notification.getNodeRef());
+                    final InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
+                    transaction.merge(LogicalDatastoreType.OPERATIONAL, path, node, true);
+                }
+            });
         }
     }
 
     @Override
-    public synchronized void onNodeConnectorRemoved(final NodeConnectorRemoved notification) {
-        InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
+    public void onNodeConnectorRemoved(final NodeConnectorRemoved notification) {
+
+        final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
                 .getNodeConnectorRef());
-        TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
-        DataModificationTransaction tx = dataService.beginTransaction();
-        tx.removeOperationalData(tpInstance);
-        removeAffectedLinks(tx, tpId);
-        listenOnTransactionState(tx.getIdentifier(),tx.commit());
 
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                final TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
+                removeAffectedLinks(tpId);
+            }
+        });
+
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(ReadWriteTransaction transaction) {
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+            }
+        });
     }
 
     @Override
-    public synchronized void onNodeConnectorUpdated(final NodeConnectorUpdated notification) {
-        FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class);
+    public void onNodeConnectorUpdated(final NodeConnectorUpdated notification) {
+        final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class);
         if (fcncu != null) {
-            NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeConnectorRef()).getId());
-            TerminationPoint point = toTerminationPoint(toTerminationPointId(notification.getId()),
-                    notification.getNodeConnectorRef());
-            InstanceIdentifier<TerminationPoint> path = tpPath(nodeId, point.getKey().getTpId());
-
-            DataModificationTransaction tx = dataService.beginTransaction();
-            tx.putOperationalData(path, point);
-            if ((fcncu.getState() != null && fcncu.getState().isLinkDown())
-                    || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
-                removeAffectedLinks(tx, point.getTpId());
-            }
-            listenOnTransactionState(tx.getIdentifier(),tx.commit());
+            processor.enqueueOperation(new TopologyOperation() {
+                @Override
+                public void applyOperation(final ReadWriteTransaction transaction) {
+                    final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeConnectorRef()).getId());
+                    TerminationPoint point = toTerminationPoint(toTerminationPointId(notification.getId()),
+                            notification.getNodeConnectorRef());
+                    final InstanceIdentifier<TerminationPoint> path = tpPath(nodeId, point.getKey().getTpId());
+                    transaction.merge(LogicalDatastoreType.OPERATIONAL, path, point, true);
+                    if ((fcncu.getState() != null && fcncu.getState().isLinkDown())
+                            || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
+                        removeAffectedLinks(point.getTpId());
+                    }
+                }
+            });
         }
     }
 
     @Override
-    public synchronized void onLinkDiscovered(final LinkDiscovered notification) {
-        Link link = toTopologyLink(notification);
-        InstanceIdentifier<Link> path = linkPath(link);
-        DataModificationTransaction tx = dataService.beginTransaction();
-        tx.putOperationalData(path, link);
-        listenOnTransactionState(tx.getIdentifier(),tx.commit());
-
+    public void onLinkDiscovered(final LinkDiscovered notification) {
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                final Link link = toTopologyLink(notification);
+                final InstanceIdentifier<Link> path = linkPath(link);
+                transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
+            }
+        });
     }
 
     @Override
-    public synchronized void onLinkOverutilized(final LinkOverutilized notification) {
+    public void onLinkOverutilized(final LinkOverutilized notification) {
         // NOOP
     }
 
     @Override
-    public synchronized void onLinkRemoved(final LinkRemoved notification) {
-        InstanceIdentifier<Link> path = linkPath(toTopologyLink(notification));
-        DataModificationTransaction tx = dataService.beginTransaction();
-        tx.removeOperationalData(path);
-        listenOnTransactionState(tx.getIdentifier(),tx.commit());
+    public void onLinkRemoved(final LinkRemoved notification) {
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+            }
+        });
     }
 
     @Override
-    public synchronized void onLinkUtilizationNormal(final LinkUtilizationNormal notification) {
+    public void onLinkUtilizationNormal(final LinkUtilizationNormal notification) {
         // NOOP
     }
 
-    private static InstanceIdentifier<Node> toNodeIdentifier(final NodeRef ref) {
+    private InstanceIdentifier<Node> toNodeIdentifier(final NodeRef ref) {
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey invNodeKey = getNodeKey(ref);
-
         NodeKey nodeKey = new NodeKey(toTopologyNodeId(invNodeKey.getId()));
-        return InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, topology)
-                .child(Node.class, nodeKey).build();
+        return topology.child(Node.class, nodeKey);
     }
 
     private InstanceIdentifier<TerminationPoint> toTerminationPointIdentifier(final NodeConnectorRef ref) {
@@ -184,73 +188,73 @@ class FlowCapableTopologyExporter implements //
         return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId()));
     }
 
-    private void removeAffectedLinks(final DataModificationTransaction transaction, final NodeId id) {
-        TypeSafeDataReader reader = TypeSafeDataReader.forReader(transaction);
-
-        Topology topologyData = reader.readOperationalData(topologyPath);
-        if (topologyData == null) {
-            return;
-        }
-        for (Link link : topologyData.getLink()) {
-            if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
-                InstanceIdentifier<Link> path = topologyPath.child(Link.class, link.getKey());
-                transaction.removeOperationalData(path);
+    private void removeAffectedLinks(final NodeId id) {
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+                Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
+                    @Override
+                    public void onSuccess(Optional<Topology> topologyOptional) {
+                        if (topologyOptional.isPresent()) {
+                            List<Link> linkList = topologyOptional.get().getLink() != null
+                                    ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
+                            for (Link link : linkList) {
+                                if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
+                                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
+                                }
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.error("Error reading topology data for topology {}", topology, throwable);
+                    }
+                });
             }
-        }
+        });
     }
 
-    private void removeAffectedLinks(final DataModificationTransaction transaction, final TpId id) {
-        TypeSafeDataReader reader = TypeSafeDataReader.forReader(transaction);
-        Topology topologyData = reader.readOperationalData(topologyPath);
-        if (topologyData == null) {
-            return;
-        }
-        for (Link link : topologyData.getLink()) {
-            if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
-                InstanceIdentifier<Link> path = topologyPath.child(Link.class, link.getKey());
-                transaction.removeOperationalData(path);
+    private void removeAffectedLinks(final TpId id) {
+        processor.enqueueOperation(new TopologyOperation() {
+            @Override
+            public void applyOperation(final ReadWriteTransaction transaction) {
+                CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+                Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
+                    @Override
+                    public void onSuccess(Optional<Topology> topologyOptional) {
+                        if (topologyOptional.isPresent()) {
+                            List<Link> linkList = topologyOptional.get().getLink() != null
+                                    ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
+                            for (Link link : linkList) {
+                                if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
+                                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
+                                }
+                            }
+                        }
+                    }
+
+                    @Override
+                    public void onFailure(Throwable throwable) {
+                        LOG.error("Error reading topology data for topology {}", topology, throwable);
+                    }
+                });
             }
-        }
+        });
     }
 
     private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
-        NodeKey nodeKey = new NodeKey(nodeId);
-        return InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, topology)
-                .child(Node.class, nodeKey).build();
+        return topology.child(Node.class, new NodeKey(nodeId));
     }
 
     private InstanceIdentifier<TerminationPoint> tpPath(final NodeId nodeId, final TpId tpId) {
         NodeKey nodeKey = new NodeKey(nodeId);
         TerminationPointKey tpKey = new TerminationPointKey(tpId);
-        return InstanceIdentifier.builder(NetworkTopology.class).child(Topology.class, topology)
-                .child(Node.class, nodeKey).child(TerminationPoint.class, tpKey).build();
+        return topology.child(Node.class, nodeKey).child(TerminationPoint.class, tpKey);
     }
 
     private InstanceIdentifier<Link> linkPath(final Link link) {
-        InstanceIdentifier<Link> linkInstanceId = InstanceIdentifier.builder(NetworkTopology.class)
-                .child(Topology.class, topology).child(Link.class, link.getKey()).build();
-        return linkInstanceId;
-    }
-
-    /**
-     * @param txId transaction identificator
-     * @param future transaction result
-     */
-    private static void listenOnTransactionState(final Object txId, Future<RpcResult<TransactionStatus>> future) {
-        Futures.addCallback(JdkFutureAdapters.listenInPoolThread(future),new FutureCallback<RpcResult<TransactionStatus>>() {
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.error("Topology export failed for Tx:{}", txId, t);
-
-            }
-
-            @Override
-            public void onSuccess(RpcResult<TransactionStatus> result) {
-                if(!result.isSuccessful()) {
-                    LOG.error("Topology export failed for Tx:{}", txId);
-                }
-            }
-        });
+        return topology.child(Link.class, link.getKey());
     }
 }