2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.CoreMatchers.startsWith;
12 import static org.hamcrest.MatcherAssert.assertThat;
13 import static org.junit.jupiter.api.Assertions.assertEquals;
14 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
15 import static org.junit.jupiter.api.Assertions.assertNull;
16 import static org.junit.jupiter.api.Assertions.assertThrows;
17 import static org.junit.jupiter.api.Assertions.assertTrue;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.ArgumentMatchers.anyCollection;
20 import static org.mockito.ArgumentMatchers.argThat;
21 import static org.mockito.ArgumentMatchers.eq;
22 import static org.mockito.Mockito.after;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.doNothing;
25 import static org.mockito.Mockito.doReturn;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.reset;
28 import static org.mockito.Mockito.timeout;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
33 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
35 import akka.actor.ActorRef;
36 import akka.actor.ActorSystem;
37 import akka.actor.Props;
38 import akka.actor.Status.Failure;
39 import akka.actor.Status.Success;
40 import akka.pattern.AskTimeoutException;
41 import akka.pattern.Patterns;
42 import akka.testkit.TestActorRef;
43 import akka.testkit.javadsl.TestKit;
44 import akka.util.Timeout;
45 import com.google.common.collect.ImmutableList;
46 import com.google.common.io.CharSource;
47 import com.google.common.net.InetAddresses;
48 import com.google.common.util.concurrent.FluentFuture;
49 import com.google.common.util.concurrent.Futures;
50 import com.google.common.util.concurrent.ListenableFuture;
51 import com.google.common.util.concurrent.SettableFuture;
52 import java.net.InetSocketAddress;
53 import java.time.Duration;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.Optional;
57 import java.util.concurrent.ExecutionException;
58 import java.util.concurrent.TimeUnit;
59 import org.junit.jupiter.api.AfterEach;
60 import org.junit.jupiter.api.BeforeEach;
61 import org.junit.jupiter.api.Test;
62 import org.junit.jupiter.api.extension.ExtendWith;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.junit.jupiter.MockitoExtension;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMActionException;
68 import org.opendaylight.mdsal.dom.api.DOMActionService;
69 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
74 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
75 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
76 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
77 import org.opendaylight.mdsal.dom.api.DOMRpcException;
78 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
79 import org.opendaylight.mdsal.dom.api.DOMRpcService;
80 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
81 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
82 import org.opendaylight.netconf.client.mdsal.api.DeviceNetconfSchemaProvider;
83 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
84 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
85 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
86 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
87 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
88 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
89 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
90 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
91 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
92 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
93 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
94 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
95 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
96 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
97 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
98 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
99 import org.opendaylight.yangtools.concepts.ObjectRegistration;
100 import org.opendaylight.yangtools.concepts.Registration;
101 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
102 import org.opendaylight.yangtools.yang.common.ErrorType;
103 import org.opendaylight.yangtools.yang.common.QName;
104 import org.opendaylight.yangtools.yang.common.RpcError;
105 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
106 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
107 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
108 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
109 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
110 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
111 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
112 import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
113 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
114 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
115 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
116 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
117 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
118 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
119 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
120 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
121 import org.opendaylight.yangtools.yang.model.spi.source.DelegatedYangTextSource;
122 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
123 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
124 import scala.concurrent.Await;
125 import scala.concurrent.Future;
127 @ExtendWith(MockitoExtension.class)
128 class NetconfNodeActorTest extends AbstractBaseSchemasTest {
130 private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
131 private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
132 private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
134 private ActorSystem system = ActorSystem.create();
135 private final TestKit testKit = new TestKit(system);
137 private ActorRef masterRef;
138 private RemoteDeviceId remoteDeviceId;
139 private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
142 private Rpcs.Normalized mockRpc;
144 private DOMRpcService mockDOMRpcService;
146 private Actions.Normalized mockDOMActionService;
148 private DOMMountPointService mockMountPointService;
150 private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
152 private ObjectRegistration<DOMMountPoint> mockMountPointReg;
154 private DOMDataBroker mockDOMDataBroker;
156 private NetconfDataTreeService netconfService;
158 private Registration mockSchemaSourceReg1;
160 private Registration mockSchemaSourceReg2;
162 private SchemaSourceRegistry mockRegistry;
164 private EffectiveModelContextFactory mockSchemaContextFactory;
166 private SchemaRepository mockSchemaRepository;
168 private EffectiveModelContext mockSchemaContext;
170 private DeviceNetconfSchemaProvider deviceSchemaProvider;
174 remoteDeviceId = new RemoteDeviceId("netconf-topology",
175 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
177 masterSchemaRepository.registerSchemaSourceListener(
178 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
180 final NetconfTopologySetup setup = NetconfTopologySetup.builder()
181 .setActorSystem(system)
182 .setIdleTimeout(Duration.ofSeconds(1))
183 .setDeviceSchemaProvider(deviceSchemaProvider)
184 .setBaseSchemaProvider(BASE_SCHEMAS)
187 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
189 masterRef = TestActorRef.create(system, props, "master_messages");
194 TestKit.shutdownActorSystem(system, true);
199 void testInitializeAndRefreshMasterData() {
201 // Test CreateInitialMasterActorData.
203 initializeMaster(new ArrayList<>());
205 // Test RefreshSetupMasterActorData.
207 final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
208 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
210 final NetconfTopologySetup newSetup = NetconfTopologySetup.builder()
211 .setBaseSchemaProvider(BASE_SCHEMAS)
212 .setDeviceSchemaProvider(deviceSchemaProvider)
213 .setActorSystem(system)
216 masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
218 testKit.expectMsgClass(MasterActorDataInitialized.class);
222 void testAskForMasterMountPoint() {
224 // Test with master not setup yet.
226 final TestKit kit = new TestKit(system);
228 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
230 final Failure failure = kit.expectMsgClass(Failure.class);
231 assertInstanceOf(NotMasterException.class, failure.cause());
233 // Now initialize - master should send the RegisterMountPoint message.
235 List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
236 initializeMaster(sourceIdentifiers);
238 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
240 final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
242 assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
246 void testRegisterAndUnregisterMountPoint() throws Exception {
247 resetMountPointMocks();
248 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
249 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
250 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
251 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
253 ActorRef slaveRef = registerSlaveMountPoint();
257 slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
259 verify(mockMountPointReg, timeout(5000)).close();
260 verify(mockSchemaSourceReg1, timeout(1000)).close();
261 verify(mockSchemaSourceReg2, timeout(1000)).close();
263 // Test registration with another interleaved registration that completes while the first registration
264 // is resolving the schema context.
266 reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
267 resetMountPointMocks();
269 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
271 final var newMockSchemaSourceReg = mock(Registration.class);
273 final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
274 doReturn(Futures.immediateFuture(mockSchemaContext))
275 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
278 SettableFuture<SchemaContext> future = SettableFuture.create();
280 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
281 withSourceId(SOURCE_IDENTIFIER1));
283 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
284 .createEffectiveModelContextFactory();
286 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
289 future.set(mockSchemaContext);
292 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
294 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
295 .createEffectiveModelContextFactory();
297 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
299 verify(mockMountPointBuilder, timeout(5000)).register();
300 verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
301 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
302 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
303 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
304 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
305 verify(mockSchemaSourceReg1).close();
306 verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
307 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
308 verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
310 // Stop the slave actor and verify schema source registrations are closed.
312 final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
313 Await.result(stopFuture, TIMEOUT.duration());
315 verify(mockMountPointReg).close();
316 verify(newMockSchemaSourceReg).close();
320 void testRegisterMountPointWithSchemaFailures() {
321 resetMountPointMocks();
322 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
323 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
324 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
325 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
327 var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
328 doReturn(mockRegistry).when(deviceSchemaProvider2).registry();
329 doReturn(mockSchemaRepository).when(deviceSchemaProvider2).repository();
330 final NetconfTopologySetup setup = NetconfTopologySetup.builder()
331 .setDeviceSchemaProvider(deviceSchemaProvider2)
332 .setBaseSchemaProvider(BASE_SCHEMAS)
333 .setActorSystem(system)
336 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
337 mockMountPointService));
339 // Test unrecoverable failure.
341 doReturn(Futures.immediateFailedFuture(
342 new SchemaResolutionException("mock", new SourceIdentifier("foo"), null)))
343 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
345 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
346 masterRef), testKit.getRef());
348 testKit.expectMsgClass(Success.class);
350 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
351 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
353 verify(mockMountPointBuilder, after(1000).never()).register();
354 verify(mockSchemaSourceReg1, timeout(1000)).close();
355 verify(mockSchemaSourceReg2, timeout(1000)).close();
357 // Test recoverable AskTimeoutException - schema context resolution should be retried.
359 reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
361 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock", new SourceIdentifier("foo"),
362 new AskTimeoutException("timeout"))))
363 .doReturn(Futures.immediateFuture(mockSchemaContext))
364 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
366 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
367 masterRef), testKit.getRef());
369 testKit.expectMsgClass(Success.class);
371 verify(mockMountPointBuilder, timeout(5000)).register();
372 verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
374 // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
375 // attempt should not be retried.
377 reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
378 resetMountPointMocks();
380 final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
381 doReturn(Futures.immediateFuture(mockSchemaContext))
382 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
385 SettableFuture<SchemaContext> future = SettableFuture.create();
387 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
388 .createEffectiveModelContextFactory();
390 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
391 masterRef), testKit.getRef());
393 future.setException(new SchemaResolutionException("mock", new SourceIdentifier("foo"),
394 new AskTimeoutException("timeout")));
397 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
399 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
401 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
402 masterRef), testKit.getRef());
404 verify(mockMountPointBuilder, timeout(5000)).register();
405 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
409 void testMissingSchemaSourceOnMissingProvider() {
410 final var repository = new SharedSchemaRepository("test");
412 final var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
413 doReturn(repository).when(deviceSchemaProvider2).repository();
414 final var setup = NetconfTopologySetup.builder()
415 .setActorSystem(system)
416 .setDeviceSchemaProvider(deviceSchemaProvider2)
417 .setIdleTimeout(Duration.ofSeconds(1))
418 .setBaseSchemaProvider(BASE_SCHEMAS)
420 final var props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
421 final var actor = TestActorRef.create(system, props, "master_messages_2");
423 final var sourceIdentifier = new SourceIdentifier("testID");
425 final var proxyYangProvider = new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
427 final var resolvedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
428 final var ex = assertThrows(MissingSchemaSourceException.class,
429 () -> Await.result(resolvedSchemaFuture, TIMEOUT.duration()));
430 assertEquals("No providers registered for source SourceIdentifier [testID]", ex.getMessage());
434 void testYangTextSchemaSourceRequest() throws Exception {
435 doReturn(masterSchemaRepository).when(deviceSchemaProvider).repository();
437 final var sourceIdentifier = new SourceIdentifier("testID");
439 final var proxyYangProvider = new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
441 final var yangTextSchemaSource = new DelegatedYangTextSource(sourceIdentifier, CharSource.wrap("YANG"));
445 try (var schemaSourceReg = masterSchemaRepository.registerSchemaSource(
446 id -> Futures.immediateFuture(yangTextSchemaSource),
447 PotentialSchemaSource.create(sourceIdentifier, YangTextSource.class, 1))) {
448 final var resolvedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
449 final var success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
451 assertEquals(sourceIdentifier, success.getRepresentation().sourceId());
452 assertEquals("YANG", success.getRepresentation().read());
455 // Test missing source failure.
457 final var failedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
458 final var ex = assertThrows(MissingSchemaSourceException.class,
459 () -> Await.result(failedSchemaFuture, TIMEOUT.duration()));
460 assertThat(ex.getMessage(), startsWith("No providers registered for source"));
461 assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
465 void testSlaveInvokeRpc() throws Exception {
466 resetMountPointMocks();
467 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
468 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
469 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
470 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
471 doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
473 initializeMaster(List.of(new SourceIdentifier("testID")));
474 registerSlaveMountPoint();
476 ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
477 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
479 final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
480 assertInstanceOf(ProxyDOMRpcService.class, slaveDomRPCService);
482 final QName testQName = QName.create("", "TestQname");
483 final ContainerNode outputNode = ImmutableNodes.newContainerBuilder()
484 .withNodeIdentifier(new NodeIdentifier(testQName))
485 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
486 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
488 // RPC with no response output.
490 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
492 DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
494 assertEquals(null, result);
496 // RPC with response output.
498 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
499 .when(mockDOMRpcService).invokeRpc(any(), any());
501 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
503 assertEquals(outputNode, result.value());
504 assertTrue(result.errors().isEmpty());
506 // RPC with response error.
508 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
509 .when(mockDOMRpcService).invokeRpc(any(), any());
511 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
513 assertNull(result.value());
514 assertEquals(rpcError, result.errors().iterator().next());
516 // RPC with response output and error.
518 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
519 .when(mockDOMRpcService).invokeRpc(any(), any());
521 final DOMRpcResult resultOutputError =
522 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
524 assertEquals(outputNode, resultOutputError.value());
525 assertEquals(rpcError, resultOutputError.errors().iterator().next());
528 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
529 .when(mockDOMRpcService).invokeRpc(any(), any());
530 final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
532 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
533 final Throwable cause = e.getCause();
534 assertInstanceOf(DOMRpcException.class, cause);
535 assertEquals("mock", cause.getMessage());
539 void testSlaveInvokeAction() throws Exception {
540 resetMountPointMocks();
541 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
542 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
543 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
544 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
545 doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
547 initializeMaster(List.of(new SourceIdentifier("testActionID")));
548 registerSlaveMountPoint();
550 final var domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
551 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
553 final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
554 assertInstanceOf(ProxyDOMActionService.class, slaveDomActionService);
556 final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
557 final Absolute schemaPath = Absolute.of(testQName);
559 final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier.of(testQName);
561 final DOMDataTreeIdentifier domDataTreeIdentifier = DOMDataTreeIdentifier.of(LogicalDatastoreType.OPERATIONAL,
564 final ContainerNode outputNode = ImmutableNodes.newContainerBuilder()
565 .withNodeIdentifier(new NodeIdentifier(testQName))
566 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
568 // Action with no response output.
569 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
570 .invokeAction(any(), any(), any());
571 var result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
572 .get(2, TimeUnit.SECONDS);
573 assertEquals(null, result);
575 // Action with response output.
576 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode))).when(mockDOMActionService)
577 .invokeAction(any(), any(), any());
578 result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
579 .get(2, TimeUnit.SECONDS);
581 assertEquals(outputNode, result.value());
582 assertTrue(result.errors().isEmpty());
585 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
586 .when(mockDOMActionService).invokeAction(any(), any(), any());
587 final var future = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode);
589 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
590 final Throwable cause = e.getCause();
591 assertInstanceOf(DOMActionException.class, cause);
592 assertEquals("mock", cause.getMessage());
596 void testSlaveNewTransactionRequests() {
597 resetMountPointMocks();
598 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
599 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
600 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
601 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
602 doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
604 doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
605 doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
606 doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
608 initializeMaster(List.of());
609 registerSlaveMountPoint();
611 final var domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
612 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
614 final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
615 assertInstanceOf(ProxyDOMDataBroker.class, slaveDOMDataBroker);
617 slaveDOMDataBroker.newReadOnlyTransaction();
618 verify(mockDOMDataBroker).newReadOnlyTransaction();
620 slaveDOMDataBroker.newReadWriteTransaction();
621 verify(mockDOMDataBroker).newReadWriteTransaction();
623 slaveDOMDataBroker.newWriteOnlyTransaction();
624 verify(mockDOMDataBroker).newWriteOnlyTransaction();
628 void testSlaveNewNetconfDataTreeServiceRequest() {
629 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
630 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
631 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
632 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
633 doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
635 initializeMaster(List.of());
636 registerSlaveMountPoint();
638 ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
639 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
641 final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
642 assertInstanceOf(ProxyNetconfDataTreeService.class, slaveNetconfService);
644 final YangInstanceIdentifier PATH = YangInstanceIdentifier.of();
645 final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
646 final ContainerNode NODE = ImmutableNodes.newContainerBuilder()
647 .withNodeIdentifier(new NodeIdentifier(QName.create("", "cont")))
650 final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
651 doReturn(result).when(netconfService).get(PATH);
652 doReturn(result).when(netconfService).getConfig(PATH);
653 doReturn(emptyFluentFuture()).when(netconfService).commit();
655 slaveNetconfService.get(PATH);
656 slaveNetconfService.getConfig(PATH);
657 slaveNetconfService.lock();
658 slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
659 slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
660 slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
661 slaveNetconfService.delete(STORE, PATH);
662 slaveNetconfService.remove(STORE, PATH);
663 slaveNetconfService.discardChanges();
664 slaveNetconfService.commit();
666 verify(netconfService, timeout(1000)).get(PATH);
667 verify(netconfService, timeout(1000)).getConfig(PATH);
668 verify(netconfService, timeout(1000)).lock();
669 verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
670 verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
671 verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
672 verify(netconfService, timeout(1000)).delete(STORE, PATH);
673 verify(netconfService, timeout(1000)).remove(STORE, PATH);
674 verify(netconfService, timeout(1000)).discardChanges();
675 verify(netconfService, timeout(1000)).commit();
678 private ActorRef registerSlaveMountPoint() {
679 var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
680 doReturn(mockRegistry).when(deviceSchemaProvider2).registry();
681 doReturn(mockSchemaRepository).when(deviceSchemaProvider2).repository();
682 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(NetconfTopologySetup.builder()
683 .setDeviceSchemaProvider(deviceSchemaProvider2)
684 .setActorSystem(system)
685 .setBaseSchemaProvider(BASE_SCHEMAS)
686 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
688 doReturn(Futures.immediateFuture(mockSchemaContext))
689 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
691 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
692 masterRef), testKit.getRef());
694 verify(mockMountPointBuilder, timeout(5000)).register();
695 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
696 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
697 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
698 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
699 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
701 testKit.expectMsgClass(Success.class);
705 private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
706 masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
707 new RemoteDeviceServices(mockRpc, mockDOMActionService)), testKit.getRef());
708 testKit.expectMsgClass(MasterActorDataInitialized.class);
711 private void resetMountPointMocks() {
712 reset(mockMountPointReg, mockMountPointBuilder);
714 doNothing().when(mockMountPointReg).close();
716 doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
717 doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
720 private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
721 return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));