Mods/improvements to NetconfNodeActorTest et al 82/71982/4
authorTom Pantelis <tompantelis@gmail.com>
Sat, 12 May 2018 18:29:44 +0000 (14:29 -0400)
committerTom Pantelis <tompantelis@gmail.com>
Mon, 14 May 2018 23:54:32 +0000 (19:54 -0400)
Increased code coverage in NetconfNodeActor et al via new test
cases and mods to existing tests cases in NetconfNodeActorTest.

Also made other mods - see inline comments.

Change-Id: Id268d747110fe5c1b05a0c2959138661b966d0f6
Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyYangTextSourceProvider.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/NormalizedNodeMessage.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/SchemaPathMessage.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessage.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/rpc/InvokeRpcMessageReply.java
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/NetconfNodeActorTest.java

index 33fa7b96e7443f83e4e0bb1fe133c5b4699a8657..f972e4568454e32207e24980d97aa2f6ab6db527 100644 (file)
@@ -13,7 +13,6 @@ import akka.actor.ActorSystem;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import com.google.common.base.Function;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
@@ -66,58 +65,39 @@ public class ProxyDOMRpcService implements DOMRpcService {
 
         final NormalizedNodeMessage normalizedNodeMessage =
                 new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, input);
-        final Future<Object> scalaFuture =
-                Patterns.ask(masterActorRef,
-                        new InvokeRpcMessage(new SchemaPathMessage(type), normalizedNodeMessage),
-                        actorResponseWaitTime);
+        final Future<Object> scalaFuture = Patterns.ask(masterActorRef,
+                new InvokeRpcMessage(new SchemaPathMessage(type), normalizedNodeMessage), actorResponseWaitTime);
 
         final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> checkedFuture = Futures.makeChecked(settableFuture,
-                new Function<Exception, DOMRpcException>() {
-
-                @Nullable
-                @Override
-                public DOMRpcException apply(@Nullable final Exception exception) {
-                    return new ClusteringRpcException(id + ": Exception during remote rpc invocation.",
-                            exception);
-                }
-            });
-
         scalaFuture.onComplete(new OnComplete<Object>() {
             @Override
-            public void onComplete(final Throwable failure, final Object success) throws Throwable {
+            public void onComplete(final Throwable failure, final Object response) {
                 if (failure != null) {
                     settableFuture.setException(failure);
                     return;
                 }
-                if (success instanceof Throwable) {
-                    settableFuture.setException((Throwable) success);
-                    return;
-                }
-                if (success instanceof EmptyResultResponse || success == null) {
+
+                if (response instanceof EmptyResultResponse) {
                     settableFuture.set(null);
                     return;
                 }
-                final Collection<RpcError> errors = ((InvokeRpcMessageReply) success).getRpcErrors();
+
+                final Collection<RpcError> errors = ((InvokeRpcMessageReply) response).getRpcErrors();
                 final NormalizedNodeMessage normalizedNodeMessageResult =
-                        ((InvokeRpcMessageReply) success).getNormalizedNodeMessage();
+                        ((InvokeRpcMessageReply) response).getNormalizedNodeMessage();
                 final DOMRpcResult result;
                 if (normalizedNodeMessageResult == null) {
                     result = new DefaultDOMRpcResult(errors);
                 } else {
-                    if (errors == null) {
-                        result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode());
-                    } else {
-                        result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode(), errors);
-                    }
+                    result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode(), errors);
                 }
                 settableFuture.set(result);
             }
         }, actorSystem.dispatcher());
 
-        return checkedFuture;
-
+        return Futures.makeChecked(settableFuture,
+            ex -> new ClusteringRpcException(id + ": Exception during remote rpc invocation.", ex));
     }
 
     @Nonnull
index d4f0d79712bcdcd3018ef4b7aa607f961261da29..e9ee161437a629d1292725c5267a1659a7848084 100644 (file)
@@ -8,7 +8,6 @@
 
 package org.opendaylight.netconf.topology.singleton.impl;
 
