NetconfTopologyManager and UT mods/improvements 21/71921/2
authorTom Pantelis <tompantelis@gmail.com>
Fri, 11 May 2018 01:57:44 +0000 (21:57 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Fri, 11 May 2018 02:25:38 +0000 (22:25 -0400)
NetconfTopologyManagerTest:

 - separated the test cases for successful and failed CSS registration
   to make it easier to understand/follow.

 - added additional scenarios/coverage/verification for DTCL
   create/update/replace/delete notifications.

 - removed the manipulation and direct verification of the
   NetconfTopologyManager internal maps via reflection. These
   are internal implementation details that the UTs shouldn't
   have knowledge of. The maps are indirectly verified via
   verification of the stored mocks.

 - used the real DataBroker to verify the initilization of the
   Topology nodes. This is easier than mocking a write trannsaction
   correctly.

NetconfTopologyManager:

 - changed the Maps to ConcurrentHashMap as they are accessed by
   multiple threads and could even be accessed concurrently.

 - added a hook for the UTs to create mock NetconfTopologyContexts

 - if CSS registration fails, it should just close the NetconfTopologyContext
   and not itself.

 - other minor changes.

NetconfTopologyContext:

 - implement AutoCloseable and rename closeFinal to the standard close.

 - Renamed the AtomicBooleans for clarity.

Change-Id: Id095a4ff9395d68912a92b0b59d725e062ddd9fc
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
netconf/netconf-topology-singleton/pom.xml
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyContext.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManager.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfTopologyManagerTest.java

index 2ed1044b38d02b773712ad472653f3558e355dcd..9f4fa8f7edd8fa3403dcd44121887d4fe2fcfc5c 100644 (file)
             <groupId>org.opendaylight.mdsal.model</groupId>
             <artifactId>ietf-topology</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-broker-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.controller</groupId>
+            <artifactId>sal-binding-broker-impl</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.mockito</groupId>
index 11cd117b1ec3501015fe2689704d3762c1bda52a..d8df4c36f5f97a4672e9563cce20a011c2710872 100644 (file)
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-class NetconfTopologyContext implements ClusterSingletonService {
+class NetconfTopologyContext implements ClusterSingletonService, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyContext.class);
 
@@ -47,8 +47,8 @@ class NetconfTopologyContext implements ClusterSingletonService {
     private RemoteDeviceConnector remoteDeviceConnector;
     private NetconfNodeManager netconfNodeManager;
     private ActorRef masterActorRef;
-    private final AtomicBoolean finalClose = new AtomicBoolean(false);
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
     private volatile boolean isMaster;
 
     NetconfTopologyContext(final NetconfTopologySetup netconfTopologyDeviceSetup,
@@ -80,7 +80,7 @@ class NetconfTopologyContext implements ClusterSingletonService {
             netconfNodeManager = null;
         }
 
-        if (!finalClose.get()) {
+        if (!closed.get()) {
             final String masterAddress =
                     Cluster.get(netconfTopologyDeviceSetup.getActorSystem()).selfAddress().toString();
             masterActorRef = netconfTopologyDeviceSetup.getActorSystem().actorOf(NetconfNodeActor.props(
@@ -97,13 +97,13 @@ class NetconfTopologyContext implements ClusterSingletonService {
     @Override
     public ListenableFuture<Void> closeServiceInstance() {
 
-        if (!finalClose.get()) {
+        if (!closed.get()) {
             // in case that master changes role to slave, new NodeDeviceManager must be created and listener registered
             netconfNodeManager = createNodeDeviceManager();
         }
         stopDeviceConnectorAndActor();
 
-        return Futures.immediateCheckedFuture(null);
+        return Futures.immediateFuture(null);
     }
 
     @Override
@@ -120,8 +120,9 @@ class NetconfTopologyContext implements ClusterSingletonService {
         return ndm;
     }
 
-    void closeFinal() throws Exception {
-        if (!finalClose.compareAndSet(false, true)) {
+    @Override
+    public void close() throws Exception {
+        if (!closed.compareAndSet(false, true)) {
             return;
         }
 
@@ -168,7 +169,7 @@ class NetconfTopologyContext implements ClusterSingletonService {
     }
 
     private void stopDeviceConnectorAndActor() {
-        if (!closed.compareAndSet(false, true)) {
+        if (!stopped.compareAndSet(false, true)) {
             return;
         }
         if (remoteDeviceConnector != null) {
index 82e340d9171649d57752af5c733b570764542f13..4096153d1237d3e7bb8f32c349763d8f46bf187a 100644 (file)
@@ -10,14 +10,15 @@ package org.opendaylight.netconf.topology.singleton.impl;
 
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.util.concurrent.EventExecutor;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
@@ -62,9 +63,9 @@ public class NetconfTopologyManager
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
 
-    private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new HashMap<>();
+    private final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts = new ConcurrentHashMap<>();
     private final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>
-            clusterRegistrations = new HashMap<>();
+            clusterRegistrations = new ConcurrentHashMap<>();
 
     private final DataBroker dataBroker;
     private final RpcProviderRegistry rpcProviderRegistry;
@@ -163,9 +164,8 @@ public class NetconfTopologyManager
         final ServiceGroupIdentifier serviceGroupIdent =
                 ServiceGroupIdentifier.create(instanceIdentifier.toString());
 
-        final NetconfTopologyContext newNetconfTopologyContext =
-                new NetconfTopologyContext(createSetup(instanceIdentifier, node), serviceGroupIdent,
-                        actorResponseWaitTime, mountPointService);
+        final NetconfTopologyContext newNetconfTopologyContext = newNetconfTopologyContext(
+                createSetup(instanceIdentifier, node), serviceGroupIdent, actorResponseWaitTime);
 
         int tries = 3;
         while (true) {
@@ -181,53 +181,52 @@ public class NetconfTopologyManager
                 if (--tries <= 0) {
                     LOG.error("Unable to register cluster singleton service {} - done trying, closing topology context",
                             newNetconfTopologyContext, e);
-                    close();
+                    close(newNetconfTopologyContext);
                     break;
                 }
             }
         }
-
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
     private void stopNetconfDeviceContext(final InstanceIdentifier<Node> instanceIdentifier) {
-        if (contexts.containsKey(instanceIdentifier)) {
-            try {
-                clusterRegistrations.get(instanceIdentifier).close();
-                contexts.get(instanceIdentifier).closeFinal();
-            } catch (final Exception e) {
-                LOG.warn("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier);
-            }
-            contexts.remove(instanceIdentifier);
-            clusterRegistrations.remove(instanceIdentifier);
+        final NetconfTopologyContext netconfTopologyContext = contexts.remove(instanceIdentifier);
+        if (netconfTopologyContext != null) {
+            close(clusterRegistrations.remove(instanceIdentifier));
+            close(netconfTopologyContext);
         }
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
+    @VisibleForTesting
+    protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
+            ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+        return new NetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime, mountPointService);
+    }
+
     @Override
     public void close() {
         if (dataChangeListenerRegistration != null) {
             dataChangeListenerRegistration.close();
             dataChangeListenerRegistration = null;
         }
-        contexts.forEach((instanceIdentifier, netconfTopologyContext) -> {
-            try {
-                netconfTopologyContext.closeFinal();
-            } catch (final Exception e) {
-                LOG.error("Error at closing topology context. InstanceIdentifier: " + instanceIdentifier, e);
-            }
-        });
-        clusterRegistrations.forEach((instanceIdentifier, clusterSingletonServiceRegistration) -> {
-            try {
-                clusterSingletonServiceRegistration.close();
-            } catch (final Exception e) {
-                LOG.error("Error at unregistering from cluster. InstanceIdentifier: " + instanceIdentifier, e);
-            }
-        });
+
+        contexts.values().forEach(netconfTopologyContext -> close(netconfTopologyContext));
+
+        clusterRegistrations.values().forEach(
+            clusterSingletonServiceRegistration -> close(clusterSingletonServiceRegistration));
+
         contexts.clear();
         clusterRegistrations.clear();
     }
 
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void close(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            LOG.warn("Error closing {}", closeable, e);
+        }
+    }
+
     /**
      * Sets the private key path from location specified in configuration file using blueprint.
      */
index b3d4be1cde2c7f9b1931acdf42ef80f541944cde..01a7749a123e69677f22afea57f370e68dea70a6 100644 (file)
@@ -8,29 +8,37 @@
 
 package org.opendaylight.netconf.topology.singleton.impl;
 
-import static junit.framework.TestCase.assertFalse;
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.MockitoAnnotations.initMocks;
 import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.DELETE;
+import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
 import static org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType.WRITE;
 
-import com.google.common.util.concurrent.Futures;
+import akka.util.Timeout;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
 import io.netty.util.concurrent.EventExecutor;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import javax.annotation.Nonnull;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
@@ -40,13 +48,16 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.test.ConstantSchemaAbstractDataBrokerTest;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.netconf.client.NetconfClientDispatcher;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
@@ -56,28 +67,49 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev15
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.Config;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.topology.singleton.config.rev170419.ConfigBuilder;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
-import org.opendaylight.yangtools.yang.binding.Identifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
+import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
 
 public class NetconfTopologyManagerTest {
+    private static final int ACTOR_RESPONSE_WAIT_TIME = 10;
+    private static final String TOPOLOGY_ID = "topologyID";
 
-    private final String topologyId = "topologyID";
     private NetconfTopologyManager netconfTopologyManager;
 
     @Mock
-    private DataBroker dataBroker;
+    private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
 
     @Mock
-    private ClusterSingletonServiceProvider clusterSingletonServiceProvider;
+    private ListenerRegistration<?> mockListenerReg;
+
+    private DataBroker dataBroker;
+
+    private final Map<InstanceIdentifier<Node>, Function<NetconfTopologySetup, NetconfTopologyContext>>
+            mockContextMap = new HashMap<>();
 
     @Before
-    public void setUp() {
+    public void setUp() throws Exception {
         initMocks(this);
 
+        ConstantSchemaAbstractDataBrokerTest dataBrokerTest = new ConstantSchemaAbstractDataBrokerTest(false) {
+            @Override
+            protected Iterable<YangModuleInfo> getModuleInfos() throws Exception {
+                return ImmutableSet.of(BindingReflections.getModuleInfo(NetworkTopology.class),
+                        BindingReflections.getModuleInfo(Topology.class));
+            }
+        };
+
+        dataBrokerTest.setup();
+        dataBroker = spy(dataBrokerTest.getDataBroker());
+
         final RpcProviderRegistry rpcProviderRegistry = mock(RpcProviderRegistry.class);
         final ScheduledThreadPool keepaliveExecutor = mock(ScheduledThreadPool.class);
         final ThreadPool processingExecutor = mock(ThreadPool.class);
@@ -90,188 +122,251 @@ public class NetconfTopologyManagerTest {
         final Config config = new ConfigBuilder().setWriteTransactionIdleTimeout(0).build();
         netconfTopologyManager = new NetconfTopologyManager(dataBroker, rpcProviderRegistry,
                 clusterSingletonServiceProvider, keepaliveExecutor, processingExecutor,
-                actorSystemProvider, eventExecutor, clientDispatcher, topologyId, config,
-                mountPointService, encryptionService);
+                actorSystemProvider, eventExecutor, clientDispatcher, TOPOLOGY_ID, config,
+                mountPointService, encryptionService) {
+            @Override
+            protected NetconfTopologyContext newNetconfTopologyContext(NetconfTopologySetup setup,
+                    ServiceGroupIdentifier serviceGroupIdent, Timeout actorResponseWaitTime) {
+                assertEquals(ACTOR_RESPONSE_WAIT_TIME, actorResponseWaitTime.duration().toSeconds());
+                return Objects.requireNonNull(mockContextMap.get(setup.getInstanceIdentifier()),
+                        "No mock context for " + setup.getInstanceIdentifier()).apply(setup);
+            }
+        };
+
+        doNothing().when(mockListenerReg).close();
+        doReturn(mockListenerReg).when(dataBroker).registerDataTreeChangeListener(any(), any());
     }
 
     @Test
-    public void testWriteConfiguration() throws Exception {
-        writeConfiguration(false);
-    }
+    public void testRegisterDataTreeChangeListener() throws Exception {
 
-    @Test
-    public void testWriteConfigurationFail() throws Exception {
-        writeConfiguration(true);
+        netconfTopologyManager.init();
+
+        await().atMost(5, TimeUnit.SECONDS).until(() -> {
+            ReadOnlyTransaction readTx = dataBroker.newReadOnlyTransaction();
+            Optional<Topology> config = readTx.read(LogicalDatastoreType.CONFIGURATION,
+                    NetconfTopologyUtils.createTopologyListPath(TOPOLOGY_ID)).get(3, TimeUnit.SECONDS);
+            Optional<Topology> oper = readTx.read(LogicalDatastoreType.OPERATIONAL,
+                    NetconfTopologyUtils.createTopologyListPath(TOPOLOGY_ID)).get(3, TimeUnit.SECONDS);
+            return config.isPresent() && oper.isPresent();
+        });
+
+        // verify registration is called with right parameters
+
+        verify(dataBroker).registerDataTreeChangeListener(
+                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, NetconfTopologyUtils
+                        .createTopologyListPath(TOPOLOGY_ID).child(Node.class)), netconfTopologyManager);
+
+        netconfTopologyManager.close();
+        verify(mockListenerReg).close();
+
+        netconfTopologyManager.close();
+        verifyNoMoreInteractions(mockListenerReg);
     }
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void testRegisterDataTreeChangeListener() {
+    public void testOnDataTreeChanged() throws Exception {
 
-        final WriteTransaction wtx = mock(WriteTransaction.class);
+        // Notify of 2 created Node objects.
 
-        doReturn(wtx).when(dataBroker).newWriteOnlyTransaction();
-        doNothing().when(wtx).merge(any(), any(), any());
-        doReturn(Futures.immediateCheckedFuture(null)).when(wtx).submit();
+        final NodeId nodeId1 = new NodeId("node-id-1");
+        final InstanceIdentifier<Node> nodeInstanceId1 = NetconfTopologyUtils.createTopologyNodeListPath(
+                new NodeKey(nodeId1), TOPOLOGY_ID);
 
-        netconfTopologyManager.init();
+        final NodeId nodeId2 = new NodeId("node-id-2");
+        final InstanceIdentifier<Node> nodeInstanceId2 = NetconfTopologyUtils.createTopologyNodeListPath(
+                new NodeKey(nodeId2), TOPOLOGY_ID);
 
-        // verify if listener is called with right parameters = registered on right path
+        final NetconfNode netconfNode1 = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(1111))
+                .setActorResponseWaitTime(ACTOR_RESPONSE_WAIT_TIME)
+                .build();
+        final Node node1 = new NodeBuilder().setNodeId(nodeId1).addAugmentation(NetconfNode.class,
+                netconfNode1).build();
 
-        verify(dataBroker, times(1)).registerDataTreeChangeListener(
-                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, NetconfTopologyUtils
-                        .createTopologyListPath(topologyId).child(Node.class)), netconfTopologyManager);
+        final DataObjectModification<Node> dataObjectModification1 = mock(DataObjectModification.class);
+        doReturn(WRITE).when(dataObjectModification1).getModificationType();
+        doReturn(node1).when(dataObjectModification1).getDataAfter();
+        doReturn(new InstanceIdentifier.IdentifiableItem<>(Node.class, new NodeKey(nodeId1)))
+                .when(dataObjectModification1).getIdentifier();
 
-    }
+        final NetconfNode netconfNode2 = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(2222))
+                .setActorResponseWaitTime(ACTOR_RESPONSE_WAIT_TIME)
+                .build();
+        final Node node2 = new NodeBuilder().setNodeId(nodeId2).addAugmentation(NetconfNode.class,
+                netconfNode2).build();
 
-    @Test
-    public void testClose() throws Exception {
+        final DataObjectModification<Node> dataObjectModification2 = mock(DataObjectModification.class);
+        doReturn(WRITE).when(dataObjectModification2).getModificationType();
+        doReturn(node2).when(dataObjectModification2).getDataAfter();
+        doReturn(new InstanceIdentifier.IdentifiableItem<>(Node.class, new NodeKey(nodeId2)))
+                .when(dataObjectModification2).getIdentifier();
 
-        final Field fieldContexts = NetconfTopologyManager.class.getDeclaredField("contexts");
-        fieldContexts.setAccessible(true);
-        @SuppressWarnings("unchecked") final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts =
-                (Map<InstanceIdentifier<Node>, NetconfTopologyContext>) fieldContexts.get(netconfTopologyManager);
+        final NetconfTopologyContext mockContext1 = mock(NetconfTopologyContext.class);
+        mockContextMap.put(nodeInstanceId1, setup -> {
+            assertEquals(node1, setup.getNode());
+            assertEquals(TOPOLOGY_ID, setup.getTopologyId());
+            return mockContext1;
+        });
 
-        final Field fieldClusterRegistrations =
-                NetconfTopologyManager.class.getDeclaredField("clusterRegistrations");
-        fieldClusterRegistrations.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration> clusterRegistrations =
-                (Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>)
-                        fieldClusterRegistrations.get(netconfTopologyManager);
+        final NetconfTopologyContext mockContext2 = mock(NetconfTopologyContext.class);
+        mockContextMap.put(nodeInstanceId2, setup -> {
+            assertEquals(node2, setup.getNode());
+            assertEquals(TOPOLOGY_ID, setup.getTopologyId());
+            return mockContext2;
+        });
 
-        final InstanceIdentifier<Node> instanceIdentifier = NetconfTopologyUtils.createTopologyNodeListPath(
-                new NodeKey(new NodeId("node-id-1")), "topology-1");
+        ClusterSingletonServiceRegistration mockClusterRegistration1 = mock(ClusterSingletonServiceRegistration.class);
+        ClusterSingletonServiceRegistration mockClusterRegistration2 = mock(ClusterSingletonServiceRegistration.class);
 
+        doReturn(mockClusterRegistration1).when(clusterSingletonServiceProvider)
+                .registerClusterSingletonService(mockContext1);
+        doReturn(mockClusterRegistration2).when(clusterSingletonServiceProvider)
+                .registerClusterSingletonService(mockContext2);
 
-        final NetconfTopologyContext context = mock(NetconfTopologyContext.class);
-        final ClusterSingletonServiceRegistration clusterRegistration =
-                mock(ClusterSingletonServiceRegistration.class);
-        contexts.put(instanceIdentifier, context);
-        clusterRegistrations.put(instanceIdentifier, clusterRegistration);
+        netconfTopologyManager.onDataTreeChanged(Arrays.asList(
+                new CustomTreeModification(new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+                        nodeInstanceId1), dataObjectModification1),
+                new CustomTreeModification(new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+                        nodeInstanceId2), dataObjectModification2)));
 
-        doNothing().when(context).closeFinal();
-        doNothing().when(clusterRegistration).close();
+        verify(clusterSingletonServiceProvider).registerClusterSingletonService(mockContext1);
+        verify(clusterSingletonServiceProvider).registerClusterSingletonService(mockContext2);
 
-        netconfTopologyManager.close();
-        verify(context, times(1)).closeFinal();
-        verify(clusterRegistration, times(1)).close();
+        // Notify of Node 1 replaced and Node 2 subtree modified.
 
-        assertTrue(contexts.isEmpty());
-        assertTrue(clusterRegistrations.isEmpty());
+        mockContextMap.clear();
 
-    }
+        final NetconfNode updatedNetconfNode1 = new NetconfNodeBuilder(netconfNode1)
+                .setPort(new PortNumber(33333)).build();
+        final Node updatedNode1 = new NodeBuilder().setNodeId(nodeId1).addAugmentation(NetconfNode.class,
+                updatedNetconfNode1).build();
 
-    private void writeConfiguration(final boolean fail) throws Exception {
+        doReturn(WRITE).when(dataObjectModification1).getModificationType();
+        doReturn(node1).when(dataObjectModification1).getDataBefore();
+        doReturn(updatedNode1).when(dataObjectModification1).getDataAfter();
 
-        final ClusterSingletonServiceRegistration clusterRegistration = mock(ClusterSingletonServiceRegistration.class);
+        doReturn(SUBTREE_MODIFIED).when(dataObjectModification2).getModificationType();
+        doReturn(node2).when(dataObjectModification2).getDataBefore();
+        doReturn(node2).when(dataObjectModification2).getDataAfter();
 
-        final Field fieldContexts = NetconfTopologyManager.class.getDeclaredField("contexts");
-        fieldContexts.setAccessible(true);
-        @SuppressWarnings("unchecked") final Map<InstanceIdentifier<Node>, NetconfTopologyContext> contexts =
-                (Map<InstanceIdentifier<Node>, NetconfTopologyContext>) fieldContexts.get(netconfTopologyManager);
+        doNothing().when(mockContext1).refresh(any());
+        doNothing().when(mockContext2).refresh(any());
 
-        final Field fieldClusterRegistrations =
-                NetconfTopologyManager.class.getDeclaredField("clusterRegistrations");
-        fieldClusterRegistrations.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        final Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration> clusterRegistrations =
-                (Map<InstanceIdentifier<Node>, ClusterSingletonServiceRegistration>)
-                        fieldClusterRegistrations.get(netconfTopologyManager);
+        netconfTopologyManager.onDataTreeChanged(Arrays.asList(
+                new CustomTreeModification(new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+                        nodeInstanceId1), dataObjectModification1),
+                new CustomTreeModification(new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+                        nodeInstanceId2), dataObjectModification2)));
 
-        final Collection<DataTreeModification<Node>> changes = new ArrayList<>();
+        ArgumentCaptor<NetconfTopologySetup> mockContext1Setup = ArgumentCaptor.forClass(NetconfTopologySetup.class);
+        verify(mockContext1).refresh(mockContext1Setup.capture());
+        assertEquals(updatedNode1, mockContext1Setup.getValue().getNode());
 
-        final InstanceIdentifier<Node> instanceIdentifier = NetconfTopologyUtils.createTopologyNodeListPath(
-                new NodeKey(new NodeId("node-id-1")), "topology-1");
+        verify(mockContext2).refresh(any());
 
-        final InstanceIdentifier<Node> instanceIdentifierDiferent = NetconfTopologyUtils.createTopologyNodeListPath(
-                new NodeKey(new NodeId("node-id-2")), "topology-2");
+        verifyNoMoreInteractions(clusterSingletonServiceProvider);
 
-        final DataTreeIdentifier<Node> rootIdentifier =
-                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, instanceIdentifier);
+        // Notify of Node 1 deleted.
 
-        final DataTreeIdentifier<Node> rootIdentifierDifferent =
-                new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, instanceIdentifierDiferent);
+        doReturn(DELETE).when(dataObjectModification1).getModificationType();
+        doReturn(updatedNode1).when(dataObjectModification1).getDataBefore();
+        doReturn(null).when(dataObjectModification1).getDataAfter();
 
-        @SuppressWarnings("unchecked") final DataObjectModification<Node> objectModification =
-                mock(DataObjectModification.class);
+        netconfTopologyManager.onDataTreeChanged(Arrays.asList(
+                new CustomTreeModification(new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+                        nodeInstanceId1), dataObjectModification1)));
 
-        final NetconfNode netconfNode = new NetconfNodeBuilder()
-                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
-                .setPort(new PortNumber(9999))
-                .setReconnectOnChangedSchema(true)
-                .setDefaultRequestTimeoutMillis(1000L)
-                .setBetweenAttemptsTimeoutMillis(100)
-                .setSchemaless(false)
-                .setTcpOnly(false)
-                .setActorResponseWaitTime(10)
-                .build();
-        final Node node = new NodeBuilder().setNodeId(new NodeId("node-id"))
-                .addAugmentation(NetconfNode.class, netconfNode).build();
+        verify(mockClusterRegistration1).close();
+        verify(mockContext1).close();
+        verifyNoMoreInteractions(clusterSingletonServiceProvider, mockClusterRegistration2, mockContext2);
 
-        final Identifier key = new NodeKey(new NodeId("node-id"));
+        // Notify of Node 1 created again.
 
-        @SuppressWarnings("unchecked") final InstanceIdentifier.IdentifiableItem<Node, NodeKey> pathArgument =
-                new InstanceIdentifier.IdentifiableItem(Node.class, key);
+        reset(clusterSingletonServiceProvider);
 
+        final NetconfTopologyContext newMockContext1 = mock(NetconfTopologyContext.class);
+        final ClusterSingletonServiceRegistration newMockClusterRegistration1 =
+                mock(ClusterSingletonServiceRegistration.class);
 
-        // testing WRITE on two identical rootIdentifiers and one different
-        if (fail) {
-            changes.add(new CustomTreeModification(rootIdentifier, objectModification));
-        } else {
-            changes.add(new CustomTreeModification(rootIdentifier, objectModification));
-            changes.add(new CustomTreeModification(rootIdentifier, objectModification));
-            changes.add(new CustomTreeModification(rootIdentifierDifferent, objectModification));
-        }
-        doReturn(WRITE).when(objectModification).getModificationType();
-        doReturn(node).when(objectModification).getDataAfter();
-        doReturn(pathArgument).when(objectModification).getIdentifier();
-
-        if (fail) {
-            doThrow(new RuntimeException("error")).when(clusterSingletonServiceProvider)
-                    .registerClusterSingletonService(any());
-        } else {
-            doReturn(clusterRegistration).when(clusterSingletonServiceProvider).registerClusterSingletonService(any());
-        }
-        netconfTopologyManager.onDataTreeChanged(changes);
+        doThrow(new RuntimeException("mock error")).doReturn(newMockClusterRegistration1)
+                .when(clusterSingletonServiceProvider).registerClusterSingletonService(newMockContext1);
 
-        if (fail) {
-            verify(clusterSingletonServiceProvider, times(3))
-                    .registerClusterSingletonService(any());
-            assertTrue(contexts.isEmpty());
-            assertTrue(clusterRegistrations.isEmpty());
-        } else {
-            verify(clusterSingletonServiceProvider, times(2))
-                    .registerClusterSingletonService(any());
+        doReturn(WRITE).when(dataObjectModification1).getModificationType();
+        doReturn(null).when(dataObjectModification1).getDataBefore();
+        doReturn(node1).when(dataObjectModification1).getDataAfter();
 
-            // only two created contexts
-            assertEquals(2, contexts.size());
-            assertTrue(contexts.containsKey(rootIdentifier.getRootIdentifier()));
-            assertTrue(contexts.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+        mockContextMap.put(nodeInstanceId1, setup -> {
+            assertEquals(node1, setup.getNode());
+            assertEquals(TOPOLOGY_ID, setup.getTopologyId());
+            return newMockContext1;
+        });
 
-            // only two created cluster registrations
-            assertEquals(2, clusterRegistrations.size());
-            assertTrue(clusterRegistrations.containsKey(rootIdentifier.getRootIdentifier()));
-            assertTrue(clusterRegistrations.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+        netconfTopologyManager.onDataTreeChanged(Arrays.asList(
+                new CustomTreeModification(new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+                        nodeInstanceId1), dataObjectModification1)));
 
-            // after delete there should be no context and clustered registrations
-            doReturn(DELETE).when(objectModification).getModificationType();
+        verify(clusterSingletonServiceProvider, times(2)).registerClusterSingletonService(newMockContext1);
 
-            doNothing().when(clusterRegistration).close();
+        verifyNoMoreInteractions(mockClusterRegistration1, mockContext1, mockClusterRegistration2, mockContext2,
+                newMockContext1, newMockClusterRegistration1, clusterSingletonServiceProvider);
 
-            netconfTopologyManager.onDataTreeChanged(changes);
+        // Test close.
 
-            verify(clusterRegistration, times(2)).close();
+        netconfTopologyManager.close();
 
-            // empty map of contexts
-            assertTrue(contexts.isEmpty());
-            assertFalse(contexts.containsKey(rootIdentifier.getRootIdentifier()));
-            assertFalse(contexts.containsKey(rootIdentifierDifferent.getRootIdentifier()));
+        verify(newMockClusterRegistration1).close();
+        verify(newMockContext1).close();
+        verify(mockClusterRegistration2).close();
+        verify(mockContext2).close();
 
-            // empty map of clustered registrations
-            assertTrue(clusterRegistrations.isEmpty());
-            assertFalse(clusterRegistrations.containsKey(rootIdentifier.getRootIdentifier()));
-            assertFalse(clusterRegistrations.containsKey(rootIdentifierDifferent.getRootIdentifier()));
-        }
+        netconfTopologyManager.close();
+
+        verifyNoMoreInteractions(mockClusterRegistration1, mockContext1, mockClusterRegistration2, mockContext2,
+                newMockContext1, newMockClusterRegistration1, clusterSingletonServiceProvider);
+    }
+
+    @Test
+    public void testClusterSingletonServiceRegistrationFailure() throws Exception {
+        final NodeId nodeId = new NodeId("node-id");
+        final InstanceIdentifier<Node> nodeInstanceId = NetconfTopologyUtils.createTopologyNodeListPath(
+                new NodeKey(nodeId), TOPOLOGY_ID);
+
+        final NetconfNode netconfNode = new NetconfNodeBuilder()
+                .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
+                .setPort(new PortNumber(10))
+                .setActorResponseWaitTime(ACTOR_RESPONSE_WAIT_TIME).build();
+        final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(NetconfNode.class,
+                netconfNode).build();
+
+        final DataObjectModification<Node> dataObjectModification = mock(DataObjectModification.class);
+        doReturn(WRITE).when(dataObjectModification).getModificationType();
+        doReturn(node).when(dataObjectModification).getDataAfter();
+        doReturn(new InstanceIdentifier.IdentifiableItem<>(Node.class, new NodeKey(nodeId)))
+                .when(dataObjectModification).getIdentifier();
+
+        final NetconfTopologyContext mockContext = mock(NetconfTopologyContext.class);
+        mockContextMap.put(nodeInstanceId, setup -> mockContext);
+
+        doThrow(new RuntimeException("mock error")).when(clusterSingletonServiceProvider)
+                .registerClusterSingletonService(mockContext);
+
+        netconfTopologyManager.init();
+
+        netconfTopologyManager.onDataTreeChanged(Arrays.asList(
+                new CustomTreeModification(new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
+                        nodeInstanceId), dataObjectModification)));
+
+        verify(clusterSingletonServiceProvider, times(3)).registerClusterSingletonService(mockContext);
+        verify(mockContext).close();
+        verifyNoMoreInteractions(mockListenerReg);
+
+        netconfTopologyManager.close();
+        verifyNoMoreInteractions(mockContext);
     }
 
     static class CustomTreeModification  implements DataTreeModification<Node> {