Bug 963 - OSGi error in Topology manager component - After exiting 33/11233/6
authorJan Hajnar <jhajnar@cisco.com>
Tue, 16 Sep 2014 13:07:17 +0000 (15:07 +0200)
committerJan Hajnar <jhajnar@cisco.com>
Sun, 21 Sep 2014 23:42:13 +0000 (01:42 +0200)
mininet of13 simulation

* added check in onLinkRemoved() to check if link exists before
atempting to delete it (without check, lots of datastore transaction
errors were displayed on mininet shutdown because links were already
removed with nodes)
* added checks to check if node, connector or link exists before trying
to remove it (reads are cheap and delete exception can break the
transaction chain)
* made reads synchronous
* added illegalState exception handling to operation processor
* made affected links removal to be in the same transaction as
Node/NodeConnector removal(creating and applying snapshot for each link
is expensive)
* updated tests

Change-Id: Icef03a02bbfb88a7cebd0fe35e4b713dcfeb317c
Signed-off-by: Jan Hajnar <jhajnar@cisco.com>
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java
opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java

index 361373d78da93f114f51c887ab95c78bc6ab3265..d8602c2ddded47d2708bd89bff5b6cd7a8d42bf7 100644 (file)
@@ -7,17 +7,8 @@
  */
 package org.opendaylight.md.controller.topology.manager;
 
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
-import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -48,12 +39,16 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
+import java.util.Collections;
+import java.util.List;
+
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeConnectorKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.getNodeKey;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPoint;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTerminationPointId;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyLink;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
+import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
 
@@ -73,11 +68,20 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         final NodeId nodeId = toTopologyNodeId(getNodeKey(notification.getNodeRef()).getId());
         final InstanceIdentifier<Node> nodeInstance = toNodeIdentifier(notification.getNodeRef());
 
+
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
-                removeAffectedLinks(nodeId, transaction);
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+                Optional<Node> nodeOptional = Optional.absent();
+                try {
+                    nodeOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, nodeInstance).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read Node ", e);
+                }
+                if (nodeOptional.isPresent()) {
+                    removeAffectedLinks(nodeId, transaction);
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+                }
             }
 
             @Override
@@ -119,8 +123,16 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
-                removeAffectedLinks(tpId, transaction);
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+                Optional<TerminationPoint> terminationPointOptional = Optional.absent();
+                try {
+                    terminationPointOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, tpInstance).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read NodeConnector ", e);
+                }
+                if (terminationPointOptional.isPresent()) {
+                    removeAffectedLinks(tpId, transaction);
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
+                }
             }
 
             @Override
@@ -164,7 +176,7 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
             public void applyOperation(final ReadWriteTransaction transaction) {
                 final Link link = toTopologyLink(notification);
                 final InstanceIdentifier<Link> path = linkPath(link);
-                transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
+                transaction.put(LogicalDatastoreType.OPERATIONAL, path, link, true);
             }
 
             @Override
@@ -184,7 +196,17 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(final ReadWriteTransaction transaction) {
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+                Optional<Link> linkOptional = Optional.absent();
+                try {
+                    // read that checks if link exists (if we do not do this we might get an exception on delete)
+                    linkOptional = transaction.read(LogicalDatastoreType.OPERATIONAL,
+                            linkPath(toTopologyLink(notification))).checkedGet();
+                } catch (ReadFailedException e) {
+                    LOG.error("Error occured when trying to read Link ", e);
+                }
+                if (linkOptional.isPresent()) {
+                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
+                }
             }
 
             @Override
@@ -194,6 +216,7 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         });
     }
 
+
     @Override
     public void onLinkUtilizationNormal(final LinkUtilizationNormal notification) {
         // NOOP
@@ -212,89 +235,57 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
     }
 
     private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) {
-        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
-                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-            @Override
-            public void onSuccess(Optional<Topology> topologyOptional) {
-                removeAffectedLinks(id, topologyOptional);
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error reading topology data for topology {}", topology, throwable);
-            }
-        });
+        Optional<Topology> topologyOptional = Optional.absent();
+        try {
+            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+        } catch (ReadFailedException e) {
+            LOG.error("Error reading topology data for topology {}", topology, e);
+        }
+        if (topologyOptional.isPresent()) {
+            removeAffectedLinks(id, topologyOptional, transaction);
+        }
     }
 
