2 * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl;
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.mockito.ArgumentMatchers.any;
12 import static org.mockito.ArgumentMatchers.eq;
13 import static org.mockito.Mockito.after;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.reset;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.DELETE;
22 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
23 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.WRITE;
25 import akka.actor.ActorSystem;
26 import akka.actor.Props;
27 import akka.cluster.Cluster;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.TestActorRef;
30 import akka.testkit.javadsl.TestKit;
31 import akka.util.Timeout;
32 import com.google.common.collect.Iterables;
33 import com.google.common.io.ByteSource;
34 import com.google.common.util.concurrent.Futures;
35 import com.typesafe.config.ConfigFactory;
36 import java.net.InetSocketAddress;
37 import java.util.List;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import java.util.stream.Collectors;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.Mock;
50 import org.mockito.junit.MockitoJUnitRunner;
51 import org.opendaylight.mdsal.binding.api.DataBroker;
52 import org.opendaylight.mdsal.binding.api.DataObjectModification;
53 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
54 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
55 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
56 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
57 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
58 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
59 import org.opendaylight.mdsal.dom.api.DOMRpcService;
60 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
61 import org.opendaylight.netconf.sal.connect.api.NetconfDeviceSchemasResolver;
62 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices;
63 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Actions;
64 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Rpcs;
65 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
66 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
67 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
68 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
69 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
70 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
71 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
72 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
73 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
74 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
75 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
78 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev221225.ConnectionOper.ConnectionStatus;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev221225.connection.oper.ClusteredConnectionStatusBuilder;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeBuilder;
82 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
83 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
84 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
85 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
86 import org.opendaylight.yangtools.concepts.ListenerRegistration;
87 import org.opendaylight.yangtools.concepts.ObjectRegistration;
88 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
89 import org.opendaylight.yangtools.yang.common.Uint16;
90 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
91 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
92 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
93 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
94 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
95 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
98 * Unit tests for NetconfNodeManager.
100 * @author Thomas Pantelis
102 @RunWith(MockitoJUnitRunner.StrictStubs.class)
103 public class NetconfNodeManagerTest extends AbstractBaseSchemasTest {
104 private static final String ACTOR_SYSTEM_NAME = "test";
105 private static final RemoteDeviceId DEVICE_ID = new RemoteDeviceId("device", new InetSocketAddress(65535));
106 private static final List<SourceIdentifier> SOURCE_IDENTIFIERS = List.of(new SourceIdentifier("testID"));
109 private DOMMountPointService mockMountPointService;
112 private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
115 private ObjectRegistration<DOMMountPoint> mockMountPointReg;
118 private DataBroker mockDataBroker;
121 private NetconfDataTreeService netconfService;
124 private DOMDataBroker mockDeviceDataBroker;
127 private Rpcs.Normalized mockRpcService;
130 private Actions.Normalized mockActionService;
133 private NetconfDeviceSchemasResolver mockSchemasResolver;
136 private EffectiveModelContextFactory mockSchemaContextFactory;
138 private ActorSystem slaveSystem;
139 private ActorSystem masterSystem;
140 private TestActorRef<TestMasterActor> testMasterActorRef;
141 private NetconfNodeManager netconfNodeManager;
142 private String masterAddress;
145 public void setup() {
146 final Timeout responseTimeout = Timeout.apply(1, TimeUnit.SECONDS);
148 slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
149 masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
151 masterAddress = Cluster.get(masterSystem).selfAddress().toString();
153 SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
154 masterSchemaRepository.registerSchemaSourceListener(
155 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
157 final String yangTemplate = """
163 SOURCE_IDENTIFIERS.stream().map(
164 sourceId -> masterSchemaRepository.registerSchemaSource(
165 id -> Futures.immediateFuture(YangTextSchemaSource.delegateForByteSource(id,
166 ByteSource.wrap(yangTemplate.replaceAll("ID", id.name().getLocalName()).getBytes(UTF_8)))),
167 PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, 1)))
168 .collect(Collectors.toList());
170 NetconfTopologySetup masterSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
171 .setActorSystem(masterSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO(
172 new NetconfDevice.SchemaResourcesDTO(masterSchemaRepository, masterSchemaRepository,
173 mockSchemaContextFactory, mockSchemasResolver)).setBaseSchemas(BASE_SCHEMAS).build();
175 testMasterActorRef = TestActorRef.create(masterSystem, Props.create(TestMasterActor.class, masterSetup,
176 DEVICE_ID, responseTimeout, mockMountPointService).withDispatcher(Dispatchers.DefaultDispatcherId()),
177 NetconfTopologyUtils.createMasterActorName(DEVICE_ID.getName(), masterAddress));
179 SharedSchemaRepository slaveSchemaRepository = new SharedSchemaRepository("slave");
180 slaveSchemaRepository.registerSchemaSourceListener(
181 TextToIRTransformer.create(slaveSchemaRepository, slaveSchemaRepository));
183 NetconfTopologySetup slaveSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
184 .setActorSystem(slaveSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO(
185 new NetconfDevice.SchemaResourcesDTO(slaveSchemaRepository, slaveSchemaRepository,
186 mockSchemaContextFactory, mockSchemasResolver)).setBaseSchemas(BASE_SCHEMAS).build();
188 netconfNodeManager = new NetconfNodeManager(slaveSetup, DEVICE_ID, responseTimeout,
189 mockMountPointService);
191 setupMountPointMocks();
195 public void teardown() {
196 TestKit.shutdownActorSystem(slaveSystem, true);
197 TestKit.shutdownActorSystem(masterSystem, true);
200 @SuppressWarnings("unchecked")
202 public void testSlaveMountPointRegistration() throws InterruptedException, ExecutionException, TimeoutException {
205 ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
206 doReturn(mockListenerReg).when(mockDataBroker).registerDataTreeChangeListener(any(), any());
208 final NodeId nodeId = new NodeId("device");
209 final NodeKey nodeKey = new NodeKey(nodeId);
210 final String topologyId = "topology-netconf";
211 final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
212 nodeKey, topologyId);
214 netconfNodeManager.registerDataTreeChangeListener(topologyId, nodeKey);
215 verify(mockDataBroker).registerDataTreeChangeListener(any(), eq(netconfNodeManager));
217 // Invoke onDataTreeChanged with a NetconfNode WRITE to simulate the master writing the operational state to
218 // Connected. Expect the slave mount point created and registered.
220 final NetconfNode netconfNode = newNetconfNode();
221 final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(netconfNode).build();
223 DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
224 doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
225 doReturn(WRITE).when(mockDataObjModification).getModificationType();
226 doReturn(node).when(mockDataObjModification).getDataAfter();
228 netconfNodeManager.onDataTreeChanged(List.of(
229 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
230 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
232 verify(mockMountPointBuilder, timeout(5000)).register();
233 verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
234 verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
235 verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
236 verify(mockMountPointService).createMountPoint(DEVICE_ID.getTopologyPath());
238 // Notify that the NetconfNode operational state was deleted. Expect the slave mount point closed.
240 doReturn(DELETE).when(mockDataObjModification).getModificationType();
242 netconfNodeManager.onDataTreeChanged(List.of(
243 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
244 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
246 verify(mockMountPointReg, timeout(5000)).close();
248 // Notify with a NetconfNode operational state WRITE. Expect the slave mount point re-created.
250 setupMountPointMocks();
252 doReturn(WRITE).when(mockDataObjModification).getModificationType();
253 doReturn(null).when(mockDataObjModification).getDataBefore();
254 doReturn(node).when(mockDataObjModification).getDataAfter();
256 netconfNodeManager.onDataTreeChanged(List.of(
257 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
258 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
260 verify(mockMountPointBuilder, timeout(5000)).register();
262 // Notify again with a NetconfNode operational state WRITE. Expect the prior slave mount point closed and
263 // and a new one registered.
265 setupMountPointMocks();
267 doReturn(node).when(mockDataObjModification).getDataBefore();
269 netconfNodeManager.onDataTreeChanged(List.of(
270 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
271 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
273 verify(mockMountPointReg, timeout(5000)).close();
274 verify(mockMountPointBuilder, timeout(5000)).register();
276 // Notify that the NetconfNode operational state was changed to UnableToConnect. Expect the slave mount point
279 reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
280 doNothing().when(mockMountPointReg).close();
282 final Node updatedNode = new NodeBuilder().setNodeId(nodeId)
283 .addAugmentation(new NetconfNodeBuilder(netconfNode)
284 .setConnectionStatus(ConnectionStatus.UnableToConnect)
288 doReturn(SUBTREE_MODIFIED).when(mockDataObjModification).getModificationType();
289 doReturn(node).when(mockDataObjModification).getDataBefore();
290 doReturn(updatedNode).when(mockDataObjModification).getDataAfter();
292 netconfNodeManager.onDataTreeChanged(List.of(
293 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
294 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
296 verify(mockMountPointReg, timeout(5000)).close();
298 netconfNodeManager.close();
299 verifyNoMoreInteractions(mockMountPointReg);
302 @SuppressWarnings("unchecked")
304 public void testSlaveMountPointRegistrationFailuresAndRetries()
305 throws InterruptedException, ExecutionException, TimeoutException {
306 final NodeId nodeId = new NodeId("device");
307 final NodeKey nodeKey = new NodeKey(nodeId);
308 final String topologyId = "topology-netconf";
309 final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
310 nodeKey, topologyId);
312 final NetconfNode netconfNode = newNetconfNode();
313 final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(netconfNode).build();
315 DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
316 doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
317 doReturn(WRITE).when(mockDataObjModification).getModificationType();
318 doReturn(node).when(mockDataObjModification).getDataAfter();
320 // First try the registration where the perceived master hasn't been initialized as the master.
322 netconfNodeManager.onDataTreeChanged(List.of(
323 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
324 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
326 verify(mockMountPointBuilder, after(1000).never()).register();
328 // Initialize the master but drop the initial YangTextSchemaSourceRequest message sent to the master so
333 CompletableFuture<AskForMasterMountPoint> yangTextSchemaSourceRequestFuture = new CompletableFuture<>();
334 testMasterActorRef.underlyingActor().messagesToDrop.put(YangTextSchemaSourceRequest.class,
335 yangTextSchemaSourceRequestFuture);
337 netconfNodeManager.onDataTreeChanged(List.of(
338 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
339 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
341 yangTextSchemaSourceRequestFuture.get(5, TimeUnit.SECONDS);
342 verify(mockMountPointBuilder, timeout(5000)).register();
344 // Initiate another registration but drop the initial AskForMasterMountPoint message sent to the master so
347 setupMountPointMocks();
349 CompletableFuture<AskForMasterMountPoint> askForMasterMountPointFuture = new CompletableFuture<>();
350 testMasterActorRef.underlyingActor().messagesToDrop.put(AskForMasterMountPoint.class,
351 askForMasterMountPointFuture);
353 netconfNodeManager.onDataTreeChanged(List.of(
354 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
355 LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
357 askForMasterMountPointFuture.get(5, TimeUnit.SECONDS);
358 verify(mockMountPointReg, timeout(5000)).close();
359 verify(mockMountPointBuilder, timeout(5000)).register();
361 reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
362 doNothing().when(mockMountPointReg).close();
363 netconfNodeManager.close();
364 verify(mockMountPointReg, timeout(5000)).close();
367 private NetconfNode newNetconfNode() {
368 return new NetconfNodeBuilder()
369 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
370 .setPort(new PortNumber(Uint16.valueOf(9999)))
371 .setConnectionStatus(ConnectionStatus.Connected)
372 .setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder()
373 .setNetconfMasterNode(masterAddress).build())
377 private void setupMountPointMocks() {
378 reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
379 doNothing().when(mockMountPointReg).close();
380 doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
381 doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
384 private void initializeMaster() {
385 TestKit kit = new TestKit(masterSystem);
386 testMasterActorRef.tell(new CreateInitialMasterActorData(mockDeviceDataBroker, netconfService,
387 SOURCE_IDENTIFIERS, new RemoteDeviceServices(mockRpcService, mockActionService)), kit.getRef());
389 kit.expectMsgClass(MasterActorDataInitialized.class);
392 private static class TestMasterActor extends NetconfNodeActor {
393 final Map<Class<?>, CompletableFuture<? extends Object>> messagesToDrop = new ConcurrentHashMap<>();
395 TestMasterActor(final NetconfTopologySetup setup, final RemoteDeviceId deviceId,
396 final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
397 super(setup, deviceId, actorResponseWaitTime, mountPointService);
400 @SuppressWarnings({ "rawtypes", "unchecked" })
402 public void handleReceive(final Object message) {
403 CompletableFuture dropFuture = messagesToDrop.remove(message.getClass());
404 if (dropFuture != null) {
405 dropFuture.complete(message);
407 super.handleReceive(message);