Integrate MRI projects for Neon
[netconf.git] / netconf / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import static java.nio.charset.StandardCharsets.UTF_8;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.any;
16 import static org.mockito.ArgumentMatchers.argThat;
17 import static org.mockito.ArgumentMatchers.eq;
18 import static org.mockito.Mockito.after;
19 import static org.mockito.Mockito.doAnswer;
20 import static org.mockito.Mockito.doNothing;
21 import static org.mockito.Mockito.doReturn;
22 import static org.mockito.Mockito.mock;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.timeout;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.verifyNoMoreInteractions;
28 import static org.mockito.MockitoAnnotations.initMocks;
29
30 import akka.actor.ActorRef;
31 import akka.actor.ActorSystem;
32 import akka.actor.Props;
33 import akka.actor.Status.Failure;
34 import akka.actor.Status.Success;
35 import akka.pattern.AskTimeoutException;
36 import akka.pattern.Patterns;
37 import akka.testkit.TestActorRef;
38 import akka.testkit.javadsl.TestKit;
39 import akka.util.Timeout;
40 import com.google.common.collect.ImmutableList;
41 import com.google.common.collect.Lists;
42 import com.google.common.io.ByteSource;
43 import com.google.common.net.InetAddresses;
44 import com.google.common.util.concurrent.Futures;
45 import com.google.common.util.concurrent.SettableFuture;
46 import java.io.InputStream;
47 import java.net.InetSocketAddress;
48 import java.util.Collections;
49 import java.util.List;
50 import java.util.Scanner;
51 import java.util.concurrent.ExecutionException;
52 import java.util.concurrent.TimeUnit;
53 import org.junit.After;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.rules.ExpectedException;
58 import org.mockito.ArgumentCaptor;
59 import org.mockito.Mock;
60 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
61 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
62 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
63 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
64 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
65 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
66 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
67 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
68 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
69 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
70 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
71 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
72 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
73 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
74 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
75 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
76 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
77 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
78 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
79 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
80 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
81 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
82 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
83 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
84 import org.opendaylight.yangtools.concepts.ObjectRegistration;
85 import org.opendaylight.yangtools.yang.common.QName;
86 import org.opendaylight.yangtools.yang.common.RpcError;
87 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
88 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
89 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
90 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
91 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
92 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
93 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
94 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
95 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
96 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
97 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
98 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
99 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
100 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
101 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
102 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
103 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
104 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
105 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
106 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
107 import scala.concurrent.Await;
108 import scala.concurrent.Future;
109 import scala.concurrent.duration.Duration;
110
111 public class NetconfNodeActorTest {
112
113     private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
114     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
115     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
116
117     private ActorSystem system = ActorSystem.create();
118     private final TestKit testKit = new TestKit(system);
119
120     @Rule
121     public final ExpectedException exception = ExpectedException.none();
122
123     private ActorRef masterRef;
124     private RemoteDeviceId remoteDeviceId;
125     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
126
127     @Mock
128     private DOMRpcService mockDOMRpcService;
129
130     @Mock
131     private DOMMountPointService mockMountPointService;
132
133     @Mock
134     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
135
136     @Mock
137     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
138
139     @Mock
140     private DOMDataBroker mockDOMDataBroker;
141
142     @Mock
143     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
144
145     @Mock
146     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
147
148     @Mock
149     private SchemaSourceRegistry mockRegistry;
150
151     @Mock
152     private SchemaContextFactory mockSchemaContextFactory;
153
154     @Mock
155     private SchemaRepository mockSchemaRepository;
156
157     @Mock
158     private SchemaContext mockSchemaContext;
159
160     @Before
161     public void setup() {
162         initMocks(this);
163
164         remoteDeviceId = new RemoteDeviceId("netconf-topology",
165                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
166
167         masterSchemaRepository.registerSchemaSourceListener(
168                 TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
169
170         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
171                 .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build();
172
173         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, masterSchemaRepository,
174                 masterSchemaRepository, TIMEOUT, mockMountPointService);
175
176         masterRef = TestActorRef.create(system, props, "master_messages");
177
178         resetMountPointMocks();
179
180         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
181
182         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
183         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
184
185         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
186                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
187     }
188
189     @After
190     public void teardown() {
191         TestKit.shutdownActorSystem(system, true);
192         system = null;
193     }
194
195     @Test
196     public void testInitializeAndRefreshMasterData() {
197
198         // Test CreateInitialMasterActorData.
199
200         initializeMaster(Lists.newArrayList());
201
202         // Test RefreshSetupMasterActorData.
203
204         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
205                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
206
207         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
208
209         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
210
211         testKit.expectMsgClass(MasterActorDataInitialized.class);
212     }
213
214     @Test
215     public void tesAskForMasterMountPoint() {
216
217         // Test with master not setup yet.
218
219         final TestKit kit = new TestKit(system);
220
221         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
222
223         final Failure failure = kit.expectMsgClass(Failure.class);
224         assertTrue(failure.cause() instanceof NotMasterException);
225
226         // Now initialize - master should send the RegisterMountPoint message.
227
228         List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
229         initializeMaster(sourceIdentifiers);
230
231         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
232
233         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
234
235         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
236     }
237
238     @Test
239     public void testRegisterAndUnregisterMountPoint() throws Exception {
240
241         ActorRef slaveRef = registerSlaveMountPoint();
242
243         // Unregister
244
245         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
246
247         verify(mockMountPointReg, timeout(5000)).close();
248         verify(mockSchemaSourceReg1, timeout(1000)).close();
249         verify(mockSchemaSourceReg2, timeout(1000)).close();
250
251         // Test registration with another interleaved registration that completes while the first registration
252         // is resolving the schema context.
253
254         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
255         resetMountPointMocks();
256
257         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
258
259         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
260                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
261
262         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
263
264         final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class);
265         doReturn(Futures.immediateFuture(mockSchemaContext))
266                 .when(newMockSchemaContextFactory).createSchemaContext(any());
267
268         doAnswer(unused -> {
269             SettableFuture<SchemaContext> future = SettableFuture.create();
270             new Thread(() -> {
271                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
272                         withSourceId(SOURCE_IDENTIFIER1));
273
274                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
275                         .createSchemaContextFactory(any(SchemaSourceFilter.class));
276
277                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
278                         testKit.getRef());
279
280                 future.set(mockSchemaContext);
281             }).start();
282             return future;
283         }).when(mockSchemaContextFactory).createSchemaContext(any());
284
285         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
286                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
287
288         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
289
290         verify(mockMountPointBuilder, timeout(5000)).register();
291         verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext);
292         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
293         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
294         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
295         verify(mockSchemaSourceReg1).close();
296         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
297         verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
298         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
299
300         // Stop the slave actor and verify schema source registrations are closed.
301
302         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
303         Await.result(stopFuture, TIMEOUT.duration());
304
305         verify(mockMountPointReg).close();
306         verify(newMockSchemaSourceReg).close();
307     }
308
309     @SuppressWarnings("unchecked")
310     @Test
311     public void testRegisterMountPointWithSchemaFailures() throws Exception {
312         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
313
314         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, mockRegistry,
315                 mockSchemaRepository, TIMEOUT, mockMountPointService));
316
317         // Test unrecoverable failure.
318
319         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
320                 .when(mockSchemaContextFactory).createSchemaContext(any());
321
322         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
323                 masterRef), testKit.getRef());
324
325         testKit.expectMsgClass(Success.class);
326
327         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
328         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
329
330         verify(mockMountPointBuilder, after(1000).never()).register();
331         verify(mockSchemaSourceReg1, timeout(1000)).close();
332         verify(mockSchemaSourceReg2, timeout(1000)).close();
333
334         // Test recoverable AskTimeoutException - schema context resolution should be retried.
335
336         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
337
338         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
339                 new AskTimeoutException("timeout"))))
340             .doReturn(Futures.immediateFuture(mockSchemaContext))
341             .when(mockSchemaContextFactory).createSchemaContext(any());
342
343         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
344                 masterRef), testKit.getRef());
345
346         testKit.expectMsgClass(Success.class);
347
348         verify(mockMountPointBuilder, timeout(5000)).register();
349         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
350
351         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
352         // attempt should not be retried.
353
354         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
355         resetMountPointMocks();
356
357         final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class);
358         doReturn(Futures.immediateFuture(mockSchemaContext))
359                 .when(mockSchemaContextFactorySuccess).createSchemaContext(any());
360
361         doAnswer(unused -> {
362             SettableFuture<SchemaContext> future = SettableFuture.create();
363             new Thread(() -> {
364                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
365                         .createSchemaContextFactory(any(SchemaSourceFilter.class));
366
367                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
368                         masterRef), testKit.getRef());
369
370                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
371             }).start();
372             return future;
373         }).when(mockSchemaContextFactory).createSchemaContext(any());
374
375         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
376                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
377
378         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
379                 masterRef), testKit.getRef());
380
381         verify(mockMountPointBuilder, timeout(5000)).register();
382         verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
383     }
384
385     @Test
386     public void testYangTextSchemaSourceRequest() throws Exception {
387         final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
388
389         final ProxyYangTextSourceProvider proxyYangProvider =
390                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
391
392         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
393                 ByteSource.wrap("YANG".getBytes(UTF_8)));
394
395         // Test success.
396
397         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
398                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
399                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
400
401         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
402                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
403
404         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
405
406         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
407         assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
408
409         // Test missing source failure.
410
411         exception.expect(MissingSchemaSourceException.class);
412
413         schemaSourceReg.close();
414
415         final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
416                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
417
418         Await.result(failedSchemaFuture, TIMEOUT.duration());
419     }
420
421     @Test
422     @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
423     public void testSlaveInvokeRpc() throws Throwable {
424
425         final List<SourceIdentifier> sourceIdentifiers =
426                 Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
427
428         initializeMaster(sourceIdentifiers);
429         registerSlaveMountPoint();
430
431         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
432         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
433
434         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
435         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
436
437         final QName testQName = QName.create("", "TestQname");
438         final SchemaPath schemaPath = SchemaPath.create(true, testQName);
439         final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
440                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
441                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
442         final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
443
444         // RPC with no response output.
445
446         doReturn(Futures.immediateCheckedFuture(null)).when(mockDOMRpcService).invokeRpc(any(), any());
447
448         DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
449
450         assertEquals(null, result);
451
452         // RPC with response output.
453
454         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode)))
455                 .when(mockDOMRpcService).invokeRpc(any(), any());
456
457         result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
458
459         assertEquals(outputNode, result.getResult());
460         assertTrue(result.getErrors().isEmpty());
461
462         // RPC with response error.
463
464         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError)))
465                 .when(mockDOMRpcService).invokeRpc(any(), any());
466
467         result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
468
469         assertNull(result.getResult());
470         assertEquals(rpcError, result.getErrors().iterator().next());
471
472         // RPC with response output and error.
473
474         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
475                 .when(mockDOMRpcService).invokeRpc(any(), any());
476
477         final DOMRpcResult resultOutputError =
478                 slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
479
480         assertEquals(outputNode, resultOutputError.getResult());
481         assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
482
483         // RPC failure.
484
485         exception.expect(DOMRpcException.class);
486
487         doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("mock")))
488                 .when(mockDOMRpcService).invokeRpc(any(), any());
489
490         try {
491             slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
492         } catch (ExecutionException e) {
493             throw e.getCause();
494         }
495     }
496
497     @Test
498     public void testSlaveNewTransactionRequests() {
499
500         doReturn(mock(DOMDataReadOnlyTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
501         doReturn(mock(DOMDataReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
502         doReturn(mock(DOMDataWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
503
504         initializeMaster(Collections.emptyList());
505         registerSlaveMountPoint();
506
507         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
508         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
509
510         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
511         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
512
513         slaveDOMDataBroker.newReadOnlyTransaction();
514         verify(mockDOMDataBroker).newReadOnlyTransaction();
515
516         slaveDOMDataBroker.newReadWriteTransaction();
517         verify(mockDOMDataBroker).newReadWriteTransaction();
518
519         slaveDOMDataBroker.newWriteOnlyTransaction();
520         verify(mockDOMDataBroker).newWriteOnlyTransaction();
521     }
522
523     private ActorRef registerSlaveMountPoint() {
524         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
525                 NetconfTopologySetupBuilder.create().setActorSystem(system).build(), remoteDeviceId, mockRegistry,
526                 mockSchemaRepository, TIMEOUT, mockMountPointService));
527
528         doReturn(Futures.immediateFuture(mockSchemaContext))
529                 .when(mockSchemaContextFactory).createSchemaContext(any());
530
531         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
532                 masterRef), testKit.getRef());
533
534         verify(mockMountPointBuilder, timeout(5000)).register();
535         verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext);
536         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
537         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
538         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
539
540         testKit.expectMsgClass(Success.class);
541
542         return slaveRef;
543     }
544
545     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
546         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers,
547                 mockDOMRpcService), testKit.getRef());
548
549         testKit.expectMsgClass(MasterActorDataInitialized.class);
550     }
551
552     private void resetMountPointMocks() {
553         reset(mockMountPointReg, mockMountPointBuilder);
554
555         doNothing().when(mockMountPointReg).close();
556
557         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
558         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
559         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
560     }
561
562     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
563         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
564     }
565
566     private static String convertStreamToString(final InputStream is) {
567         try (Scanner scanner = new Scanner(is)) {
568             return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
569         }
570     }
571 }