Merge changes I0587299e,I81a92c4e
[netconf.git] / netconf / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeActorTest.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import static java.nio.charset.StandardCharsets.UTF_8;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Matchers.argThat;
17 import static org.mockito.Matchers.eq;
18 import static org.mockito.Mockito.after;
19 import static org.mockito.Mockito.doAnswer;
20 import static org.mockito.Mockito.doNothing;
21 import static org.mockito.Mockito.doReturn;
22 import static org.mockito.Mockito.mock;
23 import static org.mockito.Mockito.reset;
24 import static org.mockito.Mockito.timeout;
25 import static org.mockito.Mockito.times;
26 import static org.mockito.Mockito.verify;
27 import static org.mockito.Mockito.verifyNoMoreInteractions;
28 import static org.mockito.MockitoAnnotations.initMocks;
29
30 import akka.actor.ActorRef;
31 import akka.actor.ActorSystem;
32 import akka.actor.Props;
33 import akka.actor.Status.Failure;
34 import akka.actor.Status.Success;
35 import akka.pattern.AskTimeoutException;
36 import akka.pattern.Patterns;
37 import akka.testkit.TestActorRef;
38 import akka.testkit.javadsl.TestKit;
39 import akka.util.Timeout;
40 import com.google.common.collect.ImmutableList;
41 import com.google.common.collect.Lists;
42 import com.google.common.io.ByteSource;
43 import com.google.common.net.InetAddresses;
44 import com.google.common.util.concurrent.Futures;
45 import com.google.common.util.concurrent.SettableFuture;
46 import java.io.InputStream;
47 import java.net.InetSocketAddress;
48 import java.util.Collections;
49 import java.util.List;
50 import java.util.Scanner;
51 import java.util.concurrent.ExecutionException;
52 import java.util.concurrent.TimeUnit;
53 import org.junit.After;
54 import org.junit.Before;
55 import org.junit.Rule;
56 import org.junit.Test;
57 import org.junit.rules.ExpectedException;
58 import org.mockito.ArgumentCaptor;
59 import org.mockito.ArgumentMatcher;
60 import org.mockito.Mock;
61 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
62 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
63 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
64 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
65 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
66 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
67 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
68 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
69 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
70 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
71 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
72 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
73 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
74 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
75 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
76 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
77 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup.NetconfTopologySetupBuilder;
78 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
79 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
80 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
81 import org.opendaylight.netconf.topology.singleton.messages.NotMasterException;
82 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
83 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
84 import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
85 import org.opendaylight.yangtools.concepts.ObjectRegistration;
86 import org.opendaylight.yangtools.yang.common.QName;
87 import org.opendaylight.yangtools.yang.common.RpcError;
88 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
89 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
90 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
91 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
92 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
93 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
94 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
95 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
96 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
97 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
98 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
99 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
100 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
101 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
102 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
103 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
104 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
105 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
106 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
107 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToASTTransformer;
108 import scala.concurrent.Await;
109 import scala.concurrent.Future;
110 import scala.concurrent.duration.Duration;
111
112 public class NetconfNodeActorTest {
113
114     private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
115     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER1 = RevisionSourceIdentifier.create("yang1");
116     private static final RevisionSourceIdentifier SOURCE_IDENTIFIER2 = RevisionSourceIdentifier.create("yang2");
117
118     private ActorSystem system = ActorSystem.create();
119     private final TestKit testKit = new TestKit(system);
120
121     @Rule
122     public final ExpectedException exception = ExpectedException.none();
123
124     private ActorRef masterRef;
125     private RemoteDeviceId remoteDeviceId;
126     private final SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
127
128     @Mock
129     private DOMRpcService mockDOMRpcService;
130
131     @Mock
132     private DOMMountPointService mockMountPointService;
133
134     @Mock
135     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
136
137     @Mock
138     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
139
140     @Mock
141     private DOMDataBroker mockDOMDataBroker;
142
143     @Mock
144     private SchemaSourceRegistration<?> mockSchemaSourceReg1;
145
146     @Mock
147     private SchemaSourceRegistration<?> mockSchemaSourceReg2;
148
149     @Mock
150     private SchemaSourceRegistry mockRegistry;
151
152     @Mock
153     private SchemaContextFactory mockSchemaContextFactory;
154
155     @Mock
156     private SchemaRepository mockSchemaRepository;
157
158     @Mock
159     private SchemaContext mockSchemaContext;
160
161     @Before
162     public void setup() {
163         initMocks(this);
164
165         remoteDeviceId = new RemoteDeviceId("netconf-topology",
166                 new InetSocketAddress(InetAddresses.forString("127.0.0.1"), 9999));
167
168         masterSchemaRepository.registerSchemaSourceListener(
169                 TextToASTTransformer.create(masterSchemaRepository, masterSchemaRepository));
170
171         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system)
172                 .setIdleTimeout(Duration.apply(1, TimeUnit.SECONDS)).build();
173
174         final Props props = NetconfNodeActor.props(setup, remoteDeviceId, masterSchemaRepository,
175                 masterSchemaRepository, TIMEOUT, mockMountPointService);
176
177         masterRef = TestActorRef.create(system, props, "master_messages");
178
179         resetMountPointMocks();
180
181         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
182
183         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
184         doReturn(mockSchemaSourceReg2).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
185
186         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
187                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
188     }
189
190     @After
191     public void teardown() {
192         TestKit.shutdownActorSystem(system, Boolean.TRUE);
193         system = null;
194     }
195
196     @Test
197     public void testInitializeAndRefreshMasterData() {
198
199         // Test CreateInitialMasterActorData.
200
201         initializeMaster(Lists.newArrayList());
202
203         // Test RefreshSetupMasterActorData.
204
205         final RemoteDeviceId newRemoteDeviceId = new RemoteDeviceId("netconf-topology2",
206                 new InetSocketAddress(InetAddresses.forString("127.0.0.2"), 9999));
207
208         final NetconfTopologySetup newSetup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
209
210         masterRef.tell(new RefreshSetupMasterActorData(newSetup, newRemoteDeviceId), testKit.getRef());
211
212         testKit.expectMsgClass(MasterActorDataInitialized.class);
213     }
214
215     @Test
216     public void tesAskForMasterMountPoint() {
217
218         // Test with master not setup yet.
219
220         final TestKit kit = new TestKit(system);
221
222         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
223
224         final Failure failure = kit.expectMsgClass(Failure.class);
225         assertTrue(failure.cause() instanceof NotMasterException);
226
227         // Now initialize - master should send the RegisterMountPoint message.
228
229         List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
230         initializeMaster(sourceIdentifiers);
231
232         masterRef.tell(new AskForMasterMountPoint(kit.getRef()), kit.getRef());
233
234         final RegisterMountPoint registerMountPoint = kit.expectMsgClass(RegisterMountPoint.class);
235
236         assertEquals(sourceIdentifiers, registerMountPoint.getSourceIndentifiers());
237     }
238
239     @Test
240     public void testRegisterAndUnregisterMountPoint() throws Exception {
241
242         ActorRef slaveRef = registerSlaveMountPoint();
243
244         // Unregister
245
246         slaveRef.tell(new UnregisterSlaveMountPoint(), testKit.getRef());
247
248         verify(mockMountPointReg, timeout(5000)).close();
249         verify(mockSchemaSourceReg1, timeout(1000)).close();
250         verify(mockSchemaSourceReg2, timeout(1000)).close();
251
252         // Test registration with another interleaved registration that completes while the first registration
253         // is resolving the schema context.
254
255         reset(mockSchemaSourceReg1, mockRegistry, mockSchemaRepository);
256         resetMountPointMocks();
257
258         doReturn(mockSchemaSourceReg1).when(mockRegistry).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
259
260         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
261                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
262
263         final SchemaSourceRegistration<?> newMockSchemaSourceReg = mock(SchemaSourceRegistration.class);
264
265         final SchemaContextFactory newMockSchemaContextFactory = mock(SchemaContextFactory.class);
266         doReturn(Futures.immediateFuture(mockSchemaContext))
267                 .when(newMockSchemaContextFactory).createSchemaContext(any());
268
269         doAnswer(unused -> {
270             SettableFuture<SchemaContext> future = SettableFuture.create();
271             new Thread(() -> {
272                 doReturn(newMockSchemaSourceReg).when(mockRegistry).registerSchemaSource(any(),
273                         withSourceId(SOURCE_IDENTIFIER1));
274
275                 doReturn(newMockSchemaContextFactory).when(mockSchemaRepository)
276                         .createSchemaContextFactory(any(SchemaSourceFilter.class));
277
278                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef),
279                         testKit.getRef());
280
281                 future.set(mockSchemaContext);
282             }).start();
283             return future;
284         }).when(mockSchemaContextFactory).createSchemaContext(any());
285
286         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
287                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
288
289         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1), masterRef), testKit.getRef());
290
291         verify(mockMountPointBuilder, timeout(5000)).register();
292         verify(mockMountPointBuilder, after(500)).addInitialSchemaContext(mockSchemaContext);
293         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
294         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
295         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
296         verify(mockSchemaSourceReg1).close();
297         verify(mockRegistry, times(2)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
298         verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
299         verifyNoMoreInteractions(mockMountPointBuilder, newMockSchemaSourceReg);
300
301         // Stop the slave actor and verify schema source registrations are closed.
302
303         final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
304         Await.result(stopFuture, TIMEOUT.duration());
305
306         verify(mockMountPointReg).close();
307         verify(newMockSchemaSourceReg).close();
308     }
309
310     @SuppressWarnings("unchecked")
311     @Test
312     public void testRegisterMountPointWithSchemaFailures() throws Exception {
313         final NetconfTopologySetup setup = NetconfTopologySetupBuilder.create().setActorSystem(system).build();
314
315         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, mockRegistry,
316                 mockSchemaRepository, TIMEOUT, mockMountPointService));
317
318         // Test unrecoverable failure.
319
320         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock")))
321                 .when(mockSchemaContextFactory).createSchemaContext(any());
322
323         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
324                 masterRef), testKit.getRef());
325
326         testKit.expectMsgClass(Success.class);
327
328         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER1));
329         verify(mockRegistry, timeout(5000)).registerSchemaSource(any(), withSourceId(SOURCE_IDENTIFIER2));
330
331         verify(mockMountPointBuilder, after(1000).never()).register();
332         verify(mockSchemaSourceReg1, timeout(1000)).close();
333         verify(mockSchemaSourceReg2, timeout(1000)).close();
334
335         // Test recoverable AskTimeoutException - schema context resolution should be retried.
336
337         reset(mockSchemaSourceReg1, mockSchemaSourceReg2);
338
339         doReturn(Futures.immediateFailedFuture(new SchemaResolutionException("mock",
340                 new AskTimeoutException("timeout"))))
341             .doReturn(Futures.immediateFuture(mockSchemaContext))
342             .when(mockSchemaContextFactory).createSchemaContext(any());
343
344         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
345                 masterRef), testKit.getRef());
346
347         testKit.expectMsgClass(Success.class);
348
349         verify(mockMountPointBuilder, timeout(5000)).register();
350         verifyNoMoreInteractions(mockSchemaSourceReg1, mockSchemaSourceReg2);
351
352         // Test AskTimeoutException with an interleaved successful registration. The first schema context resolution
353         // attempt should not be retried.
354
355         reset(mockSchemaSourceReg1, mockSchemaSourceReg2, mockSchemaRepository, mockSchemaContextFactory);
356         resetMountPointMocks();
357
358         final SchemaContextFactory mockSchemaContextFactorySuccess = mock(SchemaContextFactory.class);
359         doReturn(Futures.immediateFuture(mockSchemaContext))
360                 .when(mockSchemaContextFactorySuccess).createSchemaContext(any());
361
362         doAnswer(unused -> {
363             SettableFuture<SchemaContext> future = SettableFuture.create();
364             new Thread(() -> {
365                 doReturn(mockSchemaContextFactorySuccess).when(mockSchemaRepository)
366                         .createSchemaContextFactory(any(SchemaSourceFilter.class));
367
368                 slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
369                         masterRef), testKit.getRef());
370
371                 future.setException(new SchemaResolutionException("mock", new AskTimeoutException("timeout")));
372             }).start();
373             return future;
374         }).when(mockSchemaContextFactory).createSchemaContext(any());
375
376         doReturn(mockSchemaContextFactory).when(mockSchemaRepository)
377                 .createSchemaContextFactory(any(SchemaSourceFilter.class));
378
379         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
380                 masterRef), testKit.getRef());
381
382         verify(mockMountPointBuilder, timeout(5000)).register();
383         verify(mockSchemaRepository, times(2)).createSchemaContextFactory(any(SchemaSourceFilter.class));
384     }
385
386     @Test
387     public void testYangTextSchemaSourceRequest() throws Exception {
388         final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID");
389
390         final ProxyYangTextSourceProvider proxyYangProvider =
391                 new ProxyYangTextSourceProvider(masterRef, system.dispatcher(), TIMEOUT);
392
393         final YangTextSchemaSource yangTextSchemaSource = YangTextSchemaSource.delegateForByteSource(sourceIdentifier,
394                 ByteSource.wrap("YANG".getBytes(UTF_8)));
395
396         // Test success.
397
398         final SchemaSourceRegistration<YangTextSchemaSource> schemaSourceReg = masterSchemaRepository
399                 .registerSchemaSource(id -> Futures.immediateFuture(yangTextSchemaSource),
400                      PotentialSchemaSource.create(sourceIdentifier, YangTextSchemaSource.class, 1));
401
402         final Future<YangTextSchemaSourceSerializationProxy> resolvedSchemaFuture =
403                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
404
405         final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchemaFuture, TIMEOUT.duration());
406
407         assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
408         assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
409
410         // Test missing source failure.
411
412         exception.expect(MissingSchemaSourceException.class);
413
414         schemaSourceReg.close();
415
416         final Future<YangTextSchemaSourceSerializationProxy> failedSchemaFuture =
417                 proxyYangProvider.getYangTextSchemaSource(sourceIdentifier);
418
419         Await.result(failedSchemaFuture, TIMEOUT.duration());
420     }
421
422     @Test
423     @SuppressWarnings({"checkstyle:AvoidHidingCauseException", "checkstyle:IllegalThrows"})
424     public void testSlaveInvokeRpc() throws Throwable {
425
426         final List<SourceIdentifier> sourceIdentifiers =
427                 Lists.newArrayList(RevisionSourceIdentifier.create("testID"));
428
429         initializeMaster(sourceIdentifiers);
430         registerSlaveMountPoint();
431
432         ArgumentCaptor<DOMRpcService> domRPCServiceCaptor = ArgumentCaptor.forClass(DOMRpcService.class);
433         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), domRPCServiceCaptor.capture());
434
435         final DOMRpcService slaveDomRPCService = domRPCServiceCaptor.getValue();
436         assertTrue(slaveDomRPCService instanceof ProxyDOMRpcService);
437
438         final QName testQName = QName.create("", "TestQname");
439         final SchemaPath schemaPath = SchemaPath.create(true, testQName);
440         final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
441                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(testQName))
442                 .withChild(ImmutableNodes.leafNode(testQName, "foo")).build();
443         final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
444
445         // RPC with no response output.
446
447         doReturn(Futures.immediateCheckedFuture(null)).when(mockDOMRpcService).invokeRpc(any(), any());
448
449         DOMRpcResult result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
450
451         assertEquals(null, result);
452
453         // RPC with response output.
454
455         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode)))
456                 .when(mockDOMRpcService).invokeRpc(any(), any());
457
458         result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
459
460         assertEquals(outputNode, result.getResult());
461         assertTrue(result.getErrors().isEmpty());
462
463         // RPC with response error.
464
465         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError)))
466                 .when(mockDOMRpcService).invokeRpc(any(), any());
467
468         result = slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
469
470         assertNull(result.getResult());
471         assertEquals(rpcError, result.getErrors().iterator().next());
472
473         // RPC with response output and error.
474
475         doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
476                 .when(mockDOMRpcService).invokeRpc(any(), any());
477
478         final DOMRpcResult resultOutputError =
479                 slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
480
481         assertEquals(outputNode, resultOutputError.getResult());
482         assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
483
484         // RPC failure.
485
486         exception.expect(DOMRpcException.class);
487
488         doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("mock")))
489                 .when(mockDOMRpcService).invokeRpc(any(), any());
490
491         try {
492             slaveDomRPCService.invokeRpc(schemaPath, outputNode).get(2, TimeUnit.SECONDS);
493         } catch (ExecutionException e) {
494             throw e.getCause();
495         }
496     }
497
498     @Test
499     public void testSlaveNewTransactionRequests() {
500
501         doReturn(mock(DOMDataReadOnlyTransaction.class)).when(mockDOMDataBroker).newReadOnlyTransaction();
502         doReturn(mock(DOMDataReadWriteTransaction.class)).when(mockDOMDataBroker).newReadWriteTransaction();
503         doReturn(mock(DOMDataWriteTransaction.class)).when(mockDOMDataBroker).newWriteOnlyTransaction();
504
505         initializeMaster(Collections.emptyList());
506         registerSlaveMountPoint();
507
508         ArgumentCaptor<DOMDataBroker> domDataBrokerCaptor = ArgumentCaptor.forClass(DOMDataBroker.class);
509         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), domDataBrokerCaptor.capture());
510
511         final DOMDataBroker slaveDOMDataBroker = domDataBrokerCaptor.getValue();
512         assertTrue(slaveDOMDataBroker instanceof ProxyDOMDataBroker);
513
514         slaveDOMDataBroker.newReadOnlyTransaction();
515         verify(mockDOMDataBroker).newReadOnlyTransaction();
516
517         slaveDOMDataBroker.newReadWriteTransaction();
518         verify(mockDOMDataBroker).newReadWriteTransaction();
519
520         slaveDOMDataBroker.newWriteOnlyTransaction();
521         verify(mockDOMDataBroker).newWriteOnlyTransaction();
522     }
523
524     private ActorRef registerSlaveMountPoint() {
525         final ActorRef slaveRef = system.actorOf(NetconfNodeActor.props(
526                 NetconfTopologySetupBuilder.create().setActorSystem(system).build(), remoteDeviceId, mockRegistry,
527                 mockSchemaRepository, TIMEOUT, mockMountPointService));
528
529         doReturn(Futures.immediateFuture(mockSchemaContext))
530                 .when(mockSchemaContextFactory).createSchemaContext(any());
531
532         slaveRef.tell(new RegisterMountPoint(ImmutableList.of(SOURCE_IDENTIFIER1, SOURCE_IDENTIFIER2),
533                 masterRef), testKit.getRef());
534
535         verify(mockMountPointBuilder, timeout(5000)).register();
536         verify(mockMountPointBuilder).addInitialSchemaContext(mockSchemaContext);
537         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
538         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
539         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
540
541         testKit.expectMsgClass(Success.class);
542
543         return slaveRef;
544     }
545
546     private void initializeMaster(List<SourceIdentifier> sourceIdentifiers) {
547         masterRef.tell(new CreateInitialMasterActorData(mockDOMDataBroker, sourceIdentifiers,
548                 mockDOMRpcService), testKit.getRef());
549
550         testKit.expectMsgClass(MasterActorDataInitialized.class);
551     }
552
553     private void resetMountPointMocks() {
554         reset(mockMountPointReg, mockMountPointBuilder);
555
556         doNothing().when(mockMountPointReg).close();
557
558         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addInitialSchemaContext(any());
559         doReturn(mockMountPointBuilder).when(mockMountPointBuilder).addService(any(), any());
560         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
561     }
562
563     private PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
564         return argThat(new ArgumentMatcher<PotentialSchemaSource<?>>() {
565             @Override
566             public boolean matches(final Object argument) {
567                 final PotentialSchemaSource<?> potentialSchemaSource = (PotentialSchemaSource<?>) argument;
568                 return identifier.equals(potentialSchemaSource.getSourceIdentifier());
569             }
570         });
571     }
572
573     private String convertStreamToString(final InputStream is) {
574         try (Scanner scanner = new Scanner(is)) {
575             return scanner.useDelimiter("\\A").hasNext() ? scanner.next() : "";
576         }
577     }
578 }