Migrate netconf to MD-SAL APIs
[netconf.git] / netconf / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.argThat;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.after;
18 import static org.mockito.Mockito.doAnswer;
19 import static org.mockito.Mockito.doNothing;
20 import static org.mockito.Mockito.doReturn;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.reset;
23 import static org.mockito.Mockito.timeout;
24 import static org.mockito.Mockito.times;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.Mockito.verifyNoMoreInteractions;
27 import static org.mockito.MockitoAnnotations.initMocks;
28
29 import akka.actor.ActorRef;
30 import akka.actor.ActorSystem;
31 import akka.actor.Props;
32 import akka.actor.Status.Failure;
33 import akka.actor.Status.Success;
34 import akka.pattern.AskTimeoutException;
35 import akka.pattern.Patterns;
36 import akka.testkit.TestActorRef;
37 import akka.testkit.javadsl.TestKit;
38 import akka.util.Timeout;
39 import com.google.common.collect.ImmutableList;
40 import com.google.common.collect.Lists;
41 import com.google.common.io.ByteSource;
42 import com.google.common.net.InetAddresses;
43 import com.google.common.util.concurrent.Futures;
44 import com.google.common.util.concurrent.SettableFuture;
45 import java.io.InputStream;
46 import java.net.InetSocketAddress;
47 import java.util.Collections;
48 import java.util.List;
49 import java.util.Scanner;
50 import java.util.concurrent.ExecutionException;
51 import java.util.concurrent.TimeUnit;
52 import org.junit.After;
53 import org.junit.Before;
54 import org.junit.Rule;
55 import org.junit.Test;
56 import org.junit.rules.ExpectedException;
57 import org.mockito.ArgumentCaptor;
58 import org.mockito.Mock;
59 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
60 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
64 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
65 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
66 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
67 import org.opendaylight.mdsal.dom.api.DOMRpcException;
68 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
69 import org.opendaylight.mdsal.dom.api.DOMRpcService;
70 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
71 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
72 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
73 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
74 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
75 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
76 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
77 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
78 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
79 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
80 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
81 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
82 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
83 import org.opendaylight.yangtools.concepts.ObjectRegistration;
84 import org.opendaylight.yangtools.yang.common.QName;
85 import org.opendaylight.yangtools.yang.common.RpcError;
86 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
87 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
88 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
89 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
90 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
91 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
92 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
93 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
94 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
95 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
96 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
97 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
98 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
99 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
100 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
101 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
102 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
103 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
104 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
105 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
106 import scala.concurrent.Await;
107 import scala.concurrent.Future;
108 import scala.concurrent.duration.Duration;
109
110 public class NetconfNodeActorTest {
111
112     private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
113     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
114     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
115
116     private ActorSystem system = ActorSystem.create();
117     private final TestKit testKit = new TestKit(system);
118
119     @Rule
120     public final ExpectedException exception = ExpectedException.none();
121
122     private ActorRef masterRef;
123     private RemoteDeviceId remoteDeviceId;
124     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
125
126     @Mock
127     private DOMRpcService mockDOMRpcService;
128
129     @Mock
130     private DOMMountPointService mockMountPointService;
131
132     @Mock
133     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
134
135     @Mock
136     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
137
138     @Mock
139     private DOMDataBroker mockDOMDataBroker;
140
141     @Mock
142     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
143
144     @Mock
145     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
146
147     @Mock
148     private SchemaSourceRegistry mockRegistry;
149
150     @Mock
151     private SchemaContextFactory mockSchemaContextFactory;
152
153     @Mock
154     private SchemaRepository mockSchemaRepository;
155
156     @Mock
157     private SchemaContext mockSchemaContext;
158
159     @Before
160     public void setup() {
161         initMocks(this);
162
163         remoteDeviceId = new RemoteDeviceId("netconf-topology",
164                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
165
166         masterSchemaRepository.registerSchemaSourceListener(
167                 TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
168
169         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
170                 .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build();
171
172         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, masterSchemaRepository,
173                 masterSchemaRepository, TIMEOUT, mockMountPointService);
174
175         masterRef = TestActorRef.create(system, props, "master_messages");
176
177         resetMountPointMocks();
178
179         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
180
181         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
182         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
183
184         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
185                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
186     }
187
188     @After
189     public void teardown() {
190         TestKit.shutdownActorSystem(system, true);
191         system = null;
192     }
193
194     @Test
195     public void testInitializeAndRefreshMasterData() {
196
197         // Test CreateInitialMasterActorData.
198
199         initializeMaster(Lists.newArrayList());
200
201         // Test RefreshSetupMasterActorData.
202
203         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
204                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
205
206         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
207
208         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
209
210         testKit.expectMsgClass(MasterActorDataInitialized.class);
211     }
212
213     @Test
214     public void tesAskForMasterMountPoint() {
215
216         // Test with master not setup yet.
217
218         final TestKit kit = new TestKit(system);
219
220         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
221
222         final Failure failure = kit.expectMsgClass(Failure.class);
223         assertTrue(failure.cause() instanceof NotMasterException);
224
225         // Now initialize - master should send the RegisterMountPoint message.
226
227         List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
228         initializeMaster(sourceIdentifiers);
229
230         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
231
232         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
233
234         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
235     }
236
237     @Test
238     public void testRegisterAndUnregisterMountPoint() throws Exception {
239
240         ActorRef slaveRef = registerSlaveMountPoint();
241
242         // Unregister
243
244         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
245
246         verify(mockMountPointReg, timeout(5000)).close();
247         verify(mockSchemaSourceReg1, timeout(1000)).close();
248         verify(mockSchemaSourceReg2, timeout(1000)).close();
249
250         // Test registration with another interleaved registration that completes while the first registration
251         // is resolving the schema context.
252
253         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
254         resetMountPointMocks();
255
256         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
257
258         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
259                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
260
261         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
262
263         final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class);
264         doReturn(Futures.immediateFuture(mockSchemaContext))
265                 .when(newMockSchemaContextFactory).createSchemaContext(any());
266
267         doAnswer(unused -> {
268             SettableFuture<SchemaContext> future = SettableFuture.create();
269             new Thread(() -> {
270                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
271                         withSourceId(SOURCE_IDENTIFIER1));
272
273                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
274                         .createSchemaContextFactory(any(SchemaSourceFilter.class));
275
276                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
277                         testKit.getRef());
278
279                 future.set(mockSchemaContext);
280             }).start();
281             return future;
282         }).when(mockSchemaContextFactory).createSchemaContext(any());
283
284         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
285                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
286
287         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
288
289         verify(mockMountPointBuilder, timeout(5000)).register();
290         verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext);
291         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
292         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
293         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
294         verify(mockSchemaSourceReg1).close();
295         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
296         verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
297         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
298
299         // Stop the slave actor and verify schema source registrations are closed.
300
301         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
302         Await.result(stopFuture, TIMEOUT.duration());
303
304         verify(mockMountPointReg).close();
305         verify(newMockSchemaSourceReg).close();
306     }
307
308     @SuppressWarnings("unchecked")
309     @Test
310     public void testRegisterMountPointWithSchemaFailures() throws Exception {
311         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
312
313         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, mockRegistry,
314                 mockSchemaRepository, TIMEOUT, mockMountPointService));
315
316         // Test unrecoverable failure.
317
318         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
319                 .when(mockSchemaContextFactory).createSchemaContext(any());
320
321         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
322                 masterRef), testKit.getRef());
323
324         testKit.expectMsgClass(Success.class);
325
326         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
327         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
328
329         verify(mockMountPointBuilder, after(1000).never()).register();
330         verify(mockSchemaSourceReg1, timeout(1000)).close();
331         verify(mockSchemaSourceReg2, timeout(1000)).close();
332
333         // Test recoverable AskTimeoutException - schema context resolution should be retried.
334
335         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
336
337         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
338                 new AskTimeoutException("timeout"))))
339             .doReturn(Futures.immediateFuture(mockSchemaContext))
340             .when(mockSchemaContextFactory).createSchemaContext(any());
341
342         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
343                 masterRef), testKit.getRef());
344
345         testKit.expectMsgClass(Success.class);
346
347         verify(mockMountPointBuilder, timeout(5000)).register();
348         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
349
350         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
351         // attempt should not be retried.
352
353         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
354         resetMountPointMocks();
355
356         final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class);
357         doReturn(Futures.immediateFuture(mockSchemaContext))
358                 .when(mockSchemaContextFactorySuccess).createSchemaContext(any());
359
360         doAnswer(unused -> {
361             SettableFuture<SchemaContext> future = SettableFuture.create();
362             new Thread(() -> {
363                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
364                         .createSchemaContextFactory(any(SchemaSourceFilter.class));
365
366                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
367                         masterRef), testKit.getRef());
368
369                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
370             }).start();
371             return future;
372         }).when(mockSchemaContextFactory).createSchemaContext(any());
373
374         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
375                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
376
377         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
378                 masterRef), testKit.getRef());
379
380         verify(mockMountPointBuilder, timeout(5000)).register();
381         verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
382     }
383
384     @Test
385     public void testYangTextSchemaSourceRequest() throws Exception {
386         final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
387
388         final ProxyYangTextSourceProvider proxyYangProvider =
389                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
390
391         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
392                 ByteSource.wrap("YANG".getBytes(UTF_8)));
393
394         // Test success.
395
396         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
397                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
398                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
399
400         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
401                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
402
403         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
404
405         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
406         assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
407
408         // Test missing source failure.
409
410         exception.expect(MissingSchemaSourceException.class);
411
412         schemaSourceReg.close();
413
414         final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
415                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
416
417         Await.result(failedSchemaFuture, TIMEOUT.duration());
418     }
419
420     @Test
421     @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
422     public void testSlaveInvokeRpc() throws Throwable {
423
424         final List<SourceIdentifier> sourceIdentifiers =
425                 Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
426
427         initializeMaster(sourceIdentifiers);
428         registerSlaveMountPoint();
429
430         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
431         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
432
433         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
434         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
435
436         final QName testQName = QName.create("", "TestQname");
437         final SchemaPath schemaPath = SchemaPath.create(true, testQName);
438         final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
439                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
440                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
441         final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
442
443         // RPC with no response output.
444
445         doReturn(Futures.immediateCheckedFuture(null)).when(mockDOMRpcService).invokeRpc(any(), any());
446
447         DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
448
449         assertEquals(null, result);
450
451         // RPC with response output.
452
453         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode)))
454                 .when(mockDOMRpcService).invokeRpc(any(), any());
455
456         result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
457
458         assertEquals(outputNode, result.getResult());
459         assertTrue(result.getErrors().isEmpty());
460
461         // RPC with response error.
462
463         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError)))
464                 .when(mockDOMRpcService).invokeRpc(any(), any());
465
466         result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
467
468         assertNull(result.getResult());
469         assertEquals(rpcError, result.getErrors().iterator().next());
470
471         // RPC with response output and error.
472
473         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
474                 .when(mockDOMRpcService).invokeRpc(any(), any());
475
476         final DOMRpcResult resultOutputError =
477                 slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
478
479         assertEquals(outputNode, resultOutputError.getResult());
480         assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
481
482         // RPC failure.
483
484         exception.expect(DOMRpcException.class);
485
486         doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("mock")))
487                 .when(mockDOMRpcService).invokeRpc(any(), any());
488
489         try {
490             slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
491         } catch (ExecutionException e) {
492             throw e.getCause();
493         }
494     }
495
496     @Test
497     public void testSlaveNewTransactionRequests() {
498
499         doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
500         doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
501         doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
502
503         initializeMaster(Collections.emptyList());
504         registerSlaveMountPoint();
505
506         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
507         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
508
509         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
510         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
511
512         slaveDOMDataBroker.newReadOnlyTransaction();
513         verify(mockDOMDataBroker).newReadOnlyTransaction();
514
515         slaveDOMDataBroker.newReadWriteTransaction();
516         verify(mockDOMDataBroker).newReadWriteTransaction();
517
518         slaveDOMDataBroker.newWriteOnlyTransaction();
519         verify(mockDOMDataBroker).newWriteOnlyTransaction();
520     }
521
522     private ActorRef registerSlaveMountPoint() {
523         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
524                 NetconfTopologySetupBuilder.create().setActorSystem(system).build(), remoteDeviceId, mockRegistry,
525                 mockSchemaRepository, TIMEOUT, mockMountPointService));
526
527         doReturn(Futures.immediateFuture(mockSchemaContext))
528                 .when(mockSchemaContextFactory).createSchemaContext(any());
529
530         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
531                 masterRef), testKit.getRef());
532
533         verify(mockMountPointBuilder, timeout(5000)).register();
534         verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext);
535         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
536         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
537         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
538
539         testKit.expectMsgClass(Success.class);
540
541         return slaveRef;
542     }
543
544     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
545         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers,
546                 mockDOMRpcService), testKit.getRef());
547
548         testKit.expectMsgClass(MasterActorDataInitialized.class);
549     }
550
551     private void resetMountPointMocks() {
552         reset(mockMountPointReg, mockMountPointBuilder);
553
554         doNothing().when(mockMountPointReg).close();
555
556         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
557         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
558         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
559     }
560
561     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
562         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
563     }
564
565     private static String convertStreamToString(final InputStream is) {
566         try (Scanner scanner = new Scanner(is)) {
567             return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
568         }
569     }
570 }