-import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
@@ -21,19 +20,20 @@ import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceP
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.impl.Promise;
 
 public class ProxyYangTextSourceProvider implements RemoteYangTextSourceProvider {
 
     private final ActorRef masterRef;
-    private final ActorContext actorContext;
+    private final ExecutionContext executionContext;
     private final Timeout actorResponseWaitTime;
 
-    public ProxyYangTextSourceProvider(final ActorRef masterRef, final ActorContext actorContext,
+    public ProxyYangTextSourceProvider(final ActorRef masterRef, final ExecutionContext executionContext,
                                        final Timeout actorResponseWaitTime) {
         this.masterRef = masterRef;
-        this.actorContext = actorContext;
+        this.executionContext = executionContext;
         this.actorResponseWaitTime = actorResponseWaitTime;
     }
 
@@ -59,15 +59,11 @@ public class ProxyYangTextSourceProvider implements RemoteYangTextSourceProvider
                     promise.failure(failure);
                     return;
                 }
-                if (success instanceof Throwable) {
-                    promise.failure((Throwable) success);
-                    return;
-                }
+
                 promise.success((YangTextSchemaSourceSerializationProxy) success);
             }
-        }, actorContext.dispatcher());
+        }, executionContext);
 
         return promise.future();
-
     }
 }
index 6c297261d8b7d3eb28fc4d225c14e1a0d85fcb3a..2af981783e4e0882dae9bf28e55c5c44adc4e625 100644 (file)
@@ -216,22 +216,22 @@ public class NetconfNodeActor extends AbstractUntypedActor {
     }
 
     private void sendYangTextSchemaSourceProxy(final SourceIdentifier sourceIdentifier, final ActorRef sender) {
-        final ListenableFuture<@NonNull YangTextSchemaSource> yangTextSchemaSource =
+        final ListenableFuture<@NonNull YangTextSchemaSource> schemaSourceFuture =
                 schemaRepository.getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
 
-        Futures.addCallback(yangTextSchemaSource, new FutureCallback<YangTextSchemaSource>() {
+        Futures.addCallback(schemaSourceFuture, new FutureCallback<YangTextSchemaSource>() {
             @Override
             public void onSuccess(final YangTextSchemaSource yangTextSchemaSource) {
                 try {
                     sender.tell(new YangTextSchemaSourceSerializationProxy(yangTextSchemaSource), getSelf());
-                } catch (final IOException exception) {
-                    sender.tell(exception.getCause(), getSelf());
+                } catch (IOException e) {
+                    sender.tell(new Failure(e), getSelf());
                 }
             }
 
             @Override
             public void onFailure(@Nonnull final Throwable throwable) {
-                sender.tell(throwable, getSelf());
+                sender.tell(new Failure(throwable), getSelf());
             }
         }, MoreExecutors.directExecutor());
     }
@@ -259,7 +259,7 @@ public class NetconfNodeActor extends AbstractUntypedActor {
 
             @Override
             public void onFailure(@Nonnull final Throwable throwable) {
-                recipient.tell(throwable, getSelf());
+                recipient.tell(new Failure(throwable), getSelf());
             }
         }, MoreExecutors.directExecutor());
     }
@@ -278,7 +278,7 @@ public class NetconfNodeActor extends AbstractUntypedActor {
 
     private SchemaContextFactory createSchemaContextFactory(final ActorRef masterReference) {
         final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
-                new ProxyYangTextSourceProvider(masterReference, getContext(), actorResponseWaitTime);
+                new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider,
                 getContext().dispatcher());
 
@@ -325,6 +325,7 @@ public class NetconfNodeActor extends AbstractUntypedActor {
                         } else {
                             LOG.error("{}: Failed to resolve schema context - unable to register slave mount point",
                                     id, throwable);
+                            closeSchemaSourceRegistrations();
                         }
                     }
                 });
index a27089e6acf3b2368ccec3b6873ba064a1fe5633..c9ed78508f50b43a0d7a98fec397fb8e176635bc 100644 (file)
@@ -52,6 +52,11 @@ public class NormalizedNodeMessage implements Externalizable {
         SerializationUtils.deserializePathAndNode(in, this, APPLIER);
     }
 
