2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static org.hamcrest.CoreMatchers.containsString;
11 import static org.hamcrest.CoreMatchers.instanceOf;
12 import static org.hamcrest.CoreMatchers.startsWith;
13 import static org.hamcrest.MatcherAssert.assertThat;
14 import static org.junit.Assert.assertEquals;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertThrows;
17 import static org.junit.Assert.assertTrue;
18 import static org.mockito.ArgumentMatchers.any;
19 import static org.mockito.ArgumentMatchers.anyCollection;
20 import static org.mockito.ArgumentMatchers.argThat;
21 import static org.mockito.ArgumentMatchers.eq;
22 import static org.mockito.Mockito.after;
23 import static org.mockito.Mockito.doAnswer;
24 import static org.mockito.Mockito.doNothing;
25 import static org.mockito.Mockito.doReturn;
26 import static org.mockito.Mockito.mock;
27 import static org.mockito.Mockito.reset;
28 import static org.mockito.Mockito.timeout;
29 import static org.mockito.Mockito.times;
30 import static org.mockito.Mockito.verify;
31 import static org.mockito.Mockito.verifyNoMoreInteractions;
32 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
33 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
35 import akka.actor.ActorRef;
36 import akka.actor.ActorSystem;
37 import akka.actor.Props;
38 import akka.actor.Status.Failure;
39 import akka.actor.Status.Success;
40 import akka.pattern.AskTimeoutException;
41 import akka.pattern.Patterns;
42 import akka.testkit.TestActorRef;
43 import akka.testkit.javadsl.TestKit;
44 import akka.util.Timeout;
45 import com.google.common.collect.ImmutableList;
46 import com.google.common.io.CharSource;
47 import com.google.common.net.InetAddresses;
48 import com.google.common.util.concurrent.FluentFuture;
49 import com.google.common.util.concurrent.Futures;
50 import com.google.common.util.concurrent.ListenableFuture;
51 import com.google.common.util.concurrent.SettableFuture;
52 import java.net.InetSocketAddress;
53 import java.time.Duration;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.Optional;
57 import java.util.concurrent.ExecutionException;
58 import java.util.concurrent.TimeUnit;
59 import org.junit.After;
60 import org.junit.Before;
61 import org.junit.Test;
62 import org.junit.runner.RunWith;
63 import org.mockito.ArgumentCaptor;
64 import org.mockito.Mock;
65 import org.mockito.junit.MockitoJUnitRunner;
66 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
67 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
68 import org.opendaylight.mdsal.dom.api.DOMActionException;
69 import org.opendaylight.mdsal.dom.api.DOMActionResult;
70 import org.opendaylight.mdsal.dom.api.DOMActionService;
71 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
74 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
76 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
77 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
78 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
79 import org.opendaylight.mdsal.dom.api.DOMRpcException;
80 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
81 import org.opendaylight.mdsal.dom.api.DOMRpcService;
82 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
83 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
84 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
85 import org.opendaylight.netconf.client.mdsal.NetconfDevice.SchemaResourcesDTO;
86 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
87 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
88 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
89 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
90 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
91 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
92 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
93 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
94 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
95 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
96 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
97 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
98 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
99 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
100 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
101 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
102 import org.opendaylight.yangtools.concepts.ObjectRegistration;
103 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
104 import org.opendaylight.yangtools.yang.common.ErrorType;
105 import org.opendaylight.yangtools.yang.common.QName;
106 import org.opendaylight.yangtools.yang.common.RpcError;
107 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
108 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
112 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
113 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
114 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
115 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
116 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
117 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
118 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
119 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
120 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
121 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
122 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
123 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
124 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
125 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
126 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
127 import scala.concurrent.Await;
128 import scala.concurrent.Future;
130 @RunWith(MockitoJUnitRunner.StrictStubs.class)
131 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
133 private static final Timeout TIMEOUT = Timeout.create(Duration.ofSeconds(5));
134 private static final SourceIdentifier SOURCE_IDENTIFIER1 = new SourceIdentifier("yang1");
135 private static final SourceIdentifier SOURCE_IDENTIFIER2 = new SourceIdentifier("yang2");
137 private ActorSystem system = ActorSystem.create();
138 private final TestKit testKit = new TestKit(system);
140 private ActorRef masterRef;
141 private RemoteDeviceId remoteDeviceId;
142 private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
145 private Rpcs.Normalized mockRpc;
147 private DOMRpcService mockDOMRpcService;
149 private Actions.Normalized mockDOMActionService;
151 private DOMMountPointService mockMountPointService;
153 private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
155 private ObjectRegistration<DOMMountPoint> mockMountPointReg;
157 private DOMDataBroker mockDOMDataBroker;
159 private NetconfDataTreeService netconfService;
161 private SchemaSourceRegistration<?> mockSchemaSourceReg1;
163 private SchemaSourceRegistration<?> mockSchemaSourceReg2;
165 private SchemaSourceRegistry mockRegistry;
167 private EffectiveModelContextFactory mockSchemaContextFactory;
169 private SchemaRepository mockSchemaRepository;
171 private EffectiveModelContext mockSchemaContext;
173 private SchemaResourcesDTO schemaResourceDTO;
176 public void setup() {
177 remoteDeviceId = new RemoteDeviceId("netconf-topology",
178 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
180 masterSchemaRepository.registerSchemaSourceListener(
181 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
183 doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
184 doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
185 final NetconfTopologySetup setup = NetconfTopologySetup.builder()
186 .setActorSystem(system)
187 .setIdleTimeout(Duration.ofSeconds(1))
188 .setSchemaResourceDTO(schemaResourceDTO)
189 .setBaseSchemas(BASE_SCHEMAS)
192 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
194 masterRef = TestActorRef.create(system, props, "master_messages");
196 resetMountPointMocks();
198 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
200 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
201 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
203 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
205 doReturn(mockDOMRpcService).when(mockRpc).domRpcService();
210 public void teardown() {
211 TestKit.shutdownActorSystem(system, true);
216 public void testInitializeAndRefreshMasterData() {
218 // Test CreateInitialMasterActorData.
220 initializeMaster(new ArrayList<>());
222 // Test RefreshSetupMasterActorData.
224 final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
225 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
227 final NetconfTopologySetup newSetup = NetconfTopologySetup.builder()
228 .setBaseSchemas(BASE_SCHEMAS)
229 .setSchemaResourceDTO(schemaResourceDTO)
230 .setActorSystem(system)
233 masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
235 testKit.expectMsgClass(MasterActorDataInitialized.class);
239 public void testAskForMasterMountPoint() {
241 // Test with master not setup yet.
243 final TestKit kit = new TestKit(system);
245 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
247 final Failure failure = kit.expectMsgClass(Failure.class);
248 assertTrue(failure.cause() instanceof NotMasterException);
250 // Now initialize - master should send the RegisterMountPoint message.
252 List<SourceIdentifier> sourceIdentifiers = List.of(new SourceIdentifier("testID"));
253 initializeMaster(sourceIdentifiers);
255 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
257 final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
259 assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
263 public void testRegisterAndUnregisterMountPoint() throws Exception {
265 ActorRef slaveRef = registerSlaveMountPoint();
269 slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
271 verify(mockMountPointReg, timeout(5000)).close();
272 verify(mockSchemaSourceReg1, timeout(1000)).close();
273 verify(mockSchemaSourceReg2, timeout(1000)).close();
275 // Test registration with another interleaved registration that completes while the first registration
276 // is resolving the schema context.
278 reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
279 resetMountPointMocks();
281 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
283 final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
285 final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
286 doReturn(Futures.immediateFuture(mockSchemaContext))
287 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
290 SettableFuture<SchemaContext> future = SettableFuture.create();
292 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
293 withSourceId(SOURCE_IDENTIFIER1));
295 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
296 .createEffectiveModelContextFactory();
298 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
301 future.set(mockSchemaContext);
304 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
306 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
307 .createEffectiveModelContextFactory();
309 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
311 verify(mockMountPointBuilder, timeout(5000)).register();
312 verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
313 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
314 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
315 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
316 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
317 verify(mockSchemaSourceReg1).close();
318 verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
319 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
320 verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
322 // Stop the slave actor and verify schema source registrations are closed.
324 final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
325 Await.result(stopFuture, TIMEOUT.duration());
327 verify(mockMountPointReg).close();
328 verify(newMockSchemaSourceReg).close();
331 @SuppressWarnings("unchecked")
333 public void testRegisterMountPointWithSchemaFailures() throws Exception {
334 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
335 doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
336 doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
337 final NetconfTopologySetup setup = NetconfTopologySetup.builder()
338 .setSchemaResourceDTO(schemaResourceDTO2)
339 .setBaseSchemas(BASE_SCHEMAS)
340 .setActorSystem(system)
343 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
344 mockMountPointService));
346 // Test unrecoverable failure.
348 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
349 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
351 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
352 masterRef), testKit.getRef());
354 testKit.expectMsgClass(Success.class);
356 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
357 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
359 verify(mockMountPointBuilder, after(1000).never()).register();
360 verify(mockSchemaSourceReg1, timeout(1000)).close();
361 verify(mockSchemaSourceReg2, timeout(1000)).close();
363 // Test recoverable AskTimeoutException - schema context resolution should be retried.
365 reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
367 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
368 new AskTimeoutException("timeout"))))
369 .doReturn(Futures.immediateFuture(mockSchemaContext))
370 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
372 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
373 masterRef), testKit.getRef());
375 testKit.expectMsgClass(Success.class);
377 verify(mockMountPointBuilder, timeout(5000)).register();
378 verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
380 // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
381 // attempt should not be retried.
383 reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
384 resetMountPointMocks();
386 final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
387 doReturn(Futures.immediateFuture(mockSchemaContext))
388 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
391 SettableFuture<SchemaContext> future = SettableFuture.create();
393 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
394 .createEffectiveModelContextFactory();
396 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
397 masterRef), testKit.getRef());
399 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
402 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
404 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
406 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
407 masterRef), testKit.getRef());
409 verify(mockMountPointBuilder, timeout(5000)).register();
410 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
413 @Test(expected = MissingSchemaSourceException.class)
414 public void testMissingSchemaSourceOnMissingProvider() throws Exception {
415 final SharedSchemaRepository repository = new SharedSchemaRepository("test");
417 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
418 doReturn(repository).when(schemaResourceDTO2).getSchemaRegistry();
419 doReturn(repository).when(schemaResourceDTO2).getSchemaRepository();
420 final NetconfTopologySetup setup = NetconfTopologySetup.builder()
421 .setActorSystem(system)
422 .setSchemaResourceDTO(schemaResourceDTO2)
423 .setIdleTimeout(Duration.ofSeconds(1))
424 .setBaseSchemas(BASE_SCHEMAS)
426 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
427 ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
429 final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
431 final ProxyYangTextSourceProvider proxyYangProvider =
432 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
434 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
435 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
436 Await.result(resolvedSchemaFuture, TIMEOUT.duration());
440 public void testYangTextSchemaSourceRequest() throws Exception {
441 final SourceIdentifier sourceIdentifier = new SourceIdentifier("testID");
443 final ProxyYangTextSourceProvider proxyYangProvider =
444 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
446 final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForCharSource(sourceIdentifier,
447 CharSource.wrap("YANG"));
451 final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
452 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
453 PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
455 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
456 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
458 final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
460 assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
461 assertEquals("YANG", success.getRepresentation().read());
463 // Test missing source failure.
465 schemaSourceReg.close();
467 final MissingSchemaSourceException ex = assertThrows(MissingSchemaSourceException.class,
469 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
470 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
471 Await.result(failedSchemaFuture, TIMEOUT.duration());
473 assertThat(ex.getMessage(), startsWith("No providers registered for source"));
474 assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
478 public void testSlaveInvokeRpc() throws Exception {
480 initializeMaster(List.of(new SourceIdentifier("testID")));
481 registerSlaveMountPoint();
483 ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
484 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
486 final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
487 assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
489 final QName testQName = QName.create("", "TestQname");
490 final ContainerNode outputNode = Builders.containerBuilder()
491 .withNodeIdentifier(new NodeIdentifier(testQName))
492 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
493 final RpcError rpcError = RpcResultBuilder.newError(ErrorType.RPC, null, "Rpc invocation failed.");
495 // RPC with no response output.
497 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
499 DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
501 assertEquals(null, result);
503 // RPC with response output.
505 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
506 .when(mockDOMRpcService).invokeRpc(any(), any());
508 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
510 assertEquals(outputNode, result.value());
511 assertTrue(result.errors().isEmpty());
513 // RPC with response error.
515 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
516 .when(mockDOMRpcService).invokeRpc(any(), any());
518 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
520 assertNull(result.value());
521 assertEquals(rpcError, result.errors().iterator().next());
523 // RPC with response output and error.
525 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
526 .when(mockDOMRpcService).invokeRpc(any(), any());
528 final DOMRpcResult resultOutputError =
529 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
531 assertEquals(outputNode, resultOutputError.value());
532 assertEquals(rpcError, resultOutputError.errors().iterator().next());
535 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
536 .when(mockDOMRpcService).invokeRpc(any(), any());
537 final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
539 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
540 final Throwable cause = e.getCause();
541 assertThat(cause, instanceOf(DOMRpcException.class));
542 assertEquals("mock", cause.getMessage());
546 public void testSlaveInvokeAction() throws Exception {
547 initializeMaster(List.of(new SourceIdentifier("testActionID")));
548 registerSlaveMountPoint();
550 ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
551 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
553 final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
554 assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
556 final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
557 final Absolute schemaPath = Absolute.of(testQName);
559 final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier.of(testQName);
561 final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
564 final ContainerNode outputNode = Builders.containerBuilder()
565 .withNodeIdentifier(new NodeIdentifier(testQName))
566 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
568 // Action with no response output.
569 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
570 .invokeAction(any(), any(), any());
571 DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
572 .get(2, TimeUnit.SECONDS);
573 assertEquals(null, result);
575 // Action with response output.
576 doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
577 .invokeAction(any(), any(), any());
578 result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
579 .get(2, TimeUnit.SECONDS);
581 assertEquals(Optional.of(outputNode), result.getOutput());
582 assertTrue(result.getErrors().isEmpty());
585 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
586 .when(mockDOMActionService).invokeAction(any(), any(), any());
587 final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
588 domDataTreeIdentifier, outputNode);
590 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
591 final Throwable cause = e.getCause();
592 assertThat(cause, instanceOf(DOMActionException.class));
593 assertEquals("mock", cause.getMessage());
597 public void testSlaveNewTransactionRequests() {
598 doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
599 doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
600 doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
602 initializeMaster(List.of());
603 registerSlaveMountPoint();
605 final var domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
606 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
608 final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
609 assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
611 slaveDOMDataBroker.newReadOnlyTransaction();
612 verify(mockDOMDataBroker).newReadOnlyTransaction();
614 slaveDOMDataBroker.newReadWriteTransaction();
615 verify(mockDOMDataBroker).newReadWriteTransaction();
617 slaveDOMDataBroker.newWriteOnlyTransaction();
618 verify(mockDOMDataBroker).newWriteOnlyTransaction();
622 public void testSlaveNewNetconfDataTreeServiceRequest() {
623 initializeMaster(List.of());
624 registerSlaveMountPoint();
626 ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
627 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
629 final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
630 assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
632 final YangInstanceIdentifier PATH = YangInstanceIdentifier.of();
633 final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
634 final ContainerNode NODE = Builders.containerBuilder()
635 .withNodeIdentifier(new NodeIdentifier(QName.create("", "cont")))
638 final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
639 doReturn(result).when(netconfService).get(PATH);
640 doReturn(result).when(netconfService).getConfig(PATH);
641 doReturn(emptyFluentFuture()).when(netconfService).commit();
643 slaveNetconfService.get(PATH);
644 slaveNetconfService.getConfig(PATH);
645 slaveNetconfService.lock();
646 slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
647 slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
648 slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
649 slaveNetconfService.delete(STORE, PATH);
650 slaveNetconfService.remove(STORE, PATH);
651 slaveNetconfService.discardChanges();
652 slaveNetconfService.commit();
654 verify(netconfService, timeout(1000)).get(PATH);
655 verify(netconfService, timeout(1000)).getConfig(PATH);
656 verify(netconfService, timeout(1000)).lock();
657 verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
658 verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
659 verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
660 verify(netconfService, timeout(1000)).delete(STORE, PATH);
661 verify(netconfService, timeout(1000)).remove(STORE, PATH);
662 verify(netconfService, timeout(1000)).discardChanges();
663 verify(netconfService, timeout(1000)).commit();
666 private ActorRef registerSlaveMountPoint() {
667 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
668 doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
669 doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
670 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(NetconfTopologySetup.builder()
671 .setSchemaResourceDTO(schemaResourceDTO2)
672 .setActorSystem(system)
673 .setBaseSchemas(BASE_SCHEMAS)
674 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
676 doReturn(Futures.immediateFuture(mockSchemaContext))
677 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
679 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
680 masterRef), testKit.getRef());
682 verify(mockMountPointBuilder, timeout(5000)).register();
683 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
684 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
685 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
686 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
687 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
689 testKit.expectMsgClass(Success.class);
693 private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
694 masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
695 new RemoteDeviceServices(mockRpc, mockDOMActionService)), testKit.getRef());
696 testKit.expectMsgClass(MasterActorDataInitialized.class);
699 private void resetMountPointMocks() {
700 reset(mockMountPointReg, mockMountPointBuilder);
702 doNothing().when(mockMountPointReg).close();
704 doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
705 doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
708 private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
709 return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));