Custom scheme-cache-directory yang models are not replicated among
[netconf.git] / netconf / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.argThat;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.after;
18 import static org.mockito.Mockito.doAnswer;
19 import static org.mockito.Mockito.doNothing;
20 import static org.mockito.Mockito.doReturn;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.reset;
23 import static org.mockito.Mockito.timeout;
24 import static org.mockito.Mockito.times;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.Mockito.verifyNoMoreInteractions;
27 import static org.mockito.MockitoAnnotations.initMocks;
28 import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
29
30 import akka.actor.ActorRef;
31 import akka.actor.ActorSystem;
32 import akka.actor.Props;
33 import akka.actor.Status.Failure;
34 import akka.actor.Status.Success;
35 import akka.pattern.AskTimeoutException;
36 import akka.pattern.Patterns;
37 import akka.testkit.TestActorRef;
38 import akka.testkit.javadsl.TestKit;
39 import akka.util.Timeout;
40 import com.google.common.collect.ImmutableList;
41 import com.google.common.collect.Lists;
42 import com.google.common.io.ByteSource;
43 import com.google.common.net.InetAddresses;
44 import com.google.common.util.concurrent.Futures;
45 import com.google.common.util.concurrent.SettableFuture;
46 import java.io.InputStream;
47 import java.net.InetSocketAddress;
48 import java.util.ArrayList;
49 import java.util.Collections;
50 import java.util.List;
51 import java.util.Scanner;
52 import java.util.concurrent.ExecutionException;
53 import java.util.concurrent.TimeUnit;
54 import org.junit.After;
55 import org.junit.Before;
56 import org.junit.Rule;
57 import org.junit.Test;
58 import org.junit.rules.ExpectedException;
59 import org.mockito.ArgumentCaptor;
60 import org.mockito.Mock;
61 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
62 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
66 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
67 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
68 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
69 import org.opendaylight.mdsal.dom.api.DOMRpcException;
70 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
71 import org.opendaylight.mdsal.dom.api.DOMRpcService;
72 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
73 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
74 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
75 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
76 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
77 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
78 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
79 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
80 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
81 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
82 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
83 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
84 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
85 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
86 import org.opendaylight.yangtools.concepts.ObjectRegistration;
87 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
88 import org.opendaylight.yangtools.yang.common.QName;
89 import org.opendaylight.yangtools.yang.common.RpcError;
90 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
91 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
92 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
93 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
94 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
95 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
96 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
97 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
98 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
99 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
100 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
101 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
102 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
103 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
104 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
105 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
106 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
107 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
108 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
109 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
110 import scala.concurrent.Await;
111 import scala.concurrent.Future;
112 import scala.concurrent.duration.Duration;
113
114 public class NetconfNodeActorTest {
115
116     private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
117     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
118     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
119
120     private ActorSystem system = ActorSystem.create();
121     private final TestKit testKit = new TestKit(system);
122
123     @Rule
124     public final ExpectedException exception = ExpectedException.none();
125
126     private ActorRef masterRef;
127     private RemoteDeviceId remoteDeviceId;
128     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
129
130     @Mock
131     private DOMRpcService mockDOMRpcService;
132
133     @Mock
134     private DOMMountPointService mockMountPointService;
135
136     @Mock
137     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
138
139     @Mock
140     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
141
142     @Mock
143     private DOMDataBroker mockDOMDataBroker;
144
145     @Mock
146     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
147
148     @Mock
149     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
150
151     @Mock
152     private SchemaSourceRegistry mockRegistry;
153
154     @Mock
155     private SchemaContextFactory mockSchemaContextFactory;
156
157     @Mock
158     private SchemaRepository mockSchemaRepository;
159
160     @Mock
161     private SchemaContext mockSchemaContext;
162
163     @Mock
164     private SchemaResourcesDTO schemaResourceDTO;
165
166     @Before
167     public void setup() {
168         initMocks(this);
169
170         remoteDeviceId = new RemoteDeviceId("netconf-topology",
171                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
172
173         masterSchemaRepository.registerSchemaSourceListener(
174                 TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
175
176         doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
177         doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
178         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
179                 .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).setSchemaResourceDTO(schemaResourceDTO).build();
180
181         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
182
183         masterRef = TestActorRef.create(system, props, "master_messages");
184
185         resetMountPointMocks();
186
187         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
188
189         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
190         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
191
192         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
193                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
194     }
195
196     @After
197     public void teardown() {
198         TestKit.shutdownActorSystem(system, true);
199         system = null;
200     }
201
202     @Test
203     public void testInitializeAndRefreshMasterData() {
204
205         // Test CreateInitialMasterActorData.
206
207         initializeMaster(new ArrayList<>());
208
209         // Test RefreshSetupMasterActorData.
210
211         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
212                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
213
214         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create()
215                 .setSchemaResourceDTO(schemaResourceDTO).setActorSystem(system).build();
216
217         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
218
219         testKit.expectMsgClass(MasterActorDataInitialized.class);
220     }
221
222     @Test
223     public void tesAskForMasterMountPoint() {
224
225         // Test with master not setup yet.
226
227         final TestKit kit = new TestKit(system);
228
229         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
230
231         final Failure failure = kit.expectMsgClass(Failure.class);
232         assertTrue(failure.cause() instanceof NotMasterException);
233
234         // Now initialize - master should send the RegisterMountPoint message.
235
236         List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
237         initializeMaster(sourceIdentifiers);
238
239         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
240
241         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
242
243         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
244     }
245
246     @Test
247     public void testRegisterAndUnregisterMountPoint() throws Exception {
248
249         ActorRef slaveRef = registerSlaveMountPoint();
250
251         // Unregister
252
253         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
254
255         verify(mockMountPointReg, timeout(5000)).close();
256         verify(mockSchemaSourceReg1, timeout(1000)).close();
257         verify(mockSchemaSourceReg2, timeout(1000)).close();
258
259         // Test registration with another interleaved registration that completes while the first registration
260         // is resolving the schema context.
261
262         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
263         resetMountPointMocks();
264
265         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
266
267         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
268                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
269
270         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
271
272         final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class);
273         doReturn(Futures.immediateFuture(mockSchemaContext))
274                 .when(newMockSchemaContextFactory).createSchemaContext(any());
275
276         doAnswer(unused -> {
277             SettableFuture<SchemaContext> future = SettableFuture.create();
278             new Thread(() -> {
279                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
280                         withSourceId(SOURCE_IDENTIFIER1));
281
282                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
283                         .createSchemaContextFactory(any(SchemaSourceFilter.class));
284
285                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
286                         testKit.getRef());
287
288                 future.set(mockSchemaContext);
289             }).start();
290             return future;
291         }).when(mockSchemaContextFactory).createSchemaContext(any());
292
293         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
294                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
295
296         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
297
298         verify(mockMountPointBuilder, timeout(5000)).register();
299         verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext);
300         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
301         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
302         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
303         verify(mockSchemaSourceReg1).close();
304         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
305         verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
306         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
307
308         // Stop the slave actor and verify schema source registrations are closed.
309
310         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
311         Await.result(stopFuture, TIMEOUT.duration());
312
313         verify(mockMountPointReg).close();
314         verify(newMockSchemaSourceReg).close();
315     }
316
317     @SuppressWarnings("unchecked")
318     @Test
319     public void testRegisterMountPointWithSchemaFailures() throws Exception {
320         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
321         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
322         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
323         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2)
324                 .setActorSystem(system).build();
325
326         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
327                 mockMountPointService));
328
329         // Test unrecoverable failure.
330
331         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
332                 .when(mockSchemaContextFactory).createSchemaContext(any());
333
334         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
335                 masterRef), testKit.getRef());
336
337         testKit.expectMsgClass(Success.class);
338
339         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
340         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
341
342         verify(mockMountPointBuilder, after(1000).never()).register();
343         verify(mockSchemaSourceReg1, timeout(1000)).close();
344         verify(mockSchemaSourceReg2, timeout(1000)).close();
345
346         // Test recoverable AskTimeoutException - schema context resolution should be retried.
347
348         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
349
350         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
351                 new AskTimeoutException("timeout"))))
352             .doReturn(Futures.immediateFuture(mockSchemaContext))
353             .when(mockSchemaContextFactory).createSchemaContext(any());
354
355         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
356                 masterRef), testKit.getRef());
357
358         testKit.expectMsgClass(Success.class);
359
360         verify(mockMountPointBuilder, timeout(5000)).register();
361         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
362
363         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
364         // attempt should not be retried.
365
366         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
367         resetMountPointMocks();
368
369         final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class);
370         doReturn(Futures.immediateFuture(mockSchemaContext))
371                 .when(mockSchemaContextFactorySuccess).createSchemaContext(any());
372
373         doAnswer(unused -> {
374             SettableFuture<SchemaContext> future = SettableFuture.create();
375             new Thread(() -> {
376                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
377                         .createSchemaContextFactory(any(SchemaSourceFilter.class));
378
379                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
380                         masterRef), testKit.getRef());
381
382                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
383             }).start();
384             return future;
385         }).when(mockSchemaContextFactory).createSchemaContext(any());
386
387         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
388                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
389
390         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
391                 masterRef), testKit.getRef());
392
393         verify(mockMountPointBuilder, timeout(5000)).register();
394         verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
395     }
396
397     @Test(expected = MissingSchemaSourceException.class)
398     public void testMissingSchemaSourceOnMissingProvider() throws Exception {
399         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
400         doReturn(DEFAULT_SCHEMA_REPOSITORY).when(schemaResourceDTO2).getSchemaRegistry();
401         doReturn(DEFAULT_SCHEMA_REPOSITORY).when(schemaResourceDTO2).getSchemaRepository();
402         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
403                 .setSchemaResourceDTO(schemaResourceDTO2).setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build();
404         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
405         ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
406
407         final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
408
409         final ProxyYangTextSourceProvider proxyYangProvider =
410                 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
411
412         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
413                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
414         Await.result(resolvedSchemaFuture, TIMEOUT.duration());
415     }
416
417     @Test
418     public void testYangTextSchemaSourceRequest() throws Exception {
419         final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
420
421         final ProxyYangTextSourceProvider proxyYangProvider =
422                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
423
424         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
425                 ByteSource.wrap("YANG".getBytes(UTF_8)));
426
427         // Test success.
428
429         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
430                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
431                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
432
433         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
434                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
435
436         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
437
438         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
439         assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
440
441         // Test missing source failure.
442
443         exception.expect(MissingSchemaSourceException.class);
444
445         schemaSourceReg.close();
446
447         final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
448                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
449
450         Await.result(failedSchemaFuture, TIMEOUT.duration());
451     }
452
453     @Test
454     @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
455     public void testSlaveInvokeRpc() throws Throwable {
456
457         final List<SourceIdentifier> sourceIdentifiers =
458                 Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
459
460         initializeMaster(sourceIdentifiers);
461         registerSlaveMountPoint();
462
463         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
464         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
465
466         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
467         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
468
469         final QName testQName = QName.create("", "TestQname");
470         final SchemaPath schemaPath = SchemaPath.create(true, testQName);
471         final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
472                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
473                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
474         final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
475
476         // RPC with no response output.
477
478         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
479
480         DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
481
482         assertEquals(null, result);
483
484         // RPC with response output.
485
486         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
487                 .when(mockDOMRpcService).invokeRpc(any(), any());
488
489         result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
490
491         assertEquals(outputNode, result.getResult());
492         assertTrue(result.getErrors().isEmpty());
493
494         // RPC with response error.
495
496         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
497                 .when(mockDOMRpcService).invokeRpc(any(), any());
498
499         result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
500
501         assertNull(result.getResult());
502         assertEquals(rpcError, result.getErrors().iterator().next());
503
504         // RPC with response output and error.
505
506         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
507                 .when(mockDOMRpcService).invokeRpc(any(), any());
508
509         final DOMRpcResult resultOutputError =
510                 slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
511
512         assertEquals(outputNode, resultOutputError.getResult());
513         assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
514
515         // RPC failure.
516
517         exception.expect(DOMRpcException.class);
518
519         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
520                 .when(mockDOMRpcService).invokeRpc(any(), any());
521
522         try {
523             slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
524         } catch (ExecutionException e) {
525             throw e.getCause();
526         }
527     }
528
529     @Test
530     public void testSlaveNewTransactionRequests() {
531
532         doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
533         doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
534         doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
535
536         initializeMaster(Collections.emptyList());
537         registerSlaveMountPoint();
538
539         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
540         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
541
542         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
543         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
544
545         slaveDOMDataBroker.newReadOnlyTransaction();
546         verify(mockDOMDataBroker).newReadOnlyTransaction();
547
548         slaveDOMDataBroker.newReadWriteTransaction();
549         verify(mockDOMDataBroker).newReadWriteTransaction();
550
551         slaveDOMDataBroker.newWriteOnlyTransaction();
552         verify(mockDOMDataBroker).newWriteOnlyTransaction();
553     }
554
555     private ActorRef registerSlaveMountPoint() {
556         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
557         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
558         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
559         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
560                 NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2).setActorSystem(system)
561                 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
562
563         doReturn(Futures.immediateFuture(mockSchemaContext))
564                 .when(mockSchemaContextFactory).createSchemaContext(any());
565
566         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
567                 masterRef), testKit.getRef());
568
569         verify(mockMountPointBuilder, timeout(5000)).register();
570         verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext);
571         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
572         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
573         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
574
575         testKit.expectMsgClass(Success.class);
576
577         return slaveRef;
578     }
579
580     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
581         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers,
582                 mockDOMRpcService), testKit.getRef());
583
584         testKit.expectMsgClass(MasterActorDataInitialized.class);
585     }
586
587     private void resetMountPointMocks() {
588         reset(mockMountPointReg, mockMountPointBuilder);
589
590         doNothing().when(mockMountPointReg).close();
591
592         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
593         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
594         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
595     }
596
597     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
598         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
599     }
600
601     private static String convertStreamToString(final InputStream is) {
602         try (Scanner scanner = new Scanner(is)) {
603             return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
604         }
605     }
606 }