+    @Override
+    public String toString() {
+        return "NormalizedNodeMessage [identifier=" + identifier + ", node=" + node + "]";
+    }
+
     private static final SerializationUtils.Applier<NormalizedNodeMessage> APPLIER = (instance, path, node) -> {
         instance.identifier = path;
         instance.node = node;
index a34697885e7d37b1ac3843b1df5c3cc536ce9dd1..2e7e0619b9b578f465a83787e6252b033f41d9ec 100644 (file)
@@ -36,6 +36,11 @@ public class SchemaPathMessage implements Serializable {
         return new Proxy(this);
     }
 
+    @Override
+    public String toString() {
+        return "SchemaPathMessage [schemaPath=" + schemaPath + "]";
+    }
+
     private static class Proxy implements Externalizable {
         private static final long serialVersionUID = 2L;
 
index d540e23f8d6b3dbf066dc5daedcc5c195d1970d4..e8ef986f36a8c7de1094fe460fc3ed77f857abdf 100644 (file)
@@ -45,6 +45,12 @@ public class InvokeRpcMessage implements Serializable {
         return new Proxy(this);
     }
 
+    @Override
+    public String toString() {
+        return "InvokeRpcMessage [schemaPathMessage=" + schemaPathMessage + ", normalizedNodeMessage="
+                + normalizedNodeMessage + "]";
+    }
+
     private static class Proxy implements Externalizable {
         private static final long serialVersionUID = 2L;
 
index b70582354cc6cd476fd72e93d0303a0211d7babf..2f05f006a4aa00e0ab9cad073015d6a78399f297 100644 (file)
@@ -16,6 +16,9 @@ import java.io.ObjectOutput;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.Objects;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
 import org.opendaylight.yangtools.yang.common.RpcError;
 
@@ -26,16 +29,18 @@ public class InvokeRpcMessageReply implements Serializable {
     private final Collection<RpcError> rpcErrors;
     private final NormalizedNodeMessage normalizedNodeMessage;
 
-    public InvokeRpcMessageReply(final NormalizedNodeMessage normalizedNodeMessage,
-                                 final Collection<RpcError> rpcErrors) {
+    public InvokeRpcMessageReply(@Nullable final NormalizedNodeMessage normalizedNodeMessage,
+                                 @Nonnull final Collection<RpcError> rpcErrors) {
         this.normalizedNodeMessage = normalizedNodeMessage;
-        this.rpcErrors = rpcErrors;
+        this.rpcErrors = Objects.requireNonNull(rpcErrors);
     }
 
+    @Nullable
     public NormalizedNodeMessage getNormalizedNodeMessage() {
         return normalizedNodeMessage;
     }
 
+    @Nonnull
     public Collection<RpcError> getRpcErrors() {
         return rpcErrors;
     }
index 1daa70be8ba2858847f19a07d2306a9f44fcfeac..5abb6ce320a87034961515730249c803a8321c80 100644 (file)
@@ -8,52 +8,64 @@
 
 package org.opendaylight.netconf.topology.singleton.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.MockitoAnnotations.initMocks;
-import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
 
-import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
+import akka.actor.Status.Success;
+import akka.pattern.AskTimeoutException;
 import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import akka.testkit.javadsl.TestKit;
 import akka.util.Timeout;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.io.ByteSource;
 import com.google.common.net.InetAddresses;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.util.Collections;
 import java.util.List;
+import java.util.Scanner;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
@@ -62,11 +74,15 @@ import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
@@ -81,13 +97,14 @@ import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
-import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
+import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -95,217 +112,328 @@ import scala.concurrent.duration.Duration;
 public class NetconfNodeActorTest {
 
     private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
-    private static ActorSystem system;
+    private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
+    private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
+
+    private ActorSystem system = ActorSystem.create();
+    private final TestKit testKit = new TestKit(system);
 
     @Rule
     public final ExpectedException exception = ExpectedException.none();
 
     private ActorRef masterRef;
     private RemoteDeviceId remoteDeviceId;
+    private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
+
+    @Mock
+    private DOMRpcService mockDOMRpcService;
+
+    @Mock
+    private DOMMountPointService mockMountPointService;
+
+    @Mock
+    private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
+
+    @Mock
+    private ObjectRegistration<DOMMountPoint> mockMountPointReg;
+
+    @Mock
+    private DOMDataBroker mockDOMDataBroker;
+
+    @Mock
+    private SchemaSourceRegistration<?> mockSchemaSourceReg1;
+
+    @Mock
+    private SchemaSourceRegistration<?> mockSchemaSourceReg2;
 
     @Mock
-    private DOMRpcService domRpcService;
+    private SchemaSourceRegistry mockRegistry;
+
     @Mock
-    private DOMMountPointService mountPointService;
+    private SchemaContextFactory mockSchemaContextFactory;
+
     @Mock
-    private DataBroker dataBroker;
+    private SchemaRepository mockSchemaRepository;
+
+    @Mock
+    private SchemaContext mockSchemaContext;
 
     @Before
     public void setup() {
         initMocks(this);
+
         remoteDeviceId = new RemoteDeviceId("netconf-topology",
                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
-        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
 
-        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
-                DEFAULT_SCHEMA_REPOSITORY, TIMEOUT, mountPointService);
+        masterSchemaRepository.registerSchemaSourceListener(
+                TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
+
+        final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
+                .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build();
 
-        system = ActorSystem.create();
+        final Props props = NetconfNodeActor.props(setup, remoteDeviceId, masterSchemaRepository,
+                masterSchemaRepository, TIMEOUT, mockMountPointService);
 
         masterRef = TestActorRef.create(system, props, "master_messages");
+
+        resetMountPointMocks();
+
+        doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
+
+        doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
+        doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
+
+        doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
+                .createSchemaContextFactory(any(SchemaSourceFilter.class));
     }
 
     @After
     public void teardown() {
-        JavaTestKit.shutdownActorSystem(system);
+        TestKit.shutdownActorSystem(system, Boolean.TRUE);
         system = null;
     }
 
     @Test
-    public void testInitDataMessages() throws Exception {
+    public void testInitializeAndRefreshMasterData() {
 
-        final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
-        final List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList();
+        // Test CreateInitialMasterActorData.
 
-        /* Test init master data */
+        initializeMaster(Lists.newArrayList());
 
-        final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
-                                domRpcService), TIMEOUT);
+        // Test RefreshSetupMasterActorData.
 
-        final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
-        assertTrue(success instanceof MasterActorDataInitialized);
+        final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
+                new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
 
+        final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
 
-        /* Test refresh master data */
+        masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
 
-        final RemoteDeviceId remoteDeviceId2 = new RemoteDeviceId("netconf-topology2",
-                new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
+        testKit.expectMsgClass(MasterActorDataInitialized.class);
+    }
 
-        final NetconfTopologySetup setup2 = mock(NetconfTopologySetup.class);
+    @Test
+    public void tesAskForMasterMountPoint() {
+
+        // Test with master not setup yet.
+
+        final TestKit kit = new TestKit(system);
+
+        masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
+
+        final Failure failure = kit.expectMsgClass(Failure.class);
+        assertTrue(failure.cause() instanceof NotMasterException);
+
+        // Now initialize - master should send the RegisterMountPoint message.
 
-        final Future<Object> refreshDataToActor =
-                Patterns.ask(masterRef, new RefreshSetupMasterActorData(setup2, remoteDeviceId2),
-                        TIMEOUT);
+        List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
+        initializeMaster(sourceIdentifiers);
 
-        final Object success2 = Await.result(refreshDataToActor, TIMEOUT.duration());
-        assertTrue(success2 instanceof MasterActorDataInitialized);
+        masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
 
+        final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
+
+        assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
     }
 
     @Test
-    public void testRegisterMountPointMessage() throws Exception {
+    public void testRegisterAndUnregisterMountPoint() throws Exception {
 
-        final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
-        final List<SourceIdentifier> sourceIdentifiers =
-                Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
+        ActorRef slaveRef = registerSlaveMountPoint();
 
-        // init master data
+        // Unregister
 
-        final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
-                                domRpcService), TIMEOUT);
+        slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
 
-        final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
+        verify(mockMountPointReg, timeout(5000)).close();
+        verify(mockSchemaSourceReg1, timeout(1000)).close();
+        verify(mockSchemaSourceReg2, timeout(1000)).close();
 
-        assertTrue(successInit instanceof MasterActorDataInitialized);
+        // Test registration with another interleaved registration that completes while the first registration
+        // is resolving the schema context.
 
-        // test if slave get right identifiers from master
+        reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
+        resetMountPointMocks();
 
-        final TestKit kit = new TestKit(system);
+        doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
 
-        masterRef.tell(new AskForMasterMountPoint(kit.getRef()), ActorRef.noSender());
+        doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
+                .createSchemaContextFactory(any(SchemaSourceFilter.class));
 
-        final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
+        final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
 
-        assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
+        final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class);
+        doReturn(Futures.immediateFuture(mockSchemaContext))
+                .when(newMockSchemaContextFactory).createSchemaContext(any());
 
-    }
+        doAnswer(unused -> {
+            SettableFuture<SchemaContext> future = SettableFuture.create();
+            new Thread(() -> {
+                doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
+                        withSourceId(SOURCE_IDENTIFIER1));
+
+                doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
+                        .createSchemaContextFactory(any(SchemaSourceFilter.class));
+
+                slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
+                        testKit.getRef());
+
+                future.set(mockSchemaContext);
+            }).start();
+            return future;
+        }).when(mockSchemaContextFactory).createSchemaContext(any());
+
+        doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
+                .createSchemaContextFactory(any(SchemaSourceFilter.class));
+
+        slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
+
+        verify(mockMountPointBuilder, timeout(5000)).register();
+        verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext);
+        verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
+        verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
+        verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
+        verify(mockSchemaSourceReg1).close();
+        verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
+        verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
+        verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
+
+        // Stop the slave actor and verify schema source registrations are closed.
 
-    @Test
-    public void testReceiveRegisterMountpoint() throws Exception {
-        final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
-        final RevisionSourceIdentifier yang1 = RevisionSourceIdentifier.create("yang1");
-        final RevisionSourceIdentifier yang2 = RevisionSourceIdentifier.create("yang2");
-        final SchemaSourceRegistry registry = mock(SchemaSourceRegistry.class);
-        final SchemaRepository schemaRepository = mock(SchemaRepository.class);
-        final SchemaSourceRegistration<?> regYang1 = mock(SchemaSourceRegistration.class);
-        final SchemaSourceRegistration<?> regYang2 = mock(SchemaSourceRegistration.class);
-        doReturn(regYang1).when(registry).registerSchemaSource(any(), withSourceId(yang1));
-        doReturn(regYang2).when(registry).registerSchemaSource(any(), withSourceId(yang2));
-        final SchemaContextFactory schemaContextFactory = mock(SchemaContextFactory.class);
-        doReturn(schemaContextFactory).when(schemaRepository).createSchemaContextFactory(any(SchemaSourceFilter.class));
-        final SettableFuture<SchemaContext> schemaContextFuture = SettableFuture.create();
-        final CheckedFuture<SchemaContext, SchemaResolutionException> checkedFuture =
-                Futures.makeChecked(schemaContextFuture, e -> new SchemaResolutionException("fail", e));
-        doReturn(checkedFuture).when(schemaContextFactory).createSchemaContext(any());
-        final ActorRef slaveRef =
-                system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository, TIMEOUT,
-                        mountPointService));
-        final List<SourceIdentifier> sources = ImmutableList.of(yang1, yang2);
-        slaveRef.tell(new RegisterMountPoint(sources, masterRef), masterRef);
-
-        verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1));
-        verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2));
-        //stop actor
         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
         Await.result(stopFuture, TIMEOUT.duration());