-    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional) {
+    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
         if (!topologyOptional.isPresent()) {
             return;
         }
 
         List<Link> linkList = topologyOptional.get().getLink() != null ?
                 topologyOptional.get().getLink() : Collections.<Link> emptyList();
-        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceNode()) ||
                     id.equals(link.getDestination().getDestNode())) {
-                linkIDsToDelete.add(linkPath(link));
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
             }
         }
-
-        enqueueLinkDeletes(linkIDsToDelete);
-    }
-
-    private void enqueueLinkDeletes(final Collection<InstanceIdentifier<Link>> linkIDsToDelete) {
-        if(!linkIDsToDelete.isEmpty()) {
-            processor.enqueueOperation(new TopologyOperation() {
-                @Override
-                public void applyOperation(ReadWriteTransaction transaction) {
-                    for(InstanceIdentifier<Link> linkID: linkIDsToDelete) {
-                        transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID);
-                    }
-                }
-
-                @Override
-                public String toString() {
-                    return "Delete Links " + linkIDsToDelete.size();
-                }
-            });
-        }
     }
 
     private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) {
-        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
-                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-            @Override
-            public void onSuccess(Optional<Topology> topologyOptional) {
-                removeAffectedLinks(id, topologyOptional);
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {
-                LOG.error("Error reading topology data for topology {}", topology, throwable);
-            }
-        });
+        Optional<Topology> topologyOptional = Optional.absent();
+        try {
+            topologyOptional = transaction.read(LogicalDatastoreType.OPERATIONAL, topology).checkedGet();
+        } catch (ReadFailedException e) {
+            LOG.error("Error reading topology data for topology {}", topology, e);
+        }
+        if (topologyOptional.isPresent()) {
+            removeAffectedLinks(id, topologyOptional, transaction);
+        }
     }
 
-    private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional) {
+    private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional, ReadWriteTransaction transaction) {
         if (!topologyOptional.isPresent()) {
             return;
         }
 
         List<Link> linkList = topologyOptional.get().getLink() != null
                 ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
-        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
         for (Link link : linkList) {
             if (id.equals(link.getSource().getSourceTp()) ||
                     id.equals(link.getDestination().getDestTp())) {
-                linkIDsToDelete.add(linkPath(link));
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
             }
         }
-
-        enqueueLinkDeletes(linkIDsToDelete);
     }
 
     private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
index f09da0045930cf7cc843de1a924e64841f2db508..41162d30463d54338d687dfd904ef1f7646dbec6 100644 (file)
@@ -8,13 +8,6 @@
 package org.opendaylight.md.controller.topology.manager;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
@@ -25,6 +18,9 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 final class OperationProcessor implements AutoCloseable, Runnable, TransactionChainListener {
     private static final Logger LOG = LoggerFactory.getLogger(OperationProcessor.class);
     private static final int MAX_TRANSACTION_OPERATIONS = 100;
@@ -32,7 +28,7 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
 
     private final BlockingQueue<TopologyOperation> queue = new LinkedBlockingQueue<>(OPERATION_QUEUE_DEPTH);
     private final DataBroker dataBroker;
-    private final BindingTransactionChain transactionChain;
+    private BindingTransactionChain transactionChain;
 
     OperationProcessor(final DataBroker dataBroker) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -73,23 +69,32 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
 
                 LOG.debug("Processed {} operations, submitting transaction", ops);
 
-                CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
-                Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess(Void notUsed) {
-                        LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
-                    }
-
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        LOG.error("Topology export transaction {} failed", tx.getIdentifier(), throwable.getCause());
-                    }
-                });
+                try {
+                    tx.submit().checkedGet();
+                } catch (final TransactionCommitFailedException e) {
+                    LOG.warn("Stat DataStoreOperation unexpected State!", e);
+                    transactionChain.close();
+                    transactionChain = dataBroker.createTransactionChain(this);
+                    cleanDataStoreOperQueue();
+                }
             }
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted processing, terminating", e);
+        } catch (final IllegalStateException e) {
+            LOG.warn("Stat DataStoreOperation unexpected State!", e);
+            transactionChain.close();
+            transactionChain = dataBroker.createTransactionChain(this);
+            cleanDataStoreOperQueue();
+        } catch (final InterruptedException e) {
+            LOG.warn("Stat Manager DS Operation thread interupted!", e);
+        } catch (final Exception e) {
+            LOG.warn("Stat DataStore Operation executor fail!", e);
         }
 
