2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.Matchers.any;
15 import static org.mockito.Matchers.argThat;
16 import static org.mockito.Mockito.doReturn;
17 import static org.mockito.Mockito.mock;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.MockitoAnnotations.initMocks;
21 import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
23 import akka.actor.ActorContext;
24 import akka.actor.ActorRef;
25 import akka.actor.ActorSystem;
26 import akka.actor.Props;
27 import akka.pattern.Patterns;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.MoreObjects;
32 import com.google.common.base.Optional;
33 import com.google.common.collect.ImmutableList;
34 import com.google.common.collect.Lists;
35 import com.google.common.util.concurrent.CheckedFuture;
36 import com.google.common.util.concurrent.Futures;
37 import com.google.common.util.concurrent.SettableFuture;
38 import java.io.ByteArrayInputStream;
39 import java.io.IOException;
40 import java.io.InputStream;
41 import java.net.InetAddress;
42 import java.net.InetSocketAddress;
43 import java.net.UnknownHostException;
44 import java.util.List;
45 import java.util.concurrent.TimeUnit;
46 import org.junit.After;
47 import org.junit.Before;
48 import org.junit.Rule;
49 import org.junit.Test;
50 import org.junit.rules.ExpectedException;
51 import org.mockito.ArgumentMatcher;
52 import org.mockito.Mock;
53 import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
54 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
55 import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
56 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
57 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
58 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
59 import org.opendaylight.controller.sal.core.api.Broker;
60 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
61 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
62 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
63 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
64 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
65 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
66 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
67 import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
68 import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
69 import org.opendaylight.yangtools.yang.common.QName;
70 import org.opendaylight.yangtools.yang.common.RpcError;
71 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
74 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
75 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
76 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
77 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
78 import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
79 import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
80 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
81 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
82 import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
83 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
84 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
85 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
86 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
87 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
88 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
89 import scala.concurrent.Await;
90 import scala.concurrent.Future;
91 import scala.concurrent.duration.Duration;
93 public class NetconfNodeActorTest {
95 private static final Timeout TIMEOUT = new Timeout(Duration.create(5, "seconds"));
96 private static ActorSystem system;
99 public final ExpectedException exception = ExpectedException.none();
101 private ActorRef masterRef;
102 private RemoteDeviceId remoteDeviceId;
105 private DOMRpcService domRpcService;
108 public void setup() throws UnknownHostException {
111 remoteDeviceId = new RemoteDeviceId("netconf-topology",
112 new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
113 final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
115 final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
116 DEFAULT_SCHEMA_REPOSITORY, TIMEOUT);
118 system = ActorSystem.create();
120 masterRef = TestActorRef.create(system, props, "master_messages");
124 public void teardown() {
125 JavaTestKit.shutdownActorSystem(system);
130 public void testInitDataMessages() throws Exception {
132 final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
133 final List<SourceIdentifier> sourceIdentifiers = Lists.newArrayList();
135 /* Test init master data */
137 final Future<Object> initialDataToActor =
138 Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
139 domRpcService), TIMEOUT);
141 final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
142 assertTrue(success instanceof MasterActorDataInitialized);
145 /* Test refresh master data */
147 final RemoteDeviceId remoteDeviceId2 = new RemoteDeviceId("netconf-topology2",
148 new InetSocketAddress(InetAddress.getByName("127.0.0.2"), 9999));
150 final NetconfTopologySetup setup2 = mock(NetconfTopologySetup.class);
152 final Future<Object> refreshDataToActor =
153 Patterns.ask(masterRef, new RefreshSetupMasterActorData(setup2, remoteDeviceId2),
156 final Object success2 = Await.result(refreshDataToActor, TIMEOUT.duration());
157 assertTrue(success2 instanceof MasterActorDataInitialized);
162 public void testRegisterMountPointMessage() throws Exception {
164 final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
165 final List<SourceIdentifier> sourceIdentifiers =
166 Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent()));
170 final Future<Object> initialDataToActor =
171 Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
172 domRpcService), TIMEOUT);
174 final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
176 assertTrue(successInit instanceof MasterActorDataInitialized);
178 // test if slave get right identifiers from master
180 final Future<Object> registerMountPointFuture =
181 Patterns.ask(masterRef, new AskForMasterMountPoint(),
184 final RegisterMountPoint success =
185 (RegisterMountPoint) Await.result(registerMountPointFuture, TIMEOUT.duration());
187 assertEquals(sourceIdentifiers, success.getSourceIndentifiers());
192 public void testReceiveRegisterMountpoint() throws Exception {
193 final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
194 doReturn(mock(Broker.class)).when(setup).getDomBroker();
195 final RevisionSourceIdentifier yang1 = RevisionSourceIdentifier.create("yang1");
196 final RevisionSourceIdentifier yang2 = RevisionSourceIdentifier.create("yang2");
197 final SchemaSourceRegistry registry = mock(SchemaSourceRegistry.class);
198 final SchemaRepository schemaRepository = mock(SchemaRepository.class);
199 final SchemaSourceRegistration regYang1 = mock(SchemaSourceRegistration.class);
200 final SchemaSourceRegistration regYang2 = mock(SchemaSourceRegistration.class);
201 doReturn(regYang1).when(registry).registerSchemaSource(any(), withSourceId(yang1));
202 doReturn(regYang2).when(registry).registerSchemaSource(any(), withSourceId(yang2));
203 final SchemaContextFactory schemaContextFactory = mock(SchemaContextFactory.class);
204 doReturn(schemaContextFactory).when(schemaRepository).createSchemaContextFactory(any());
205 final SettableFuture<SchemaContext> schemaContextFuture = SettableFuture.create();
206 final CheckedFuture<SchemaContext, SchemaResolutionException> checkedFuture =
207 Futures.makeChecked(schemaContextFuture, e -> new SchemaResolutionException("fail", e));
208 doReturn(checkedFuture).when(schemaContextFactory).createSchemaContext(any());
209 final ActorRef slaveRef =
210 system.actorOf(NetconfNodeActor.props(setup, remoteDeviceId, registry, schemaRepository, TIMEOUT));
211 final List<SourceIdentifier> sources = ImmutableList.of(yang1, yang2);
212 slaveRef.tell(new RegisterMountPoint(sources), masterRef);
214 verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang1));
215 verify(registry, timeout(1000)).registerSchemaSource(any(), withSourceId(yang2));
217 final Future<Boolean> stopFuture = Patterns.gracefulStop(slaveRef, TIMEOUT.duration());
218 Await.result(stopFuture, TIMEOUT.duration());
219 //provider should be deregistered
220 verify(regYang1).close();
221 verify(regYang2).close();
225 public void testYangTextSchemaSourceRequestMessage() throws Exception {
226 final SchemaRepository schemaRepository = mock(SchemaRepository.class);
227 final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID", Optional.absent());
228 final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId,
229 DEFAULT_SCHEMA_REPOSITORY, schemaRepository, TIMEOUT);
231 final ActorRef actorRefSchemaRepo = TestActorRef.create(system, props, "master_mocked_schema_repository");
232 final ActorContext actorContext = mock(ActorContext.class);
233 doReturn(system.dispatcher()).when(actorContext).dispatcher();
235 final ProxyYangTextSourceProvider proxyYang =
236 new ProxyYangTextSourceProvider(actorRefSchemaRepo, actorContext, TIMEOUT);
237 // test if asking for source is resolved and sended back
239 final YangTextSchemaSource yangTextSchemaSource = new YangTextSchemaSource(sourceIdentifier) {
241 protected MoreObjects.ToStringHelper addToStringAttributes(final MoreObjects.ToStringHelper toStringHelper) {
246 public InputStream openStream() throws IOException {
247 return new ByteArrayInputStream("YANG".getBytes());
252 final CheckedFuture<YangTextSchemaSource, SchemaSourceException> result =
253 Futures.immediateCheckedFuture(yangTextSchemaSource);
255 doReturn(result).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
257 final Future<YangTextSchemaSourceSerializationProxy> resolvedSchema =
258 proxyYang.getYangTextSchemaSource(sourceIdentifier);
260 final YangTextSchemaSourceSerializationProxy success = Await.result(resolvedSchema, TIMEOUT.duration());
262 assertEquals(sourceIdentifier, success.getRepresentation().getIdentifier());
263 assertEquals("YANG", convertStreamToString(success.getRepresentation().openStream()));
266 // test if asking for source is missing
267 exception.expect(MissingSchemaSourceException.class);
269 final SchemaSourceException schemaSourceException =
270 new MissingSchemaSourceException("Fail", sourceIdentifier);
272 final CheckedFuture<YangTextSchemaSource, SchemaSourceException> resultFail =
273 Futures.immediateFailedCheckedFuture(schemaSourceException);
275 doReturn(resultFail).when(schemaRepository).getSchemaSource(sourceIdentifier, YangTextSchemaSource.class);
277 final Future<YangTextSchemaSourceSerializationProxy> failedSchema =
278 proxyYang.getYangTextSchemaSource(sourceIdentifier);
280 Await.result(failedSchema, TIMEOUT.duration());
285 public void testProxyDOMRpcService() throws Exception {
287 final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
288 final List<SourceIdentifier> sourceIdentifiers =
289 Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent()));
293 final Future<Object> initialDataToActor =
294 Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
295 domRpcService), TIMEOUT);
297 final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
299 assertTrue(successInit instanceof MasterActorDataInitialized);
301 // test if slave get right identifiers from master
303 final ProxyDOMRpcService slaveDomRPCService = new ProxyDOMRpcService(system, masterRef, remoteDeviceId, TIMEOUT);
305 final SchemaPath schemaPath = SchemaPath.create(true, QName.create("TestQname"));
306 final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
307 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
308 .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
309 final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
310 // EmptyResultResponse message
312 doReturn(Futures.immediateCheckedFuture(null)).when(domRpcService).invokeRpc(any(), any());
314 final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureEmpty =
315 slaveDomRPCService.invokeRpc(schemaPath, outputNode);
317 final Object resultNull = resultFutureEmpty.checkedGet(2, TimeUnit.SECONDS);
319 assertEquals(null, resultNull);
321 // InvokeRpcMessageReply message
323 doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode))).
324 when(domRpcService).invokeRpc(any(), any());
326 final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureNn =
327 slaveDomRPCService.invokeRpc(schemaPath, outputNode);
329 final DOMRpcResult resultNn = resultFutureNn.checkedGet(2, TimeUnit.SECONDS);
331 assertEquals(outputNode, resultNn.getResult());
332 assertTrue(resultNn.getErrors().isEmpty());
334 // InvokeRpcMessageReply message only error
336 doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError)))
337 .when(domRpcService).invokeRpc(any(), any());
339 final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureError =
340 slaveDomRPCService.invokeRpc(schemaPath, outputNode);
342 final DOMRpcResult resultError = resultFutureError.checkedGet(2, TimeUnit.SECONDS);
344 assertNull(resultError.getResult());
345 assertEquals(rpcError, resultError.getErrors().iterator().next());
347 // InvokeRpcMessageReply message error + result
349 doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
350 .when(domRpcService).invokeRpc(any(), any());
352 final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureOutputError =
353 slaveDomRPCService.invokeRpc(schemaPath, outputNode);
355 final DOMRpcResult resultOutputError = resultFutureOutputError.checkedGet(2, TimeUnit.SECONDS);
357 assertEquals(outputNode, resultOutputError.getResult());
358 assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
362 exception.expect(DOMRpcException.class);
364 doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("")))
365 .when(domRpcService).invokeRpc(any(), any());
367 final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureThrowable =
368 slaveDomRPCService.invokeRpc(schemaPath, outputNode);
370 resultFutureThrowable.checkedGet(2, TimeUnit.SECONDS);
374 private PotentialSchemaSource<?> withSourceId(final SourceIdentifier identifier) {
375 return argThat(new ArgumentMatcher<PotentialSchemaSource>() {
377 public boolean matches(final Object argument) {
378 final PotentialSchemaSource potentialSchemaSource = (PotentialSchemaSource) argument;
379 return identifier.equals(potentialSchemaSource.getSourceIdentifier());
384 private String convertStreamToString(final java.io.InputStream is) {
385 final java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
386 return s.hasNext() ? s.next() : "";