Bump sshd to 2.9.1
[netconf.git] / netconf / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.hamcrest.CoreMatchers.instanceOf;
13 import static org.hamcrest.CoreMatchers.startsWith;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertNull;
17 import static org.junit.Assert.assertThrows;
18 import static org.junit.Assert.assertTrue;
19 import static org.mockito.ArgumentMatchers.any;
20 import static org.mockito.ArgumentMatchers.anyCollection;
21 import static org.mockito.ArgumentMatchers.argThat;
22 import static org.mockito.ArgumentMatchers.eq;
23 import static org.mockito.Mockito.after;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.doNothing;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.timeout;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
33 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
34 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
35
36 import akka.actor.ActorRef;
37 import akka.actor.ActorSystem;
38 import akka.actor.Props;
39 import akka.actor.Status.Failure;
40 import akka.actor.Status.Success;
41 import akka.pattern.AskTimeoutException;
42 import akka.pattern.Patterns;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.collect.ImmutableList;
47 import com.google.common.io.ByteSource;
48 import com.google.common.net.InetAddresses;
49 import com.google.common.util.concurrent.FluentFuture;
50 import com.google.common.util.concurrent.Futures;
51 import com.google.common.util.concurrent.ListenableFuture;
52 import com.google.common.util.concurrent.SettableFuture;
53 import java.io.InputStream;
54 import java.net.InetSocketAddress;
55 import java.time.Duration;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Optional;
59 import java.util.Scanner;
60 import java.util.concurrent.ExecutionException;
61 import java.util.concurrent.TimeUnit;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Test;
65 import org.junit.runner.RunWith;
66 import org.mockito.ArgumentCaptor;
67 import org.mockito.Mock;
68 import org.mockito.junit.MockitoJUnitRunner;
69 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.dom.api.DOMActionException;
72 import org.opendaylight.mdsal.dom.api.DOMActionResult;
73 import org.opendaylight.mdsal.dom.api.DOMActionService;
74 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
76 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
77 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
79 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
80 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
81 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
82 import org.opendaylight.mdsal.dom.api.DOMRpcException;
83 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
84 import org.opendaylight.mdsal.dom.api.DOMRpcService;
85 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
86 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
87 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
88 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
89 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
90 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
91 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
92 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
93 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
94 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
95 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
96 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
97 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
98 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
99 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
100 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
101 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
102 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
103 import org.opendaylight.yangtools.concepts.ObjectRegistration;
104 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
105 import org.opendaylight.yangtools.yang.common.ErrorType;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
112 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
113 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
114 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
115 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
116 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
117 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
118 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
119 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
120 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
121 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
122 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
123 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
124 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
125 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
126 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
127 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
128 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
129 import scala.concurrent.Await;
130 import scala.concurrent.Future;
131
132 @RunWith(MockitoJUnitRunner.StrictStubs.class)
133 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
134
135     private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
136     private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
137     private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
138
139     private ActorSystem system = ActorSystem.create();
140     private final TestKit testKit = new TestKit(system);
141
142     private ActorRef masterRef;
143     private RemoteDeviceId remoteDeviceId;
144     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
145
146     @Mock
147     private DOMRpcService mockDOMRpcService;
148
149     @Mock
150     private DOMActionService mockDOMActionService;
151
152     @Mock
153     private DOMMountPointService mockMountPointService;
154
155     @Mock
156     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
157
158     @Mock
159     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
160
161     @Mock
162     private DOMDataBroker mockDOMDataBroker;
163
164     @Mock
165     private NetconfDataTreeService netconfService;
166
167     @Mock
168     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
169
170     @Mock
171     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
172
173     @Mock
174     private SchemaSourceRegistry mockRegistry;
175
176     @Mock
177     private EffectiveModelContextFactory mockSchemaContextFactory;
178
179     @Mock
180     private SchemaRepository mockSchemaRepository;
181
182     @Mock
183     private EffectiveModelContext mockSchemaContext;
184
185     @Mock
186     private SchemaResourcesDTO schemaResourceDTO;
187
188     @Before
189     public void setup() {
190         remoteDeviceId = new RemoteDeviceId("netconf-topology",
191                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
192
193         masterSchemaRepository.registerSchemaSourceListener(
194                 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
195
196         doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
197         doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
198         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
199                 .setIdleTimeout(Duration.ofSeconds(1)).setSchemaResourceDTO(schemaResourceDTO)
200                 .setBaseSchemas(BASE_SCHEMAS).build();
201
202         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
203
204         masterRef = TestActorRef.create(system, props, "master_messages");
205
206         resetMountPointMocks();
207
208         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
209
210         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
211         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
212
213         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
214                 .createEffectiveModelContextFactory();
215     }
216
217     @After
218     public void teardown() {
219         TestKit.shutdownActorSystem(system, true);
220         system = null;
221     }
222
223     @Test
224     public void testInitializeAndRefreshMasterData() {
225
226         // Test CreateInitialMasterActorData.
227
228         initializeMaster(new ArrayList<>());
229
230         // Test RefreshSetupMasterActorData.
231
232         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
233                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
234
235         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setBaseSchemas(BASE_SCHEMAS)
236                 .setSchemaResourceDTO(schemaResourceDTO).setActorSystem(system).build();
237
238         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
239
240         testKit.expectMsgClass(MasterActorDataInitialized.class);
241     }
242
243     @Test
244     public void testAskForMasterMountPoint() {
245
246         // Test with master not setup yet.
247
248         final TestKit kit = new TestKit(system);
249
250         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
251
252         final Failure failure = kit.expectMsgClass(Failure.class);
253         assertTrue(failure.cause() instanceof NotMasterException);
254
255         // Now initialize - master should send the RegisterMountPoint message.
256
257         List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
258         initializeMaster(sourceIdentifiers);
259
260         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
261
262         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
263
264         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
265     }
266
267     @Test
268     public void testRegisterAndUnregisterMountPoint() throws Exception {
269
270         ActorRef slaveRef = registerSlaveMountPoint();
271
272         // Unregister
273
274         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
275
276         verify(mockMountPointReg, timeout(5000)).close();
277         verify(mockSchemaSourceReg1, timeout(1000)).close();
278         verify(mockSchemaSourceReg2, timeout(1000)).close();
279
280         // Test registration with another interleaved registration that completes while the first registration
281         // is resolving the schema context.
282
283         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
284         resetMountPointMocks();
285
286         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
287
288         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
289
290         final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
291         doReturn(Futures.immediateFuture(mockSchemaContext))
292                 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
293
294         doAnswer(unused -> {
295             SettableFuture<SchemaContext> future = SettableFuture.create();
296             new Thread(() -> {
297                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
298                         withSourceId(SOURCE_IDENTIFIER1));
299
300                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
301                         .createEffectiveModelContextFactory();
302
303                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
304                         testKit.getRef());
305
306                 future.set(mockSchemaContext);
307             }).start();
308             return future;
309         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
310
311         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
312                 .createEffectiveModelContextFactory();
313
314         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
315
316         verify(mockMountPointBuilder, timeout(5000)).register();
317         verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
318         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
319         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
320         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
321         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
322         verify(mockSchemaSourceReg1).close();
323         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
324         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
325         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
326
327         // Stop the slave actor and verify schema source registrations are closed.
328
329         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
330         Await.result(stopFuture, TIMEOUT.duration());
331
332         verify(mockMountPointReg).close();
333         verify(newMockSchemaSourceReg).close();
334     }
335
336     @SuppressWarnings("unchecked")
337     @Test
338     public void testRegisterMountPointWithSchemaFailures() throws Exception {
339         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
340         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
341         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
342         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
343                 .setSchemaResourceDTO(schemaResourceDTO2)
344                 .setBaseSchemas(BASE_SCHEMAS)
345                 .setActorSystem(system)
346                 .build();
347
348         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
349                 mockMountPointService));
350
351         // Test unrecoverable failure.
352
353         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
354                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
355
356         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
357                 masterRef), testKit.getRef());
358
359         testKit.expectMsgClass(Success.class);
360
361         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
362         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
363
364         verify(mockMountPointBuilder, after(1000).never()).register();
365         verify(mockSchemaSourceReg1, timeout(1000)).close();
366         verify(mockSchemaSourceReg2, timeout(1000)).close();
367
368         // Test recoverable AskTimeoutException - schema context resolution should be retried.
369
370         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
371
372         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
373                 new AskTimeoutException("timeout"))))
374             .doReturn(Futures.immediateFuture(mockSchemaContext))
375             .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
376
377         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
378                 masterRef), testKit.getRef());
379
380         testKit.expectMsgClass(Success.class);
381
382         verify(mockMountPointBuilder, timeout(5000)).register();
383         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
384
385         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
386         // attempt should not be retried.
387
388         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
389         resetMountPointMocks();
390
391         final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
392         doReturn(Futures.immediateFuture(mockSchemaContext))
393                 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
394
395         doAnswer(unused -> {
396             SettableFuture<SchemaContext> future = SettableFuture.create();
397             new Thread(() -> {
398                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
399                     .createEffectiveModelContextFactory();
400
401                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
402                         masterRef), testKit.getRef());
403
404                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
405             }).start();
406             return future;
407         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
408
409         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
410
411         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
412                 masterRef), testKit.getRef());
413
414         verify(mockMountPointBuilder, timeout(5000)).register();
415         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
416     }
417
418     @Test(expected = MissingSchemaSourceException.class)
419     public void testMissingSchemaSourceOnMissingProvider() throws Exception {
420         final SharedSchemaRepository repository = new SharedSchemaRepository("test");
421
422         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
423         doReturn(repository).when(schemaResourceDTO2).getSchemaRegistry();
424         doReturn(repository).when(schemaResourceDTO2).getSchemaRepository();
425         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
426                 .setSchemaResourceDTO(schemaResourceDTO2).setIdleTimeout(Duration.ofSeconds(1))
427                 .setBaseSchemas(BASE_SCHEMAS).build();
428         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
429         ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
430
431         final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
432
433         final ProxyYangTextSourceProvider proxyYangProvider =
434                 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
435
436         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
437                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
438         Await.result(resolvedSchemaFuture, TIMEOUT.duration());
439     }
440
441     @Test
442     public void testYangTextSchemaSourceRequest() throws Exception {
443         final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
444
445         final ProxyYangTextSourceProvider proxyYangProvider =
446                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
447
448         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
449                 ByteSource.wrap("YANG".getBytes(UTF_8)));
450
451         // Test success.
452
453         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
454                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
455                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
456
457         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
458                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
459
460         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
461
462         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
463         assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
464
465         // Test missing source failure.
466
467         schemaSourceReg.close();
468
469         final MissingSchemaSourceException ex = assertThrows(MissingSchemaSourceException.class,
470             () -> {
471                 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
472                         proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
473                 Await.result(failedSchemaFuture, TIMEOUT.duration());
474             });
475         assertThat(ex.getMessage(), startsWith("No providers registered for source"));
476         assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
477     }
478
479     @Test
480     public void testSlaveInvokeRpc() throws Exception {
481
482         initializeMaster(List.of(new SourceIdentifier("testID")));
483         registerSlaveMountPoint();
484
485         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
486         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
487
488         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
489         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
490
491         final QName testQName = QName.create("", "TestQname");
492         final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create()
493                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
494                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
495         final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
496
497         // RPC with no response output.
498
499         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
500
501         DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
502
503         assertEquals(null, result);
504
505         // RPC with response output.
506
507         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
508                 .when(mockDOMRpcService).invokeRpc(any(), any());
509
510         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
511
512         assertEquals(outputNode, result.getResult());
513         assertTrue(result.getErrors().isEmpty());
514
515         // RPC with response error.
516
517         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
518                 .when(mockDOMRpcService).invokeRpc(any(), any());
519
520         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
521
522         assertNull(result.getResult());
523         assertEquals(rpcError, result.getErrors().iterator().next());
524
525         // RPC with response output and error.
526
527         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
528                 .when(mockDOMRpcService).invokeRpc(any(), any());
529
530         final DOMRpcResult resultOutputError =
531                 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
532
533         assertEquals(outputNode, resultOutputError.getResult());
534         assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
535
536         // RPC failure.
537         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
538             .when(mockDOMRpcService).invokeRpc(any(), any());
539         final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
540
541         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
542         final Throwable cause = e.getCause();
543         assertThat(cause, instanceOf(DOMRpcException.class));
544         assertEquals("mock", cause.getMessage());
545     }
546
547     @Test
548     public void testSlaveInvokeAction() throws Exception {
549         initializeMaster(List.of(new SourceIdentifier("testActionID")));
550         registerSlaveMountPoint();
551
552         ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
553         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
554
555         final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
556         assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
557
558         final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
559         final Absolute schemaPath = Absolute.of(testQName);
560
561         final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier
562             .create(new YangInstanceIdentifier.NodeIdentifier(testQName));
563
564         final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
565             yangIIdPath);
566
567         final ContainerNode outputNode = ImmutableContainerNodeBuilder.create()
568             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
569             .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
570
571         // Action with no response output.
572         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
573             .invokeAction(any(), any(), any());
574         DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
575             .get(2, TimeUnit.SECONDS);
576         assertEquals(null, result);
577
578         // Action with response output.
579         doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
580             .invokeAction(any(), any(), any());
581         result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
582             .get(2, TimeUnit.SECONDS);
583
584         assertEquals(outputNode, result.getOutput().get());
585         assertTrue(result.getErrors().isEmpty());
586
587         // Action failure.
588         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
589             .when(mockDOMActionService).invokeAction(any(), any(), any());
590         final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
591             domDataTreeIdentifier, outputNode);
592
593         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
594         final Throwable cause = e.getCause();
595         assertThat(cause, instanceOf(DOMActionException.class));
596         assertEquals("mock", cause.getMessage());
597     }
598
599     @Test
600     public void testSlaveNewTransactionRequests() {
601         doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
602         doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
603         doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
604
605         initializeMaster(List.of());
606         registerSlaveMountPoint();
607
608         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
609         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
610
611         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
612         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
613
614         slaveDOMDataBroker.newReadOnlyTransaction();
615         verify(mockDOMDataBroker).newReadOnlyTransaction();
616
617         slaveDOMDataBroker.newReadWriteTransaction();
618         verify(mockDOMDataBroker).newReadWriteTransaction();
619
620         slaveDOMDataBroker.newWriteOnlyTransaction();
621         verify(mockDOMDataBroker).newWriteOnlyTransaction();
622     }
623
624     @Test
625     public void testSlaveNewNetconfDataTreeServiceRequest() {
626         initializeMaster(List.of());
627         registerSlaveMountPoint();
628
629         ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
630         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
631
632         final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
633         assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
634
635         final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
636         final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
637         final ContainerNode NODE = Builders.containerBuilder()
638             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
639             .build();
640
641         final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
642         doReturn(result).when(netconfService).get(PATH);
643         doReturn(result).when(netconfService).getConfig(PATH);
644         doReturn(emptyFluentFuture()).when(netconfService).commit();
645
646         slaveNetconfService.get(PATH);
647         slaveNetconfService.getConfig(PATH);
648         slaveNetconfService.lock();
649         slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
650         slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
651         slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
652         slaveNetconfService.delete(STORE, PATH);
653         slaveNetconfService.remove(STORE, PATH);
654         slaveNetconfService.discardChanges();
655         slaveNetconfService.commit();
656
657         verify(netconfService, timeout(1000)).get(PATH);
658         verify(netconfService, timeout(1000)).getConfig(PATH);
659         verify(netconfService, timeout(1000)).lock();
660         verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
661         verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
662         verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
663         verify(netconfService, timeout(1000)).delete(STORE, PATH);
664         verify(netconfService, timeout(1000)).remove(STORE, PATH);
665         verify(netconfService, timeout(1000)).discardChanges();
666         verify(netconfService, timeout(1000)).commit();
667     }
668
669     private ActorRef registerSlaveMountPoint() {
670         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
671         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
672         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
673         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
674                 NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2).setActorSystem(system)
675                 .setBaseSchemas(BASE_SCHEMAS).build(), remoteDeviceId, TIMEOUT, mockMountPointService));
676
677         doReturn(Futures.immediateFuture(mockSchemaContext))
678                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
679
680         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
681                 masterRef), testKit.getRef());
682
683         verify(mockMountPointBuilder, timeout(5000)).register();
684         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
685         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
686         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
687         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
688         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
689
690         testKit.expectMsgClass(Success.class);
691         return slaveRef;
692     }
693
694     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
695         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
696                 mockDOMRpcService, mockDOMActionService), testKit.getRef());
697         testKit.expectMsgClass(MasterActorDataInitialized.class);
698     }
699
700     private void resetMountPointMocks() {
701         reset(mockMountPointReg, mockMountPointBuilder);
702
703         doNothing().when(mockMountPointReg).close();
704
705         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
706         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
707     }
708
709     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
710         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
711     }
712
713     private static String convertStreamToString(final InputStream is) {
714         try (Scanner scanner = new Scanner(is)) {
715             return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
716         }
717     }
718 }