-        //provider should be deregistered
-        verify(regYang1).close();
-        verify(regYang2).close();
+
+        verify(mockMountPointReg).close();
+        verify(newMockSchemaSourceReg).close();
     }
 
+    @SuppressWarnings("unchecked")
     @Test
-    public void testYangTextSchemaSourceRequestMessage() throws Exception {
-        final SchemaRepository schemaRepository = mock(SchemaRepository.class);
-        final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
-        final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId,
-                DEFAULT_SCHEMA_REPOSITORY, schemaRepository, TIMEOUT, mountPointService);
+    public void testRegisterMountPointWithSchemaFailures() throws Exception {
+        final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
 
-        final ActorRef actorRefSchemaRepo = TestActorRef.create(system, props, "master_mocked_schema_repository");
-        final ActorContext actorContext = mock(ActorContext.class);
-        doReturn(system.dispatcher()).when(actorContext).dispatcher();
+        final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, mockRegistry,
+                mockSchemaRepository, TIMEOUT, mockMountPointService));
 
-        final ProxyYangTextSourceProvider proxyYang =
-                new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext, TIMEOUT);
-        // test if asking for source is resolved and sended back
+        // Test unrecoverable failure.
 
-        final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) {
-            @Override
-            protected MoreObjects.ToStringHelper addToStringAttributes(
-                    final MoreObjects.ToStringHelper toStringHelper) {
-                return null;
-            }
+        doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
+                .when(mockSchemaContextFactory).createSchemaContext(any());
 
-            @Override
-            public InputStream openStream() throws IOException {
-                return new ByteArrayInputStream("YANG".getBytes());
-            }
-        };
+        slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+                masterRef), testKit.getRef());
 
