2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.hamcrest.CoreMatchers.instanceOf;
13 import static org.hamcrest.CoreMatchers.startsWith;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertNull;
17 import static org.junit.Assert.assertThrows;
18 import static org.junit.Assert.assertTrue;
19 import static org.mockito.ArgumentMatchers.any;
20 import static org.mockito.ArgumentMatchers.anyCollection;
21 import static org.mockito.ArgumentMatchers.argThat;
22 import static org.mockito.ArgumentMatchers.eq;
23 import static org.mockito.Mockito.after;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.doNothing;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.timeout;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
33 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
34 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
36 import akka.actor.ActorRef;
37 import akka.actor.ActorSystem;
38 import akka.actor.Props;
39 import akka.actor.Status.Failure;
40 import akka.actor.Status.Success;
41 import akka.pattern.AskTimeoutException;
42 import akka.pattern.Patterns;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.collect.ImmutableList;
47 import com.google.common.io.ByteSource;
48 import com.google.common.net.InetAddresses;
49 import com.google.common.util.concurrent.FluentFuture;
50 import com.google.common.util.concurrent.Futures;
51 import com.google.common.util.concurrent.ListenableFuture;
52 import com.google.common.util.concurrent.SettableFuture;
53 import java.io.InputStream;
54 import java.net.InetSocketAddress;
55 import java.time.Duration;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Optional;
59 import java.util.Scanner;
60 import java.util.concurrent.ExecutionException;
61 import java.util.concurrent.TimeUnit;
62 import org.junit.After;
63 import org.junit.Before;
64 import org.junit.Test;
65 import org.junit.runner.RunWith;
66 import org.mockito.ArgumentCaptor;
67 import org.mockito.Mock;
68 import org.mockito.junit.MockitoJUnitRunner;
69 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
70 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
71 import org.opendaylight.mdsal.dom.api.DOMActionException;
72 import org.opendaylight.mdsal.dom.api.DOMActionResult;
73 import org.opendaylight.mdsal.dom.api.DOMActionService;
74 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
76 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
77 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
79 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
80 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
81 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
82 import org.opendaylight.mdsal.dom.api.DOMRpcException;
83 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
84 import org.opendaylight.mdsal.dom.api.DOMRpcService;
85 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
86 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
87 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
88 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
89 import org.opendaylight.netconf.client.mdsal.api.CredentialProvider;
90 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
91 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
92 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
93 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
94 import org.opendaylight.netconf.client.mdsal.api.SslHandlerFactoryProvider;
95 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
96 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
97 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
98 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
99 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
100 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
101 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
102 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
103 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
104 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
105 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
106 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
107 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
108 import org.opendaylight.yangtools.concepts.ObjectRegistration;
109 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
110 import org.opendaylight.yangtools.yang.common.ErrorType;
111 import org.opendaylight.yangtools.yang.common.QName;
112 import org.opendaylight.yangtools.yang.common.RpcError;
113 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
114 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
115 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
116 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
117 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
118 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
119 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
120 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
121 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
122 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
123 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
124 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
125 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
126 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
127 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
128 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
129 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
130 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
131 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
132 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
133 import scala.concurrent.Await;
134 import scala.concurrent.Future;
136 @RunWith(MockitoJUnitRunner.StrictStubs.class)
137 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
139 private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
140 private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
141 private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
143 private ActorSystem system = ActorSystem.create();
144 private final TestKit testKit = new TestKit(system);
146 private ActorRef masterRef;
147 private RemoteDeviceId remoteDeviceId;
148 private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
151 private Rpcs.Normalized mockDOMRpcService;
153 private Actions.Normalized mockDOMActionService;
155 private DOMMountPointService mockMountPointService;
157 private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
159 private ObjectRegistration<DOMMountPoint> mockMountPointReg;
161 private DOMDataBroker mockDOMDataBroker;
163 private NetconfDataTreeService netconfService;
165 private SchemaSourceRegistration<?> mockSchemaSourceReg1;
167 private SchemaSourceRegistration<?> mockSchemaSourceReg2;
169 private SchemaSourceRegistry mockRegistry;
171 private EffectiveModelContextFactory mockSchemaContextFactory;
173 private SchemaRepository mockSchemaRepository;
175 private EffectiveModelContext mockSchemaContext;
177 private SchemaResourcesDTO schemaResourceDTO;
179 private CredentialProvider credentialProvider;
181 private SslHandlerFactoryProvider sslHandlerFactoryProvider;
184 public void setup() {
185 remoteDeviceId = new RemoteDeviceId("netconf-topology",
186 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
188 masterSchemaRepository.registerSchemaSourceListener(
189 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
191 doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
192 doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
193 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
194 .setActorSystem(system)
195 .setIdleTimeout(Duration.ofSeconds(1))
196 .setSchemaResourceDTO(schemaResourceDTO)
197 .setBaseSchemas(BASE_SCHEMAS)
198 .setCredentialProvider(credentialProvider)
199 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
202 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
204 masterRef = TestActorRef.create(system, props, "master_messages");
206 resetMountPointMocks();
208 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
210 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
211 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
213 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
214 .createEffectiveModelContextFactory();
218 public void teardown() {
219 TestKit.shutdownActorSystem(system, true);
224 public void testInitializeAndRefreshMasterData() {
226 // Test CreateInitialMasterActorData.
228 initializeMaster(new ArrayList<>());
230 // Test RefreshSetupMasterActorData.
232 final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
233 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
235 final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create()
236 .setBaseSchemas(BASE_SCHEMAS)
237 .setSchemaResourceDTO(schemaResourceDTO)
238 .setActorSystem(system)
239 .setCredentialProvider(credentialProvider)
240 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
243 masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
245 testKit.expectMsgClass(MasterActorDataInitialized.class);
249 public void testAskForMasterMountPoint() {
251 // Test with master not setup yet.
253 final TestKit kit = new TestKit(system);
255 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
257 final Failure failure = kit.expectMsgClass(Failure.class);
258 assertTrue(failure.cause() instanceof NotMasterException);
260 // Now initialize - master should send the RegisterMountPoint message.
262 List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
263 initializeMaster(sourceIdentifiers);
265 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
267 final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
269 assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
273 public void testRegisterAndUnregisterMountPoint() throws Exception {
275 ActorRef slaveRef = registerSlaveMountPoint();
279 slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
281 verify(mockMountPointReg, timeout(5000)).close();
282 verify(mockSchemaSourceReg1, timeout(1000)).close();
283 verify(mockSchemaSourceReg2, timeout(1000)).close();
285 // Test registration with another interleaved registration that completes while the first registration
286 // is resolving the schema context.
288 reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
289 resetMountPointMocks();
291 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
293 final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
295 final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
296 doReturn(Futures.immediateFuture(mockSchemaContext))
297 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
300 SettableFuture<SchemaContext> future = SettableFuture.create();
302 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
303 withSourceId(SOURCE_IDENTIFIER1));
305 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
306 .createEffectiveModelContextFactory();
308 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
311 future.set(mockSchemaContext);
314 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
316 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
317 .createEffectiveModelContextFactory();
319 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
321 verify(mockMountPointBuilder, timeout(5000)).register();
322 verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
323 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
324 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
325 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
326 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
327 verify(mockSchemaSourceReg1).close();
328 verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
329 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
330 verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
332 // Stop the slave actor and verify schema source registrations are closed.
334 final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
335 Await.result(stopFuture, TIMEOUT.duration());
337 verify(mockMountPointReg).close();
338 verify(newMockSchemaSourceReg).close();
341 @SuppressWarnings("unchecked")
343 public void testRegisterMountPointWithSchemaFailures() throws Exception {
344 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
345 doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
346 doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
347 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
348 .setSchemaResourceDTO(schemaResourceDTO2)
349 .setBaseSchemas(BASE_SCHEMAS)
350 .setActorSystem(system)
351 .setCredentialProvider(credentialProvider)
352 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
355 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
356 mockMountPointService));
358 // Test unrecoverable failure.
360 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
361 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
363 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
364 masterRef), testKit.getRef());
366 testKit.expectMsgClass(Success.class);
368 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
369 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
371 verify(mockMountPointBuilder, after(1000).never()).register();
372 verify(mockSchemaSourceReg1, timeout(1000)).close();
373 verify(mockSchemaSourceReg2, timeout(1000)).close();
375 // Test recoverable AskTimeoutException - schema context resolution should be retried.
377 reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
379 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
380 new AskTimeoutException("timeout"))))
381 .doReturn(Futures.immediateFuture(mockSchemaContext))
382 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
384 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
385 masterRef), testKit.getRef());
387 testKit.expectMsgClass(Success.class);
389 verify(mockMountPointBuilder, timeout(5000)).register();
390 verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
392 // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
393 // attempt should not be retried.
395 reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
396 resetMountPointMocks();
398 final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
399 doReturn(Futures.immediateFuture(mockSchemaContext))
400 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
403 SettableFuture<SchemaContext> future = SettableFuture.create();
405 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
406 .createEffectiveModelContextFactory();
408 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
409 masterRef), testKit.getRef());
411 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
414 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
416 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
418 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
419 masterRef), testKit.getRef());
421 verify(mockMountPointBuilder, timeout(5000)).register();
422 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
425 @Test(expected = MissingSchemaSourceException.class)
426 public void testMissingSchemaSourceOnMissingProvider() throws Exception {
427 final SharedSchemaRepository repository = new SharedSchemaRepository("test");
429 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
430 doReturn(repository).when(schemaResourceDTO2).getSchemaRegistry();
431 doReturn(repository).when(schemaResourceDTO2).getSchemaRepository();
432 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
433 .setActorSystem(system)
434 .setSchemaResourceDTO(schemaResourceDTO2)
435 .setIdleTimeout(Duration.ofSeconds(1))
436 .setBaseSchemas(BASE_SCHEMAS)
437 .setCredentialProvider(credentialProvider)
438 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
440 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
441 ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
443 final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
445 final ProxyYangTextSourceProvider proxyYangProvider =
446 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
448 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
449 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
450 Await.result(resolvedSchemaFuture, TIMEOUT.duration());
454 public void testYangTextSchemaSourceRequest() throws Exception {
455 final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
457 final ProxyYangTextSourceProvider proxyYangProvider =
458 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
460 final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
461 ByteSource.wrap("YANG".getBytes(UTF_8)));
465 final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
466 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
467 PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
469 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
470 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
472 final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
474 assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
475 assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
477 // Test missing source failure.
479 schemaSourceReg.close();
481 final MissingSchemaSourceException ex = assertThrows(MissingSchemaSourceException.class,
483 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
484 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
485 Await.result(failedSchemaFuture, TIMEOUT.duration());
487 assertThat(ex.getMessage(), startsWith("No providers registered for source"));
488 assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
492 public void testSlaveInvokeRpc() throws Exception {
494 initializeMaster(List.of(new SourceIdentifier("testID")));
495 registerSlaveMountPoint();
497 ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
498 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
500 final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
501 assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
503 final QName testQName = QName.create("", "TestQname");
504 final ContainerNode outputNode = Builders.containerBuilder()
505 .withNodeIdentifier(new NodeIdentifier(testQName))
506 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
507 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
509 // RPC with no response output.
511 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
513 DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
515 assertEquals(null, result);
517 // RPC with response output.
519 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
520 .when(mockDOMRpcService).invokeRpc(any(), any());
522 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
524 assertEquals(outputNode, result.value());
525 assertTrue(result.errors().isEmpty());
527 // RPC with response error.
529 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
530 .when(mockDOMRpcService).invokeRpc(any(), any());
532 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
534 assertNull(result.value());
535 assertEquals(rpcError, result.errors().iterator().next());
537 // RPC with response output and error.
539 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
540 .when(mockDOMRpcService).invokeRpc(any(), any());
542 final DOMRpcResult resultOutputError =
543 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
545 assertEquals(outputNode, resultOutputError.value());
546 assertEquals(rpcError, resultOutputError.errors().iterator().next());
549 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
550 .when(mockDOMRpcService).invokeRpc(any(), any());
551 final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
553 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
554 final Throwable cause = e.getCause();
555 assertThat(cause, instanceOf(DOMRpcException.class));
556 assertEquals("mock", cause.getMessage());
560 public void testSlaveInvokeAction() throws Exception {
561 initializeMaster(List.of(new SourceIdentifier("testActionID")));
562 registerSlaveMountPoint();
564 ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
565 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
567 final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
568 assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
570 final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
571 final Absolute schemaPath = Absolute.of(testQName);
573 final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier.create(new NodeIdentifier(testQName));
575 final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
578 final ContainerNode outputNode = Builders.containerBuilder()
579 .withNodeIdentifier(new NodeIdentifier(testQName))
580 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
582 // Action with no response output.
583 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
584 .invokeAction(any(), any(), any());
585 DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
586 .get(2, TimeUnit.SECONDS);
587 assertEquals(null, result);
589 // Action with response output.
590 doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
591 .invokeAction(any(), any(), any());
592 result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
593 .get(2, TimeUnit.SECONDS);
595 assertEquals(Optional.of(outputNode), result.getOutput());
596 assertTrue(result.getErrors().isEmpty());
599 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
600 .when(mockDOMActionService).invokeAction(any(), any(), any());
601 final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
602 domDataTreeIdentifier, outputNode);
604 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
605 final Throwable cause = e.getCause();
606 assertThat(cause, instanceOf(DOMActionException.class));
607 assertEquals("mock", cause.getMessage());
611 public void testSlaveNewTransactionRequests() {
612 doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
613 doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
614 doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
616 initializeMaster(List.of());
617 registerSlaveMountPoint();
619 ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
620 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
622 final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
623 assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
625 slaveDOMDataBroker.newReadOnlyTransaction();
626 verify(mockDOMDataBroker).newReadOnlyTransaction();
628 slaveDOMDataBroker.newReadWriteTransaction();
629 verify(mockDOMDataBroker).newReadWriteTransaction();
631 slaveDOMDataBroker.newWriteOnlyTransaction();
632 verify(mockDOMDataBroker).newWriteOnlyTransaction();
636 public void testSlaveNewNetconfDataTreeServiceRequest() {
637 initializeMaster(List.of());
638 registerSlaveMountPoint();
640 ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
641 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
643 final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
644 assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
646 final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
647 final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
648 final ContainerNode NODE = Builders.containerBuilder()
649 .withNodeIdentifier(new NodeIdentifier(QName.create("", "cont")))
652 final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
653 doReturn(result).when(netconfService).get(PATH);
654 doReturn(result).when(netconfService).getConfig(PATH);
655 doReturn(emptyFluentFuture()).when(netconfService).commit();
657 slaveNetconfService.get(PATH);
658 slaveNetconfService.getConfig(PATH);
659 slaveNetconfService.lock();
660 slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
661 slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
662 slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
663 slaveNetconfService.delete(STORE, PATH);
664 slaveNetconfService.remove(STORE, PATH);
665 slaveNetconfService.discardChanges();
666 slaveNetconfService.commit();
668 verify(netconfService, timeout(1000)).get(PATH);
669 verify(netconfService, timeout(1000)).getConfig(PATH);
670 verify(netconfService, timeout(1000)).lock();
671 verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
672 verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
673 verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
674 verify(netconfService, timeout(1000)).delete(STORE, PATH);
675 verify(netconfService, timeout(1000)).remove(STORE, PATH);
676 verify(netconfService, timeout(1000)).discardChanges();
677 verify(netconfService, timeout(1000)).commit();
680 private ActorRef registerSlaveMountPoint() {
681 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
682 doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
683 doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
684 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(NetconfTopologySetupBuilder.create()
685 .setSchemaResourceDTO(schemaResourceDTO2)
686 .setActorSystem(system)
687 .setBaseSchemas(BASE_SCHEMAS)
688 .setCredentialProvider(credentialProvider)
689 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
690 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
692 doReturn(Futures.immediateFuture(mockSchemaContext))
693 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
695 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
696 masterRef), testKit.getRef());
698 verify(mockMountPointBuilder, timeout(5000)).register();
699 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
700 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
701 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
702 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
703 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
705 testKit.expectMsgClass(Success.class);
709 private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
710 masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
711 new RemoteDeviceServices(mockDOMRpcService, mockDOMActionService)), testKit.getRef());
712 testKit.expectMsgClass(MasterActorDataInitialized.class);
715 private void resetMountPointMocks() {
716 reset(mockMountPointReg, mockMountPointBuilder);
718 doNothing().when(mockMountPointReg).close();
720 doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
721 doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
724 private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
725 return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
728 private static String convertStreamToString(final InputStream is) {
729 try (Scanner scanner = new Scanner(is)) {
730 return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";