a0df29f994d7b315f35a5fe5208f6c5f78f28d1d
[netconf.git] / netconf / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.hamcrest.CoreMatchers.instanceOf;
13 import static org.hamcrest.CoreMatchers.startsWith;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertNull;
17 import static org.junit.Assert.assertThrows;
18 import static org.junit.Assert.assertTrue;
19 import static org.mockito.ArgumentMatchers.any;
20 import static org.mockito.ArgumentMatchers.anyCollection;
21 import static org.mockito.ArgumentMatchers.argThat;
22 import static org.mockito.ArgumentMatchers.eq;
23 import static org.mockito.Mockito.after;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.doNothing;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.timeout;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
33 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
34 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
35
36 import akka.actor.ActorRef;
37 import akka.actor.ActorSystem;
38 import akka.actor.Props;
39 import akka.actor.Status.Failure;
40 import akka.actor.Status.Success;
41 import akka.pattern.AskTimeoutException;
42 import akka.pattern.Patterns;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.collect.ImmutableList;
47 import com.google.common.collect.Lists;
48 import com.google.common.io.ByteSource;
49 import com.google.common.net.InetAddresses;
50 import com.google.common.util.concurrent.FluentFuture;
51 import com.google.common.util.concurrent.Futures;
52 import com.google.common.util.concurrent.ListenableFuture;
53 import com.google.common.util.concurrent.SettableFuture;
54 import java.io.InputStream;
55 import java.net.InetSocketAddress;
56 import java.util.ArrayList;
57 import java.util.Collections;
58 import java.util.List;
59 import java.util.Optional;
60 import java.util.Scanner;
61 import java.util.concurrent.ExecutionException;
62 import java.util.concurrent.TimeUnit;
63 import org.junit.After;
64 import org.junit.Before;
65 import org.junit.Test;
66 import org.junit.runner.RunWith;
67 import org.mockito.ArgumentCaptor;
68 import org.mockito.Mock;
69 import org.mockito.junit.MockitoJUnitRunner;
70 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
71 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
72 import org.opendaylight.mdsal.dom.api.DOMActionException;
73 import org.opendaylight.mdsal.dom.api.DOMActionResult;
74 import org.opendaylight.mdsal.dom.api.DOMActionService;
75 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
76 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
77 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
80 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
81 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
82 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
83 import org.opendaylight.mdsal.dom.api.DOMRpcException;
84 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
85 import org.opendaylight.mdsal.dom.api.DOMRpcService;
86 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
87 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
88 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
89 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
90 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
91 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
92 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
93 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
94 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
95 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
96 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
97 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
98 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
99 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
100 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
101 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
102 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
103 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
104 import org.opendaylight.yangtools.concepts.ObjectRegistration;
105 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
106 import org.opendaylight.yangtools.yang.common.ErrorType;
107 import org.opendaylight.yangtools.yang.common.QName;
108 import org.opendaylight.yangtools.yang.common.RpcError;
109 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
110 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
111 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
112 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
113 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
114 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
115 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
116 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
117 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
118 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
119 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
120 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
121 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
122 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
123 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
124 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
125 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
126 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
127 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
128 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
129 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
130 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
131 import scala.concurrent.Await;
132 import scala.concurrent.Future;
133 import scala.concurrent.duration.Duration;
134
135 @RunWith(MockitoJUnitRunner.StrictStubs.class)
136 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
137
138     private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
139     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
140     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
141
142     private ActorSystem system = ActorSystem.create();
143     private final TestKit testKit = new TestKit(system);
144
145     private ActorRef masterRef;
146     private RemoteDeviceId remoteDeviceId;
147     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
148
149     @Mock
150     private DOMRpcService mockDOMRpcService;
151
152     @Mock
153     private DOMActionService mockDOMActionService;
154
155     @Mock
156     private DOMMountPointService mockMountPointService;
157
158     @Mock
159     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
160
161     @Mock
162     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
163
164     @Mock
165     private DOMDataBroker mockDOMDataBroker;
166
167     @Mock
168     private NetconfDataTreeService netconfService;
169
170     @Mock
171     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
172
173     @Mock
174     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
175
176     @Mock
177     private SchemaSourceRegistry mockRegistry;
178
179     @Mock
180     private EffectiveModelContextFactory mockSchemaContextFactory;
181
182     @Mock
183     private SchemaRepository mockSchemaRepository;
184
185     @Mock
186     private EffectiveModelContext mockSchemaContext;
187
188     @Mock
189     private SchemaResourcesDTO schemaResourceDTO;
190
191     @Before
192     public void setup() {
193         remoteDeviceId = new RemoteDeviceId("netconf-topology",
194                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
195
196         masterSchemaRepository.registerSchemaSourceListener(
197                 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
198
199         doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
200         doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
201         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
202                 .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).setSchemaResourceDTO(schemaResourceDTO)
203                 .setBaseSchemas(BASE_SCHEMAS).build();
204
205         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
206
207         masterRef = TestActorRef.create(system, props, "master_messages");
208
209         resetMountPointMocks();
210
211         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
212
213         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
214         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
215
216         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
217                 .createEffectiveModelContextFactory();
218     }
219
220     @After
221     public void teardown() {
222         TestKit.shutdownActorSystem(system, true);
223         system = null;
224     }
225
226     @Test
227     public void testInitializeAndRefreshMasterData() {
228
229         // Test CreateInitialMasterActorData.
230
231         initializeMaster(new ArrayList<>());
232
233         // Test RefreshSetupMasterActorData.
234
235         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
236                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
237
238         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setBaseSchemas(BASE_SCHEMAS)
239                 .setSchemaResourceDTO(schemaResourceDTO).setActorSystem(system).build();
240
241         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
242
243         testKit.expectMsgClass(MasterActorDataInitialized.class);
244     }
245
246     @Test
247     public void testAskForMasterMountPoint() {
248
249         // Test with master not setup yet.
250
251         final TestKit kit = new TestKit(system);
252
253         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
254
255         final Failure failure = kit.expectMsgClass(Failure.class);
256         assertTrue(failure.cause() instanceof NotMasterException);
257
258         // Now initialize - master should send the RegisterMountPoint message.
259
260         List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
261         initializeMaster(sourceIdentifiers);
262
263         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
264
265         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
266
267         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
268     }
269
270     @Test
271     public void testRegisterAndUnregisterMountPoint() throws Exception {
272
273         ActorRef slaveRef = registerSlaveMountPoint();
274
275         // Unregister
276
277         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
278
279         verify(mockMountPointReg, timeout(5000)).close();
280         verify(mockSchemaSourceReg1, timeout(1000)).close();
281         verify(mockSchemaSourceReg2, timeout(1000)).close();
282
283         // Test registration with another interleaved registration that completes while the first registration
284         // is resolving the schema context.
285
286         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
287         resetMountPointMocks();
288
289         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
290
291         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
292
293         final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
294         doReturn(Futures.immediateFuture(mockSchemaContext))
295                 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
296
297         doAnswer(unused -> {
298             SettableFuture<SchemaContext> future = SettableFuture.create();
299             new Thread(() -> {
300                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
301                         withSourceId(SOURCE_IDENTIFIER1));
302
303                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
304                         .createEffectiveModelContextFactory();
305
306                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
307                         testKit.getRef());
308
309                 future.set(mockSchemaContext);
310             }).start();
311             return future;
312         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
313
314         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
315                 .createEffectiveModelContextFactory();
316
317         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
318
319         verify(mockMountPointBuilder, timeout(5000)).register();
320         verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
321         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
322         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
323         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
324         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
325         verify(mockSchemaSourceReg1).close();
326         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
327         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
328         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
329
330         // Stop the slave actor and verify schema source registrations are closed.
331
332         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
333         Await.result(stopFuture, TIMEOUT.duration());
334
335         verify(mockMountPointReg).close();
336         verify(newMockSchemaSourceReg).close();
337     }
338
339     @SuppressWarnings("unchecked")
340     @Test
341     public void testRegisterMountPointWithSchemaFailures() throws Exception {
342         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
343         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
344         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
345         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
346                 .setSchemaResourceDTO(schemaResourceDTO2)
347                 .setBaseSchemas(BASE_SCHEMAS)
348                 .setActorSystem(system)
349                 .build();
350
351         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
352                 mockMountPointService));
353
354         // Test unrecoverable failure.
355
356         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
357                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
358
359         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
360                 masterRef), testKit.getRef());
361
362         testKit.expectMsgClass(Success.class);
363
364         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
365         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
366
367         verify(mockMountPointBuilder, after(1000).never()).register();
368         verify(mockSchemaSourceReg1, timeout(1000)).close();
369         verify(mockSchemaSourceReg2, timeout(1000)).close();
370
371         // Test recoverable AskTimeoutException - schema context resolution should be retried.
372
373         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
374
375         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
376                 new AskTimeoutException("timeout"))))
377             .doReturn(Futures.immediateFuture(mockSchemaContext))
378             .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
379
380         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
381                 masterRef), testKit.getRef());
382
383         testKit.expectMsgClass(Success.class);
384
385         verify(mockMountPointBuilder, timeout(5000)).register();
386         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
387
388         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
389         // attempt should not be retried.
390
391         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
392         resetMountPointMocks();
393
394         final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
395         doReturn(Futures.immediateFuture(mockSchemaContext))
396                 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
397
398         doAnswer(unused -> {
399             SettableFuture<SchemaContext> future = SettableFuture.create();
400             new Thread(() -> {
401                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
402                     .createEffectiveModelContextFactory();
403
404                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
405                         masterRef), testKit.getRef());
406
407                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
408             }).start();
409             return future;
410         }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
411
412         doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
413
414         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
415                 masterRef), testKit.getRef());
416
417         verify(mockMountPointBuilder, timeout(5000)).register();
418         verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
419     }
420
421     @Test(expected = MissingSchemaSourceException.class)
422     public void testMissingSchemaSourceOnMissingProvider() throws Exception {
423         final SharedSchemaRepository repository = new SharedSchemaRepository("test");
424
425         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
426         doReturn(repository).when(schemaResourceDTO2).getSchemaRegistry();
427         doReturn(repository).when(schemaResourceDTO2).getSchemaRepository();
428         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
429                 .setSchemaResourceDTO(schemaResourceDTO2).setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS))
430                 .setBaseSchemas(BASE_SCHEMAS).build();
431         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
432         ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
433
434         final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
435
436         final ProxyYangTextSourceProvider proxyYangProvider =
437                 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
438
439         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
440                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
441         Await.result(resolvedSchemaFuture, TIMEOUT.duration());
442     }
443
444     @Test
445     public void testYangTextSchemaSourceRequest() throws Exception {
446         final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
447
448         final ProxyYangTextSourceProvider proxyYangProvider =
449                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
450
451         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
452                 ByteSource.wrap("YANG".getBytes(UTF_8)));
453
454         // Test success.
455
456         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
457                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
458                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
459
460         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
461                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
462
463         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
464
465         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
466         assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
467
468         // Test missing source failure.
469
470         schemaSourceReg.close();
471
472         final MissingSchemaSourceException ex = assertThrows(MissingSchemaSourceException.class,
473             () -> {
474                 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
475                         proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
476                 Await.result(failedSchemaFuture, TIMEOUT.duration());
477             });
478         assertThat(ex.getMessage(), startsWith("No providers registered for source"));
479         assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
480     }
481
482     @Test
483     public void testSlaveInvokeRpc() throws Exception {
484
485         final List<SourceIdentifier> sourceIdentifiers =
486                 Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
487
488         initializeMaster(sourceIdentifiers);
489         registerSlaveMountPoint();
490
491         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
492         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
493
494         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
495         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
496
497         final QName testQName = QName.create("", "TestQname");
498         final NormalizedNode outputNode = ImmutableContainerNodeBuilder.create()
499                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
500                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
501         final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
502
503         // RPC with no response output.
504
505         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
506
507         DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
508
509         assertEquals(null, result);
510
511         // RPC with response output.
512
513         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
514                 .when(mockDOMRpcService).invokeRpc(any(), any());
515
516         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
517
518         assertEquals(outputNode, result.getResult());
519         assertTrue(result.getErrors().isEmpty());
520
521         // RPC with response error.
522
523         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
524                 .when(mockDOMRpcService).invokeRpc(any(), any());
525
526         result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
527
528         assertNull(result.getResult());
529         assertEquals(rpcError, result.getErrors().iterator().next());
530
531         // RPC with response output and error.
532
533         doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
534                 .when(mockDOMRpcService).invokeRpc(any(), any());
535
536         final DOMRpcResult resultOutputError =
537                 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
538
539         assertEquals(outputNode, resultOutputError.getResult());
540         assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
541
542         // RPC failure.
543         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
544             .when(mockDOMRpcService).invokeRpc(any(), any());
545         final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
546
547         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
548         final Throwable cause = e.getCause();
549         assertThat(cause, instanceOf(DOMRpcException.class));
550         assertEquals("mock", cause.getMessage());
551     }
552
553     @Test
554     public void testSlaveInvokeAction() throws Exception {
555         final List<SourceIdentifier> sourceIdentifiers = Lists
556             .newArrayList(RevisionSourceIdentifier.create("testActionID"));
557         initializeMaster(sourceIdentifiers);
558         registerSlaveMountPoint();
559
560         ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
561         verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
562
563         final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
564         assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
565
566         final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
567         final Absolute schemaPath = Absolute.of(testQName);
568
569         final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier
570             .create(new YangInstanceIdentifier.NodeIdentifier(testQName));
571
572         final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
573             yangIIdPath);
574
575         final ContainerNode outputNode = ImmutableContainerNodeBuilder.create()
576             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
577             .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
578
579         // Action with no response output.
580         doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
581             .invokeAction(any(), any(), any());
582         DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
583             .get(2, TimeUnit.SECONDS);
584         assertEquals(null, result);
585
586         // Action with response output.
587         doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
588             .invokeAction(any(), any(), any());
589         result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
590             .get(2, TimeUnit.SECONDS);
591
592         assertEquals(outputNode, result.getOutput().get());
593         assertTrue(result.getErrors().isEmpty());
594
595         // Action failure.
596         doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
597             .when(mockDOMActionService).invokeAction(any(), any(), any());
598         final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
599             domDataTreeIdentifier, outputNode);
600
601         final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
602         final Throwable cause = e.getCause();
603         assertThat(cause, instanceOf(DOMActionException.class));
604         assertEquals("mock", cause.getMessage());
605     }
606
607     @Test
608     public void testSlaveNewTransactionRequests() {
609         doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
610         doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
611         doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
612
613         initializeMaster(Collections.emptyList());
614         registerSlaveMountPoint();
615
616         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
617         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
618
619         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
620         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
621
622         slaveDOMDataBroker.newReadOnlyTransaction();
623         verify(mockDOMDataBroker).newReadOnlyTransaction();
624
625         slaveDOMDataBroker.newReadWriteTransaction();
626         verify(mockDOMDataBroker).newReadWriteTransaction();
627
628         slaveDOMDataBroker.newWriteOnlyTransaction();
629         verify(mockDOMDataBroker).newWriteOnlyTransaction();
630     }
631
632     @Test
633     public void testSlaveNewNetconfDataTreeServiceRequest() {
634         initializeMaster(Collections.emptyList());
635         registerSlaveMountPoint();
636
637         ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
638         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
639
640         final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
641         assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
642
643         final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
644         final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
645         final ContainerNode NODE = Builders.containerBuilder()
646             .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont")))
647             .build();
648
649         final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
650         doReturn(result).when(netconfService).get(PATH);
651         doReturn(result).when(netconfService).getConfig(PATH);
652         doReturn(emptyFluentFuture()).when(netconfService).commit();
653
654         slaveNetconfService.get(PATH);
655         slaveNetconfService.getConfig(PATH);
656         slaveNetconfService.lock();
657         slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
658         slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
659         slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
660         slaveNetconfService.delete(STORE, PATH);
661         slaveNetconfService.remove(STORE, PATH);
662         slaveNetconfService.discardChanges();
663         slaveNetconfService.commit();
664
665         verify(netconfService, timeout(1000)).get(PATH);
666         verify(netconfService, timeout(1000)).getConfig(PATH);
667         verify(netconfService, timeout(1000)).lock();
668         verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
669         verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
670         verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
671         verify(netconfService, timeout(1000)).delete(STORE, PATH);
672         verify(netconfService, timeout(1000)).remove(STORE, PATH);
673         verify(netconfService, timeout(1000)).discardChanges();
674         verify(netconfService, timeout(1000)).commit();
675     }
676
677     private ActorRef registerSlaveMountPoint() {
678         SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
679         doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
680         doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
681         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
682                 NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2).setActorSystem(system)
683                 .setBaseSchemas(BASE_SCHEMAS).build(), remoteDeviceId, TIMEOUT, mockMountPointService));
684
685         doReturn(Futures.immediateFuture(mockSchemaContext))
686                 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
687
688         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
689                 masterRef), testKit.getRef());
690
691         verify(mockMountPointBuilder, timeout(5000)).register();
692         verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
693         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
694         verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
695         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
696         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
697
698         testKit.expectMsgClass(Success.class);
699         return slaveRef;
700     }
701
702     private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
703         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
704                 mockDOMRpcService, mockDOMActionService), testKit.getRef());
705         testKit.expectMsgClass(MasterActorDataInitialized.class);
706     }
707
708     private void resetMountPointMocks() {
709         reset(mockMountPointReg, mockMountPointBuilder);
710
711         doNothing().when(mockMountPointReg).close();
712
713         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
714         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
715     }
716
717     private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
718         return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
719     }
720
721     private static String convertStreamToString(final InputStream is) {
722         try (Scanner scanner = new Scanner(is)) {
723             return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
724         }
725     }
726 }