Bug 1688: Fix IllegalStateEx from FlowCapableTopologyExporter 58/11058/5
authortpantelis <tpanteli@brocade.com>
Sat, 30 Aug 2014 09:29:31 +0000 (05:29 -0400)
committertpantelis <tpanteli@brocade.com>
Mon, 1 Sep 2014 11:26:26 +0000 (07:26 -0400)
Modified code to enqueue the deletes on the OperationProcessor in
removeAffectedLinks.

Also added unit tests..

Change-Id: Ie7764303e6c18ff020567739e17e51aed76a8762
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/topology-manager/pom.xml
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporter.java
opendaylight/md-sal/topology-manager/src/main/java/org/opendaylight/md/controller/topology/manager/OperationProcessor.java
opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java [new file with mode: 0644]

index fe1813a19943dfe7f3446b55f9c49b1986df6729..57313d2948960b038791d75a5fdb9287cce41530 100644 (file)
       <artifactId>org.osgi.core</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index c1996f4691632637abc9fc7dffacce0bcb12f2ad..361373d78da93f114f51c887ab95c78bc6ab3265 100644 (file)
@@ -15,9 +15,9 @@ import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMap
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNode;
 import static org.opendaylight.md.controller.topology.manager.FlowCapableNodeMapping.toTopologyNodeId;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -50,17 +50,19 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 
 class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, OpendaylightInventoryListener {
 
-    private final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
+    private static final Logger LOG = LoggerFactory.getLogger(FlowCapableTopologyExporter.class);
     private final InstanceIdentifier<Topology> topology;
     private final OperationProcessor processor;
 
-    FlowCapableTopologyExporter(final OperationProcessor processor, final InstanceIdentifier<Topology> topology) {
+    FlowCapableTopologyExporter(final OperationProcessor processor,
+            final InstanceIdentifier<Topology> topology) {
         this.processor = Preconditions.checkNotNull(processor);
         this.topology = Preconditions.checkNotNull(topology);
     }
@@ -73,15 +75,14 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
 
         processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                removeAffectedLinks(nodeId);
+            public void applyOperation(ReadWriteTransaction transaction) {
+                removeAffectedLinks(nodeId, transaction);
+                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
             }
-        });
 
-        processor.enqueueOperation(new TopologyOperation() {
             @Override
-            public void applyOperation(ReadWriteTransaction transaction) {
-                transaction.delete(LogicalDatastoreType.OPERATIONAL, nodeInstance);
+            public String toString() {
+                return "onNodeRemoved";
             }
         });
     }
@@ -97,6 +98,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
                     final InstanceIdentifier<Node> path = getNodePath(toTopologyNodeId(notification.getId()));
                     transaction.merge(LogicalDatastoreType.OPERATIONAL, path, node, true);
                 }
+
+                @Override
+                public String toString() {
+                    return "onNodeUpdated";
+                }
             });
         }
     }
@@ -104,28 +110,30 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
     @Override
     public void onNodeConnectorRemoved(final NodeConnectorRemoved notification) {
 
-        final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(notification
-                .getNodeConnectorRef());
+        final InstanceIdentifier<TerminationPoint> tpInstance = toTerminationPointIdentifier(
+                notification.getNodeConnectorRef());
 
-        processor.enqueueOperation(new TopologyOperation() {
-            @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                final TpId tpId = toTerminationPointId(getNodeConnectorKey(notification.getNodeConnectorRef()).getId());
-                removeAffectedLinks(tpId);
-            }
-        });
+        final TpId tpId = toTerminationPointId(getNodeConnectorKey(
+                notification.getNodeConnectorRef()).getId());
 
         processor.enqueueOperation(new TopologyOperation() {
             @Override
             public void applyOperation(ReadWriteTransaction transaction) {
+                removeAffectedLinks(tpId, transaction);
                 transaction.delete(LogicalDatastoreType.OPERATIONAL, tpInstance);
             }
+
+            @Override
+            public String toString() {
+                return "onNodeConnectorRemoved";
+            }
         });
     }
 
     @Override
     public void onNodeConnectorUpdated(final NodeConnectorUpdated notification) {
-        final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(FlowCapableNodeConnectorUpdated.class);
+        final FlowCapableNodeConnectorUpdated fcncu = notification.getAugmentation(
+                FlowCapableNodeConnectorUpdated.class);
         if (fcncu != null) {
             processor.enqueueOperation(new TopologyOperation() {
                 @Override
@@ -137,9 +145,14 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
                     transaction.merge(LogicalDatastoreType.OPERATIONAL, path, point, true);
                     if ((fcncu.getState() != null && fcncu.getState().isLinkDown())
                             || (fcncu.getConfiguration() != null && fcncu.getConfiguration().isPORTDOWN())) {
-                        removeAffectedLinks(point.getTpId());
+                        removeAffectedLinks(point.getTpId(), transaction);
                     }
                 }
+
+                @Override
+                public String toString() {
+                    return "onNodeConnectorUpdated";
+                }
             });
         }
     }
