Merge "Bug 963 - OSGi error in Topology manager component - After exiting mininet...
authorTony Tkacik <ttkacik@cisco.com>
Mon, 22 Sep 2014 18:28:40 +0000 (18:28 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 22 Sep 2014 18:28:40 +0000 (18:28 +0000)
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java
opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java

index 361373d78da93f114f51c887ab95c78bc6ab3265..d8602c2ddded47d2708bd89bff5b6cd7a8d42bf7 100644 (file)
@@ -7,17 +7,8 @@
  */
 package org.opendaylight.md.controller.topology.manager;
 
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -48,12 +39,16 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import java.util.Collections;
+import java.util.List;
+
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
 
@@ -73,11 +68,20 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
         final InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
 
+
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
-                removeAffectedLinks(nodeId, transaction);
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+                Optional<Node> nodeOptional = Optional.absent();
+                try {
+                    nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, nodeInstance).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read Node ", e);
+                }
+                if (nodeOptional.isPresent()) {
+                    removeAffectedLinks(nodeId, transaction);
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+                }
             }
 
             @Override
@@ -119,8 +123,16 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
-                removeAffectedLinks(tpId, transaction);
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+                Optional<TerminationPoint> terminationPointOptional = Optional.absent();
+                try {
+                    terminationPointOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, tpInstance).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read NodeConnector ", e);
+                }
+                if (terminationPointOptional.isPresent()) {
+                    removeAffectedLinks(tpId, transaction);
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+                }
             }
 
             @Override
@@ -164,7 +176,7 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
             public void applyOperation(final ReadWriteTransaction transaction) {
                 final Link link = toTopologyLink(notification);
                 final InstanceIdentifier<Link> path = linkPath(link);
-                transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
+                transaction.put(LogicalDatastoreType.OPERATIONAL, path, link, true);
             }
 
             @Override
@@ -184,7 +196,17 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(final ReadWriteTransaction transaction) {
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+                Optional<Link> linkOptional = Optional.absent();
+                try {
+                    // read that checks if link exists (if we do not do this we might get an exception on delete)
+                    linkOptional = transaction.read(LogicalDatastoreType.OPERATIONAL,
+                            linkPath(toTopologyLink(notification))).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read Link ", e);
+                }
+                if (linkOptional.isPresent()) {
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+                }
             }
 
             @Override
@@ -194,6 +216,7 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         });
     }
 
+
     @Override
     public void onLinkUtilizationNormal(final LinkUtilizationNormal notification) {
         // NOOP
@@ -212,89 +235,57 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
     }
 
     private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) {
-        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
-                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-            @Override
-            public void onSuccess(Optional<Topology> topologyOptional) {
-                removeAffectedLinks(id, topologyOptional);
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error reading topology data for topology {}", topology, throwable);
-            }
-        });
+        Optional<Topology> topologyOptional = Optional.absent();
+        try {
+            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+        } catch (ReadFailedException e) {
+            LOG.error("Error reading topology data for topology {}", topology, e);
+        }
+        if (topologyOptional.isPresent()) {
+            removeAffectedLinks(id, topologyOptional, transaction);
+        }
     }
 
-    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional) {
+    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
         if (!topologyOptional.isPresent()) {
             return;
         }
 
         List<Link> linkList = topologyOptional.get().getLink() != null ?
                 topologyOptional.get().getLink() : Collections.<Link> emptyList();
-        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceNode()) ||
                     id.equals(link.getDestination().getDestNode())) {
-                linkIDsToDelete.add(linkPath(link));
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
             }
         }
-
-        enqueueLinkDeletes(linkIDsToDelete);
-    }
-
-    private void enqueueLinkDeletes(final Collection<InstanceIdentifier<Link>> linkIDsToDelete) {
-        if(!linkIDsToDelete.isEmpty()) {
-            processor.enqueueOperation(new TopologyOperation() {
-                @Override
-                public void applyOperation(ReadWriteTransaction transaction) {
-                    for(InstanceIdentifier<Link> linkID: linkIDsToDelete) {
-                        transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID);
-                    }
-                }
-
-                @Override
-                public String toString() {
-                    return "Delete Links " + linkIDsToDelete.size();
-                }
-            });
-        }
     }
 
     private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) {
-        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
-                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-            @Override
-            public void onSuccess(Optional<Topology> topologyOptional) {
-                removeAffectedLinks(id, topologyOptional);
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error reading topology data for topology {}", topology, throwable);
-            }
-        });
+        Optional<Topology> topologyOptional = Optional.absent();
+        try {
+            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+        } catch (ReadFailedException e) {
+            LOG.error("Error reading topology data for topology {}", topology, e);
+        }
+        if (topologyOptional.isPresent()) {
+            removeAffectedLinks(id, topologyOptional, transaction);
+        }
     }
 
