2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.hamcrest.CoreMatchers.containsString;
12 import static org.hamcrest.CoreMatchers.instanceOf;
13 import static org.hamcrest.CoreMatchers.startsWith;
14 import static org.hamcrest.MatcherAssert.assertThat;
15 import static org.junit.Assert.assertEquals;
16 import static org.junit.Assert.assertNull;
17 import static org.junit.Assert.assertThrows;
18 import static org.junit.Assert.assertTrue;
19 import static org.mockito.ArgumentMatchers.any;
20 import static org.mockito.ArgumentMatchers.anyCollection;
21 import static org.mockito.ArgumentMatchers.argThat;
22 import static org.mockito.ArgumentMatchers.eq;
23 import static org.mockito.Mockito.after;
24 import static org.mockito.Mockito.doAnswer;
25 import static org.mockito.Mockito.doNothing;
26 import static org.mockito.Mockito.doReturn;
27 import static org.mockito.Mockito.mock;
28 import static org.mockito.Mockito.reset;
29 import static org.mockito.Mockito.timeout;
30 import static org.mockito.Mockito.times;
31 import static org.mockito.Mockito.verify;
32 import static org.mockito.Mockito.verifyNoMoreInteractions;
33 import static org.opendaylight.mdsal.common.api.CommitInfo.emptyFluentFuture;
34 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateFluentFuture;
36 import akka.actor.ActorRef;
37 import akka.actor.ActorSystem;
38 import akka.actor.Props;
39 import akka.actor.Status.Failure;
40 import akka.actor.Status.Success;
41 import akka.pattern.AskTimeoutException;
42 import akka.pattern.Patterns;
43 import akka.testkit.TestActorRef;
44 import akka.testkit.javadsl.TestKit;
45 import akka.util.Timeout;
46 import com.google.common.collect.ImmutableList;
47 import com.google.common.collect.Lists;
48 import com.google.common.io.ByteSource;
49 import com.google.common.net.InetAddresses;
50 import com.google.common.util.concurrent.FluentFuture;
51 import com.google.common.util.concurrent.Futures;
52 import com.google.common.util.concurrent.ListenableFuture;
53 import com.google.common.util.concurrent.SettableFuture;
54 import java.io.InputStream;
55 import java.net.InetSocketAddress;
56 import java.util.ArrayList;
57 import java.util.Collections;
58 import java.util.List;
59 import java.util.Optional;
60 import java.util.Scanner;
61 import java.util.concurrent.ExecutionException;
62 import java.util.concurrent.TimeUnit;
63 import org.junit.After;
64 import org.junit.Before;
65 import org.junit.Test;
66 import org.junit.runner.RunWith;
67 import org.mockito.ArgumentCaptor;
68 import org.mockito.Mock;
69 import org.mockito.junit.MockitoJUnitRunner;
70 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
71 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
72 import org.opendaylight.mdsal.dom.api.DOMActionException;
73 import org.opendaylight.mdsal.dom.api.DOMActionResult;
74 import org.opendaylight.mdsal.dom.api.DOMActionService;
75 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
76 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
77 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
80 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
81 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
82 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
83 import org.opendaylight.mdsal.dom.api.DOMRpcException;
84 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
85 import org.opendaylight.mdsal.dom.api.DOMRpcService;
86 import org.opendaylight.mdsal.dom.api.DOMSchemaService;
87 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
88 import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult;
89 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
90 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
91 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
92 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
93 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringActionException;
94 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
95 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
96 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
97 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
98 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
99 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
100 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
101 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
102 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
103 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
104 import org.opendaylight.yangtools.concepts.ObjectRegistration;
105 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
106 import org.opendaylight.yangtools.yang.common.QName;
107 import org.opendaylight.yangtools.yang.common.RpcError;
108 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
109 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
110 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
111 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
112 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
113 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
114 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
115 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
116 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
117 import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
118 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
119 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
120 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
121 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
122 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
123 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
124 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
125 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
126 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
127 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
128 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
129 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
130 import scala.concurrent.Await;
131 import scala.concurrent.Future;
132 import scala.concurrent.duration.Duration;
134 @RunWith(MockitoJUnitRunner.StrictStubs.class)
135 public class NetconfNodeActorTest extends AbstractBaseSchemasTest {
137 private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
138 private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
139 private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
141 private ActorSystem system = ActorSystem.create();
142 private final TestKit testKit = new TestKit(system);
144 private ActorRef masterRef;
145 private RemoteDeviceId remoteDeviceId;
146 private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
149 private DOMRpcService mockDOMRpcService;
152 private DOMActionService mockDOMActionService;
155 private DOMMountPointService mockMountPointService;
158 private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
161 private ObjectRegistration<DOMMountPoint> mockMountPointReg;
164 private DOMDataBroker mockDOMDataBroker;
167 private NetconfDataTreeService netconfService;
170 private SchemaSourceRegistration<?> mockSchemaSourceReg1;
173 private SchemaSourceRegistration<?> mockSchemaSourceReg2;
176 private SchemaSourceRegistry mockRegistry;
179 private EffectiveModelContextFactory mockSchemaContextFactory;
182 private SchemaRepository mockSchemaRepository;
185 private EffectiveModelContext mockSchemaContext;
188 private SchemaResourcesDTO schemaResourceDTO;
191 public void setup() {
192 remoteDeviceId = new RemoteDeviceId("netconf-topology",
193 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
195 masterSchemaRepository.registerSchemaSourceListener(
196 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
198 doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
199 doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
200 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
201 .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).setSchemaResourceDTO(schemaResourceDTO)
202 .setBaseSchemas(BASE_SCHEMAS).build();
204 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
206 masterRef = TestActorRef.create(system, props, "master_messages");
208 resetMountPointMocks();
210 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
212 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
213 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
215 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
216 .createEffectiveModelContextFactory();
220 public void teardown() {
221 TestKit.shutdownActorSystem(system, true);
226 public void testInitializeAndRefreshMasterData() {
228 // Test CreateInitialMasterActorData.
230 initializeMaster(new ArrayList<>());
232 // Test RefreshSetupMasterActorData.
234 final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
235 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
237 final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setBaseSchemas(BASE_SCHEMAS)
238 .setSchemaResourceDTO(schemaResourceDTO).setActorSystem(system).build();
240 masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
242 testKit.expectMsgClass(MasterActorDataInitialized.class);
246 public void testAskForMasterMountPoint() {
248 // Test with master not setup yet.
250 final TestKit kit = new TestKit(system);
252 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
254 final Failure failure = kit.expectMsgClass(Failure.class);
255 assertTrue(failure.cause() instanceof NotMasterException);
257 // Now initialize - master should send the RegisterMountPoint message.
259 List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
260 initializeMaster(sourceIdentifiers);
262 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
264 final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
266 assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
270 public void testRegisterAndUnregisterMountPoint() throws Exception {
272 ActorRef slaveRef = registerSlaveMountPoint();
276 slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
278 verify(mockMountPointReg, timeout(5000)).close();
279 verify(mockSchemaSourceReg1, timeout(1000)).close();
280 verify(mockSchemaSourceReg2, timeout(1000)).close();
282 // Test registration with another interleaved registration that completes while the first registration
283 // is resolving the schema context.
285 reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
286 resetMountPointMocks();
288 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
290 final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
292 final EffectiveModelContextFactory newMockSchemaContextFactory = mock(EffectiveModelContextFactory.class);
293 doReturn(Futures.immediateFuture(mockSchemaContext))
294 .when(newMockSchemaContextFactory).createEffectiveModelContext(anyCollection());
297 SettableFuture<SchemaContext> future = SettableFuture.create();
299 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
300 withSourceId(SOURCE_IDENTIFIER1));
302 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
303 .createEffectiveModelContextFactory();
305 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
308 future.set(mockSchemaContext);
311 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
313 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
314 .createEffectiveModelContextFactory();
316 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
318 verify(mockMountPointBuilder, timeout(5000)).register();
319 verify(mockMountPointBuilder, after(500)).addService(eq(DOMDataBroker.class), any());
320 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
321 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), any());
322 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
323 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
324 verify(mockSchemaSourceReg1).close();
325 verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
326 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
327 verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
329 // Stop the slave actor and verify schema source registrations are closed.
331 final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
332 Await.result(stopFuture, TIMEOUT.duration());
334 verify(mockMountPointReg).close();
335 verify(newMockSchemaSourceReg).close();
338 @SuppressWarnings("unchecked")
340 public void testRegisterMountPointWithSchemaFailures() throws Exception {
341 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
342 doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
343 doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
344 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create()
345 .setSchemaResourceDTO(schemaResourceDTO2)
346 .setBaseSchemas(BASE_SCHEMAS)
347 .setActorSystem(system)
350 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
351 mockMountPointService));
353 // Test unrecoverable failure.
355 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
356 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
358 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
359 masterRef), testKit.getRef());
361 testKit.expectMsgClass(Success.class);
363 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
364 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
366 verify(mockMountPointBuilder, after(1000).never()).register();
367 verify(mockSchemaSourceReg1, timeout(1000)).close();
368 verify(mockSchemaSourceReg2, timeout(1000)).close();
370 // Test recoverable AskTimeoutException - schema context resolution should be retried.
372 reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
374 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
375 new AskTimeoutException("timeout"))))
376 .doReturn(Futures.immediateFuture(mockSchemaContext))
377 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
379 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
380 masterRef), testKit.getRef());
382 testKit.expectMsgClass(Success.class);
384 verify(mockMountPointBuilder, timeout(5000)).register();
385 verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
387 // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
388 // attempt should not be retried.
390 reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
391 resetMountPointMocks();
393 final EffectiveModelContextFactory mockSchemaContextFactorySuccess = mock(EffectiveModelContextFactory.class);
394 doReturn(Futures.immediateFuture(mockSchemaContext))
395 .when(mockSchemaContextFactorySuccess).createEffectiveModelContext(anyCollection());
398 SettableFuture<SchemaContext> future = SettableFuture.create();
400 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
401 .createEffectiveModelContextFactory();
403 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
404 masterRef), testKit.getRef());
406 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
409 }).when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
411 doReturn(mockSchemaContextFactory).when(mockSchemaRepository).createEffectiveModelContextFactory();
413 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
414 masterRef), testKit.getRef());
416 verify(mockMountPointBuilder, timeout(5000)).register();
417 verify(mockSchemaRepository, times(2)).createEffectiveModelContextFactory();
420 @Test(expected = MissingSchemaSourceException.class)
421 public void testMissingSchemaSourceOnMissingProvider() throws Exception {
422 final SharedSchemaRepository repository = new SharedSchemaRepository("test");
424 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
425 doReturn(repository).when(schemaResourceDTO2).getSchemaRegistry();
426 doReturn(repository).when(schemaResourceDTO2).getSchemaRepository();
427 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
428 .setSchemaResourceDTO(schemaResourceDTO2).setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS))
429 .setBaseSchemas(BASE_SCHEMAS).build();
430 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
431 ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
433 final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
435 final ProxyYangTextSourceProvider proxyYangProvider =
436 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
438 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
439 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
440 Await.result(resolvedSchemaFuture, TIMEOUT.duration());
444 public void testYangTextSchemaSourceRequest() throws Exception {
445 final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
447 final ProxyYangTextSourceProvider proxyYangProvider =
448 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
450 final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
451 ByteSource.wrap("YANG".getBytes(UTF_8)));
455 final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
456 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
457 PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
459 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
460 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
462 final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
464 assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
465 assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
467 // Test missing source failure.
469 schemaSourceReg.close();
471 final MissingSchemaSourceException ex = assertThrows(MissingSchemaSourceException.class,
473 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
474 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
475 Await.result(failedSchemaFuture, TIMEOUT.duration());
477 assertThat(ex.getMessage(), startsWith("No providers registered for source"));
478 assertThat(ex.getMessage(), containsString(sourceIdentifier.toString()));
482 public void testSlaveInvokeRpc() throws Exception {
484 final List<SourceIdentifier> sourceIdentifiers =
485 Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
487 initializeMaster(sourceIdentifiers);
488 registerSlaveMountPoint();
490 ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
491 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
493 final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
494 assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
496 final QName testQName = QName.create("", "TestQname");
497 final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
498 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
499 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
500 final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
502 // RPC with no response output.
504 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
506 DOMRpcResult result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
508 assertEquals(null, result);
510 // RPC with response output.
512 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
513 .when(mockDOMRpcService).invokeRpc(any(), any());
515 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
517 assertEquals(outputNode, result.getResult());
518 assertTrue(result.getErrors().isEmpty());
520 // RPC with response error.
522 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
523 .when(mockDOMRpcService).invokeRpc(any(), any());
525 result = slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
527 assertNull(result.getResult());
528 assertEquals(rpcError, result.getErrors().iterator().next());
530 // RPC with response output and error.
532 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
533 .when(mockDOMRpcService).invokeRpc(any(), any());
535 final DOMRpcResult resultOutputError =
536 slaveDomRPCService.invokeRpc(testQName, outputNode).get(2, TimeUnit.SECONDS);
538 assertEquals(outputNode, resultOutputError.getResult());
539 assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
542 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
543 .when(mockDOMRpcService).invokeRpc(any(), any());
544 final ListenableFuture<? extends DOMRpcResult> future = slaveDomRPCService.invokeRpc(testQName, outputNode);
546 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
547 final Throwable cause = e.getCause();
548 assertThat(cause, instanceOf(DOMRpcException.class));
549 assertEquals("mock", cause.getMessage());
553 public void testSlaveInvokeAction() throws Exception {
554 final List<SourceIdentifier> sourceIdentifiers = Lists
555 .newArrayList(RevisionSourceIdentifier.create("testActionID"));
556 initializeMaster(sourceIdentifiers);
557 registerSlaveMountPoint();
559 ArgumentCaptor<DOMActionService> domActionServiceCaptor = ArgumentCaptor.forClass(DOMActionService.class);
560 verify(mockMountPointBuilder).addService(eq(DOMActionService.class), domActionServiceCaptor.capture());
562 final DOMActionService slaveDomActionService = domActionServiceCaptor.getValue();
563 assertTrue(slaveDomActionService instanceof ProxyDOMActionService);
565 final QName testQName = QName.create("test", "2019-08-16", "TestActionQname");
566 final Absolute schemaPath = Absolute.of(testQName);
568 final YangInstanceIdentifier yangIIdPath = YangInstanceIdentifier
569 .create(new YangInstanceIdentifier.NodeIdentifier(testQName));
571 final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(LogicalDatastoreType.OPERATIONAL,
574 final ContainerNode outputNode = ImmutableContainerNodeBuilder.create()
575 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
576 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
578 // Action with no response output.
579 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMActionService)
580 .invokeAction(any(), any(), any());
581 DOMActionResult result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
582 .get(2, TimeUnit.SECONDS);
583 assertEquals(null, result);
585 // Action with response output.
586 doReturn(FluentFutures.immediateFluentFuture(new SimpleDOMActionResult(outputNode))).when(mockDOMActionService)
587 .invokeAction(any(), any(), any());
588 result = slaveDomActionService.invokeAction(schemaPath, domDataTreeIdentifier, outputNode)
589 .get(2, TimeUnit.SECONDS);
591 assertEquals(outputNode, result.getOutput().get());
592 assertTrue(result.getErrors().isEmpty());
595 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringActionException("mock")))
596 .when(mockDOMActionService).invokeAction(any(), any(), any());
597 final ListenableFuture<? extends DOMActionResult> future = slaveDomActionService.invokeAction(schemaPath,
598 domDataTreeIdentifier, outputNode);
600 final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get(2, TimeUnit.SECONDS));
601 final Throwable cause = e.getCause();
602 assertThat(cause, instanceOf(DOMActionException.class));
603 assertEquals("mock", cause.getMessage());
607 public void testSlaveNewTransactionRequests() {
608 doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
609 doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
610 doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
612 initializeMaster(Collections.emptyList());
613 registerSlaveMountPoint();
615 ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
616 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
618 final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
619 assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
621 slaveDOMDataBroker.newReadOnlyTransaction();
622 verify(mockDOMDataBroker).newReadOnlyTransaction();
624 slaveDOMDataBroker.newReadWriteTransaction();
625 verify(mockDOMDataBroker).newReadWriteTransaction();
627 slaveDOMDataBroker.newWriteOnlyTransaction();
628 verify(mockDOMDataBroker).newWriteOnlyTransaction();
632 public void testSlaveNewNetconfDataTreeServiceRequest() {
633 initializeMaster(Collections.emptyList());
634 registerSlaveMountPoint();
636 ArgumentCaptor<NetconfDataTreeService> netconfCaptor = ArgumentCaptor.forClass(NetconfDataTreeService.class);
637 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), netconfCaptor.capture());
639 final NetconfDataTreeService slaveNetconfService = netconfCaptor.getValue();
640 assertTrue(slaveNetconfService instanceof ProxyNetconfDataTreeService);
642 final YangInstanceIdentifier PATH = YangInstanceIdentifier.empty();
643 final LogicalDatastoreType STORE = LogicalDatastoreType.CONFIGURATION;
644 final NormalizedNode<?, ?> NODE = Builders.containerBuilder()
645 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("", "cont"))).build();
647 final FluentFuture<Optional<Object>> result = immediateFluentFuture(Optional.of(NODE));
648 doReturn(result).when(netconfService).get(PATH);
649 doReturn(result).when(netconfService).getConfig(PATH);
650 doReturn(emptyFluentFuture()).when(netconfService).commit(any());
652 slaveNetconfService.get(PATH);
653 slaveNetconfService.getConfig(PATH);
654 slaveNetconfService.lock();
655 slaveNetconfService.merge(STORE, PATH, NODE, Optional.empty());
656 slaveNetconfService.replace(STORE, PATH, NODE, Optional.empty());
657 slaveNetconfService.create(STORE, PATH, NODE, Optional.empty());
658 slaveNetconfService.delete(STORE, PATH);
659 slaveNetconfService.remove(STORE, PATH);
660 slaveNetconfService.discardChanges();
661 slaveNetconfService.commit(Collections.emptyList());
663 verify(netconfService, timeout(1000)).get(PATH);
664 verify(netconfService, timeout(1000)).getConfig(PATH);
665 verify(netconfService, timeout(1000)).lock();
666 verify(netconfService, timeout(1000)).merge(STORE, PATH, NODE, Optional.empty());
667 verify(netconfService, timeout(1000)).replace(STORE, PATH, NODE, Optional.empty());
668 verify(netconfService, timeout(1000)).create(STORE, PATH, NODE, Optional.empty());
669 verify(netconfService, timeout(1000)).delete(STORE, PATH);
670 verify(netconfService, timeout(1000)).remove(STORE, PATH);
671 verify(netconfService, timeout(1000)).discardChanges();
672 verify(netconfService, timeout(1000)).commit(any());
675 private ActorRef registerSlaveMountPoint() {
676 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
677 doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
678 doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
679 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
680 NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2).setActorSystem(system)
681 .setBaseSchemas(BASE_SCHEMAS).build(), remoteDeviceId, TIMEOUT, mockMountPointService));
683 doReturn(Futures.immediateFuture(mockSchemaContext))
684 .when(mockSchemaContextFactory).createEffectiveModelContext(anyCollection());
686 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
687 masterRef), testKit.getRef());
689 verify(mockMountPointBuilder, timeout(5000)).register();
690 verify(mockMountPointBuilder).addService(eq(DOMSchemaService.class), any());
691 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
692 verify(mockMountPointBuilder).addService(eq(NetconfDataTreeService.class), any());
693 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
694 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
696 testKit.expectMsgClass(Success.class);
700 private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
701 masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, netconfService, sourceIdentifiers,
702 mockDOMRpcService, mockDOMActionService), testKit.getRef());
703 testKit.expectMsgClass(MasterActorDataInitialized.class);
706 private void resetMountPointMocks() {
707 reset(mockMountPointReg, mockMountPointBuilder);
709 doNothing().when(mockMountPointReg).close();
711 doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
712 doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
715 private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
716 return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
719 private static String convertStreamToString(final InputStream is) {
720 try (Scanner scanner = new Scanner(is)) {
721 return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";