@@ -153,6 +166,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
                 final InstanceIdentifier<Link> path = linkPath(link);
                 transaction.merge(LogicalDatastoreType.OPERATIONAL, path, link, true);
             }
+
+            @Override
+            public String toString() {
+                return "onLinkDiscovered";
+            }
         });
     }
 
@@ -168,6 +186,11 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
             public void applyOperation(final ReadWriteTransaction transaction) {
                 transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(toTopologyLink(notification)));
             }
+
+            @Override
+            public String toString() {
+                return "onLinkRemoved";
+            }
         });
     }
 
@@ -188,62 +211,92 @@ class FlowCapableTopologyExporter implements FlowTopologyDiscoveryListener, Open
         return tpPath(toTopologyNodeId(invNodeKey.getId()), toTerminationPointId(invNodeConnectorKey.getId()));
     }
 
-    private void removeAffectedLinks(final NodeId id) {
-        processor.enqueueOperation(new TopologyOperation() {
+    private void removeAffectedLinks(final NodeId id, final ReadWriteTransaction transaction) {
+        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
+                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
             @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-                Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-                    @Override
-                    public void onSuccess(Optional<Topology> topologyOptional) {
-                        if (topologyOptional.isPresent()) {
-                            List<Link> linkList = topologyOptional.get().getLink() != null
-                                    ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
-                            for (Link link : linkList) {
-                                if (id.equals(link.getSource().getSourceNode()) || id.equals(link.getDestination().getDestNode())) {
-                                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
-                                }
-                            }
-                        }
-                    }
+            public void onSuccess(Optional<Topology> topologyOptional) {
+                removeAffectedLinks(id, topologyOptional);
+            }
 
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        LOG.error("Error reading topology data for topology {}", topology, throwable);
-                    }
-                });
+            @Override
+            public void onFailure(Throwable throwable) {
+                LOG.error("Error reading topology data for topology {}", topology, throwable);
             }
         });
     }
 