-    private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional) {
+    private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
         if (!topologyOptional.isPresent()) {
             return;
         }
 
         List<Link> linkList = topologyOptional.get().getLink() != null
                 ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
-        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceTp()) ||
                     id.equals(link.getDestination().getDestTp())) {
-                linkIDsToDelete.add(linkPath(link));
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
             }
         }
-
-        enqueueLinkDeletes(linkIDsToDelete);
     }
 
     private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
index f09da0045930cf7cc843de1a924e64841f2db508..41162d30463d54338d687dfd904ef1f7646dbec6 100644 (file)
@@ -8,13 +8,6 @@
 package org.opendaylight.md.controller.topology.manager;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -25,6 +18,9 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
@@ -32,7 +28,7 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
     private final DataBroker dataBroker;
-    private final BindingTransactionChain transactionChain;
+    private BindingTransactionChain transactionChain;
 
     OperationProcessor(final DataBroker dataBroker) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -73,23 +69,32 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
 
                 LOG.debug("Processed {} operations, submitting transaction", ops);
 
-                CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
-                Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess(Void notUsed) {
-                        LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
-                    }
-
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
-                    }
-                });
+                try {
+                    tx.submit().checkedGet();
+                } catch (final TransactionCommitFailedException e) {
+                    LOG.warn("Stat DataStoreOperation unexpected State!", e);
+                    transactionChain.close();
+                    transactionChain = dataBroker.createTransactionChain(this);
+                    cleanDataStoreOperQueue();
+                }
             }
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted processing, terminating", e);
+        } catch (final IllegalStateException e) {
+            LOG.warn("Stat DataStoreOperation unexpected State!", e);
+            transactionChain.close();
+            transactionChain = dataBroker.createTransactionChain(this);
+            cleanDataStoreOperQueue();
+        } catch (final InterruptedException e) {
+            LOG.warn("Stat Manager DS Operation thread interupted!", e);
+        } catch (final Exception e) {
+            LOG.warn("Stat DataStore Operation executor fail!", e);
         }
 
+        // Drain all events, making sure any blocked threads are unblocked
+        cleanDataStoreOperQueue();
+
+    }
+
+    private void cleanDataStoreOperQueue() {
         // Drain all events, making sure any blocked threads are unblocked
         while (!queue.isEmpty()) {
             queue.poll();
index b7a56a489019bc56cc78cde3513d8f7a16f7bc4d..d07d42f2ec2d1d486d427d7e5dfa2762722d663f 100644 (file)
@@ -8,29 +8,11 @@
 
 package org.opendaylight.md.controller.topology.manager;
 
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,17 +64,36 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 public class FlowCapableTopologyExporterTest {
 
@@ -135,8 +136,12 @@ public class FlowCapableTopologyExporterTest {
     @Test
     public void testOnNodeRemoved() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+        Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                nodeKey = newInvNodeKey("node1");
+                                                                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
         InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
                 org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
                 nodeKey);
@@ -154,10 +159,16 @@ public class FlowCapableTopologyExporterTest {
             };
 
         SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        readFuture.set(Optional.of(topology));
         ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
         doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
 
+        SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoNode));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
+
         CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
 
         int expDeleteCalls = expDeletedIIDs.length;
@@ -166,11 +177,7 @@ public class FlowCapableTopologyExporterTest {
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
         setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
 
-        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
-        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
-        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
-
-        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+        doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
 
         exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
 
@@ -180,20 +187,21 @@ public class FlowCapableTopologyExporterTest {
 
         waitForDeletes(expDeleteCalls, deleteLatch);
 
-        waitForSubmit(submitLatch2);
-
         assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
 
         verifyMockTx(mockTx1);
-        verifyMockTx(mockTx2);
     }
 
     @SuppressWarnings({ "rawtypes" })
     @Test
     public void testOnNodeRemovedWithNoTopology() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+        Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                nodeKey = newInvNodeKey("node1");
+                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
         InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
                 org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
                 nodeKey);
@@ -207,6 +215,11 @@ public class FlowCapableTopologyExporterTest {
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
 
+        SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoNode));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
+
         CountDownLatch deleteLatch = new CountDownLatch(1);
         ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
