Do not reference individual builders
[netconf.git] / netconf / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.hamcrest.CoreMatchers.instanceOf;
13 import static org.hamcrest.CoreMatchers.startsWith;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertNull;
17 import static org.junit.Assert.assertThrows;
18 import static org.junit.Assert.assertTrue;
19 import static org.mockito.ArgumentMatchers.any;
20 import static org.mockito.ArgumentMatchers.anyCollection;
21 import static org.mockito.ArgumentMatchers.argThat;
22 import static org.mockito.ArgumentMatchers.eq;
23 import static org.mockito.Mockito.after;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.doNothing;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.timeout;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
33 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
34 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
35
36 import akka.actor.ActorRef;
37 import akka.actor.ActorSystem;
38 import akka.actor.Props;
39 import akka.actor.Status.Failure;
40 import akka.actor.Status.Success;
41 import akka.pattern.AskTimeoutException;
42 import akka.pattern.Patterns;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.collect.ImmutableList;
47 import com.google.common.io.ByteSource;
48 import com.google.common.net.InetAddresses;
49 import com.google.common.util.concurrent.FluentFuture;
50 import com.google.common.util.concurrent.Futures;
51 import com.google.common.util.concurrent.ListenableFuture;
52 import com.google.common.util.concurrent.SettableFuture;
53 import java.io.InputStream;
54 import java.net.InetSocketAddress;
55 import java.time.Duration;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Optional;
59 import java.util.Scanner;
60 import java.util.concurrent.ExecutionException;
61 import java.util.concurrent.TimeUnit;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Test;
65 import org.junit.runner.RunWith;
66 import org.mockito.ArgumentCaptor;
67 import org.mockito.Mock;
68 import org.mockito.junit.MockitoJUnitRunner;
69 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.dom.api.DOMActionException;
72 import org.opendaylight.mdsal.dom.api.DOMActionResult;
73 import org.opendaylight.mdsal.dom.api.DOMActionService;
74 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
76 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
77 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
79 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
80 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
81 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
82 import org.opendaylight.mdsal.dom.api.DOMRpcException;
83 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
84 import org.opendaylight.mdsal.dom.api.DOMRpcService;
85 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
86 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
87 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
88 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
89 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
90 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
91 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
92 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
93 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
94 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
95 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
96 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
97 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
98 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
99 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
100 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
101 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
102 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
103 import org.opendaylight.yangtools.concepts.ObjectRegistration;
104 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
105 import org.opendaylight.yangtools.yang.common.ErrorType;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
113 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
114 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
115 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
116 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
117 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
118 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
119 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
120 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
121 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
122 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
123 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
124 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
125 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
126 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
127 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
128 import scala.concurrent.Await;
129 import scala.concurrent.Future;
130
131 @RunWith(MockitoJUnitRunner.StrictStubs.class)
132 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
133
134     private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
135     private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
136     private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
137
138     private ActorSystem system = ActorSystem.create();
139     private final TestKit testKit = new TestKit(system);
140
141     private ActorRef masterRef;
142     private RemoteDeviceId remoteDeviceId;
143     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
144
145     @Mock
146     private DOMRpcService mockDOMRpcService;
147
148     @Mock
149     private DOMActionService mockDOMActionService;
150
151     @Mock
152     private DOMMountPointService mockMountPointService;
153
154     @Mock
155     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
156
157     @Mock
158     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
159
160     @Mock
161     private DOMDataBroker mockDOMDataBroker;
162
163     @Mock
164     private NetconfDataTreeService netconfService;
165
166     @Mock
167     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
168
169     @Mock
170     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
171
172     @Mock
173     private SchemaSourceRegistry mockRegistry;
174
175     @Mock
176     private EffectiveModelContextFactory mockSchemaContextFactory;
177
178     @Mock
179     private SchemaRepository mockSchemaRepository;
180
181     @Mock
182     private EffectiveModelContext mockSchemaContext;
183
184     @Mock
185     private SchemaResourcesDTO schemaResourceDTO;
186
187     @Before
188     public void setup() {
189         remoteDeviceId = new RemoteDeviceId("netconf-topology",
190                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
191
192         masterSchemaRepository.registerSchemaSourceListener(
193                 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
194
195         doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
196         doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
197         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
198                 .setIdleTimeout(Duration.ofSeconds(1)).setSchemaResourceDTO(schemaResourceDTO)
199                 .setBaseSchemas(BASE_SCHEMAS).build();
200
201         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
202
203         masterRef = TestActorRef.create(system, props, "master_messages");
204
205         resetMountPointMocks();
206
207         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
208
209         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
210         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
211
212         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
213                 .createEffectiveModelContextFactory();
214     }
215
216     @After
217     public void teardown() {
218         TestKit.shutdownActorSystem(system, true);
219         system = null;
220     }
221
222     @Test
223     public void testInitializeAndRefreshMasterData() {
224
225         // Test CreateInitialMasterActorData.
226
227         initializeMaster(new ArrayList<>());
228
229         // Test RefreshSetupMasterActorData.
230
231         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
232                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
233
234         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setBaseSchemas(BASE_SCHEMAS)
235                 .setSchemaResourceDTO(schemaResourceDTO).setActorSystem(system).build();
236
237         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
238
239         testKit.expectMsgClass(MasterActorDataInitialized.class);
240     }
241
242     @Test
243     public void testAskForMasterMountPoint() {
244
245         // Test with master not setup yet.
246
247         final TestKit kit = new TestKit(system);
248
249         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
250
251         final Failure failure = kit.expectMsgClass(Failure.class);
252         assertTrue(failure.cause() instanceof NotMasterException);
253
254         // Now initialize - master should send the RegisterMountPoint message.
255
256         List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
257         initializeMaster(sourceIdentifiers);
258
259         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
260
261         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
262
263         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
264     }
265
266     @Test
267     public void testRegisterAndUnregisterMountPoint() throws Exception {
268
269         ActorRef slaveRef = registerSlaveMountPoint();
270
271         // Unregister
272
273         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
274
275         verify(mockMountPointReg, timeout(5000)).close();
276         verify(mockSchemaSourceReg1, timeout(1000)).close();
277         verify(mockSchemaSourceReg2, timeout(1000)).close();
278
279         // Test registration with another interleaved registration that completes while the first registration
280         // is resolving the schema context.
281
282         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
283         resetMountPointMocks();
284
285         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
286
287         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
288
289         final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
290         doReturn(Futures.immediateFuture(mockSchemaContext))
291                 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
292
293         doAnswer(unused -> {
294             SettableFuture<SchemaContext> future = SettableFuture.create();
295             new Thread(() -> {
296                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
297                         withSourceId(SOURCE_IDENTIFIER1));
298
299                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
300                         .createEffectiveModelContextFactory();
301
302                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
303                         testKit.getRef());
304
305                 future.set(mockSchemaContext);
306             }).start();
307             return future;
308         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
309
310         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
311                 .createEffectiveModelContextFactory();
312
313         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
314
315         verify(mockMountPointBuilder, timeout(5000)).register();
316         verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
317         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
318         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
319         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
320         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
321         verify(mockSchemaSourceReg1).close();
322         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
323         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
324         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
325
326         // Stop the slave actor and verify schema source registrations are closed.
327
328         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
329         Await.result(stopFuture, TIMEOUT.duration());
330
331         verify(mockMountPointReg).close();
332         verify(newMockSchemaSourceReg).close();
333     }
334
335     @SuppressWarnings("unchecked")
336     @Test
337     public void testRegisterMountPointWithSchemaFailures() throws Exception {
338         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
339         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
340         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
341         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
342                 .setSchemaResourceDTO(schemaResourceDTO2)
343                 .setBaseSchemas(BASE_SCHEMAS)
344                 .setActorSystem(system)
345                 .build();
346
347         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
348                 mockMountPointService));
349
350         // Test unrecoverable failure.
351
352         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
353                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
354
355         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
356                 masterRef), testKit.getRef());
357
358         testKit.expectMsgClass(Success.class);
359
360         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
361         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
362
363         verify(mockMountPointBuilder, after(1000).never()).register();
364         verify(mockSchemaSourceReg1, timeout(1000)).close();
365         verify(mockSchemaSourceReg2, timeout(1000)).close();
366
367         // Test recoverable AskTimeoutException - schema context resolution should be retried.
368
369         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
370
371         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
372                 new AskTimeoutException("timeout"))))
373             .doReturn(Futures.immediateFuture(mockSchemaContext))
374             .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
375
376         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
377                 masterRef), testKit.getRef());
378
379         testKit.expectMsgClass(Success.class);
380
381         verify(mockMountPointBuilder, timeout(5000)).register();
382         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
383
384         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
385         // attempt should not be retried.
386
387         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
388         resetMountPointMocks();
389
390         final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
391         doReturn(Futures.immediateFuture(mockSchemaContext))
392                 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
393
394         doAnswer(unused -> {
395             SettableFuture<SchemaContext> future = SettableFuture.create();
396             new Thread(() -> {
397                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
398                     .createEffectiveModelContextFactory();
399
400                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
401                         masterRef), testKit.getRef());
402
403                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
404             }).start();
405             return future;
406         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
407
408         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
409
410         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
411                 masterRef), testKit.getRef());
412
413         verify(mockMountPointBuilder, timeout(5000)).register();
414         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
415     }
416
417     @Test(expected = MissingSchemaSourceException.class)
418     public void testMissingSchemaSourceOnMissingProvider() throws Exception {
419         final SharedSchemaRepository repository = new SharedSchemaRepository("test");
420
421         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
422         doReturn(repository).when(schemaResourceDTO2).getSchemaRegistry();
423         doReturn(repository).when(schemaResourceDTO2).getSchemaRepository();
424         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
425                 .setSchemaResourceDTO(schemaResourceDTO2).setIdleTimeout(Duration.ofSeconds(1))
426                 .setBaseSchemas(BASE_SCHEMAS).build();
427         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
428         ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
429
430         final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
431
432         final ProxyYangTextSourceProvider proxyYangProvider =
433                 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
434
435         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
436                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
437         Await.result(resolvedSchemaFuture, TIMEOUT.duration());
438     }
439
440     @Test
441     public void testYangTextSchemaSourceRequest() throws Exception {
442         final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
443
444         final ProxyYangTextSourceProvider proxyYangProvider =
445                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
446
447         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
448                 ByteSource.wrap("YANG".getBytes(UTF_8)));
449
450         // Test success.
451
452         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
453                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
454                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
455
456         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
457                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
458
459         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
460
461         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
462         assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
463
464         // Test missing source failure.
465
466         schemaSourceReg.close();
467
468         final MissingSchemaSourceException ex = assertThrows(MissingSchemaSourceException.class,
469             () -> {
470                 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
471                         proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
472                 Await.result(failedSchemaFuture, TIMEOUT.duration());
473             });
474         assertThat(ex.getMessage(), startsWith("No providers registered for source"));
475         assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
476     }
477
478     @Test
479     public void testSlaveInvokeRpc() throws Exception {
480
481         initializeMaster(List.of(new SourceIdentifier("testID")));
482         registerSlaveMountPoint();
483
484         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
485         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
486
487         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
488         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
489
490         final QName testQName = QName.create("", "TestQname");
491         final ContainerNode outputNode = Builders.containerBuilder()
492                 .withNodeIdentifier(new NodeIdentifier(testQName))
493                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
494         final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
495
496         // RPC with no response output.
497
498         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
499
500         DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
501
502         assertEquals(null, result);
503
504         // RPC with response output.
505
506         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
507                 .when(mockDOMRpcService).invokeRpc(any(), any());
508
509         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
510
511         assertEquals(outputNode, result.getResult());
512         assertTrue(result.getErrors().isEmpty());
513
514         // RPC with response error.
515
516         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
517                 .when(mockDOMRpcService).invokeRpc(any(), any());
518
519         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
520
521         assertNull(result.getResult());
522         assertEquals(rpcError, result.getErrors().iterator().next());
523
524         // RPC with response output and error.
525
526         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
527                 .when(mockDOMRpcService).invokeRpc(any(), any());
528
529         final DOMRpcResult resultOutputError =
530                 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
531
532         assertEquals(outputNode, resultOutputError.getResult());
533         assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
534
535         // RPC failure.
536         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
537             .when(mockDOMRpcService).invokeRpc(any(), any());
538         final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
539
540         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
541         final Throwable cause = e.getCause();
542         assertThat(cause, instanceOf(DOMRpcException.class));
543         assertEquals("mock", cause.getMessage());
544     }
545
546     @Test
547     public void testSlaveInvokeAction() throws Exception {
548         initializeMaster(List.of(new SourceIdentifier("testActionID")));
549         registerSlaveMountPoint();
550
551         ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
552         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
553
554         final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
555         assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
556
557         final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
558         final Absolute schemaPath = Absolute.of(testQName);
559
560         final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier.create(new NodeIdentifier(testQName));
561
562         final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
563             yangIIdPath);
564
565         final ContainerNode outputNode = Builders.containerBuilder()
566             .withNodeIdentifier(new NodeIdentifier(testQName))
567             .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
568
569         // Action with no response output.
570         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
571             .invokeAction(any(), any(), any());
572         DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
573             .get(2, TimeUnit.SECONDS);
574         assertEquals(null, result);
575
576         // Action with response output.
577         doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
578             .invokeAction(any(), any(), any());
579         result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
580             .get(2, TimeUnit.SECONDS);
581
582         assertEquals(outputNode, result.getOutput().get());
583         assertTrue(result.getErrors().isEmpty());
584
585         // Action failure.
586         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
587             .when(mockDOMActionService).invokeAction(any(), any(), any());
588         final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
589             domDataTreeIdentifier, outputNode);
590
591         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
592         final Throwable cause = e.getCause();
593         assertThat(cause, instanceOf(DOMActionException.class));
594         assertEquals("mock", cause.getMessage());
595     }
596
597     @Test
598     public void testSlaveNewTransactionRequests() {
599         doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
600         doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
601         doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
602
603         initializeMaster(List.of());
604         registerSlaveMountPoint();
605
606         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
607         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
608
609         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
610         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
611
612         slaveDOMDataBroker.newReadOnlyTransaction();
613         verify(mockDOMDataBroker).newReadOnlyTransaction();
614
615         slaveDOMDataBroker.newReadWriteTransaction();
616         verify(mockDOMDataBroker).newReadWriteTransaction();
617
618         slaveDOMDataBroker.newWriteOnlyTransaction();
619         verify(mockDOMDataBroker).newWriteOnlyTransaction();
620     }
621
622     @Test
623     public void testSlaveNewNetconfDataTreeServiceRequest() {
624         initializeMaster(List.of());
625         registerSlaveMountPoint();
626
627         ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
628         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
629
630         final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
631         assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
632
633         final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
634         final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
635         final ContainerNode NODE = Builders.containerBuilder()
636             .withNodeIdentifier(new NodeIdentifier(QName.create("", "cont")))
637             .build();
638
639         final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
640         doReturn(result).when(netconfService).get(PATH);
641         doReturn(result).when(netconfService).getConfig(PATH);
642         doReturn(emptyFluentFuture()).when(netconfService).commit();
643
644         slaveNetconfService.get(PATH);
645         slaveNetconfService.getConfig(PATH);
646         slaveNetconfService.lock();
647         slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
648         slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
649         slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
650         slaveNetconfService.delete(STORE, PATH);
651         slaveNetconfService.remove(STORE, PATH);
652         slaveNetconfService.discardChanges();
653         slaveNetconfService.commit();
654
655         verify(netconfService, timeout(1000)).get(PATH);
656         verify(netconfService, timeout(1000)).getConfig(PATH);
657         verify(netconfService, timeout(1000)).lock();
658         verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
659         verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
660         verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
661         verify(netconfService, timeout(1000)).delete(STORE, PATH);
662         verify(netconfService, timeout(1000)).remove(STORE, PATH);
663         verify(netconfService, timeout(1000)).discardChanges();
664         verify(netconfService, timeout(1000)).commit();
665     }
666
667     private ActorRef registerSlaveMountPoint() {
668         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
669         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
670         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
671         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
672                 NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2).setActorSystem(system)
673                 .setBaseSchemas(BASE_SCHEMAS).build(), remoteDeviceId, TIMEOUT, mockMountPointService));
674
675         doReturn(Futures.immediateFuture(mockSchemaContext))
676                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
677
678         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
679                 masterRef), testKit.getRef());
680
681         verify(mockMountPointBuilder, timeout(5000)).register();
682         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
683         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
684         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
685         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
686         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
687
688         testKit.expectMsgClass(Success.class);
689         return slaveRef;
690     }
691
692     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
693         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
694                 mockDOMRpcService, mockDOMActionService), testKit.getRef());
695         testKit.expectMsgClass(MasterActorDataInitialized.class);
696     }
697
698     private void resetMountPointMocks() {
699         reset(mockMountPointReg, mockMountPointBuilder);
700
701         doNothing().when(mockMountPointReg).close();
702
703         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
704         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
705     }
706
707     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
708         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
709     }
710
711     private static String convertStreamToString(final InputStream is) {
712         try (Scanner scanner = new Scanner(is)) {
713             return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
714         }
715     }
716 }