2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.CoreMatchers.instanceOf;
12 import static org.hamcrest.CoreMatchers.startsWith;
13 import static org.hamcrest.MatcherAssert.assertThat;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertThrows;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.ArgumentMatchers.anyCollection;
20 import static org.mockito.ArgumentMatchers.argThat;
21 import static org.mockito.ArgumentMatchers.eq;
22 import static org.mockito.Mockito.after;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.doNothing;
25 import static org.mockito.Mockito.doReturn;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.reset;
28 import static org.mockito.Mockito.timeout;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
33 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
35 import akka.actor.ActorRef;
36 import akka.actor.ActorSystem;
37 import akka.actor.Props;
38 import akka.actor.Status.Failure;
39 import akka.actor.Status.Success;
40 import akka.pattern.AskTimeoutException;
41 import akka.pattern.Patterns;
42 import akka.testkit.TestActorRef;
43 import akka.testkit.javadsl.TestKit;
44 import akka.util.Timeout;
45 import com.google.common.collect.ImmutableList;
46 import com.google.common.io.CharSource;
47 import com.google.common.net.InetAddresses;
48 import com.google.common.util.concurrent.FluentFuture;
49 import com.google.common.util.concurrent.Futures;
50 import com.google.common.util.concurrent.ListenableFuture;
51 import com.google.common.util.concurrent.SettableFuture;
52 import java.net.InetSocketAddress;
53 import java.time.Duration;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.Optional;
57 import java.util.concurrent.ExecutionException;
58 import java.util.concurrent.TimeUnit;
59 import org.junit.After;
60 import org.junit.Before;
61 import org.junit.Test;
62 import org.junit.runner.RunWith;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.junit.MockitoJUnitRunner;
66 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
67 import org.opendaylight.mdsal.dom.api.DOMActionException;
68 import org.opendaylight.mdsal.dom.api.DOMActionResult;
69 import org.opendaylight.mdsal.dom.api.DOMActionService;
70 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
74 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
75 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
76 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
77 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
78 import org.opendaylight.mdsal.dom.api.DOMRpcException;
79 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
80 import org.opendaylight.mdsal.dom.api.DOMRpcService;
81 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
82 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
83 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
84 import org.opendaylight.netconf.client.mdsal.api.DeviceNetconfSchemaProvider;
85 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
86 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
87 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
88 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
89 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
90 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
91 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
92 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
93 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
94 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
95 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
96 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
97 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
98 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
99 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
100 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
101 import org.opendaylight.yangtools.concepts.ObjectRegistration;
102 import org.opendaylight.yangtools.concepts.Registration;
103 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
104 import org.opendaylight.yangtools.yang.common.ErrorType;
105 import org.opendaylight.yangtools.yang.common.QName;
106 import org.opendaylight.yangtools.yang.common.RpcError;
107 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
112 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
113 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
114 import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
115 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
116 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
117 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
118 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
119 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
120 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
121 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
122 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
123 import org.opendaylight.yangtools.yang.model.spi.source.DelegatedYangTextSource;
124 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
125 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
126 import scala.concurrent.Await;
127 import scala.concurrent.Future;
129 @RunWith(MockitoJUnitRunner.StrictStubs.class)
130 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
132 private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
133 private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
134 private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
136 private ActorSystem system = ActorSystem.create();
137 private final TestKit testKit = new TestKit(system);
139 private ActorRef masterRef;
140 private RemoteDeviceId remoteDeviceId;
141 private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
144 private Rpcs.Normalized mockRpc;
146 private DOMRpcService mockDOMRpcService;
148 private Actions.Normalized mockDOMActionService;
150 private DOMMountPointService mockMountPointService;
152 private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
154 private ObjectRegistration<DOMMountPoint> mockMountPointReg;
156 private DOMDataBroker mockDOMDataBroker;
158 private NetconfDataTreeService netconfService;
160 private Registration mockSchemaSourceReg1;
162 private Registration mockSchemaSourceReg2;
164 private SchemaSourceRegistry mockRegistry;
166 private EffectiveModelContextFactory mockSchemaContextFactory;
168 private SchemaRepository mockSchemaRepository;
170 private EffectiveModelContext mockSchemaContext;
172 private DeviceNetconfSchemaProvider deviceSchemaProvider;
175 public void setup() {
176 remoteDeviceId = new RemoteDeviceId("netconf-topology",
177 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
179 masterSchemaRepository.registerSchemaSourceListener(
180 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
182 final NetconfTopologySetup setup = NetconfTopologySetup.builder()
183 .setActorSystem(system)
184 .setIdleTimeout(Duration.ofSeconds(1))
185 .setDeviceSchemaProvider(deviceSchemaProvider)
186 .setBaseSchemaProvider(BASE_SCHEMAS)
189 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
191 masterRef = TestActorRef.create(system, props, "master_messages");
193 resetMountPointMocks();
195 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
197 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
198 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
200 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
202 doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
207 public void teardown() {
208 TestKit.shutdownActorSystem(system, true);
213 public void testInitializeAndRefreshMasterData() {
215 // Test CreateInitialMasterActorData.
217 initializeMaster(new ArrayList<>());
219 // Test RefreshSetupMasterActorData.
221 final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
222 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
224 final NetconfTopologySetup newSetup = NetconfTopologySetup.builder()
225 .setBaseSchemaProvider(BASE_SCHEMAS)
226 .setDeviceSchemaProvider(deviceSchemaProvider)
227 .setActorSystem(system)
230 masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
232 testKit.expectMsgClass(MasterActorDataInitialized.class);
236 public void testAskForMasterMountPoint() {
238 // Test with master not setup yet.
240 final TestKit kit = new TestKit(system);
242 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
244 final Failure failure = kit.expectMsgClass(Failure.class);
245 assertTrue(failure.cause() instanceof NotMasterException);
247 // Now initialize - master should send the RegisterMountPoint message.
249 List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
250 initializeMaster(sourceIdentifiers);
252 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
254 final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
256 assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
260 public void testRegisterAndUnregisterMountPoint() throws Exception {
262 ActorRef slaveRef = registerSlaveMountPoint();
266 slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
268 verify(mockMountPointReg, timeout(5000)).close();
269 verify(mockSchemaSourceReg1, timeout(1000)).close();
270 verify(mockSchemaSourceReg2, timeout(1000)).close();
272 // Test registration with another interleaved registration that completes while the first registration
273 // is resolving the schema context.
275 reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
276 resetMountPointMocks();
278 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
280 final var newMockSchemaSourceReg = mock(Registration.class);
282 final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
283 doReturn(Futures.immediateFuture(mockSchemaContext))
284 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
287 SettableFuture<SchemaContext> future = SettableFuture.create();
289 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
290 withSourceId(SOURCE_IDENTIFIER1));
292 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
293 .createEffectiveModelContextFactory();
295 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
298 future.set(mockSchemaContext);
301 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
303 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
304 .createEffectiveModelContextFactory();
306 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
308 verify(mockMountPointBuilder, timeout(5000)).register();
309 verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
310 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
311 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
312 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
313 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
314 verify(mockSchemaSourceReg1).close();
315 verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
316 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
317 verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
319 // Stop the slave actor and verify schema source registrations are closed.
321 final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
322 Await.result(stopFuture, TIMEOUT.duration());
324 verify(mockMountPointReg).close();
325 verify(newMockSchemaSourceReg).close();
328 // @SuppressWarnings("unchecked")
330 public void testRegisterMountPointWithSchemaFailures() throws Exception {
331 var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
332 doReturn(mockRegistry).when(deviceSchemaProvider2).registry();
333 doReturn(mockSchemaRepository).when(deviceSchemaProvider2).repository();
334 final NetconfTopologySetup setup = NetconfTopologySetup.builder()
335 .setDeviceSchemaProvider(deviceSchemaProvider2)
336 .setBaseSchemaProvider(BASE_SCHEMAS)
337 .setActorSystem(system)
340 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
341 mockMountPointService));
343 // Test unrecoverable failure.
345 doReturn(Futures.immediateFailedFuture(
346 new SchemaResolutionException("mock", new SourceIdentifier("foo"), null)))
347 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
349 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
350 masterRef), testKit.getRef());
352 testKit.expectMsgClass(Success.class);
354 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
355 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
357 verify(mockMountPointBuilder, after(1000).never()).register();
358 verify(mockSchemaSourceReg1, timeout(1000)).close();
359 verify(mockSchemaSourceReg2, timeout(1000)).close();
361 // Test recoverable AskTimeoutException - schema context resolution should be retried.
363 reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
365 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock", new SourceIdentifier("foo"),
366 new AskTimeoutException("timeout"))))
367 .doReturn(Futures.immediateFuture(mockSchemaContext))
368 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
370 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
371 masterRef), testKit.getRef());
373 testKit.expectMsgClass(Success.class);
375 verify(mockMountPointBuilder, timeout(5000)).register();
376 verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
378 // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
379 // attempt should not be retried.
381 reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
382 resetMountPointMocks();
384 final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
385 doReturn(Futures.immediateFuture(mockSchemaContext))
386 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
389 SettableFuture<SchemaContext> future = SettableFuture.create();
391 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
392 .createEffectiveModelContextFactory();
394 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
395 masterRef), testKit.getRef());
397 future.setException(new SchemaResolutionException("mock", new SourceIdentifier("foo"),
398 new AskTimeoutException("timeout")));
401 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
403 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
405 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
406 masterRef), testKit.getRef());
408 verify(mockMountPointBuilder, timeout(5000)).register();
409 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
413 public void testMissingSchemaSourceOnMissingProvider() throws Exception {
414 final var repository = new SharedSchemaRepository("test");
416 final var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
417 doReturn(repository).when(deviceSchemaProvider2).repository();
418 final var setup = NetconfTopologySetup.builder()
419 .setActorSystem(system)
420 .setDeviceSchemaProvider(deviceSchemaProvider2)
421 .setIdleTimeout(Duration.ofSeconds(1))
422 .setBaseSchemaProvider(BASE_SCHEMAS)
424 final var props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
425 final var actor = TestActorRef.create(system, props, "master_messages_2");
427 final var sourceIdentifier = new SourceIdentifier("testID");
429 final var proxyYangProvider = new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
431 final var resolvedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
432 final var ex = assertThrows(MissingSchemaSourceException.class,
433 () -> Await.result(resolvedSchemaFuture, TIMEOUT.duration()));
434 assertEquals("No providers registered for source SourceIdentifier [testID]", ex.getMessage());
438 public void testYangTextSchemaSourceRequest() throws Exception {
439 doReturn(masterSchemaRepository).when(deviceSchemaProvider).repository();
441 final var sourceIdentifier = new SourceIdentifier("testID");
443 final var proxyYangProvider = new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
445 final var yangTextSchemaSource = new DelegatedYangTextSource(sourceIdentifier, CharSource.wrap("YANG"));
449 try (var schemaSourceReg = masterSchemaRepository.registerSchemaSource(
450 id -> Futures.immediateFuture(yangTextSchemaSource),
451 PotentialSchemaSource.create(sourceIdentifier, YangTextSource.class, 1))) {
452 final var resolvedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
453 final var success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
455 assertEquals(sourceIdentifier, success.getRepresentation().sourceId());
456 assertEquals("YANG", success.getRepresentation().read());
459 // Test missing source failure.
461 final var failedSchemaFuture = proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
462 final var ex = assertThrows(MissingSchemaSourceException.class,
463 () -> Await.result(failedSchemaFuture, TIMEOUT.duration()));
464 assertThat(ex.getMessage(), startsWith("No providers registered for source"));
465 assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
469 public void testSlaveInvokeRpc() throws Exception {
471 initializeMaster(List.of(new SourceIdentifier("testID")));
472 registerSlaveMountPoint();
474 ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
475 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
477 final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
478 assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
480 final QName testQName = QName.create("", "TestQname");
481 final ContainerNode outputNode = ImmutableNodes.newContainerBuilder()
482 .withNodeIdentifier(new NodeIdentifier(testQName))
483 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
484 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
486 // RPC with no response output.
488 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
490 DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
492 assertEquals(null, result);
494 // RPC with response output.
496 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
497 .when(mockDOMRpcService).invokeRpc(any(), any());
499 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
501 assertEquals(outputNode, result.value());
502 assertTrue(result.errors().isEmpty());
504 // RPC with response error.
506 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
507 .when(mockDOMRpcService).invokeRpc(any(), any());
509 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
511 assertNull(result.value());
512 assertEquals(rpcError, result.errors().iterator().next());
514 // RPC with response output and error.
516 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
517 .when(mockDOMRpcService).invokeRpc(any(), any());
519 final DOMRpcResult resultOutputError =
520 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
522 assertEquals(outputNode, resultOutputError.value());
523 assertEquals(rpcError, resultOutputError.errors().iterator().next());
526 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
527 .when(mockDOMRpcService).invokeRpc(any(), any());
528 final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
530 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
531 final Throwable cause = e.getCause();
532 assertThat(cause, instanceOf(DOMRpcException.class));
533 assertEquals("mock", cause.getMessage());
537 public void testSlaveInvokeAction() throws Exception {
538 initializeMaster(List.of(new SourceIdentifier("testActionID")));
539 registerSlaveMountPoint();
541 ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
542 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
544 final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
545 assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
547 final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
548 final Absolute schemaPath = Absolute.of(testQName);
550 final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier.of(testQName);
552 final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
555 final ContainerNode outputNode = ImmutableNodes.newContainerBuilder()
556 .withNodeIdentifier(new NodeIdentifier(testQName))
557 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
559 // Action with no response output.
560 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
561 .invokeAction(any(), any(), any());
562 DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
563 .get(2, TimeUnit.SECONDS);
564 assertEquals(null, result);
566 // Action with response output.
567 doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
568 .invokeAction(any(), any(), any());
569 result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
570 .get(2, TimeUnit.SECONDS);
572 assertEquals(Optional.of(outputNode), result.getOutput());
573 assertTrue(result.getErrors().isEmpty());
576 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
577 .when(mockDOMActionService).invokeAction(any(), any(), any());
578 final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
579 domDataTreeIdentifier, outputNode);
581 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
582 final Throwable cause = e.getCause();
583 assertThat(cause, instanceOf(DOMActionException.class));
584 assertEquals("mock", cause.getMessage());
588 public void testSlaveNewTransactionRequests() {
589 doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
590 doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
591 doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
593 initializeMaster(List.of());
594 registerSlaveMountPoint();
596 final var domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
597 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
599 final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
600 assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
602 slaveDOMDataBroker.newReadOnlyTransaction();
603 verify(mockDOMDataBroker).newReadOnlyTransaction();
605 slaveDOMDataBroker.newReadWriteTransaction();
606 verify(mockDOMDataBroker).newReadWriteTransaction();
608 slaveDOMDataBroker.newWriteOnlyTransaction();
609 verify(mockDOMDataBroker).newWriteOnlyTransaction();
613 public void testSlaveNewNetconfDataTreeServiceRequest() {
614 initializeMaster(List.of());
615 registerSlaveMountPoint();
617 ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
618 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
620 final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
621 assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
623 final YangInstanceIdentifier PATH = YangInstanceIdentifier.of();
624 final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
625 final ContainerNode NODE = ImmutableNodes.newContainerBuilder()
626 .withNodeIdentifier(new NodeIdentifier(QName.create("", "cont")))
629 final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
630 doReturn(result).when(netconfService).get(PATH);
631 doReturn(result).when(netconfService).getConfig(PATH);
632 doReturn(emptyFluentFuture()).when(netconfService).commit();
634 slaveNetconfService.get(PATH);
635 slaveNetconfService.getConfig(PATH);
636 slaveNetconfService.lock();
637 slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
638 slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
639 slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
640 slaveNetconfService.delete(STORE, PATH);
641 slaveNetconfService.remove(STORE, PATH);
642 slaveNetconfService.discardChanges();
643 slaveNetconfService.commit();
645 verify(netconfService, timeout(1000)).get(PATH);
646 verify(netconfService, timeout(1000)).getConfig(PATH);
647 verify(netconfService, timeout(1000)).lock();
648 verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
649 verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
650 verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
651 verify(netconfService, timeout(1000)).delete(STORE, PATH);
652 verify(netconfService, timeout(1000)).remove(STORE, PATH);
653 verify(netconfService, timeout(1000)).discardChanges();
654 verify(netconfService, timeout(1000)).commit();
657 private ActorRef registerSlaveMountPoint() {
658 var deviceSchemaProvider2 = mock(DeviceNetconfSchemaProvider.class);
659 doReturn(mockRegistry).when(deviceSchemaProvider2).registry();
660 doReturn(mockSchemaRepository).when(deviceSchemaProvider2).repository();
661 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(NetconfTopologySetup.builder()
662 .setDeviceSchemaProvider(deviceSchemaProvider2)
663 .setActorSystem(system)
664 .setBaseSchemaProvider(BASE_SCHEMAS)
665 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
667 doReturn(Futures.immediateFuture(mockSchemaContext))
668 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
670 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
671 masterRef), testKit.getRef());
673 verify(mockMountPointBuilder, timeout(5000)).register();
674 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
675 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
676 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
677 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
678 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
680 testKit.expectMsgClass(Success.class);
684 private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
685 masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
686 new RemoteDeviceServices(mockRpc, mockDOMActionService)), testKit.getRef());
687 testKit.expectMsgClass(MasterActorDataInitialized.class);
690 private void resetMountPointMocks() {
691 reset(mockMountPointReg, mockMountPointBuilder);
693 doNothing().when(mockMountPointReg).close();
695 doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
696 doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
699 private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
700 return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));