+        // Drain all events, making sure any blocked threads are unblocked
+        cleanDataStoreOperQueue();
+
+    }
+
+    private void cleanDataStoreOperQueue() {
         // Drain all events, making sure any blocked threads are unblocked
         while (!queue.isEmpty()) {
             queue.poll();
index b7a56a489019bc56cc78cde3513d8f7a16f7bc4d..d07d42f2ec2d1d486d427d7e5dfa2762722d663f 100644 (file)
@@ -8,29 +8,11 @@
 
 package org.opendaylight.md.controller.topology.manager;
 
-import static org.junit.Assert.fail;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,17 +64,36 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 
 public class FlowCapableTopologyExporterTest {
 
@@ -135,8 +136,12 @@ public class FlowCapableTopologyExporterTest {
     @Test
     public void testOnNodeRemoved() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+        Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                nodeKey = newInvNodeKey("node1");
+                                                                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
         InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
                 org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
                 nodeKey);
@@ -154,10 +159,16 @@ public class FlowCapableTopologyExporterTest {
             };
 
         SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        readFuture.set(Optional.of(topology));
         ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
         doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
 
+        SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoNode));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
+
         CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
 
         int expDeleteCalls = expDeletedIIDs.length;
@@ -166,11 +177,7 @@ public class FlowCapableTopologyExporterTest {
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
         setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
 
-        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
-        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
-        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
-
-        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+        doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
 
         exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
 
@@ -180,20 +187,21 @@ public class FlowCapableTopologyExporterTest {
 
         waitForDeletes(expDeleteCalls, deleteLatch);
 
-        waitForSubmit(submitLatch2);
-
         assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
 
         verifyMockTx(mockTx1);
-        verifyMockTx(mockTx2);
     }
 
     @SuppressWarnings({ "rawtypes" })
     @Test
     public void testOnNodeRemovedWithNoTopology() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        InstanceIdentifier<Node> topoNodeII = topologyIID.child(Node.class, topoNodeKey);
+        Node topoNode = new NodeBuilder().setKey(topoNodeKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                nodeKey = newInvNodeKey("node1");
+                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
         InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
                 org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
                 nodeKey);
@@ -207,6 +215,11 @@ public class FlowCapableTopologyExporterTest {
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
 
+        SettableFuture<Optional<Node>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoNode));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topoNodeII);
+
         CountDownLatch deleteLatch = new CountDownLatch(1);
         ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
@@ -227,11 +240,18 @@ public class FlowCapableTopologyExporterTest {
     @Test
     public void testOnNodeConnectorRemoved() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
+
+        InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
+                .child(TerminationPoint.class, terminationPointKey);
+        TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                  nodeKey = newInvNodeKey("node1");
+                                                                  nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
-                newInvNodeConnKey("tp1");
+                newInvNodeConnKey(terminationPointKey.getTpId().getValue());
 
         InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
 
@@ -249,10 +269,16 @@ public class FlowCapableTopologyExporterTest {
             };
 
         final SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        readFuture.set(Optional.of(topology));
         ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
         doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
 
+        SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoTermPoint));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+
         CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
 
         int expDeleteCalls = expDeletedIIDs.length;
@@ -261,11 +287,8 @@ public class FlowCapableTopologyExporterTest {
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
         setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
 
-        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
-        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
-        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
 
-        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+        doReturn(mockTx1).when(mockTxChain).newReadWriteTransaction();
 
         exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
                 new NodeConnectorRef(invNodeConnID)).build());