+        testKit.expectMsgClass(Success.class);
 
-        final ListenableFuture<YangTextSchemaSource> result = Futures.immediateFuture(yangTextSchemaSource);
+        verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
+        verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
 
-        doReturn(result).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
+        verify(mockMountPointBuilder, after(1000).never()).register();
+        verify(mockSchemaSourceReg1, timeout(1000)).close();
+        verify(mockSchemaSourceReg2, timeout(1000)).close();
 
-        final Future<YangTextSchemaSourceSerializationProxy> resolvedSchema =
-                proxyYang.getYangTextSchemaSource(sourceIdentifier);
+        // Test recoverable AskTimeoutException - schema context resolution should be retried.
 
-        final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchema, TIMEOUT.duration());
+        reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
 
-        assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
-        assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
+        doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
+                new AskTimeoutException("timeout"))))
+            .doReturn(Futures.immediateFuture(mockSchemaContext))
+            .when(mockSchemaContextFactory).createSchemaContext(any());
 
+        slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+                masterRef), testKit.getRef());
 
-        // test if asking for source is missing
-        exception.expect(MissingSchemaSourceException.class);
+        testKit.expectMsgClass(Success.class);
+
+        verify(mockMountPointBuilder, timeout(5000)).register();
+        verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
+
+        // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
+        // attempt should not be retried.
 
