52dac16d32583257e96c7703f503798512896950
[netconf.git] / apps / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.CoreMatchers.instanceOf;
12 import static org.hamcrest.CoreMatchers.startsWith;
13 import static org.hamcrest.MatcherAssert.assertThat;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertThrows;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.ArgumentMatchers.anyCollection;
20 import static org.mockito.ArgumentMatchers.argThat;
21 import static org.mockito.ArgumentMatchers.eq;
22 import static org.mockito.Mockito.after;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.doNothing;
25 import static org.mockito.Mockito.doReturn;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.reset;
28 import static org.mockito.Mockito.timeout;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
33 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
34
35 import akka.actor.ActorRef;
36 import akka.actor.ActorSystem;
37 import akka.actor.Props;
38 import akka.actor.Status.Failure;
39 import akka.actor.Status.Success;
40 import akka.pattern.AskTimeoutException;
41 import akka.pattern.Patterns;
42 import akka.testkit.TestActorRef;
43 import akka.testkit.javadsl.TestKit;
44 import akka.util.Timeout;
45 import com.google.common.collect.ImmutableList;
46 import com.google.common.io.CharSource;
47 import com.google.common.net.InetAddresses;
48 import com.google.common.util.concurrent.FluentFuture;
49 import com.google.common.util.concurrent.Futures;
50 import com.google.common.util.concurrent.ListenableFuture;
51 import com.google.common.util.concurrent.SettableFuture;
52 import java.net.InetSocketAddress;
53 import java.time.Duration;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.Optional;
57 import java.util.concurrent.ExecutionException;
58 import java.util.concurrent.TimeUnit;
59 import org.junit.After;
60 import org.junit.Before;
61 import org.junit.Test;
62 import org.junit.runner.RunWith;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.junit.MockitoJUnitRunner;
66 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
67 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
68 import org.opendaylight.mdsal.dom.api.DOMActionException;
69 import org.opendaylight.mdsal.dom.api.DOMActionResult;
70 import org.opendaylight.mdsal.dom.api.DOMActionService;
71 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
74 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
76 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
77 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
78 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
79 import org.opendaylight.mdsal.dom.api.DOMRpcException;
80 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
81 import org.opendaylight.mdsal.dom.api.DOMRpcService;
82 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
83 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
84 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
85 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
86 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
87 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
88 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
89 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
90 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
91 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
92 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
93 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
94 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
95 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
96 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
97 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
98 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
99 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
100 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
101 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
102 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
103 import org.opendaylight.yangtools.concepts.ObjectRegistration;
104 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
105 import org.opendaylight.yangtools.yang.common.ErrorType;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
113 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
114 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
115 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
116 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
117 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
118 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
119 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
120 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
121 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
122 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
123 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
124 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
125 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
126 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
127 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
128 import scala.concurrent.Await;
129 import scala.concurrent.Future;
130
131 @RunWith(MockitoJUnitRunner.StrictStubs.class)
132 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
133
134     private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
135     private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
136     private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
137
138     private ActorSystem system = ActorSystem.create();
139     private final TestKit testKit = new TestKit(system);
140
141     private ActorRef masterRef;
142     private RemoteDeviceId remoteDeviceId;
143     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
144
145     @Mock
146     private Rpcs.Normalized mockDOMRpcService;
147     @Mock
148     private Actions.Normalized mockDOMActionService;
149     @Mock
150     private DOMMountPointService mockMountPointService;
151     @Mock
152     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
153     @Mock
154     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
155     @Mock
156     private DOMDataBroker mockDOMDataBroker;
157     @Mock
158     private NetconfDataTreeService netconfService;
159     @Mock
160     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
161     @Mock
162     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
163     @Mock
164     private SchemaSourceRegistry mockRegistry;
165     @Mock
166     private EffectiveModelContextFactory mockSchemaContextFactory;
167     @Mock
168     private SchemaRepository mockSchemaRepository;
169     @Mock
170     private EffectiveModelContext mockSchemaContext;
171     @Mock
172     private SchemaResourcesDTO schemaResourceDTO;
173
174     @Before
175     public void setup() {
176         remoteDeviceId = new RemoteDeviceId("netconf-topology",
177                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
178
179         masterSchemaRepository.registerSchemaSourceListener(
180                 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
181
182         doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
183         doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
184         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
185             .setActorSystem(system)
186             .setIdleTimeout(Duration.ofSeconds(1))
187             .setSchemaResourceDTO(schemaResourceDTO)
188             .setBaseSchemas(BASE_SCHEMAS)
189             .build();
190
191         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
192
193         masterRef = TestActorRef.create(system, props, "master_messages");
194
195         resetMountPointMocks();
196
197         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
198
199         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
200         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
201
202         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
203                 .createEffectiveModelContextFactory();
204     }
205
206     @After
207     public void teardown() {
208         TestKit.shutdownActorSystem(system, true);
209         system = null;
210     }
211
212     @Test
213     public void testInitializeAndRefreshMasterData() {
214
215         // Test CreateInitialMasterActorData.
216
217         initializeMaster(new ArrayList<>());
218
219         // Test RefreshSetupMasterActorData.
220
221         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
222                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
223
224         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create()
225             .setBaseSchemas(BASE_SCHEMAS)
226             .setSchemaResourceDTO(schemaResourceDTO)
227             .setActorSystem(system)
228             .build();
229
230         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
231
232         testKit.expectMsgClass(MasterActorDataInitialized.class);
233     }
234
235     @Test
236     public void testAskForMasterMountPoint() {
237
238         // Test with master not setup yet.
239
240         final TestKit kit = new TestKit(system);
241
242         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
243
244         final Failure failure = kit.expectMsgClass(Failure.class);
245         assertTrue(failure.cause() instanceof NotMasterException);
246
247         // Now initialize - master should send the RegisterMountPoint message.
248
249         List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
250         initializeMaster(sourceIdentifiers);
251
252         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
253
254         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
255
256         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
257     }
258
259     @Test
260     public void testRegisterAndUnregisterMountPoint() throws Exception {
261
262         ActorRef slaveRef = registerSlaveMountPoint();
263
264         // Unregister
265
266         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
267
268         verify(mockMountPointReg, timeout(5000)).close();
269         verify(mockSchemaSourceReg1, timeout(1000)).close();
270         verify(mockSchemaSourceReg2, timeout(1000)).close();
271
272         // Test registration with another interleaved registration that completes while the first registration
273         // is resolving the schema context.
274
275         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
276         resetMountPointMocks();
277
278         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
279
280         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
281
282         final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
283         doReturn(Futures.immediateFuture(mockSchemaContext))
284                 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
285
286         doAnswer(unused -> {
287             SettableFuture<SchemaContext> future = SettableFuture.create();
288             new Thread(() -> {
289                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
290                         withSourceId(SOURCE_IDENTIFIER1));
291
292                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
293                         .createEffectiveModelContextFactory();
294
295                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
296                         testKit.getRef());
297
298                 future.set(mockSchemaContext);
299             }).start();
300             return future;
301         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
302
303         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
304                 .createEffectiveModelContextFactory();
305
306         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
307
308         verify(mockMountPointBuilder, timeout(5000)).register();
309         verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
310         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
311         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
312         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
313         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
314         verify(mockSchemaSourceReg1).close();
315         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
316         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
317         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
318
319         // Stop the slave actor and verify schema source registrations are closed.
320
321         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
322         Await.result(stopFuture, TIMEOUT.duration());
323
324         verify(mockMountPointReg).close();
325         verify(newMockSchemaSourceReg).close();
326     }
327
328     @SuppressWarnings("unchecked")
329     @Test
330     public void testRegisterMountPointWithSchemaFailures() throws Exception {
331         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
332         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
333         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
334         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
335                 .setSchemaResourceDTO(schemaResourceDTO2)
336                 .setBaseSchemas(BASE_SCHEMAS)
337                 .setActorSystem(system)
338                 .build();
339
340         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
341                 mockMountPointService));
342
343         // Test unrecoverable failure.
344
345         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
346                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
347
348         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
349                 masterRef), testKit.getRef());
350
351         testKit.expectMsgClass(Success.class);
352
353         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
354         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
355
356         verify(mockMountPointBuilder, after(1000).never()).register();
357         verify(mockSchemaSourceReg1, timeout(1000)).close();
358         verify(mockSchemaSourceReg2, timeout(1000)).close();
359
360         // Test recoverable AskTimeoutException - schema context resolution should be retried.
361
362         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
363
364         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
365                 new AskTimeoutException("timeout"))))
366             .doReturn(Futures.immediateFuture(mockSchemaContext))
367             .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
368
369         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
370                 masterRef), testKit.getRef());
371
372         testKit.expectMsgClass(Success.class);
373
374         verify(mockMountPointBuilder, timeout(5000)).register();
375         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
376
377         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
378         // attempt should not be retried.
379
380         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
381         resetMountPointMocks();
382
383         final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
384         doReturn(Futures.immediateFuture(mockSchemaContext))
385                 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
386
387         doAnswer(unused -> {
388             SettableFuture<SchemaContext> future = SettableFuture.create();
389             new Thread(() -> {
390                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
391                     .createEffectiveModelContextFactory();
392
393                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
394                         masterRef), testKit.getRef());
395
396                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
397             }).start();
398             return future;
399         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
400
401         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
402
403         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
404                 masterRef), testKit.getRef());
405
406         verify(mockMountPointBuilder, timeout(5000)).register();
407         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
408     }
409
410     @Test(expected = MissingSchemaSourceException.class)
411     public void testMissingSchemaSourceOnMissingProvider() throws Exception {
412         final SharedSchemaRepository repository = new SharedSchemaRepository("test");
413
414         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
415         doReturn(repository).when(schemaResourceDTO2).getSchemaRegistry();
416         doReturn(repository).when(schemaResourceDTO2).getSchemaRepository();
417         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
418             .setActorSystem(system)
419             .setSchemaResourceDTO(schemaResourceDTO2)
420             .setIdleTimeout(Duration.ofSeconds(1))
421             .setBaseSchemas(BASE_SCHEMAS)
422             .build();
423         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
424         ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
425
426         final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
427
428         final ProxyYangTextSourceProvider proxyYangProvider =
429                 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
430
431         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
432                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
433         Await.result(resolvedSchemaFuture, TIMEOUT.duration());
434     }
435
436     @Test
437     public void testYangTextSchemaSourceRequest() throws Exception {
438         final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
439
440         final ProxyYangTextSourceProvider proxyYangProvider =
441                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
442
443         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForCharSource(sourceIdentifier,
444                 CharSource.wrap("YANG"));
445
446         // Test success.
447
448         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
449                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
450                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
451
452         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
453                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
454
455         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
456
457         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
458         assertEquals("YANG", success.getRepresentation().read());
459
460         // Test missing source failure.
461
462         schemaSourceReg.close();
463
464         final MissingSchemaSourceException ex = assertThrows(MissingSchemaSourceException.class,
465             () -> {
466                 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
467                         proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
468                 Await.result(failedSchemaFuture, TIMEOUT.duration());
469             });
470         assertThat(ex.getMessage(), startsWith("No providers registered for source"));
471         assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
472     }
473
474     @Test
475     public void testSlaveInvokeRpc() throws Exception {
476
477         initializeMaster(List.of(new SourceIdentifier("testID")));
478         registerSlaveMountPoint();
479
480         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
481         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
482
483         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
484         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
485
486         final QName testQName = QName.create("", "TestQname");
487         final ContainerNode outputNode = Builders.containerBuilder()
488                 .withNodeIdentifier(new NodeIdentifier(testQName))
489                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
490         final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
491
492         // RPC with no response output.
493
494         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
495
496         DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
497
498         assertEquals(null, result);
499
500         // RPC with response output.
501
502         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
503                 .when(mockDOMRpcService).invokeRpc(any(), any());
504
505         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
506
507         assertEquals(outputNode, result.value());
508         assertTrue(result.errors().isEmpty());
509
510         // RPC with response error.
511
512         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
513                 .when(mockDOMRpcService).invokeRpc(any(), any());
514
515         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
516
517         assertNull(result.value());
518         assertEquals(rpcError, result.errors().iterator().next());
519
520         // RPC with response output and error.
521
522         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
523                 .when(mockDOMRpcService).invokeRpc(any(), any());
524
525         final DOMRpcResult resultOutputError =
526                 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
527
528         assertEquals(outputNode, resultOutputError.value());
529         assertEquals(rpcError, resultOutputError.errors().iterator().next());
530
531         // RPC failure.
532         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
533             .when(mockDOMRpcService).invokeRpc(any(), any());
534         final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
535
536         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
537         final Throwable cause = e.getCause();
538         assertThat(cause, instanceOf(DOMRpcException.class));
539         assertEquals("mock", cause.getMessage());
540     }
541
542     @Test
543     public void testSlaveInvokeAction() throws Exception {
544         initializeMaster(List.of(new SourceIdentifier("testActionID")));
545         registerSlaveMountPoint();
546
547         ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
548         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
549
550         final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
551         assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
552
553         final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
554         final Absolute schemaPath = Absolute.of(testQName);
555
556         final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier.create(new NodeIdentifier(testQName));
557
558         final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
559             yangIIdPath);
560
561         final ContainerNode outputNode = Builders.containerBuilder()
562             .withNodeIdentifier(new NodeIdentifier(testQName))
563             .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
564
565         // Action with no response output.
566         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
567             .invokeAction(any(), any(), any());
568         DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
569             .get(2, TimeUnit.SECONDS);
570         assertEquals(null, result);
571
572         // Action with response output.
573         doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
574             .invokeAction(any(), any(), any());
575         result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
576             .get(2, TimeUnit.SECONDS);
577
578         assertEquals(Optional.of(outputNode), result.getOutput());
579         assertTrue(result.getErrors().isEmpty());
580
581         // Action failure.
582         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
583             .when(mockDOMActionService).invokeAction(any(), any(), any());
584         final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
585             domDataTreeIdentifier, outputNode);
586
587         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
588         final Throwable cause = e.getCause();
589         assertThat(cause, instanceOf(DOMActionException.class));
590         assertEquals("mock", cause.getMessage());
591     }
592
593     @Test
594     public void testSlaveNewTransactionRequests() {
595         doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
596         doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
597         doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
598
599         initializeMaster(List.of());
600         registerSlaveMountPoint();
601
602         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
603         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
604
605         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
606         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
607
608         slaveDOMDataBroker.newReadOnlyTransaction();
609         verify(mockDOMDataBroker).newReadOnlyTransaction();
610
611         slaveDOMDataBroker.newReadWriteTransaction();
612         verify(mockDOMDataBroker).newReadWriteTransaction();
613
614         slaveDOMDataBroker.newWriteOnlyTransaction();
615         verify(mockDOMDataBroker).newWriteOnlyTransaction();
616     }
617
618     @Test
619     public void testSlaveNewNetconfDataTreeServiceRequest() {
620         initializeMaster(List.of());
621         registerSlaveMountPoint();
622
623         ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
624         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
625
626         final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
627         assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
628
629         final YangInstanceIdentifier PATH = YangInstanceIdentifier.of();
630         final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
631         final ContainerNode NODE = Builders.containerBuilder()
632             .withNodeIdentifier(new NodeIdentifier(QName.create("", "cont")))
633             .build();
634
635         final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
636         doReturn(result).when(netconfService).get(PATH);
637         doReturn(result).when(netconfService).getConfig(PATH);
638         doReturn(emptyFluentFuture()).when(netconfService).commit();
639
640         slaveNetconfService.get(PATH);
641         slaveNetconfService.getConfig(PATH);
642         slaveNetconfService.lock();
643         slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
644         slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
645         slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
646         slaveNetconfService.delete(STORE, PATH);
647         slaveNetconfService.remove(STORE, PATH);
648         slaveNetconfService.discardChanges();
649         slaveNetconfService.commit();
650
651         verify(netconfService, timeout(1000)).get(PATH);
652         verify(netconfService, timeout(1000)).getConfig(PATH);
653         verify(netconfService, timeout(1000)).lock();
654         verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
655         verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
656         verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
657         verify(netconfService, timeout(1000)).delete(STORE, PATH);
658         verify(netconfService, timeout(1000)).remove(STORE, PATH);
659         verify(netconfService, timeout(1000)).discardChanges();
660         verify(netconfService, timeout(1000)).commit();
661     }
662
663     private ActorRef registerSlaveMountPoint() {
664         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
665         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
666         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
667         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(NetconfTopologySetupBuilder.create()
668                 .setSchemaResourceDTO(schemaResourceDTO2)
669                 .setActorSystem(system)
670                 .setBaseSchemas(BASE_SCHEMAS)
671                 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
672
673         doReturn(Futures.immediateFuture(mockSchemaContext))
674                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
675
676         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
677                 masterRef), testKit.getRef());
678
679         verify(mockMountPointBuilder, timeout(5000)).register();
680         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
681         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
682         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
683         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
684         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
685
686         testKit.expectMsgClass(Success.class);
687         return slaveRef;
688     }
689
690     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
691         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
692                 new RemoteDeviceServices(mockDOMRpcService, mockDOMActionService)), testKit.getRef());
693         testKit.expectMsgClass(MasterActorDataInitialized.class);
694     }
695
696     private void resetMountPointMocks() {
697         reset(mockMountPointReg, mockMountPointBuilder);
698
699         doNothing().when(mockMountPointReg).close();
700
701         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
702         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
703     }
704
705     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
706         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
707     }
708 }