2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.ArgumentMatchers.any;
15 import static org.mockito.ArgumentMatchers.argThat;
16 import static org.mockito.ArgumentMatchers.eq;
17 import static org.mockito.Mockito.after;
18 import static org.mockito.Mockito.doAnswer;
19 import static org.mockito.Mockito.doNothing;
20 import static org.mockito.Mockito.doReturn;
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.reset;
23 import static org.mockito.Mockito.timeout;
24 import static org.mockito.Mockito.times;
25 import static org.mockito.Mockito.verify;
26 import static org.mockito.Mockito.verifyNoMoreInteractions;
27 import static org.mockito.MockitoAnnotations.initMocks;
28 import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
30 import akka.actor.ActorRef;
31 import akka.actor.ActorSystem;
32 import akka.actor.Props;
33 import akka.actor.Status.Failure;
34 import akka.actor.Status.Success;
35 import akka.pattern.AskTimeoutException;
36 import akka.pattern.Patterns;
37 import akka.testkit.TestActorRef;
38 import akka.testkit.javadsl.TestKit;
39 import akka.util.Timeout;
40 import com.google.common.collect.ImmutableList;
41 import com.google.common.collect.Lists;
42 import com.google.common.io.ByteSource;
43 import com.google.common.net.InetAddresses;
44 import com.google.common.util.concurrent.Futures;
45 import com.google.common.util.concurrent.SettableFuture;
46 import java.io.InputStream;
47 import java.net.InetSocketAddress;
48 import java.util.ArrayList;
49 import java.util.Collections;
50 import java.util.List;
51 import java.util.Scanner;
52 import java.util.concurrent.ExecutionException;
53 import java.util.concurrent.TimeUnit;
54 import org.junit.After;
55 import org.junit.Before;
56 import org.junit.Rule;
57 import org.junit.Test;
58 import org.junit.rules.ExpectedException;
59 import org.mockito.ArgumentCaptor;
60 import org.mockito.Mock;
61 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
62 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
65 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
66 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
67 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
68 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
69 import org.opendaylight.mdsal.dom.api.DOMRpcException;
70 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
71 import org.opendaylight.mdsal.dom.api.DOMRpcService;
72 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
73 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
74 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
75 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
76 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
77 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
78 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
79 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
80 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
81 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
82 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
83 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
84 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
85 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
86 import org.opendaylight.yangtools.concepts.ObjectRegistration;
87 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
88 import org.opendaylight.yangtools.yang.common.QName;
89 import org.opendaylight.yangtools.yang.common.RpcError;
90 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
91 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
92 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
93 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
94 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
95 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
96 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
97 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
98 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
99 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
100 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
101 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
102 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
103 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
104 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
105 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
106 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
107 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
108 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
109 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
110 import scala.concurrent.Await;
111 import scala.concurrent.Future;
112 import scala.concurrent.duration.Duration;
114 public class NetconfNodeActorTest {
116 private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
117 private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
118 private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
120 private ActorSystem system = ActorSystem.create();
121 private final TestKit testKit = new TestKit(system);
124 public final ExpectedException exception = ExpectedException.none();
126 private ActorRef masterRef;
127 private RemoteDeviceId remoteDeviceId;
128 private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
131 private DOMRpcService mockDOMRpcService;
134 private DOMMountPointService mockMountPointService;
137 private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
140 private ObjectRegistration<DOMMountPoint> mockMountPointReg;
143 private DOMDataBroker mockDOMDataBroker;
146 private SchemaSourceRegistration<?> mockSchemaSourceReg1;
149 private SchemaSourceRegistration<?> mockSchemaSourceReg2;
152 private SchemaSourceRegistry mockRegistry;
155 private SchemaContextFactory mockSchemaContextFactory;
158 private SchemaRepository mockSchemaRepository;
161 private SchemaContext mockSchemaContext;
164 private SchemaResourcesDTO schemaResourceDTO;
167 public void setup() {
170 remoteDeviceId = new RemoteDeviceId("netconf-topology",
171 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
173 masterSchemaRepository.registerSchemaSourceListener(
174 TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
176 doReturn(masterSchemaRepository).when(schemaResourceDTO).getSchemaRepository();
177 doReturn(mockRegistry).when(schemaResourceDTO).getSchemaRegistry();
178 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
179 .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).setSchemaResourceDTO(schemaResourceDTO).build();
181 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
183 masterRef = TestActorRef.create(system, props, "master_messages");
185 resetMountPointMocks();
187 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
189 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
190 doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
192 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
193 .createSchemaContextFactory(any(SchemaSourceFilter.class));
197 public void teardown() {
198 TestKit.shutdownActorSystem(system, true);
203 public void testInitializeAndRefreshMasterData() {
205 // Test CreateInitialMasterActorData.
207 initializeMaster(new ArrayList<>());
209 // Test RefreshSetupMasterActorData.
211 final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
212 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
214 final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create()
215 .setSchemaResourceDTO(schemaResourceDTO).setActorSystem(system).build();
217 masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
219 testKit.expectMsgClass(MasterActorDataInitialized.class);
223 public void tesAskForMasterMountPoint() {
225 // Test with master not setup yet.
227 final TestKit kit = new TestKit(system);
229 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
231 final Failure failure = kit.expectMsgClass(Failure.class);
232 assertTrue(failure.cause() instanceof NotMasterException);
234 // Now initialize - master should send the RegisterMountPoint message.
236 List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
237 initializeMaster(sourceIdentifiers);
239 masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
241 final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
243 assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
247 public void testRegisterAndUnregisterMountPoint() throws Exception {
249 ActorRef slaveRef = registerSlaveMountPoint();
253 slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
255 verify(mockMountPointReg, timeout(5000)).close();
256 verify(mockSchemaSourceReg1, timeout(1000)).close();
257 verify(mockSchemaSourceReg2, timeout(1000)).close();
259 // Test registration with another interleaved registration that completes while the first registration
260 // is resolving the schema context.
262 reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
263 resetMountPointMocks();
265 doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
267 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
268 .createSchemaContextFactory(any(SchemaSourceFilter.class));
270 final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
272 final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class);
273 doReturn(Futures.immediateFuture(mockSchemaContext))
274 .when(newMockSchemaContextFactory).createSchemaContext(any());
277 SettableFuture<SchemaContext> future = SettableFuture.create();
279 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
280 withSourceId(SOURCE_IDENTIFIER1));
282 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
283 .createSchemaContextFactory(any(SchemaSourceFilter.class));
285 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
288 future.set(mockSchemaContext);
291 }).when(mockSchemaContextFactory).createSchemaContext(any());
293 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
294 .createSchemaContextFactory(any(SchemaSourceFilter.class));
296 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
298 verify(mockMountPointBuilder, timeout(5000)).register();
299 verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext);
300 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
301 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
302 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
303 verify(mockSchemaSourceReg1).close();
304 verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
305 verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
306 verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
308 // Stop the slave actor and verify schema source registrations are closed.
310 final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
311 Await.result(stopFuture, TIMEOUT.duration());
313 verify(mockMountPointReg).close();
314 verify(newMockSchemaSourceReg).close();
317 @SuppressWarnings("unchecked")
319 public void testRegisterMountPointWithSchemaFailures() throws Exception {
320 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
321 doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
322 doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
323 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2)
324 .setActorSystem(system).build();
326 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT,
327 mockMountPointService));
329 // Test unrecoverable failure.
331 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
332 .when(mockSchemaContextFactory).createSchemaContext(any());
334 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
335 masterRef), testKit.getRef());
337 testKit.expectMsgClass(Success.class);
339 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
340 verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
342 verify(mockMountPointBuilder, after(1000).never()).register();
343 verify(mockSchemaSourceReg1, timeout(1000)).close();
344 verify(mockSchemaSourceReg2, timeout(1000)).close();
346 // Test recoverable AskTimeoutException - schema context resolution should be retried.
348 reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
350 doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
351 new AskTimeoutException("timeout"))))
352 .doReturn(Futures.immediateFuture(mockSchemaContext))
353 .when(mockSchemaContextFactory).createSchemaContext(any());
355 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
356 masterRef), testKit.getRef());
358 testKit.expectMsgClass(Success.class);
360 verify(mockMountPointBuilder, timeout(5000)).register();
361 verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
363 // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
364 // attempt should not be retried.
366 reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
367 resetMountPointMocks();
369 final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class);
370 doReturn(Futures.immediateFuture(mockSchemaContext))
371 .when(mockSchemaContextFactorySuccess).createSchemaContext(any());
374 SettableFuture<SchemaContext> future = SettableFuture.create();
376 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
377 .createSchemaContextFactory(any(SchemaSourceFilter.class));
379 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
380 masterRef), testKit.getRef());
382 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
385 }).when(mockSchemaContextFactory).createSchemaContext(any());
387 doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
388 .createSchemaContextFactory(any(SchemaSourceFilter.class));
390 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
391 masterRef), testKit.getRef());
393 verify(mockMountPointBuilder, timeout(5000)).register();
394 verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
397 @Test(expected = MissingSchemaSourceException.class)
398 public void testMissingSchemaSourceOnMissingProvider() throws Exception {
399 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
400 doReturn(DEFAULT_SCHEMA_REPOSITORY).when(schemaResourceDTO2).getSchemaRegistry();
401 doReturn(DEFAULT_SCHEMA_REPOSITORY).when(schemaResourceDTO2).getSchemaRepository();
402 final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
403 .setSchemaResourceDTO(schemaResourceDTO2).setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build();
404 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, TIMEOUT, mockMountPointService);
405 ActorRef actor = TestActorRef.create(system, props, "master_messages_2");
407 final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
409 final ProxyYangTextSourceProvider proxyYangProvider =
410 new ProxyYangTextSourceProvider(actor, system.dispatcher(), TIMEOUT);
412 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
413 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
414 Await.result(resolvedSchemaFuture, TIMEOUT.duration());
418 public void testYangTextSchemaSourceRequest() throws Exception {
419 final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
421 final ProxyYangTextSourceProvider proxyYangProvider =
422 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
424 final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
425 ByteSource.wrap("YANG".getBytes(UTF_8)));
429 final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
430 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
431 PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
433 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
434 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
436 final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
438 assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
439 assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
441 // Test missing source failure.
443 exception.expect(MissingSchemaSourceException.class);
445 schemaSourceReg.close();
447 final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
448 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
450 Await.result(failedSchemaFuture, TIMEOUT.duration());
454 @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
455 public void testSlaveInvokeRpc() throws Throwable {
457 final List<SourceIdentifier> sourceIdentifiers =
458 Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
460 initializeMaster(sourceIdentifiers);
461 registerSlaveMountPoint();
463 ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
464 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
466 final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
467 assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
469 final QName testQName = QName.create("", "TestQname");
470 final SchemaPath schemaPath = SchemaPath.create(true, testQName);
471 final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
472 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
473 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
474 final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
476 // RPC with no response output.
478 doReturn(FluentFutures.immediateNullFluentFuture()).when(mockDOMRpcService).invokeRpc(any(), any());
480 DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
482 assertEquals(null, result);
484 // RPC with response output.
486 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode)))
487 .when(mockDOMRpcService).invokeRpc(any(), any());
489 result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
491 assertEquals(outputNode, result.getResult());
492 assertTrue(result.getErrors().isEmpty());
494 // RPC with response error.
496 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(rpcError)))
497 .when(mockDOMRpcService).invokeRpc(any(), any());
499 result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
501 assertNull(result.getResult());
502 assertEquals(rpcError, result.getErrors().iterator().next());
504 // RPC with response output and error.
506 doReturn(FluentFutures.immediateFluentFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
507 .when(mockDOMRpcService).invokeRpc(any(), any());
509 final DOMRpcResult resultOutputError =
510 slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
512 assertEquals(outputNode, resultOutputError.getResult());
513 assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
517 exception.expect(DOMRpcException.class);
519 doReturn(FluentFutures.immediateFailedFluentFuture(new ClusteringRpcException("mock")))
520 .when(mockDOMRpcService).invokeRpc(any(), any());
523 slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
524 } catch (ExecutionException e) {
530 public void testSlaveNewTransactionRequests() {
532 doReturn(mock(DOMDataTreeReadTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
533 doReturn(mock(DOMDataTreeReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
534 doReturn(mock(DOMDataTreeWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
536 initializeMaster(Collections.emptyList());
537 registerSlaveMountPoint();
539 ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
540 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
542 final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
543 assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
545 slaveDOMDataBroker.newReadOnlyTransaction();
546 verify(mockDOMDataBroker).newReadOnlyTransaction();
548 slaveDOMDataBroker.newReadWriteTransaction();
549 verify(mockDOMDataBroker).newReadWriteTransaction();
551 slaveDOMDataBroker.newWriteOnlyTransaction();
552 verify(mockDOMDataBroker).newWriteOnlyTransaction();
555 private ActorRef registerSlaveMountPoint() {
556 SchemaResourcesDTO schemaResourceDTO2 = mock(SchemaResourcesDTO.class);
557 doReturn(mockRegistry).when(schemaResourceDTO2).getSchemaRegistry();
558 doReturn(mockSchemaRepository).when(schemaResourceDTO2).getSchemaRepository();
559 final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
560 NetconfTopologySetupBuilder.create().setSchemaResourceDTO(schemaResourceDTO2).setActorSystem(system)
561 .build(), remoteDeviceId, TIMEOUT, mockMountPointService));
563 doReturn(Futures.immediateFuture(mockSchemaContext))
564 .when(mockSchemaContextFactory).createSchemaContext(any());
566 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
567 masterRef), testKit.getRef());
569 verify(mockMountPointBuilder, timeout(5000)).register();
570 verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext);
571 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
572 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
573 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
575 testKit.expectMsgClass(Success.class);
580 private void initializeMaster(final List<SourceIdentifier> sourceIdentifiers) {
581 masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers,
582 mockDOMRpcService), testKit.getRef());
584 testKit.expectMsgClass(MasterActorDataInitialized.class);
587 private void resetMountPointMocks() {
588 reset(mockMountPointReg, mockMountPointBuilder);
590 doNothing().when(mockMountPointReg).close();
592 doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
593 doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
594 doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
597 private static PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
598 return argThat(argument -> identifier.equals(argument.getSourceIdentifier()));
601 private static String convertStreamToString(final InputStream is) {
602 try (Scanner scanner = new Scanner(is)) {
603 return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";