-        final SchemaSourceException schemaSourceException =
-                new MissingSchemaSourceException("Fail", sourceIdentifier);
+        reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
+        resetMountPointMocks();
 
-        final CheckedFuture<YangTextSchemaSource, SchemaSourceException> resultFail =
-                Futures.immediateFailedCheckedFuture(schemaSourceException);
+        final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class);
+        doReturn(Futures.immediateFuture(mockSchemaContext))
+                .when(mockSchemaContextFactorySuccess).createSchemaContext(any());
 
-        doReturn(resultFail).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
+        doAnswer(unused -> {
+            SettableFuture<SchemaContext> future = SettableFuture.create();
+            new Thread(() -> {
+                doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
+                        .createSchemaContextFactory(any(SchemaSourceFilter.class));
 
-        final Future<YangTextSchemaSourceSerializationProxy> failedSchema =
-                proxyYang.getYangTextSchemaSource(sourceIdentifier);
+                slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+                        masterRef), testKit.getRef());
 
-        Await.result(failedSchema, TIMEOUT.duration());
+                future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
+            }).start();
+            return future;
+        }).when(mockSchemaContextFactory).createSchemaContext(any());
 
+        doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
+                .createSchemaContextFactory(any(SchemaSourceFilter.class));
+
+        slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+                masterRef), testKit.getRef());
+
+        verify(mockMountPointBuilder, timeout(5000)).register();
+        verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
     }
 
     @Test
