2 * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static org.mockito.ArgumentMatchers.any;
11 import static org.mockito.ArgumentMatchers.eq;
12 import static org.mockito.Mockito.after;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.reset;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.DELETE;
21 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
22 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.WRITE;
24 import akka.actor.ActorSystem;
25 import akka.actor.Props;
26 import akka.cluster.Cluster;
27 import akka.dispatch.Dispatchers;
28 import akka.testkit.TestActorRef;
29 import akka.testkit.javadsl.TestKit;
30 import akka.util.Timeout;
31 import com.google.common.collect.Iterables;
32 import com.google.common.io.CharSource;
33 import com.google.common.util.concurrent.Futures;
34 import com.typesafe.config.ConfigFactory;
35 import java.net.InetSocketAddress;
36 import java.util.List;
38 import java.util.concurrent.CompletableFuture;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.stream.Collectors;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.Mock;
49 import org.mockito.junit.MockitoJUnitRunner;
50 import org.opendaylight.mdsal.binding.api.DataBroker;
51 import org.opendaylight.mdsal.binding.api.DataObjectModification;
52 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
53 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
54 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
55 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
56 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
57 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
58 import org.opendaylight.mdsal.dom.api.DOMRpcService;
59 import org.opendaylight.netconf.client.mdsal.NetconfDevice;
60 import org.opendaylight.netconf.client.mdsal.api.NetconfDeviceSchemasResolver;
61 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
62 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
63 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
64 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
65 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
66 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
67 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
68 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
69 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
70 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
71 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
72 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
73 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
74 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
75 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.ConnectionOper.ConnectionStatus;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.connection.oper.ClusteredConnectionStatusBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNodeBuilder;
82 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
83 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
84 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
85 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
86 import org.opendaylight.yangtools.concepts.ObjectRegistration;
87 import org.opendaylight.yangtools.concepts.Registration;
88 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
89 import org.opendaylight.yangtools.yang.common.Uint16;
90 import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
91 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
92 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
93 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
94 import org.opendaylight.yangtools.yang.model.spi.source.DelegatedYangTextSource;
95 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
96 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
99 * Unit tests for NetconfNodeManager.
101 * @author Thomas Pantelis
103 @RunWith(MockitoJUnitRunner.StrictStubs.class)
104 public class NetconfNodeManagerTest extends AbstractBaseSchemasTest {
105 private static final String ACTOR_SYSTEM_NAME = "test";
106 private static final RemoteDeviceId DEVICE_ID = new RemoteDeviceId("device", new InetSocketAddress(65535));
107 private static final List<SourceIdentifier> SOURCE_IDENTIFIERS = List.of(new SourceIdentifier("testID"));
110 private DOMMountPointService mockMountPointService;
112 private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
114 private ObjectRegistration<DOMMountPoint> mockMountPointReg;
116 private DataBroker mockDataBroker;
118 private NetconfDataTreeService netconfService;
120 private DOMDataBroker mockDeviceDataBroker;
122 private Rpcs.Normalized mockRpcService;
124 private Actions.Normalized mockActionService;
126 private NetconfDeviceSchemasResolver mockSchemasResolver;
128 private EffectiveModelContextFactory mockSchemaContextFactory;
130 private ActorSystem slaveSystem;
131 private ActorSystem masterSystem;
132 private TestActorRef<TestMasterActor> testMasterActorRef;
133 private NetconfNodeManager netconfNodeManager;
134 private String masterAddress;
137 public void setup() {
138 final Timeout responseTimeout = Timeout.apply(1, TimeUnit.SECONDS);
140 slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
141 masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
143 masterAddress = Cluster.get(masterSystem).selfAddress().toString();
145 SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
146 masterSchemaRepository.registerSchemaSourceListener(
147 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
149 final String yangTemplate = """
155 SOURCE_IDENTIFIERS.stream().map(
156 sourceId -> masterSchemaRepository.registerSchemaSource(
157 id -> Futures.immediateFuture(new DelegatedYangTextSource(id,
158 CharSource.wrap(yangTemplate.replaceAll("ID", id.name().getLocalName())))),
159 PotentialSchemaSource.create(sourceId, YangTextSource.class, 1)))
160 .collect(Collectors.toList());
162 NetconfTopologySetup masterSetup = NetconfTopologySetup.builder()
163 .setActorSystem(masterSystem)
164 .setDataBroker(mockDataBroker)
165 .setSchemaResourceDTO(new NetconfDevice.SchemaResourcesDTO(
166 masterSchemaRepository, masterSchemaRepository, mockSchemaContextFactory, mockSchemasResolver))
167 .setBaseSchemas(BASE_SCHEMAS)
170 testMasterActorRef = TestActorRef.create(masterSystem, Props.create(TestMasterActor.class, masterSetup,
171 DEVICE_ID, responseTimeout, mockMountPointService).withDispatcher(Dispatchers.DefaultDispatcherId()),
172 NetconfTopologyUtils.createMasterActorName(DEVICE_ID.name(), masterAddress));
174 SharedSchemaRepository slaveSchemaRepository = new SharedSchemaRepository("slave");
175 slaveSchemaRepository.registerSchemaSourceListener(
176 TextToIRTransformer.create(slaveSchemaRepository, slaveSchemaRepository));
178 NetconfTopologySetup slaveSetup = NetconfTopologySetup.builder()
179 .setActorSystem(slaveSystem)
180 .setDataBroker(mockDataBroker)
181 .setSchemaResourceDTO(new NetconfDevice.SchemaResourcesDTO(
182 slaveSchemaRepository, slaveSchemaRepository, mockSchemaContextFactory, mockSchemasResolver))
183 .setBaseSchemas(BASE_SCHEMAS)
186 netconfNodeManager = new NetconfNodeManager(slaveSetup, DEVICE_ID, responseTimeout,
187 mockMountPointService);
189 setupMountPointMocks();
193 public void teardown() {
194 TestKit.shutdownActorSystem(slaveSystem, true);
195 TestKit.shutdownActorSystem(masterSystem, true);
198 @SuppressWarnings("unchecked")
200 public void testSlaveMountPointRegistration() throws InterruptedException, ExecutionException, TimeoutException {
203 Registration mockListenerReg = mock(Registration.class);
204 doReturn(mockListenerReg).when(mockDataBroker).registerTreeChangeListener(any(), any());
206 final NodeId nodeId = new NodeId("device");
207 final NodeKey nodeKey = new NodeKey(nodeId);
208 final String topologyId = "topology-netconf";
209 final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
210 nodeKey, topologyId);
212 netconfNodeManager.registerDataTreeChangeListener(topologyId, nodeKey);
213 verify(mockDataBroker).registerTreeChangeListener(any(), eq(netconfNodeManager));
215 // Invoke onDataTreeChanged with a NetconfNode WRITE to simulate the master writing the operational state to
216 // Connected. Expect the slave mount point created and registered.
218 final NetconfNode netconfNode = newNetconfNode();
219 final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(netconfNode).build();
221 DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
222 doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).step();
223 doReturn(WRITE).when(mockDataObjModification).modificationType();
224 doReturn(node).when(mockDataObjModification).dataAfter();
226 netconfNodeManager.onDataTreeChanged(List.of(
227 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.of(
228 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
230 verify(mockMountPointBuilder, timeout(5000)).register();
231 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
232 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
233 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
234 verify(mockMountPointService).createMountPoint(NetconfNodeUtils.defaultTopologyMountPath(DEVICE_ID));
236 // Notify that the NetconfNode operational state was deleted. Expect the slave mount point closed.
238 doReturn(DELETE).when(mockDataObjModification).modificationType();
240 netconfNodeManager.onDataTreeChanged(List.of(
241 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.of(
242 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
244 verify(mockMountPointReg, timeout(5000)).close();
246 // Notify with a NetconfNode operational state WRITE. Expect the slave mount point re-created.
248 setupMountPointMocks();
250 doReturn(WRITE).when(mockDataObjModification).modificationType();
251 doReturn(null).when(mockDataObjModification).dataBefore();
252 doReturn(node).when(mockDataObjModification).dataAfter();
254 netconfNodeManager.onDataTreeChanged(List.of(
255 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.of(
256 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
258 verify(mockMountPointBuilder, timeout(5000)).register();
260 // Notify again with a NetconfNode operational state WRITE. Expect the prior slave mount point closed and
261 // and a new one registered.
263 setupMountPointMocks();
265 doReturn(node).when(mockDataObjModification).dataBefore();
267 netconfNodeManager.onDataTreeChanged(List.of(
268 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.of(
269 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
271 verify(mockMountPointReg, timeout(5000)).close();
272 verify(mockMountPointBuilder, timeout(5000)).register();
274 // Notify that the NetconfNode operational state was changed to UnableToConnect. Expect the slave mount point
277 reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
278 doNothing().when(mockMountPointReg).close();
280 final Node updatedNode = new NodeBuilder().setNodeId(nodeId)
281 .addAugmentation(new NetconfNodeBuilder(netconfNode)
282 .setConnectionStatus(ConnectionStatus.UnableToConnect)
286 doReturn(SUBTREE_MODIFIED).when(mockDataObjModification).modificationType();
287 doReturn(node).when(mockDataObjModification).dataBefore();
288 doReturn(updatedNode).when(mockDataObjModification).dataAfter();
290 netconfNodeManager.onDataTreeChanged(List.of(
291 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.of(
292 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
294 verify(mockMountPointReg, timeout(5000)).close();
296 netconfNodeManager.close();
297 verifyNoMoreInteractions(mockMountPointReg);
300 @SuppressWarnings("unchecked")
302 public void testSlaveMountPointRegistrationFailuresAndRetries()
303 throws InterruptedException, ExecutionException, TimeoutException {
304 final NodeId nodeId = new NodeId("device");
305 final NodeKey nodeKey = new NodeKey(nodeId);
306 final String topologyId = "topology-netconf";
307 final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
308 nodeKey, topologyId);
310 final NetconfNode netconfNode = newNetconfNode();
311 final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(netconfNode).build();
313 DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
314 doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).step();
315 doReturn(WRITE).when(mockDataObjModification).modificationType();
316 doReturn(node).when(mockDataObjModification).dataAfter();
318 // First try the registration where the perceived master hasn't been initialized as the master.
320 netconfNodeManager.onDataTreeChanged(List.of(
321 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.of(
322 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
324 verify(mockMountPointBuilder, after(1000).never()).register();
326 // Initialize the master but drop the initial YangTextSchemaSourceRequest message sent to the master so
331 CompletableFuture<AskForMasterMountPoint> yangTextSchemaSourceRequestFuture = new CompletableFuture<>();
332 testMasterActorRef.underlyingActor().messagesToDrop.put(YangTextSchemaSourceRequest.class,
333 yangTextSchemaSourceRequestFuture);
335 netconfNodeManager.onDataTreeChanged(List.of(
336 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.of(
337 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
339 yangTextSchemaSourceRequestFuture.get(5, TimeUnit.SECONDS);
340 verify(mockMountPointBuilder, timeout(5000)).register();
342 // Initiate another registration but drop the initial AskForMasterMountPoint message sent to the master so
345 setupMountPointMocks();
347 CompletableFuture<AskForMasterMountPoint> askForMasterMountPointFuture = new CompletableFuture<>();
348 testMasterActorRef.underlyingActor().messagesToDrop.put(AskForMasterMountPoint.class,
349 askForMasterMountPointFuture);
351 netconfNodeManager.onDataTreeChanged(List.of(
352 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.of(
353 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
355 askForMasterMountPointFuture.get(5, TimeUnit.SECONDS);
356 verify(mockMountPointReg, timeout(5000)).close();
357 verify(mockMountPointBuilder, timeout(5000)).register();
359 reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
360 doNothing().when(mockMountPointReg).close();
361 netconfNodeManager.close();
362 verify(mockMountPointReg, timeout(5000)).close();
365 private NetconfNode newNetconfNode() {
366 return new NetconfNodeBuilder()
367 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
368 .setPort(new PortNumber(Uint16.valueOf(9999)))
369 .setConnectionStatus(ConnectionStatus.Connected)
370 .setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder()
371 .setNetconfMasterNode(masterAddress).build())
375 private void setupMountPointMocks() {
376 reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
377 doNothing().when(mockMountPointReg).close();
378 doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
379 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
382 private void initializeMaster() {
383 TestKit kit = new TestKit(masterSystem);
384 testMasterActorRef.tell(new CreateInitialMasterActorData(mockDeviceDataBroker, netconfService,
385 SOURCE_IDENTIFIERS, new RemoteDeviceServices(mockRpcService, mockActionService)), kit.getRef());
387 kit.expectMsgClass(MasterActorDataInitialized.class);
390 private static class TestMasterActor extends NetconfNodeActor {
391 final Map<Class<?>, CompletableFuture<? extends Object>> messagesToDrop = new ConcurrentHashMap<>();
393 TestMasterActor(final NetconfTopologySetup setup, final RemoteDeviceId deviceId,
394 final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
395 super(setup, deviceId, actorResponseWaitTime, mountPointService);
398 @SuppressWarnings({ "rawtypes", "unchecked" })
400 public void handleReceive(final Object message) {
401 CompletableFuture dropFuture = messagesToDrop.remove(message.getClass());
402 if (dropFuture != null) {
403 dropFuture.complete(message);
405 super.handleReceive(message);