Bump upstreams
[netconf.git] / apps / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeManagerTest.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static org.mockito.ArgumentMatchers.any;
11 import static org.mockito.ArgumentMatchers.eq;
12 import static org.mockito.Mockito.after;
13 import static org.mockito.Mockito.doNothing;
14 import static org.mockito.Mockito.doReturn;
15 import static org.mockito.Mockito.mock;
16 import static org.mockito.Mockito.reset;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.DELETE;
21 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
22 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.WRITE;
23
24 import akka.actor.ActorSystem;
25 import akka.actor.Props;
26 import akka.cluster.Cluster;
27 import akka.dispatch.Dispatchers;
28 import akka.testkit.TestActorRef;
29 import akka.testkit.javadsl.TestKit;
30 import akka.util.Timeout;
31 import com.google.common.collect.Iterables;
32 import com.google.common.io.CharSource;
33 import com.google.common.util.concurrent.Futures;
34 import com.typesafe.config.ConfigFactory;
35 import java.net.InetSocketAddress;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.concurrent.CompletableFuture;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.TimeUnit;
42 import java.util.concurrent.TimeoutException;
43 import java.util.stream.Collectors;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.junit.runner.RunWith;
48 import org.mockito.Mock;
49 import org.mockito.junit.MockitoJUnitRunner;
50 import org.opendaylight.mdsal.binding.api.DataBroker;
51 import org.opendaylight.mdsal.binding.api.DataObjectModification;
52 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
53 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
54 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
55 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
56 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
57 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
58 import org.opendaylight.mdsal.dom.api.DOMRpcService;
59 import org.opendaylight.netconf.client.mdsal.NetconfDevice;
60 import org.opendaylight.netconf.client.mdsal.api.CredentialProvider;
61 import org.opendaylight.netconf.client.mdsal.api.NetconfDeviceSchemasResolver;
62 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceId;
63 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
64 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Actions;
65 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
66 import org.opendaylight.netconf.client.mdsal.api.SslHandlerFactoryProvider;
67 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
68 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
69 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
70 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
71 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
72 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
73 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
74 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
75 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
78 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
79 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.ConnectionOper.ConnectionStatus;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev230430.connection.oper.ClusteredConnectionStatusBuilder;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
83 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeBuilder;
84 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
85 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
86 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
87 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
88 import org.opendaylight.yangtools.concepts.ListenerRegistration;
89 import org.opendaylight.yangtools.concepts.ObjectRegistration;
90 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
91 import org.opendaylight.yangtools.yang.common.Uint16;
92 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
93 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
94 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
95 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
96 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
97 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
98
99 /**
100  * Unit tests for NetconfNodeManager.
101  *
102  * @author Thomas Pantelis
103  */
104 @RunWith(MockitoJUnitRunner.StrictStubs.class)
105 public class NetconfNodeManagerTest extends AbstractBaseSchemasTest {
106     private static final String ACTOR_SYSTEM_NAME = "test";
107     private static final RemoteDeviceId DEVICE_ID = new RemoteDeviceId("device", new InetSocketAddress(65535));
108     private static final List<SourceIdentifier> SOURCE_IDENTIFIERS = List.of(new SourceIdentifier("testID"));
109
110     @Mock
111     private DOMMountPointService mockMountPointService;
112     @Mock
113     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
114     @Mock
115     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
116     @Mock
117     private DataBroker mockDataBroker;
118     @Mock
119     private NetconfDataTreeService netconfService;
120     @Mock
121     private DOMDataBroker mockDeviceDataBroker;
122     @Mock
123     private Rpcs.Normalized mockRpcService;
124     @Mock
125     private Actions.Normalized mockActionService;
126     @Mock
127     private NetconfDeviceSchemasResolver mockSchemasResolver;
128     @Mock
129     private EffectiveModelContextFactory mockSchemaContextFactory;
130     @Mock
131     private CredentialProvider credentialProvider;
132     @Mock
133     private SslHandlerFactoryProvider sslHandlerFactoryProvider;
134
135     private ActorSystem slaveSystem;
136     private ActorSystem masterSystem;
137     private TestActorRef<TestMasterActor> testMasterActorRef;
138     private NetconfNodeManager netconfNodeManager;
139     private String masterAddress;
140
141     @Before
142     public void setup() {
143         final Timeout responseTimeout = Timeout.apply(1, TimeUnit.SECONDS);
144
145         slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
146         masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
147
148         masterAddress = Cluster.get(masterSystem).selfAddress().toString();
149
150         SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
151         masterSchemaRepository.registerSchemaSourceListener(
152                 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
153
154         final String yangTemplate = """
155             module ID {\
156               namespace "ID";\
157               prefix ID;\
158             }""";
159
160         SOURCE_IDENTIFIERS.stream().map(
161             sourceId -> masterSchemaRepository.registerSchemaSource(
162                 id -> Futures.immediateFuture(YangTextSchemaSource.delegateForCharSource(id,
163                         CharSource.wrap(yangTemplate.replaceAll("ID", id.name().getLocalName())))),
164                 PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, 1)))
165         .collect(Collectors.toList());
166
167         NetconfTopologySetup masterSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
168                 .setActorSystem(masterSystem)
169                 .setDataBroker(mockDataBroker)
170                 .setSchemaResourceDTO(new NetconfDevice.SchemaResourcesDTO(
171                     masterSchemaRepository, masterSchemaRepository, mockSchemaContextFactory, mockSchemasResolver))
172                 .setBaseSchemas(BASE_SCHEMAS)
173                 .setCredentialProvider(credentialProvider)
174                 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
175                 .build();
176
177         testMasterActorRef = TestActorRef.create(masterSystem, Props.create(TestMasterActor.class, masterSetup,
178                 DEVICE_ID, responseTimeout, mockMountPointService).withDispatcher(Dispatchers.DefaultDispatcherId()),
179                 NetconfTopologyUtils.createMasterActorName(DEVICE_ID.name(), masterAddress));
180
181         SharedSchemaRepository slaveSchemaRepository = new SharedSchemaRepository("slave");
182         slaveSchemaRepository.registerSchemaSourceListener(
183                 TextToIRTransformer.create(slaveSchemaRepository, slaveSchemaRepository));
184
185         NetconfTopologySetup slaveSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
186                 .setActorSystem(slaveSystem)
187                 .setDataBroker(mockDataBroker)
188                 .setSchemaResourceDTO(new NetconfDevice.SchemaResourcesDTO(
189                     slaveSchemaRepository, slaveSchemaRepository, mockSchemaContextFactory, mockSchemasResolver))
190                 .setBaseSchemas(BASE_SCHEMAS)
191                 .setCredentialProvider(credentialProvider)
192                 .setSslHandlerFactoryProvider(sslHandlerFactoryProvider)
193                 .build();
194
195         netconfNodeManager = new NetconfNodeManager(slaveSetup, DEVICE_ID, responseTimeout,
196                 mockMountPointService);
197
198         setupMountPointMocks();
199     }
200
201     @After
202     public void teardown() {
203         TestKit.shutdownActorSystem(slaveSystem, true);
204         TestKit.shutdownActorSystem(masterSystem, true);
205     }
206
207     @SuppressWarnings("unchecked")
208     @Test
209     public void testSlaveMountPointRegistration() throws InterruptedException, ExecutionException, TimeoutException {
210         initializeMaster();
211
212         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
213         doReturn(mockListenerReg).when(mockDataBroker).registerDataTreeChangeListener(any(), any());
214
215         final NodeId nodeId = new NodeId("device");
216         final NodeKey nodeKey = new NodeKey(nodeId);
217         final String topologyId = "topology-netconf";
218         final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
219                 nodeKey, topologyId);
220
221         netconfNodeManager.registerDataTreeChangeListener(topologyId, nodeKey);
222         verify(mockDataBroker).registerDataTreeChangeListener(any(), eq(netconfNodeManager));
223
224         // Invoke onDataTreeChanged with a NetconfNode WRITE to simulate the master writing the operational state to
225         // Connected. Expect the slave mount point created and registered.
226
227         final NetconfNode netconfNode = newNetconfNode();
228         final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(netconfNode).build();
229
230         DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
231         doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
232         doReturn(WRITE).when(mockDataObjModification).getModificationType();
233         doReturn(node).when(mockDataObjModification).getDataAfter();
234
235         netconfNodeManager.onDataTreeChanged(List.of(
236                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
237                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
238
239         verify(mockMountPointBuilder, timeout(5000)).register();
240         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
241         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
242         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
243         verify(mockMountPointService).createMountPoint(NetconfNodeUtils.defaultTopologyMountPath(DEVICE_ID));
244
245         // Notify that the NetconfNode operational state was deleted. Expect the slave mount point closed.
246
247         doReturn(DELETE).when(mockDataObjModification).getModificationType();
248
249         netconfNodeManager.onDataTreeChanged(List.of(
250                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
251                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
252
253         verify(mockMountPointReg, timeout(5000)).close();
254
255         // Notify with a NetconfNode operational state WRITE. Expect the slave mount point re-created.
256
257         setupMountPointMocks();
258
259         doReturn(WRITE).when(mockDataObjModification).getModificationType();
260         doReturn(null).when(mockDataObjModification).getDataBefore();
261         doReturn(node).when(mockDataObjModification).getDataAfter();
262
263         netconfNodeManager.onDataTreeChanged(List.of(
264                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
265                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
266
267         verify(mockMountPointBuilder, timeout(5000)).register();
268
269         // Notify again with a NetconfNode operational state WRITE. Expect the prior slave mount point closed and
270         // and a new one registered.
271
272         setupMountPointMocks();
273
274         doReturn(node).when(mockDataObjModification).getDataBefore();
275
276         netconfNodeManager.onDataTreeChanged(List.of(
277                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
278                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
279
280         verify(mockMountPointReg, timeout(5000)).close();
281         verify(mockMountPointBuilder, timeout(5000)).register();
282
283         // Notify that the NetconfNode operational state was changed to UnableToConnect. Expect the slave mount point
284         // closed.
285
286         reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
287         doNothing().when(mockMountPointReg).close();
288
289         final Node updatedNode = new NodeBuilder().setNodeId(nodeId)
290                 .addAugmentation(new NetconfNodeBuilder(netconfNode)
291                     .setConnectionStatus(ConnectionStatus.UnableToConnect)
292                     .build())
293                 .build();
294
295         doReturn(SUBTREE_MODIFIED).when(mockDataObjModification).getModificationType();
296         doReturn(node).when(mockDataObjModification).getDataBefore();
297         doReturn(updatedNode).when(mockDataObjModification).getDataAfter();
298
299         netconfNodeManager.onDataTreeChanged(List.of(
300                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
301                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
302
303         verify(mockMountPointReg, timeout(5000)).close();
304
305         netconfNodeManager.close();
306         verifyNoMoreInteractions(mockMountPointReg);
307     }
308
309     @SuppressWarnings("unchecked")
310     @Test
311     public void testSlaveMountPointRegistrationFailuresAndRetries()
312             throws InterruptedException, ExecutionException, TimeoutException {
313         final NodeId nodeId = new NodeId("device");
314         final NodeKey nodeKey = new NodeKey(nodeId);
315         final String topologyId = "topology-netconf";
316         final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
317                 nodeKey, topologyId);
318
319         final NetconfNode netconfNode = newNetconfNode();
320         final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(netconfNode).build();
321
322         DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
323         doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
324         doReturn(WRITE).when(mockDataObjModification).getModificationType();
325         doReturn(node).when(mockDataObjModification).getDataAfter();
326
327         // First try the registration where the perceived master hasn't been initialized as the master.
328
329         netconfNodeManager.onDataTreeChanged(List.of(
330                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
331                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
332
333         verify(mockMountPointBuilder, after(1000).never()).register();
334
335         // Initialize the master but drop the initial YangTextSchemaSourceRequest message sent to the master so
336         // it retries.
337
338         initializeMaster();
339
340         CompletableFuture<AskForMasterMountPoint> yangTextSchemaSourceRequestFuture = new CompletableFuture<>();
341         testMasterActorRef.underlyingActor().messagesToDrop.put(YangTextSchemaSourceRequest.class,
342                 yangTextSchemaSourceRequestFuture);
343
344         netconfNodeManager.onDataTreeChanged(List.of(
345                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
346                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
347
348         yangTextSchemaSourceRequestFuture.get(5, TimeUnit.SECONDS);
349         verify(mockMountPointBuilder, timeout(5000)).register();
350
351         // Initiate another registration but drop the initial AskForMasterMountPoint message sent to the master so
352         // it retries.
353
354         setupMountPointMocks();
355
356         CompletableFuture<AskForMasterMountPoint> askForMasterMountPointFuture = new CompletableFuture<>();
357         testMasterActorRef.underlyingActor().messagesToDrop.put(AskForMasterMountPoint.class,
358                 askForMasterMountPointFuture);
359
360         netconfNodeManager.onDataTreeChanged(List.of(
361                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
362                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
363
364         askForMasterMountPointFuture.get(5, TimeUnit.SECONDS);
365         verify(mockMountPointReg, timeout(5000)).close();
366         verify(mockMountPointBuilder, timeout(5000)).register();
367
368         reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
369         doNothing().when(mockMountPointReg).close();
370         netconfNodeManager.close();
371         verify(mockMountPointReg, timeout(5000)).close();
372     }
373
374     private NetconfNode newNetconfNode() {
375         return new NetconfNodeBuilder()
376                 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
377                 .setPort(new PortNumber(Uint16.valueOf(9999)))
378                 .setConnectionStatus(ConnectionStatus.Connected)
379                 .setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder()
380                         .setNetconfMasterNode(masterAddress).build())
381                 .build();
382     }
383
384     private void setupMountPointMocks() {
385         reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
386         doNothing().when(mockMountPointReg).close();
387         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
388         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
389     }
390
391     private void initializeMaster() {
392         TestKit kit = new TestKit(masterSystem);
393         testMasterActorRef.tell(new CreateInitialMasterActorData(mockDeviceDataBroker, netconfService,
394             SOURCE_IDENTIFIERS, new RemoteDeviceServices(mockRpcService, mockActionService)), kit.getRef());
395
396         kit.expectMsgClass(MasterActorDataInitialized.class);
397     }
398
399     private static class TestMasterActor extends NetconfNodeActor {
400         final Map<Class<?>, CompletableFuture<? extends Object>> messagesToDrop = new ConcurrentHashMap<>();
401
402         TestMasterActor(final NetconfTopologySetup setup, final RemoteDeviceId deviceId,
403                 final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
404             super(setup, deviceId, actorResponseWaitTime, mountPointService);
405         }
406
407         @SuppressWarnings({ "rawtypes", "unchecked" })
408         @Override
409         public void handleReceive(final Object message) {
410             CompletableFuture dropFuture = messagesToDrop.remove(message.getClass());
411             if (dropFuture != null) {
412                 dropFuture.complete(message);
413             } else {
414                 super.handleReceive(message);
415             }
416         }
417     }
418 }