-    public void testProxyDOMRpcService() throws Exception {
+    public void testYangTextSchemaSourceRequest() throws Exception {
+        final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
 
-        final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
-        final List<SourceIdentifier> sourceIdentifiers =
-                Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
+        final ProxyYangTextSourceProvider proxyYangProvider =
+                new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
+
+        final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
+                ByteSource.wrap("YANG".getBytes(UTF_8)));
+
+        // Test success.
 
-        // init master data
+        final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
+                .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
+                     PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
 
-        final Future<Object> initialDataToActor =
-                Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
-                        domRpcService), TIMEOUT);
+        final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
+                proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
 
-        final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
+        final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
 
-        assertTrue(successInit instanceof MasterActorDataInitialized);
+        assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
+        assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
+
+        // Test missing source failure.
+
+        exception.expect(MissingSchemaSourceException.class);
+
+        schemaSourceReg.close();
+
+        final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
+                proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
+
+        Await.result(failedSchemaFuture, TIMEOUT.duration());
+    }
+
+    @Test
+    @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
+    public void testSlaveInvokeRpc() throws Throwable {
+
+        final List<SourceIdentifier> sourceIdentifiers =
+                Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
 
-        // test if slave get right identifiers from master
+        initializeMaster(sourceIdentifiers);
+        registerSlaveMountPoint();
 
-        final ProxyDOMRpcService slaveDomRPCService =
-                new ProxyDOMRpcService(system, masterRef, remoteDeviceId, TIMEOUT);
+        ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
+        verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
+
+        final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
+        assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
 
         final QName testQName = QName.create("", "TestQname");
         final SchemaPath schemaPath = SchemaPath.create(true, testQName);
@@ -313,83 +441,138 @@ public class NetconfNodeActorTest {
                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
         final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
-        // EmptyResultResponse message
 
-        doReturn(Futures.immediateCheckedFuture(null)).when(domRpcService).invokeRpc(any(), any());
+        // RPC with no response output.
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureEmpty =
-                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+        doReturn(Futures.immediateCheckedFuture(null)).when(mockDOMRpcService).invokeRpc(any(), any());
 
-        final Object resultNull = resultFutureEmpty.checkedGet(2, TimeUnit.SECONDS);
+        DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
 
-        assertEquals(null, resultNull);
+        assertEquals(null, result);
 
-        // InvokeRpcMessageReply message
+        // RPC with response output.
 
         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode)))
-                .when(domRpcService).invokeRpc(any(), any());
-
-        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureNn =
-                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+                .when(mockDOMRpcService).invokeRpc(any(), any());
 
-        final DOMRpcResult resultNn = resultFutureNn.checkedGet(2, TimeUnit.SECONDS);
+        result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
 
-        assertEquals(outputNode, resultNn.getResult());
-        assertTrue(resultNn.getErrors().isEmpty());
+        assertEquals(outputNode, result.getResult());
+        assertTrue(result.getErrors().isEmpty());
 
-        // InvokeRpcMessageReply message only error
+        // RPC with response error.
 
         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError)))
-                .when(domRpcService).invokeRpc(any(), any());
+                .when(mockDOMRpcService).invokeRpc(any(), any());
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureError =
-                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+        result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
 
-        final DOMRpcResult resultError = resultFutureError.checkedGet(2, TimeUnit.SECONDS);
+        assertNull(result.getResult());
+        assertEquals(rpcError, result.getErrors().iterator().next());
 