-    private void removeAffectedLinks(final TpId id) {
-        processor.enqueueOperation(new TopologyOperation() {
-            @Override
-            public void applyOperation(final ReadWriteTransaction transaction) {
-                CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture = transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
-                Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
-                    @Override
-                    public void onSuccess(Optional<Topology> topologyOptional) {
-                        if (topologyOptional.isPresent()) {
-                            List<Link> linkList = topologyOptional.get().getLink() != null
-                                    ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
-                            for (Link link : linkList) {
-                                if (id.equals(link.getSource().getSourceTp()) || id.equals(link.getDestination().getDestTp())) {
-                                    transaction.delete(LogicalDatastoreType.OPERATIONAL, linkPath(link));
-                                }
-                            }
-                        }
-                    }
+    private void removeAffectedLinks(final NodeId id, Optional<Topology> topologyOptional) {
+        if (!topologyOptional.isPresent()) {
+            return;
+        }
+
+        List<Link> linkList = topologyOptional.get().getLink() != null ?
+                topologyOptional.get().getLink() : Collections.<Link> emptyList();
+        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
+        for (Link link : linkList) {
+            if (id.equals(link.getSource().getSourceNode()) ||
+                    id.equals(link.getDestination().getDestNode())) {
+                linkIDsToDelete.add(linkPath(link));
+            }
+        }
+
+        enqueueLinkDeletes(linkIDsToDelete);
+    }
 
-                    @Override
-                    public void onFailure(Throwable throwable) {
-                        LOG.error("Error reading topology data for topology {}", topology, throwable);
+    private void enqueueLinkDeletes(final Collection<InstanceIdentifier<Link>> linkIDsToDelete) {
+        if(!linkIDsToDelete.isEmpty()) {
+            processor.enqueueOperation(new TopologyOperation() {
+                @Override
+                public void applyOperation(ReadWriteTransaction transaction) {
+                    for(InstanceIdentifier<Link> linkID: linkIDsToDelete) {
+                        transaction.delete(LogicalDatastoreType.OPERATIONAL, linkID);
                     }
-                });
+                }
+
+                @Override
+                public String toString() {
+                    return "Delete Links " + linkIDsToDelete.size();
+                }
+            });
+        }
+    }
+
+    private void removeAffectedLinks(final TpId id, final ReadWriteTransaction transaction) {
+        CheckedFuture<Optional<Topology>, ReadFailedException> topologyDataFuture =
+                transaction.read(LogicalDatastoreType.OPERATIONAL, topology);
+        Futures.addCallback(topologyDataFuture, new FutureCallback<Optional<Topology>>() {
+            @Override
+            public void onSuccess(Optional<Topology> topologyOptional) {
+                removeAffectedLinks(id, topologyOptional);
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                LOG.error("Error reading topology data for topology {}", topology, throwable);
             }
         });
     }
 
+    private void removeAffectedLinks(final TpId id, Optional<Topology> topologyOptional) {
+        if (!topologyOptional.isPresent()) {
+            return;
+        }
+
+        List<Link> linkList = topologyOptional.get().getLink() != null
+                ? topologyOptional.get().getLink() : Collections.<Link> emptyList();
+        final List<InstanceIdentifier<Link>> linkIDsToDelete = Lists.newArrayList();
+        for (Link link : linkList) {
+            if (id.equals(link.getSource().getSourceTp()) ||
+                    id.equals(link.getDestination().getDestTp())) {
+                linkIDsToDelete.add(linkPath(link));
+            }
+        }
+
+        enqueueLinkDeletes(linkIDsToDelete);
+    }
+
     private InstanceIdentifier<Node> getNodePath(final NodeId nodeId) {
         return topology.child(Node.class, new NodeKey(nodeId));
     }
index 1cf648eb975c521d6c81e22b927e8f276f888e99..f09da0045930cf7cc843de1a924e64841f2db508 100644 (file)
@@ -11,14 +11,17 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,9 +53,9 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
             for (; ; ) {
                 TopologyOperation op = queue.take();
 
-                LOG.debug("New operations available, starting transaction");
-                final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
+                LOG.debug("New {} operation available, starting transaction", op);
 
+                final ReadWriteTransaction tx = transactionChain.newReadWriteTransaction();
 
                 int ops = 0;
                 do {
@@ -64,14 +67,16 @@ final class OperationProcessor implements AutoCloseable, Runnable, TransactionCh
                     } else {
                         op = null;
                     }
+
+                    LOG.debug("Next operation {}", op);
                 } while (op != null);
 
                 LOG.debug("Processed {} operations, submitting transaction", ops);
 
-                final CheckedFuture txResultFuture = tx.submit();
-                Futures.addCallback(txResultFuture, new FutureCallback() {
+                CheckedFuture<Void, TransactionCommitFailedException> txResultFuture = tx.submit();
+                Futures.addCallback(txResultFuture, new FutureCallback<Void>() {
                     @Override
-                    public void onSuccess(Object o) {
+                    public void onSuccess(Void notUsed) {
                         LOG.debug("Topology export successful for tx :{}", tx.getIdentifier());
                     }
 
diff --git a/opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java b/opendaylight/md-sal/topology-manager/src/test/java/org/opendaylight/md/controller/topology/manager/FlowCapableTopologyExporterTest.java
new file mode 100644 (file)
index 0000000..b7a56a4
--- /dev/null
@@ -0,0 +1,666 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.md.controller.topology.manager;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnectorUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdated;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkDiscoveredBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.topology.discovery.rev130819.LinkRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.PortConfig;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.port.rev130925.flow.capable.port.StateBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.topology.inventory.rev131030.InventoryNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.model.topology.inventory.rev131030.InventoryNodeConnector;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.LinkId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TpId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.Destination;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.DestinationBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.Source;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.link.attributes.SourceBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Link;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.LinkKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPoint;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.node.TerminationPointKey;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class FlowCapableTopologyExporterTest {
+
+    @Mock
+    private DataBroker mockDataBroker;
+
+    @Mock
+    private BindingTransactionChain mockTxChain;
+
+    private OperationProcessor processor;
+
+    private FlowCapableTopologyExporter exporter;
+
+    private InstanceIdentifier<Topology> topologyIID;
+
+    private final ExecutorService executor = Executors.newFixedThreadPool(1);
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        doReturn(mockTxChain).when(mockDataBroker)
+                .createTransactionChain(any(TransactionChainListener.class));
+
+        processor = new OperationProcessor(mockDataBroker);
+
+        topologyIID = InstanceIdentifier.create(NetworkTopology.class)
+                .child(Topology.class, new TopologyKey(new TopologyId("test")));
+        exporter = new FlowCapableTopologyExporter(processor, topologyIID);
+
+        executor.execute(processor);
+    }
+
+    @After
+    public void tearDown() {
+        executor.shutdownNow();
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    @Test
+    public void testOnNodeRemoved() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                nodeKey = newInvNodeKey("node1");
+        InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+                org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+                nodeKey);
+
+        List<Link> linkList = Arrays.asList(
+                newLink("link1", newSourceNode("node1"), newDestNode("dest")),
+                newLink("link2", newSourceNode("source"), newDestNode("node1")),
+                newLink("link2", newSourceNode("source2"), newDestNode("dest2")));
+        final Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+        InstanceIdentifier[] expDeletedIIDs = {
+                topologyIID.child(Link.class, linkList.get(0).getKey()),
+                topologyIID.child(Link.class, linkList.get(1).getKey()),
+                topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+            };
+
+        SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
+        doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+
+        CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
+
+        int expDeleteCalls = expDeletedIIDs.length;
+        CountDownLatch deleteLatch = new CountDownLatch(expDeleteCalls);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
+
+        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
+        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
+        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
+
+        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
+
+        waitForSubmit(submitLatch1);
+
+        setReadFutureAsync(topology, readFuture);
+
+        waitForDeletes(expDeleteCalls, deleteLatch);
+
+        waitForSubmit(submitLatch2);
+
+        assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+
+        verifyMockTx(mockTx1);
+        verifyMockTx(mockTx2);
+    }
+
+    @SuppressWarnings({ "rawtypes" })
+    @Test
+    public void testOnNodeRemovedWithNoTopology() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                nodeKey = newInvNodeKey("node1");
+        InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+                org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+                nodeKey);
+
+        InstanceIdentifier[] expDeletedIIDs = {
+                topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+            };
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+
+        CountDownLatch deleteLatch = new CountDownLatch(1);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeRemoved(new NodeRemovedBuilder().setNodeRef(new NodeRef(invNodeID)).build());
+
+        waitForSubmit(submitLatch);
+
+        waitForDeletes(1, deleteLatch);
+
+        assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorRemoved() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                  nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        List<Link> linkList = Arrays.asList(
+                newLink("link1", newSourceTp("tp1"), newDestTp("dest")),
+                newLink("link2", newSourceTp("source"), newDestTp("tp1")),
+                newLink("link3", newSourceTp("source2"), newDestTp("dest2")));
+        final Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+        InstanceIdentifier[] expDeletedIIDs = {
+                topologyIID.child(Link.class, linkList.get(0).getKey()),
+                topologyIID.child(Link.class, linkList.get(1).getKey()),
+                topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+                        .child(TerminationPoint.class, new TerminationPointKey(new TpId("tp1")))
+            };
+
+        final SettableFuture<Optional<Topology>> readFuture = SettableFuture.create();
+        ReadWriteTransaction mockTx1 = mock(ReadWriteTransaction.class);
+        doReturn(Futures.makeChecked(readFuture, ReadFailedException.MAPPER)).when(mockTx1)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+
+        CountDownLatch submitLatch1 = setupStubbedSubmit(mockTx1);
+
+        int expDeleteCalls = expDeletedIIDs.length;
+        CountDownLatch deleteLatch = new CountDownLatch(expDeleteCalls);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx1, deletedLinkIDs, deleteLatch);
+
+        ReadWriteTransaction mockTx2 = mock(ReadWriteTransaction.class);
+        setupStubbedDeletes(mockTx2, deletedLinkIDs, deleteLatch);
+        CountDownLatch submitLatch2 = setupStubbedSubmit(mockTx2);
+
+        doReturn(mockTx1).doReturn(mockTx2).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).build());
+
+        waitForSubmit(submitLatch1);
+
+        setReadFutureAsync(topology, readFuture);
+
+        waitForDeletes(expDeleteCalls, deleteLatch);
+
+        waitForSubmit(submitLatch2);
+
+        assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+
+        verifyMockTx(mockTx1);
+        verifyMockTx(mockTx2);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorRemovedWithNoTopology() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                  nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        InstanceIdentifier[] expDeletedIIDs = {
+                topologyIID.child(Node.class, new NodeKey(new NodeId("node1")))
+                        .child(TerminationPoint.class, new TerminationPointKey(new TpId("tp1")))
+            };
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.absent())).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+
+        CountDownLatch deleteLatch = new CountDownLatch(1);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorRemoved(new NodeConnectorRemovedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        waitForDeletes(1, deleteLatch);
+
+        assertDeletedIDs(expDeletedIIDs, deletedLinkIDs);
+    }
+
+    @Test
+    public void testOnNodeUpdated() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                            nodeKey = newInvNodeKey("node1");
+        InstanceIdentifier<?> invNodeID = InstanceIdentifier.create(Nodes.class).child(
+                org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+                nodeKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeUpdated(new NodeUpdatedBuilder().setNodeRef(new NodeRef(invNodeID))
+                .setId(nodeKey.getId()).addAugmentation(FlowCapableNodeUpdated.class,
+                        new FlowCapableNodeUpdatedBuilder().build()).build());
+
+        waitForSubmit(submitLatch);
+
+        ArgumentCaptor<Node> mergedNode = ArgumentCaptor.forClass(Node.class);
+        NodeId expNodeId = new NodeId("node1");
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(Node.class,
+                new NodeKey(expNodeId))), mergedNode.capture(), eq(true));
+        assertEquals("getNodeId", expNodeId, mergedNode.getValue().getNodeId());
+        InventoryNode augmentation = mergedNode.getValue().getAugmentation(InventoryNode.class);
+        assertNotNull("Missing augmentation", augmentation);
+        assertEquals("getInventoryNodeRef", new NodeRef(invNodeID), augmentation.getInventoryNodeRef());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorUpdated() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                 nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+                        FlowCapableNodeConnectorUpdated.class,
+                        new FlowCapableNodeConnectorUpdatedBuilder().build()).build());
+
+        waitForSubmit(submitLatch);
+
+        ArgumentCaptor<TerminationPoint> mergedNode = ArgumentCaptor.forClass(TerminationPoint.class);
+        NodeId expNodeId = new NodeId("node1");
+        TpId expTpId = new TpId("tp1");
+        InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+                Node.class, new NodeKey(expNodeId)).child(TerminationPoint.class,
+                        new TerminationPointKey(expTpId));
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+                mergedNode.capture(), eq(true));
+        assertEquals("getTpId", expTpId, mergedNode.getValue().getTpId());
+        InventoryNodeConnector augmentation = mergedNode.getValue().getAugmentation(
+                InventoryNodeConnector.class);
+        assertNotNull("Missing augmentation", augmentation);
+        assertEquals("getInventoryNodeConnectorRef", new NodeConnectorRef(invNodeConnID),
+                augmentation.getInventoryNodeConnectorRef());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorUpdatedWithLinkStateDown() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                 nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        List<Link> linkList = Arrays.asList(newLink("link1", newSourceTp("tp1"), newDestTp("dest")));
+        Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.of(topology))).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+        setupStubbedSubmit(mockTx);
+
+        CountDownLatch deleteLatch = new CountDownLatch(1);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+                        FlowCapableNodeConnectorUpdated.class,
+                        new FlowCapableNodeConnectorUpdatedBuilder().setState(
+                                new StateBuilder().setLinkDown(true).build()).build()).build());
+
+        waitForDeletes(1, deleteLatch);
+
+        InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+                Node.class, new NodeKey(new NodeId("node1"))).child(TerminationPoint.class,
+                        new TerminationPointKey(new TpId("tp1")));
+
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+                any(TerminationPoint.class), eq(true));
+
+        assertDeletedIDs(new InstanceIdentifier[]{topologyIID.child(Link.class,
+                linkList.get(0).getKey())}, deletedLinkIDs);
+    }
+
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testOnNodeConnectorUpdatedWithPortDown() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                 nodeKey = newInvNodeKey("node1");
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey =
+                newInvNodeConnKey("tp1");
+
+        InstanceIdentifier<?> invNodeConnID = newNodeConnID(nodeKey, ncKey);
+
+        List<Link> linkList = Arrays.asList(newLink("link1", newSourceTp("tp1"), newDestTp("dest")));
+        Topology topology = new TopologyBuilder().setLink(linkList).build();
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        doReturn(Futures.immediateCheckedFuture(Optional.of(topology))).when(mockTx)
+                .read(LogicalDatastoreType.OPERATIONAL, topologyIID);
+        setupStubbedSubmit(mockTx);
+
+        CountDownLatch deleteLatch = new CountDownLatch(1);
+        ArgumentCaptor<InstanceIdentifier> deletedLinkIDs =
+                ArgumentCaptor.forClass(InstanceIdentifier.class);
+        setupStubbedDeletes(mockTx, deletedLinkIDs, deleteLatch);
+
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onNodeConnectorUpdated(new NodeConnectorUpdatedBuilder().setNodeConnectorRef(
+                new NodeConnectorRef(invNodeConnID)).setId(ncKey.getId()).addAugmentation(
+                        FlowCapableNodeConnectorUpdated.class,
+                        new FlowCapableNodeConnectorUpdatedBuilder().setConfiguration(
+                                new PortConfig(true, true, true, true)).build()).build());
+
+        waitForDeletes(1, deleteLatch);
+
+        InstanceIdentifier<TerminationPoint> expTpPath = topologyIID.child(
+                Node.class, new NodeKey(new NodeId("node1"))).child(TerminationPoint.class,
+                        new TerminationPointKey(new TpId("tp1")));
+
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(expTpPath),
+                any(TerminationPoint.class), eq(true));
+
+        assertDeletedIDs(new InstanceIdentifier[]{topologyIID.child(Link.class,
+                linkList.get(0).getKey())}, deletedLinkIDs);
+    }
+
+    @Test
+    public void testOnLinkDiscovered() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                sourceNodeKey = newInvNodeKey("sourceNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+        InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                destNodeKey = newInvNodeKey("destNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                destNodeConnKey = newInvNodeConnKey("destTP");
+        InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onLinkDiscovered(new LinkDiscoveredBuilder().setSource(
+                new NodeConnectorRef(sourceConnID)).setDestination(
+                        new NodeConnectorRef(destConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        ArgumentCaptor<Link> mergedNode = ArgumentCaptor.forClass(Link.class);
+        verify(mockTx).merge(eq(LogicalDatastoreType.OPERATIONAL), eq(topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId())))),
+                mergedNode.capture(), eq(true));
+        assertEquals("Source node ID", "sourceNode",
+                mergedNode.getValue().getSource().getSourceNode().getValue());
+        assertEquals("Dest TP ID", "sourceTP",
+                mergedNode.getValue().getSource().getSourceTp().getValue());
+        assertEquals("Dest node ID", "destNode",
+                mergedNode.getValue().getDestination().getDestNode().getValue());
+        assertEquals("Dest TP ID", "destTP",
+                mergedNode.getValue().getDestination().getDestTp().getValue());
+    }
+
+    @Test
+    public void testOnLinkRemoved() {
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                sourceNodeKey = newInvNodeKey("sourceNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                sourceNodeConnKey = newInvNodeConnKey("sourceTP");
+        InstanceIdentifier<?> sourceConnID = newNodeConnID(sourceNodeKey, sourceNodeConnKey);
+
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                destNodeKey = newInvNodeKey("destNode");
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey
+                destNodeConnKey = newInvNodeConnKey("destTP");
+        InstanceIdentifier<?> destConnID = newNodeConnID(destNodeKey, destNodeConnKey);
+
+        ReadWriteTransaction mockTx = mock(ReadWriteTransaction.class);
+        CountDownLatch submitLatch = setupStubbedSubmit(mockTx);
+        doReturn(mockTx).when(mockTxChain).newReadWriteTransaction();
+
+        exporter.onLinkRemoved(new LinkRemovedBuilder().setSource(
+                new NodeConnectorRef(sourceConnID)).setDestination(
+                        new NodeConnectorRef(destConnID)).build());
+
+        waitForSubmit(submitLatch);
+
+        verify(mockTx).delete(LogicalDatastoreType.OPERATIONAL, topologyIID.child(
+                Link.class, new LinkKey(new LinkId(sourceNodeConnKey.getId()))));
+    }
+
+    private void verifyMockTx(ReadWriteTransaction mockTx) {
+        InOrder inOrder = inOrder(mockTx);
+        inOrder.verify(mockTx, atLeast(0)).submit();
+        inOrder.verify(mockTx, never()).delete(eq(LogicalDatastoreType.OPERATIONAL),
+              any(InstanceIdentifier.class));
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void assertDeletedIDs(InstanceIdentifier[] expDeletedIIDs,
+            ArgumentCaptor<InstanceIdentifier> deletedLinkIDs) {
+        Set<InstanceIdentifier> actualIIDs = new HashSet<>(deletedLinkIDs.getAllValues());
+        for(InstanceIdentifier id: expDeletedIIDs) {
+            assertTrue("Missing expected deleted IID " + id, actualIIDs.contains(id));
+        }
+    }
+
+    private void setReadFutureAsync(final Topology topology,
+            final SettableFuture<Optional<Topology>> readFuture) {
+        new Thread() {
+            @Override
+            public void run() {
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+                readFuture.set(Optional.of(topology));
+            }
+
+        }.start();
+    }
+
+    private void waitForSubmit(CountDownLatch latch) {
+        assertEquals("Transaction submitted", true,
+                Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
+    }
+
+    private void waitForDeletes(int expDeleteCalls, final CountDownLatch latch) {
+        boolean done = Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS);
+        if(!done) {
+            fail("Expected " + expDeleteCalls + " delete calls. Actual: " +
+                    (expDeleteCalls - latch.getCount()));
+        }
+    }
+
+    private CountDownLatch setupStubbedSubmit(ReadWriteTransaction mockTx) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        doAnswer(new Answer<CheckedFuture<Void, TransactionCommitFailedException>>() {
+            @Override
+            public CheckedFuture<Void, TransactionCommitFailedException> answer(
+                                                            InvocationOnMock invocation) {
+                latch.countDown();
+                return Futures.immediateCheckedFuture(null);
+            }
+        }).when(mockTx).submit();
+
+        return latch;
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void setupStubbedDeletes(ReadWriteTransaction mockTx,
+            ArgumentCaptor<InstanceIdentifier> deletedLinkIDs, final CountDownLatch latch) {
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) {
+                latch.countDown();
+                return null;
+            }
+        }).when(mockTx).delete(eq(LogicalDatastoreType.OPERATIONAL), deletedLinkIDs.capture());
+    }
+
+    private org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey
+                                                                        newInvNodeKey(String id) {
+        org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey nodeKey =
+                new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey(
+                        new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.
+                                                                      rev130819.NodeId(id));
+        return nodeKey;
+    }
+
+    private NodeConnectorKey newInvNodeConnKey(String id) {
+        return new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey(
+                new org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.
+                                                               NodeConnectorId(id));
+    }
+
+    private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> newNodeConnID(
+            org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey nodeKey,
+            org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey ncKey) {
+        return InstanceIdentifier.create(Nodes.class).child(
+                org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class,
+                nodeKey).child(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.
+                        rev130819.node.NodeConnector.class, ncKey);
+    }
+
+    private Link newLink(String id, Source source, Destination dest) {
+        return new LinkBuilder().setLinkId(new LinkId(id))
+                .setSource(source).setDestination(dest).build();
+    }
+
+    private Destination newDestTp(String id) {
+        return new DestinationBuilder().setDestTp(new TpId(id)).build();
+    }
+
+    private Source newSourceTp(String id) {
+        return new SourceBuilder().setSourceTp(new TpId(id)).build();
+    }
+
+    private Destination newDestNode(String id) {
+        return new DestinationBuilder().setDestNode(new NodeId(id)).build();
+    }
+
+    private Source newSourceNode(String id) {
+        return new SourceBuilder().setSourceNode(new NodeId(id)).build();
+    }
+}