@@ -276,23 +299,27 @@ public class FlowCapableTopologyExporterTest {
 
         waitForDeletes(expDeleteCalls, deleteLatch);
 
-        waitForSubmit(submitLatch2);
-
         assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
 
         verifyMockTx(mockTx1);
-        verifyMockTx(mockTx2);
     }
 
     @SuppressWarnings("rawtypes")
     @Test
     public void testOnNodeConnectorRemovedWithNoTopology() {
 
+        NodeKey topoNodeKey = new NodeKey(new NodeId("node1"));
+        TerminationPointKey terminationPointKey = new TerminationPointKey(new TpId("tp1"));
+
+        InstanceIdentifier<TerminationPoint> topoTermPointII = topologyIID.child(Node.class, topoNodeKey)
+                .child(TerminationPoint.class, terminationPointKey);
+        TerminationPoint topoTermPoint = new TerminationPointBuilder().setKey(terminationPointKey).build();
+
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
-                                                                  nodeKey = newInvNodeKey("node1");
+                nodeKey = newInvNodeKey(topoNodeKey.getNodeId().getValue());
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
-                newInvNodeConnKey("tp1");
+                newInvNodeConnKey(terminationPointKey.getTpId().getValue());
 
         InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
 
@@ -306,6 +333,11 @@ public class FlowCapableTopologyExporterTest {
                 .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
 
+        SettableFuture<Optional<TerminationPoint>> readFutureNode = SettableFuture.create();
+        readFutureNode.set(Optional.of(topoTermPoint));
+        doReturn(Futures.makeChecked(readFutureNode, ReadFailedException.MAPPER)).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topoTermPointII);
+
         CountDownLatch deleteLatch = new CountDownLatch(1);
         ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
                 ArgumentCaptor.forClass(InstanceIdentifier.class);
@@ -510,8 +542,8 @@ public class FlowCapableTopologyExporterTest {
         waitForSubmit(submitLatch);
 
         ArgumentCaptor<Link> mergedNode = ArgumentCaptor.forClass(Link.class);
-        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
-                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
+        verify(mockTx).put(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
+                        Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
                 mergedNode.capture(), eq(true));
         assertEquals("Source node ID", "sourceNode",
                 mergedNode.getValue().getSource().getSourceNode().getValue());
@@ -524,7 +556,7 @@ public class FlowCapableTopologyExporterTest {
     }
 
     @Test
-    public void testOnLinkRemoved() {
+    public void testOnLinkRemovedLinkExists() {
 
         org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
                 sourceNodeKey = newInvNodeKey("sourceNode");
@@ -538,13 +570,18 @@ public class FlowCapableTopologyExporterTest {
                 destNodeConnKey = newInvNodeConnKey("destTP");
         InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
 
+        Link link = newLink(sourceNodeConnKey.getId().getValue(), newSourceTp(sourceNodeConnKey.getId().getValue()),
+                newDestTp(destNodeConnKey.getId().getValue()));
+
         ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
         CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
         doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+        doReturn(Futures.immediateCheckedFuture(Optional.of(link))).when(mockTx).read(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
 
         exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
                 new NodeConnectorRef(sourceConnID)).setDestination(
-                        new NodeConnectorRef(destConnID)).build());
+                new NodeConnectorRef(destConnID)).build());
 
         waitForSubmit(submitLatch);
 
@@ -552,6 +589,37 @@ public class FlowCapableTopologyExporterTest {
                 Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
     }
 
+    @Test
+    public void testOnLinkRemovedLinkDoesNotExist() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                sourceNodeKey = newInvNodeKey("sourceNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+        InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                destNodeKey = newInvNodeKey("destNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                destNodeConnKey = newInvNodeConnKey("destTP");
+        InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+        doReturn(Futures.immediateCheckedFuture(Optional.<Link>absent())).when(mockTx).read(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+
+        exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
+                new NodeConnectorRef(sourceConnID)).setDestination(
+                new NodeConnectorRef(destConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        verify(mockTx, never()).delete(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+    }
+
     private void verifyMockTx(ReadWriteTransaction mockTx) {
         InOrder inOrder = inOrder(mockTx);
         inOrder.verify(mockTx, atLeast(0)).submit();