-        assertNull(resultError.getResult());
-        assertEquals(rpcError, resultError.getErrors().iterator().next());
-
-        // InvokeRpcMessageReply message error + result
+        // RPC with response output and error.
 
         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
-                .when(domRpcService).invokeRpc(any(), any());
-
-        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureOutputError =
-                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+                .when(mockDOMRpcService).invokeRpc(any(), any());
 
-        final DOMRpcResult resultOutputError = resultFutureOutputError.checkedGet(2, TimeUnit.SECONDS);
+        final DOMRpcResult resultOutputError =
+                slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
 
         assertEquals(outputNode, resultOutputError.getResult());
         assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
 
-        // Throwable message
+        // RPC failure.
 
         exception.expect(DOMRpcException.class);
 
-        doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("")))
-                .when(domRpcService).invokeRpc(any(), any());
+        doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("mock")))
+                .when(mockDOMRpcService).invokeRpc(any(), any());
+
+        try {
+            slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
+        } catch (ExecutionException e) {
+            throw e.getCause();
+        }
+    }
+
+    @Test
+    public void testSlaveNewTransactionRequests() {
+
+        doReturn(mock(DOMDataReadOnlyTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
+        doReturn(mock(DOMDataReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
+        doReturn(mock(DOMDataWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
+
+        initializeMaster(Collections.emptyList());
+        registerSlaveMountPoint();
+
+        ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
+        verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
 
-        final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureThrowable =
-                slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+        final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
+        assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
 
-        resultFutureThrowable.checkedGet(2, TimeUnit.SECONDS);
+        slaveDOMDataBroker.newReadOnlyTransaction();
+        verify(mockDOMDataBroker).newReadOnlyTransaction();
 
+        slaveDOMDataBroker.newReadWriteTransaction();
+        verify(mockDOMDataBroker).newReadWriteTransaction();
+
+        slaveDOMDataBroker.newWriteOnlyTransaction();
+        verify(mockDOMDataBroker).newWriteOnlyTransaction();
+    }
+
+    private ActorRef registerSlaveMountPoint() {
+        final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
+                NetconfTopologySetupBuilder.create().setActorSystem(system).build(), remoteDeviceId, mockRegistry,
+                mockSchemaRepository, TIMEOUT, mockMountPointService));
+
+        doReturn(Futures.immediateFuture(mockSchemaContext))
+                .when(mockSchemaContextFactory).createSchemaContext(any());
+
+        slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
+                masterRef), testKit.getRef());
+
+        verify(mockMountPointBuilder, timeout(5000)).register();
+        verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext);
+        verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
+        verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
+        verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
+
+        testKit.expectMsgClass(Success.class);
+
+        return slaveRef;
+    }
+
+    private void initializeMaster(List<SourceIdentifier> sourceIdentifiers) {
+        masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers,
+                mockDOMRpcService), testKit.getRef());
+
+        testKit.expectMsgClass(MasterActorDataInitialized.class);
+    }
+
+    private void resetMountPointMocks() {
+        reset(mockMountPointReg, mockMountPointBuilder);
+
+        doNothing().when(mockMountPointReg).close();
+
+        doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
+        doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
+        doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
     }
 
     private PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
-        return argThat(new ArgumentMatcher<PotentialSchemaSource>() {
+        return argThat(new ArgumentMatcher<PotentialSchemaSource<?>>() {
             @Override
             public boolean matches(final Object argument) {
-                final PotentialSchemaSource potentialSchemaSource = (PotentialSchemaSource) argument;
+                final PotentialSchemaSource<?> potentialSchemaSource = (PotentialSchemaSource<?>) argument;
                 return identifier.equals(potentialSchemaSource.getSourceIdentifier());
             }
         });
     }
 
-    private String convertStreamToString(final java.io.InputStream is) {
-        final java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
-        return s.hasNext() ? s.next() : "";
+    private String convertStreamToString(final InputStream is) {
+        try (Scanner scanner = new Scanner(is)) {
+            return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
+        }
     }
-
 }