@@ -227,11 +240,18 @@ public class FlowCapableTopologyExporterTest {
     @Test
     public void testOnNodeConnectorRemoved() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
+
+        InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
+                .child(TerminationPoint.class, terminationPointKey);
+        TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                  nodeKey = newInvNodeKey("node1");
+                                                                  nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
-                newInvNodeConnKey("tp1");
+                newInvNodeConnKey(terminationPointKey.getTpId().getValue());
 
         InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
 
@@ -249,10 +269,16 @@ public class FlowCapableTopologyExporterTest {
             };
 
         final SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        readFuture.set(Optional.of(topology));
         ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
         doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
 
+        SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoTermPoint));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+
         CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
 
         int expDeleteCalls = expDeletedIIDs.length;
@@ -261,11 +287,8 @@ public class FlowCapableTopologyExporterTest {
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
         setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
 
-        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
-        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
-        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
 
-        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+        doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
 
         exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
                 new NodeConnectorRef(invNodeConnID)).build());
@@ -276,23 +299,27 @@ public class FlowCapableTopologyExporterTest {
 
         waitForDeletes(expDeleteCalls, deleteLatch);
 
-        waitForSubmit(submitLatch2);
-
         assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
 
         verifyMockTx(mockTx1);
-        verifyMockTx(mockTx2);
     }
 
     @SuppressWarnings("rawtypes")
     @Test
     public void testOnNodeConnectorRemovedWithNoTopology() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
+
+        InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
+                .child(TerminationPoint.class, terminationPointKey);
+        TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                  nodeKey = newInvNodeKey("node1");
+                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
-                newInvNodeConnKey("tp1");
+                newInvNodeConnKey(terminationPointKey.getTpId().getValue());
 
         InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
 
@@ -306,6 +333,11 @@ public class FlowCapableTopologyExporterTest {
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
 
+        SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoTermPoint));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+
         CountDownLatch deleteLatch = new CountDownLatch(1);
         ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
@@ -510,8 +542,8 @@ public class FlowCapableTopologyExporterTest {
         waitForSubmit(submitLatch);
 
         ArgumentCaptor<Link> mergedNode = ArgumentCaptor.forClass(Link.class);
-        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
-                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
+        verify(mockTx).put(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
+                        Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
                 mergedNode.capture(), eq(true));
         assertEquals("Source node ID", "sourceNode",
                 mergedNode.getValue().getSource().getSourceNode().getValue());
@@ -524,7 +556,7 @@ public class FlowCapableTopologyExporterTest {
     }
 
     @Test
-    public void testOnLinkRemoved() {
+    public void testOnLinkRemovedLinkExists() {
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
                 sourceNodeKey = newInvNodeKey("sourceNode");
@@ -538,13 +570,18 @@ public class FlowCapableTopologyExporterTest {
                 destNodeConnKey = newInvNodeConnKey("destTP");
         InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
 
+        Link link = newLink(sourceNodeConnKey.getId().getValue(), newSourceTp(sourceNodeConnKey.getId().getValue()),
+                newDestTp(destNodeConnKey.getId().getValue()));
+
         ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
         doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+        doReturn(Futures.immediateCheckedFuture(Optional.of(link))).when(mockTx).read(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
 
         exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
                 new NodeConnectorRef(sourceConnID)).setDestination(
-                        new NodeConnectorRef(destConnID)).build());
+                new NodeConnectorRef(destConnID)).build());
 
         waitForSubmit(submitLatch);
 
@@ -552,6 +589,37 @@ public class FlowCapableTopologyExporterTest {
                 Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
     }
 
+    @Test
+    public void testOnLinkRemovedLinkDoesNotExist() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                sourceNodeKey = newInvNodeKey("sourceNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+        InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                destNodeKey = newInvNodeKey("destNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                destNodeConnKey = newInvNodeConnKey("destTP");
+        InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+        doReturn(Futures.immediateCheckedFuture(Optional.<Link>absent())).when(mockTx).read(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+
+        exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
+                new NodeConnectorRef(sourceConnID)).setDestination(
+                new NodeConnectorRef(destConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        verify(mockTx, never()).delete(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+    }
+
     private void verifyMockTx(ReadWriteTransaction mockTx) {
         InOrder inOrder = inOrder(mockTx);
         inOrder.verify(mockTx, atLeast(0)).submit();