Annotate all serialVersionUID with @Serial
[netconf.git] / apps / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.hamcrest.CoreMatchers.instanceOf;
13 import static org.hamcrest.CoreMatchers.startsWith;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertNull;
17 import static org.junit.Assert.assertThrows;
18 import static org.junit.Assert.assertTrue;
19 import static org.mockito.ArgumentMatchers.any;
20 import static org.mockito.ArgumentMatchers.anyCollection;
21 import static org.mockito.ArgumentMatchers.argThat;
22 import static org.mockito.ArgumentMatchers.eq;
23 import static org.mockito.Mockito.after;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.doNothing;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.timeout;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
33 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
34 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
35
36 import akka.actor.ActorRef;
37 import akka.actor.ActorSystem;
38 import akka.actor.Props;
39 import akka.actor.Status.Failure;
40 import akka.actor.Status.Success;
41 import akka.pattern.AskTimeoutException;
42 import akka.pattern.Patterns;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.collect.ImmutableList;
47 import com.google.common.io.ByteSource;
48 import com.google.common.net.InetAddresses;
49 import com.google.common.util.concurrent.FluentFuture;
50 import com.google.common.util.concurrent.Futures;
51 import com.google.common.util.concurrent.ListenableFuture;
52 import com.google.common.util.concurrent.SettableFuture;
53 import java.io.InputStream;
54 import java.net.InetSocketAddress;
55 import java.time.Duration;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Optional;
59 import java.util.Scanner;
60 import java.util.concurrent.ExecutionException;
61 import java.util.concurrent.TimeUnit;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Test;
65 import org.junit.runner.RunWith;
66 import org.mockito.ArgumentCaptor;
67 import org.mockito.Mock;
68 import org.mockito.junit.MockitoJUnitRunner;
69 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.dom.api.DOMActionException;
72 import org.opendaylight.mdsal.dom.api.DOMActionResult;
73 import org.opendaylight.mdsal.dom.api.DOMActionService;
74 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
76 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
77 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
79 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
80 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
81 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
82 import org.opendaylight.mdsal.dom.api.DOMRpcException;
83 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
84 import org.opendaylight.mdsal.dom.api.DOMRpcService;
85 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
86 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
87 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
88 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
89 import org.opendaylight.netconf.client.mdsal.api.CredentialProvider;
90 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
91 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
92 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
93 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
94 import org.opendaylight.netconf.client.mdsal.api.SslHandlerFactoryProvider;
95 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
96 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
97 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
98 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
99 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
100 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
101 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
102 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
103 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
104 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
105 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
106 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
107 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
108 import org.opendaylight.yangtools.concepts.ObjectRegistration;
109 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
110 import org.opendaylight.yangtools.yang.common.ErrorType;
111 import org.opendaylight.yangtools.yang.common.QName;
112 import org.opendaylight.yangtools.yang.common.RpcError;
113 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
114 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
115 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
116 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
117 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
118 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
119 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
120 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
121 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
122 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
123 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
124 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
125 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
126 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
127 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
128 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
129 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
130 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
131 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
132 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
133 import scala.concurrent.Await;
134 import scala.concurrent.Future;
135
136 @RunWith(MockitoJUnitRunner.StrictStubs.class)
137 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
138
139     private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
140     private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
141     private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
142
143     private ActorSystem system = ActorSystem.create();
144     private final TestKit testKit = new TestKit(system);
145
146     private ActorRef masterRef;
147     private RemoteDeviceId remoteDeviceId;
148     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
149
150     @Mock
151     private Rpcs.Normalized mockDOMRpcService;
152     @Mock
153     private Actions.Normalized mockDOMActionService;
154     @Mock
155     private DOMMountPointService mockMountPointService;
156     @Mock
157     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
158     @Mock
159     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
160     @Mock
161     private DOMDataBroker mockDOMDataBroker;
162     @Mock
163     private NetconfDataTreeService netconfService;
164     @Mock
165     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
166     @Mock
167     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
168     @Mock
169     private SchemaSourceRegistry mockRegistry;
170     @Mock
171     private EffectiveModelContextFactory mockSchemaContextFactory;
172     @Mock
173     private SchemaRepository mockSchemaRepository;
174     @Mock
175     private EffectiveModelContext mockSchemaContext;
176     @Mock
177     private SchemaResourcesDTO schemaResourceDTO;
178     @Mock
179     private CredentialProvider credentialProvider;
180     @Mock
181     private SslHandlerFactoryProvider sslHandlerFactoryProvider;
182
183     @Before
184     public void setup() {
185         remoteDeviceId = new RemoteDeviceId("netconf-topology",
186                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
187
188         masterSchemaRepository.registerSchemaSourceListener(
189                 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
190
191         doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
192         doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
193         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
194             .setActorSystem(system)
195             .setIdleTimeout(Duration.ofSeconds(1))
196             .setSchemaResourceDTO(schemaResourceDTO)
197             .setBaseSchemas(BASE_SCHEMAS)
198             .setCredentialProvider(credentialProvider)
199             .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
200             .build();
201
202         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
203
204         masterRef = TestActorRef.create(system, props, "master_messages");
205
206         resetMountPointMocks();
207
208         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
209
210         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
211         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
212
213         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
214                 .createEffectiveModelContextFactory();
215     }
216
217     @After
218     public void teardown() {
219         TestKit.shutdownActorSystem(system, true);
220         system = null;
221     }
222
223     @Test
224     public void testInitializeAndRefreshMasterData() {
225
226         // Test CreateInitialMasterActorData.
227
228         initializeMaster(new ArrayList<>());
229
230         // Test RefreshSetupMasterActorData.
231
232         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
233                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
234
235         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create()
236             .setBaseSchemas(BASE_SCHEMAS)
237             .setSchemaResourceDTO(schemaResourceDTO)
238             .setActorSystem(system)
239             .setCredentialProvider(credentialProvider)
240             .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
241             .build();
242
243         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
244
245         testKit.expectMsgClass(MasterActorDataInitialized.class);
246     }
247
248     @Test
249     public void testAskForMasterMountPoint() {
250
251         // Test with master not setup yet.
252
253         final TestKit kit = new TestKit(system);
254
255         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
256
257         final Failure failure = kit.expectMsgClass(Failure.class);
258         assertTrue(failure.cause() instanceof NotMasterException);
259
260         // Now initialize - master should send the RegisterMountPoint message.
261
262         List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
263         initializeMaster(sourceIdentifiers);
264
265         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
266
267         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
268
269         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
270     }
271
272     @Test
273     public void testRegisterAndUnregisterMountPoint() throws Exception {
274
275         ActorRef slaveRef = registerSlaveMountPoint();
276
277         // Unregister
278
279         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
280
281         verify(mockMountPointReg, timeout(5000)).close();
282         verify(mockSchemaSourceReg1, timeout(1000)).close();
283         verify(mockSchemaSourceReg2, timeout(1000)).close();
284
285         // Test registration with another interleaved registration that completes while the first registration
286         // is resolving the schema context.
287
288         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
289         resetMountPointMocks();
290
291         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
292
293         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
294
295         final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
296         doReturn(Futures.immediateFuture(mockSchemaContext))
297                 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
298
299         doAnswer(unused -> {
300             SettableFuture<SchemaContext> future = SettableFuture.create();
301             new Thread(() -> {
302                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
303                         withSourceId(SOURCE_IDENTIFIER1));
304
305                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
306                         .createEffectiveModelContextFactory();
307
308                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
309                         testKit.getRef());
310
311                 future.set(mockSchemaContext);
312             }).start();
313             return future;
314         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
315
316         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
317                 .createEffectiveModelContextFactory();
318
319         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
320
321         verify(mockMountPointBuilder, timeout(5000)).register();
322         verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
323         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
324         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
325         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
326         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
327         verify(mockSchemaSourceReg1).close();
328         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
329         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
330         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
331
332         // Stop the slave actor and verify schema source registrations are closed.
333
334         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
335         Await.result(stopFuture, TIMEOUT.duration());
336
337         verify(mockMountPointReg).close();
338         verify(newMockSchemaSourceReg).close();
339     }
340
341     @SuppressWarnings("unchecked")
342     @Test
343     public void testRegisterMountPointWithSchemaFailures() throws Exception {
344         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
345         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
346         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
347         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
348                 .setSchemaResourceDTO(schemaResourceDTO2)
349                 .setBaseSchemas(BASE_SCHEMAS)
350                 .setActorSystem(system)
351                 .setCredentialProvider(credentialProvider)
352                 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
353                 .build();
354
355         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
356                 mockMountPointService));
357
358         // Test unrecoverable failure.
359
360         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
361                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
362
363         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
364                 masterRef), testKit.getRef());
365
366         testKit.expectMsgClass(Success.class);
367
368         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
369         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
370
371         verify(mockMountPointBuilder, after(1000).never()).register();
372         verify(mockSchemaSourceReg1, timeout(1000)).close();
373         verify(mockSchemaSourceReg2, timeout(1000)).close();
374
375         // Test recoverable AskTimeoutException - schema context resolution should be retried.
376
377         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
378
379         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
380                 new AskTimeoutException("timeout"))))
381             .doReturn(Futures.immediateFuture(mockSchemaContext))
382             .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
383
384         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
385                 masterRef), testKit.getRef());
386
387         testKit.expectMsgClass(Success.class);
388
389         verify(mockMountPointBuilder, timeout(5000)).register();
390         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
391
392         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
393         // attempt should not be retried.
394
395         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
396         resetMountPointMocks();
397
398         final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
399         doReturn(Futures.immediateFuture(mockSchemaContext))
400                 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
401
402         doAnswer(unused -> {
403             SettableFuture<SchemaContext> future = SettableFuture.create();
404             new Thread(() -> {
405                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
406                     .createEffectiveModelContextFactory();
407
408                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
409                         masterRef), testKit.getRef());
410
411                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
412             }).start();
413             return future;
414         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
415
416         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
417
418         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
419                 masterRef), testKit.getRef());
420
421         verify(mockMountPointBuilder, timeout(5000)).register();
422         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
423     }
424
425     @Test(expected = MissingSchemaSourceException.class)
426     public void testMissingSchemaSourceOnMissingProvider() throws Exception {
427         final SharedSchemaRepository repository = new SharedSchemaRepository("test");
428
429         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
430         doReturn(repository).when(schemaResourceDTO2).getSchemaRegistry();
431         doReturn(repository).when(schemaResourceDTO2).getSchemaRepository();
432         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
433             .setActorSystem(system)
434             .setSchemaResourceDTO(schemaResourceDTO2)
435             .setIdleTimeout(Duration.ofSeconds(1))
436             .setBaseSchemas(BASE_SCHEMAS)
437             .setCredentialProvider(credentialProvider)
438             .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
439             .build();
440         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
441         ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
442
443         final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
444
445         final ProxyYangTextSourceProvider proxyYangProvider =
446                 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
447
448         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
449                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
450         Await.result(resolvedSchemaFuture, TIMEOUT.duration());
451     }
452
453     @Test
454     public void testYangTextSchemaSourceRequest() throws Exception {
455         final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
456
457         final ProxyYangTextSourceProvider proxyYangProvider =
458                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
459
460         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
461                 ByteSource.wrap("YANG".getBytes(UTF_8)));
462
463         // Test success.
464
465         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
466                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
467                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
468
469         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
470                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
471
472         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
473
474         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
475         assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
476
477         // Test missing source failure.
478
479         schemaSourceReg.close();
480
481         final MissingSchemaSourceException ex = assertThrows(MissingSchemaSourceException.class,
482             () -> {
483                 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
484                         proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
485                 Await.result(failedSchemaFuture, TIMEOUT.duration());
486             });
487         assertThat(ex.getMessage(), startsWith("No providers registered for source"));
488         assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
489     }
490
491     @Test
492     public void testSlaveInvokeRpc() throws Exception {
493
494         initializeMaster(List.of(new SourceIdentifier("testID")));
495         registerSlaveMountPoint();
496
497         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
498         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
499
500         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
501         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
502
503         final QName testQName = QName.create("", "TestQname");
504         final ContainerNode outputNode = Builders.containerBuilder()
505                 .withNodeIdentifier(new NodeIdentifier(testQName))
506                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
507         final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
508
509         // RPC with no response output.
510
511         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
512
513         DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
514
515         assertEquals(null, result);
516
517         // RPC with response output.
518
519         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
520                 .when(mockDOMRpcService).invokeRpc(any(), any());
521
522         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
523
524         assertEquals(outputNode, result.value());
525         assertTrue(result.errors().isEmpty());
526
527         // RPC with response error.
528
529         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
530                 .when(mockDOMRpcService).invokeRpc(any(), any());
531
532         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
533
534         assertNull(result.value());
535         assertEquals(rpcError, result.errors().iterator().next());
536
537         // RPC with response output and error.
538
539         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
540                 .when(mockDOMRpcService).invokeRpc(any(), any());
541
542         final DOMRpcResult resultOutputError =
543                 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
544
545         assertEquals(outputNode, resultOutputError.value());
546         assertEquals(rpcError, resultOutputError.errors().iterator().next());
547
548         // RPC failure.
549         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
550             .when(mockDOMRpcService).invokeRpc(any(), any());
551         final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
552
553         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
554         final Throwable cause = e.getCause();
555         assertThat(cause, instanceOf(DOMRpcException.class));
556         assertEquals("mock", cause.getMessage());
557     }
558
559     @Test
560     public void testSlaveInvokeAction() throws Exception {
561         initializeMaster(List.of(new SourceIdentifier("testActionID")));
562         registerSlaveMountPoint();
563
564         ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
565         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
566
567         final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
568         assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
569
570         final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
571         final Absolute schemaPath = Absolute.of(testQName);
572
573         final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier.create(new NodeIdentifier(testQName));
574
575         final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
576             yangIIdPath);
577
578         final ContainerNode outputNode = Builders.containerBuilder()
579             .withNodeIdentifier(new NodeIdentifier(testQName))
580             .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
581
582         // Action with no response output.
583         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
584             .invokeAction(any(), any(), any());
585         DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
586             .get(2, TimeUnit.SECONDS);
587         assertEquals(null, result);
588
589         // Action with response output.
590         doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
591             .invokeAction(any(), any(), any());
592         result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
593             .get(2, TimeUnit.SECONDS);
594
595         assertEquals(Optional.of(outputNode), result.getOutput());
596         assertTrue(result.getErrors().isEmpty());
597
598         // Action failure.
599         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
600             .when(mockDOMActionService).invokeAction(any(), any(), any());
601         final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
602             domDataTreeIdentifier, outputNode);
603
604         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
605         final Throwable cause = e.getCause();
606         assertThat(cause, instanceOf(DOMActionException.class));
607         assertEquals("mock", cause.getMessage());
608     }
609
610     @Test
611     public void testSlaveNewTransactionRequests() {
612         doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
613         doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
614         doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
615
616         initializeMaster(List.of());
617         registerSlaveMountPoint();
618
619         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
620         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
621
622         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
623         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
624
625         slaveDOMDataBroker.newReadOnlyTransaction();
626         verify(mockDOMDataBroker).newReadOnlyTransaction();
627
628         slaveDOMDataBroker.newReadWriteTransaction();
629         verify(mockDOMDataBroker).newReadWriteTransaction();
630
631         slaveDOMDataBroker.newWriteOnlyTransaction();
632         verify(mockDOMDataBroker).newWriteOnlyTransaction();
633     }
634
635     @Test
636     public void testSlaveNewNetconfDataTreeServiceRequest() {
637         initializeMaster(List.of());
638         registerSlaveMountPoint();
639
640         ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
641         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
642
643         final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
644         assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
645
646         final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
647         final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
648         final ContainerNode NODE = Builders.containerBuilder()
649             .withNodeIdentifier(new NodeIdentifier(QName.create("", "cont")))
650             .build();
651
652         final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
653         doReturn(result).when(netconfService).get(PATH);
654         doReturn(result).when(netconfService).getConfig(PATH);
655         doReturn(emptyFluentFuture()).when(netconfService).commit();
656
657         slaveNetconfService.get(PATH);
658         slaveNetconfService.getConfig(PATH);
659         slaveNetconfService.lock();
660         slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
661         slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
662         slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
663         slaveNetconfService.delete(STORE, PATH);
664         slaveNetconfService.remove(STORE, PATH);
665         slaveNetconfService.discardChanges();
666         slaveNetconfService.commit();
667
668         verify(netconfService, timeout(1000)).get(PATH);
669         verify(netconfService, timeout(1000)).getConfig(PATH);
670         verify(netconfService, timeout(1000)).lock();
671         verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
672         verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
673         verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
674         verify(netconfService, timeout(1000)).delete(STORE, PATH);
675         verify(netconfService, timeout(1000)).remove(STORE, PATH);
676         verify(netconfService, timeout(1000)).discardChanges();
677         verify(netconfService, timeout(1000)).commit();
678     }
679
680     private ActorRef registerSlaveMountPoint() {
681         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
682         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
683         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
684         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(NetconfTopologySetupBuilder.create()
685                 .setSchemaResourceDTO(schemaResourceDTO2)
686                 .setActorSystem(system)
687                 .setBaseSchemas(BASE_SCHEMAS)
688                 .setCredentialProvider(credentialProvider)
689                 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
690                 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
691
692         doReturn(Futures.immediateFuture(mockSchemaContext))
693                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
694
695         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
696                 masterRef), testKit.getRef());
697
698         verify(mockMountPointBuilder, timeout(5000)).register();
699         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
700         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
701         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
702         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
703         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
704
705         testKit.expectMsgClass(Success.class);
706         return slaveRef;
707     }
708
709     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
710         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
711                 new RemoteDeviceServices(mockDOMRpcService, mockDOMActionService)), testKit.getRef());
712         testKit.expectMsgClass(MasterActorDataInitialized.class);
713     }
714
715     private void resetMountPointMocks() {
716         reset(mockMountPointReg, mockMountPointBuilder);
717
718         doNothing().when(mockMountPointReg).close();
719
720         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
721         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
722     }
723
724     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
725         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
726     }
727
728     private static String convertStreamToString(final InputStream is) {
729         try (Scanner scanner = new Scanner(is)) {
730             return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
731         }
732     }
733 }