Migrate netconf-topology-singleton tests to JUnit5
[netconf.git] / apps / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.CoreMatchers.startsWith;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.jupiter.api.Assertions.assertEquals;
14 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
15 import static org.junit.jupiter.api.Assertions.assertNull;
16 import static org.junit.jupiter.api.Assertions.assertThrows;
17 import static org.junit.jupiter.api.Assertions.assertTrue;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.ArgumentMatchers.anyCollection;
20 import static org.mockito.ArgumentMatchers.argThat;
21 import static org.mockito.ArgumentMatchers.eq;
22 import static org.mockito.Mockito.after;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.doNothing;
25 import static org.mockito.Mockito.doReturn;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.reset;
28 import static org.mockito.Mockito.timeout;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
33 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
34
35 import akka.actor.ActorRef;
36 import akka.actor.ActorSystem;
37 import akka.actor.Props;
38 import akka.actor.Status.Failure;
39 import akka.actor.Status.Success;
40 import akka.pattern.AskTimeoutException;
41 import akka.pattern.Patterns;
42 import akka.testkit.TestActorRef;
43 import akka.testkit.javadsl.TestKit;
44 import akka.util.Timeout;
45 import com.google.common.collect.ImmutableList;
46 import com.google.common.io.CharSource;
47 import com.google.common.net.InetAddresses;
48 import com.google.common.util.concurrent.FluentFuture;
49 import com.google.common.util.concurrent.Futures;
50 import com.google.common.util.concurrent.ListenableFuture;
51 import com.google.common.util.concurrent.SettableFuture;
52 import java.net.InetSocketAddress;
53 import java.time.Duration;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.Optional;
57 import java.util.concurrent.ExecutionException;
58 import java.util.concurrent.TimeUnit;
59 import org.junit.jupiter.api.AfterEach;
60 import org.junit.jupiter.api.BeforeEach;
61 import org.junit.jupiter.api.Test;
62 import org.junit.jupiter.api.extension.ExtendWith;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.junit.jupiter.MockitoExtension;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMActionException;
68 import org.opendaylight.mdsal.dom.api.DOMActionService;
69 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
74 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
75 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
76 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
77 import org.opendaylight.mdsal.dom.api.DOMRpcException;
78 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
79 import org.opendaylight.mdsal.dom.api.DOMRpcService;
80 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
81 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
82 import org.opendaylight.netconf.client.mdsal.api.DeviceNetconfSchemaProvider;
83 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
84 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
85 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
86 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
87 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
88 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
89 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
90 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
91 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
92 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
93 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
94 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
95 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
96 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
97 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
98 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
99 import org.opendaylight.yangtools.concepts.ObjectRegistration;
100 import org.opendaylight.yangtools.concepts.Registration;
101 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
102 import org.opendaylight.yangtools.yang.common.ErrorType;
103 import org.opendaylight.yangtools.yang.common.QName;
104 import org.opendaylight.yangtools.yang.common.RpcError;
105 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
106 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
107 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
108 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
109 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
110 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
111 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
112 import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
113 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
114 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
115 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
116 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
117 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
118 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
119 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
120 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
121 import org.opendaylight.yangtools.yang.model.spi.source.DelegatedYangTextSource;
122 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
123 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
124 import scala.concurrent.Await;
125 import scala.concurrent.Future;
126
127 @ExtendWith(MockitoExtension.class)
128 class NetconfNodeActorTest extends AbstractBaseSchemasTest {
129
130     private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
131     private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
132     private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
133
134     private ActorSystem system = ActorSystem.create();
135     private final TestKit testKit = new TestKit(system);
136
137     private ActorRef masterRef;
138     private RemoteDeviceId remoteDeviceId;
139     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
140
141     @Mock
142     private Rpcs.Normalized mockRpc;
143     @Mock
144     private DOMRpcService mockDOMRpcService;
145     @Mock
146     private Actions.Normalized mockDOMActionService;
147     @Mock
148     private DOMMountPointService mockMountPointService;
149     @Mock
150     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
151     @Mock
152     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
153     @Mock
154     private DOMDataBroker mockDOMDataBroker;
155     @Mock
156     private NetconfDataTreeService netconfService;
157     @Mock
158     private Registration mockSchemaSourceReg1;
159     @Mock
160     private Registration mockSchemaSourceReg2;
161     @Mock
162     private SchemaSourceRegistry mockRegistry;
163     @Mock
164     private EffectiveModelContextFactory mockSchemaContextFactory;
165     @Mock
166     private SchemaRepository mockSchemaRepository;
167     @Mock
168     private EffectiveModelContext mockSchemaContext;
169     @Mock
170     private DeviceNetconfSchemaProvider deviceSchemaProvider;
171
172     @BeforeEach
173     void setup() {
174         remoteDeviceId = new RemoteDeviceId("netconf-topology",
175                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
176
177         masterSchemaRepository.registerSchemaSourceListener(
178                 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
179
180         final NetconfTopologySetup setup = NetconfTopologySetup.builder()
181             .setActorSystem(system)
182             .setIdleTimeout(Duration.ofSeconds(1))
183             .setDeviceSchemaProvider(deviceSchemaProvider)
184             .setBaseSchemaProvider(BASE_SCHEMAS)
185             .build();
186
187         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
188
189         masterRef = TestActorRef.create(system, props, "master_messages");
190     }
191
192     @AfterEach
193     void teardown() {
194         TestKit.shutdownActorSystem(system, true);
195         system = null;
196     }
197
198     @Test
199     void testInitializeAndRefreshMasterData() {
200
201         // Test CreateInitialMasterActorData.
202
203         initializeMaster(new ArrayList<>());
204
205         // Test RefreshSetupMasterActorData.
206
207         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
208                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
209
210         final NetconfTopologySetup newSetup = NetconfTopologySetup.builder()
211             .setBaseSchemaProvider(BASE_SCHEMAS)
212             .setDeviceSchemaProvider(deviceSchemaProvider)
213             .setActorSystem(system)
214             .build();
215
216         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
217
218         testKit.expectMsgClass(MasterActorDataInitialized.class);
219     }
220
221     @Test
222     void testAskForMasterMountPoint() {
223
224         // Test with master not setup yet.
225
226         final TestKit kit = new TestKit(system);
227
228         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
229
230         final Failure failure = kit.expectMsgClass(Failure.class);
231         assertInstanceOf(NotMasterException.class, failure.cause());
232
233         // Now initialize - master should send the RegisterMountPoint message.
234
235         List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
236         initializeMaster(sourceIdentifiers);
237
238         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
239
240         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
241
242         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
243     }
244
245     @Test
246     void testRegisterAndUnregisterMountPoint() throws Exception {
247         resetMountPointMocks();
248         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
249         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
250         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
251         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
252
253         ActorRef slaveRef = registerSlaveMountPoint();
254
255         // Unregister
256
257         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
258
259         verify(mockMountPointReg, timeout(5000)).close();
260         verify(mockSchemaSourceReg1, timeout(1000)).close();
261         verify(mockSchemaSourceReg2, timeout(1000)).close();
262
263         // Test registration with another interleaved registration that completes while the first registration
264         // is resolving the schema context.
265
266         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
267         resetMountPointMocks();
268
269         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
270
271         final var newMockSchemaSourceReg = mock(Registration.class);
272
273         final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
274         doReturn(Futures.immediateFuture(mockSchemaContext))
275                 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
276
277         doAnswer(unused -> {
278             SettableFuture<SchemaContext> future = SettableFuture.create();
279             new Thread(() -> {
280                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
281                         withSourceId(SOURCE_IDENTIFIER1));
282
283                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
284                         .createEffectiveModelContextFactory();
285
286                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
287                         testKit.getRef());
288
289                 future.set(mockSchemaContext);
290             }).start();
291             return future;
292         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
293
294         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
295                 .createEffectiveModelContextFactory();
296
297         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
298
299         verify(mockMountPointBuilder, timeout(5000)).register();
300         verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
301         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
302         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
303         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
304         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
305         verify(mockSchemaSourceReg1).close();
306         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
307         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
308         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
309
310         // Stop the slave actor and verify schema source registrations are closed.
311
312         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
313         Await.result(stopFuture, TIMEOUT.duration());
314
315         verify(mockMountPointReg).close();
316         verify(newMockSchemaSourceReg).close();
317     }
318
319     @Test
320     void testRegisterMountPointWithSchemaFailures() {
321         resetMountPointMocks();
322         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
323         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
324         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
325         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
326
327         var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
328         doReturn(mockRegistry).when(deviceSchemaProvider2).registry();
329         doReturn(mockSchemaRepository).when(deviceSchemaProvider2).repository();
330         final NetconfTopologySetup setup = NetconfTopologySetup.builder()
331                 .setDeviceSchemaProvider(deviceSchemaProvider2)
332                 .setBaseSchemaProvider(BASE_SCHEMAS)
333                 .setActorSystem(system)
334                 .build();
335
336         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
337                 mockMountPointService));
338
339         // Test unrecoverable failure.
340
341         doReturn(Futures.immediateFailedFuture(
342             new SchemaResolutionException("mock", new SourceIdentifier("foo"), null)))
343                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
344
345         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
346                 masterRef), testKit.getRef());
347
348         testKit.expectMsgClass(Success.class);
349
350         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
351         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
352
353         verify(mockMountPointBuilder, after(1000).never()).register();
354         verify(mockSchemaSourceReg1, timeout(1000)).close();
355         verify(mockSchemaSourceReg2, timeout(1000)).close();
356
357         // Test recoverable AskTimeoutException - schema context resolution should be retried.
358
359         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
360
361         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock", new SourceIdentifier("foo"),
362                 new AskTimeoutException("timeout"))))
363             .doReturn(Futures.immediateFuture(mockSchemaContext))
364             .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
365
366         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
367                 masterRef), testKit.getRef());
368
369         testKit.expectMsgClass(Success.class);
370
371         verify(mockMountPointBuilder, timeout(5000)).register();
372         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
373
374         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
375         // attempt should not be retried.
376
377         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
378         resetMountPointMocks();
379
380         final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
381         doReturn(Futures.immediateFuture(mockSchemaContext))
382                 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
383
384         doAnswer(unused -> {
385             SettableFuture<SchemaContext> future = SettableFuture.create();
386             new Thread(() -> {
387                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
388                     .createEffectiveModelContextFactory();
389
390                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
391                         masterRef), testKit.getRef());
392
393                 future.setException(new SchemaResolutionException("mock", new SourceIdentifier("foo"),
394                     new AskTimeoutException("timeout")));
395             }).start();
396             return future;
397         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
398
399         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
400
401         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
402                 masterRef), testKit.getRef());
403
404         verify(mockMountPointBuilder, timeout(5000)).register();
405         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
406     }
407
408     @Test
409     void testMissingSchemaSourceOnMissingProvider() {
410         final var repository = new SharedSchemaRepository("test");
411
412         final var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
413         doReturn(repository).when(deviceSchemaProvider2).repository();
414         final var setup = NetconfTopologySetup.builder()
415             .setActorSystem(system)
416             .setDeviceSchemaProvider(deviceSchemaProvider2)
417             .setIdleTimeout(Duration.ofSeconds(1))
418             .setBaseSchemaProvider(BASE_SCHEMAS)
419             .build();
420         final var props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
421         final var actor = TestActorRef.create(system, props, "master_messages_2");
422
423         final var sourceIdentifier = new SourceIdentifier("testID");
424
425         final var proxyYangProvider = new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
426
427         final var resolvedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
428         final var ex = assertThrows(MissingSchemaSourceException.class,
429             () -> Await.result(resolvedSchemaFuture, TIMEOUT.duration()));
430         assertEquals("No providers registered for source SourceIdentifier [testID]", ex.getMessage());
431     }
432
433     @Test
434     void testYangTextSchemaSourceRequest() throws Exception {
435         doReturn(masterSchemaRepository).when(deviceSchemaProvider).repository();
436
437         final var sourceIdentifier = new SourceIdentifier("testID");
438
439         final var proxyYangProvider = new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
440
441         final var yangTextSchemaSource = new DelegatedYangTextSource(sourceIdentifier, CharSource.wrap("YANG"));
442
443         // Test success.
444
445         try (var schemaSourceReg = masterSchemaRepository.registerSchemaSource(
446                 id -> Futures.immediateFuture(yangTextSchemaSource),
447                 PotentialSchemaSource.create(sourceIdentifier, YangTextSource.class, 1))) {
448             final var resolvedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
449             final var success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
450
451             assertEquals(sourceIdentifier, success.getRepresentation().sourceId());
452             assertEquals("YANG", success.getRepresentation().read());
453         }
454
455         // Test missing source failure.
456
457         final var failedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
458         final var ex = assertThrows(MissingSchemaSourceException.class,
459             () -> Await.result(failedSchemaFuture, TIMEOUT.duration()));
460         assertThat(ex.getMessage(), startsWith("No providers registered for source"));
461         assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
462     }
463
464     @Test
465     void testSlaveInvokeRpc() throws Exception {
466         resetMountPointMocks();
467         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
468         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
469         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
470         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
471         doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
472
473         initializeMaster(List.of(new SourceIdentifier("testID")));
474         registerSlaveMountPoint();
475
476         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
477         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
478
479         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
480         assertInstanceOf(ProxyDOMRpcService.class, slaveDomRPCService);
481
482         final QName testQName = QName.create("", "TestQname");
483         final ContainerNode outputNode = ImmutableNodes.newContainerBuilder()
484                 .withNodeIdentifier(new NodeIdentifier(testQName))
485                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
486         final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
487
488         // RPC with no response output.
489
490         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
491
492         DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
493
494         assertEquals(null, result);
495
496         // RPC with response output.
497
498         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
499                 .when(mockDOMRpcService).invokeRpc(any(), any());
500
501         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
502
503         assertEquals(outputNode, result.value());
504         assertTrue(result.errors().isEmpty());
505
506         // RPC with response error.
507
508         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
509                 .when(mockDOMRpcService).invokeRpc(any(), any());
510
511         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
512
513         assertNull(result.value());
514         assertEquals(rpcError, result.errors().iterator().next());
515
516         // RPC with response output and error.
517
518         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
519                 .when(mockDOMRpcService).invokeRpc(any(), any());
520
521         final DOMRpcResult resultOutputError =
522                 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
523
524         assertEquals(outputNode, resultOutputError.value());
525         assertEquals(rpcError, resultOutputError.errors().iterator().next());
526
527         // RPC failure.
528         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
529             .when(mockDOMRpcService).invokeRpc(any(), any());
530         final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
531
532         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
533         final Throwable cause = e.getCause();
534         assertInstanceOf(DOMRpcException.class, cause);
535         assertEquals("mock", cause.getMessage());
536     }
537
538     @Test
539     void testSlaveInvokeAction() throws Exception {
540         resetMountPointMocks();
541         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
542         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
543         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
544         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
545         doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
546
547         initializeMaster(List.of(new SourceIdentifier("testActionID")));
548         registerSlaveMountPoint();
549
550         final var domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
551         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
552
553         final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
554         assertInstanceOf(ProxyDOMActionService.class, slaveDomActionService);
555
556         final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
557         final Absolute schemaPath = Absolute.of(testQName);
558
559         final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier.of(testQName);
560
561         final DOMDataTreeIdentifier domDataTreeIdentifier = DOMDataTreeIdentifier.of(LogicalDatastoreType.OPERATIONAL,
562             yangIIdPath);
563
564         final ContainerNode outputNode = ImmutableNodes.newContainerBuilder()
565             .withNodeIdentifier(new NodeIdentifier(testQName))
566             .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
567
568         // Action with no response output.
569         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
570             .invokeAction(any(), any(), any());
571         var result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
572             .get(2, TimeUnit.SECONDS);
573         assertEquals(null, result);
574
575         // Action with response output.
576         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode))).when(mockDOMActionService)
577             .invokeAction(any(), any(), any());
578         result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
579             .get(2, TimeUnit.SECONDS);
580
581         assertEquals(outputNode, result.value());
582         assertTrue(result.errors().isEmpty());
583
584         // Action failure.
585         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
586             .when(mockDOMActionService).invokeAction(any(), any(), any());
587         final var future = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode);
588
589         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
590         final Throwable cause = e.getCause();
591         assertInstanceOf(DOMActionException.class, cause);
592         assertEquals("mock", cause.getMessage());
593     }
594
595     @Test
596     void testSlaveNewTransactionRequests() {
597         resetMountPointMocks();
598         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
599         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
600         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
601         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
602         doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
603
604         doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
605         doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
606         doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
607
608         initializeMaster(List.of());
609         registerSlaveMountPoint();
610
611         final var domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
612         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
613
614         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
615         assertInstanceOf(ProxyDOMDataBroker.class, slaveDOMDataBroker);
616
617         slaveDOMDataBroker.newReadOnlyTransaction();
618         verify(mockDOMDataBroker).newReadOnlyTransaction();
619
620         slaveDOMDataBroker.newReadWriteTransaction();
621         verify(mockDOMDataBroker).newReadWriteTransaction();
622
623         slaveDOMDataBroker.newWriteOnlyTransaction();
624         verify(mockDOMDataBroker).newWriteOnlyTransaction();
625     }
626
627     @Test
628     void testSlaveNewNetconfDataTreeServiceRequest() {
629         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
630         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
631         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
632         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
633         doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
634
635         initializeMaster(List.of());
636         registerSlaveMountPoint();
637
638         ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
639         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
640
641         final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
642         assertInstanceOf(ProxyNetconfDataTreeService.class, slaveNetconfService);
643
644         final YangInstanceIdentifier PATH = YangInstanceIdentifier.of();
645         final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
646         final ContainerNode NODE = ImmutableNodes.newContainerBuilder()
647             .withNodeIdentifier(new NodeIdentifier(QName.create("", "cont")))
648             .build();
649
650         final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
651         doReturn(result).when(netconfService).get(PATH);
652         doReturn(result).when(netconfService).getConfig(PATH);
653         doReturn(emptyFluentFuture()).when(netconfService).commit();
654
655         slaveNetconfService.get(PATH);
656         slaveNetconfService.getConfig(PATH);
657         slaveNetconfService.lock();
658         slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
659         slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
660         slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
661         slaveNetconfService.delete(STORE, PATH);
662         slaveNetconfService.remove(STORE, PATH);
663         slaveNetconfService.discardChanges();
664         slaveNetconfService.commit();
665
666         verify(netconfService, timeout(1000)).get(PATH);
667         verify(netconfService, timeout(1000)).getConfig(PATH);
668         verify(netconfService, timeout(1000)).lock();
669         verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
670         verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
671         verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
672         verify(netconfService, timeout(1000)).delete(STORE, PATH);
673         verify(netconfService, timeout(1000)).remove(STORE, PATH);
674         verify(netconfService, timeout(1000)).discardChanges();
675         verify(netconfService, timeout(1000)).commit();
676     }
677
678     private ActorRef registerSlaveMountPoint() {
679         var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
680         doReturn(mockRegistry).when(deviceSchemaProvider2).registry();
681         doReturn(mockSchemaRepository).when(deviceSchemaProvider2).repository();
682         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(NetconfTopologySetup.builder()
683                 .setDeviceSchemaProvider(deviceSchemaProvider2)
684                 .setActorSystem(system)
685                 .setBaseSchemaProvider(BASE_SCHEMAS)
686                 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
687
688         doReturn(Futures.immediateFuture(mockSchemaContext))
689                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
690
691         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
692                 masterRef), testKit.getRef());
693
694         verify(mockMountPointBuilder, timeout(5000)).register();
695         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
696         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
697         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
698         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
699         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
700
701         testKit.expectMsgClass(Success.class);
702         return slaveRef;
703     }
704
705     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
706         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
707                 new RemoteDeviceServices(mockRpc, mockDOMActionService)), testKit.getRef());
708         testKit.expectMsgClass(MasterActorDataInitialized.class);
709     }
710
711     private void resetMountPointMocks() {
712         reset(mockMountPointReg, mockMountPointBuilder);
713
714         doNothing().when(mockMountPointReg).close();
715
716         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
717         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
718     }
719
720     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
721         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
722     }
723 }