Move DEFAULT_TOPOLOGY_NAME
[netconf.git] / netconf / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / NetconfNodeManagerTest.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static java.nio.charset.StandardCharsets.UTF_8;
11 import static org.mockito.ArgumentMatchers.any;
12 import static org.mockito.ArgumentMatchers.eq;
13 import static org.mockito.Mockito.after;
14 import static org.mockito.Mockito.doNothing;
15 import static org.mockito.Mockito.doReturn;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.reset;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20 import static org.mockito.Mockito.verifyNoMoreInteractions;
21 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.DELETE;
22 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.SUBTREE_MODIFIED;
23 import static org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType.WRITE;
24
25 import akka.actor.ActorSystem;
26 import akka.actor.Props;
27 import akka.cluster.Cluster;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.TestActorRef;
30 import akka.testkit.javadsl.TestKit;
31 import akka.util.Timeout;
32 import com.google.common.collect.Iterables;
33 import com.google.common.io.ByteSource;
34 import com.google.common.util.concurrent.Futures;
35 import com.typesafe.config.ConfigFactory;
36 import java.net.InetSocketAddress;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.concurrent.CompletableFuture;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.ExecutionException;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.TimeoutException;
44 import java.util.stream.Collectors;
45 import org.junit.After;
46 import org.junit.Before;
47 import org.junit.Test;
48 import org.junit.runner.RunWith;
49 import org.mockito.Mock;
50 import org.mockito.junit.MockitoJUnitRunner;
51 import org.opendaylight.mdsal.binding.api.DataBroker;
52 import org.opendaylight.mdsal.binding.api.DataObjectModification;
53 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
54 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
55 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
56 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
57 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
58 import org.opendaylight.mdsal.dom.api.DOMNotificationService;
59 import org.opendaylight.mdsal.dom.api.DOMRpcService;
60 import org.opendaylight.netconf.dom.api.NetconfDataTreeService;
61 import org.opendaylight.netconf.sal.connect.api.NetconfDeviceSchemasResolver;
62 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
63 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices;
64 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Actions;
65 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceServices.Rpcs;
66 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
67 import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
68 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
69 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
70 import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
71 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
72 import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
73 import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
74 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
75 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
76 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
77 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
78 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
79 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev221225.ConnectionOper.ConnectionStatus;
80 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev221225.connection.oper.ClusteredConnectionStatusBuilder;
81 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNode;
82 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev221225.NetconfNodeBuilder;
83 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
84 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
85 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
86 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
87 import org.opendaylight.yangtools.concepts.ListenerRegistration;
88 import org.opendaylight.yangtools.concepts.ObjectRegistration;
89 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
90 import org.opendaylight.yangtools.yang.common.Uint16;
91 import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
92 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
93 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
94 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
95 import org.opendaylight.yangtools.yang.parser.repo.SharedSchemaRepository;
96 import org.opendaylight.yangtools.yang.parser.rfc7950.repo.TextToIRTransformer;
97
98 /**
99  * Unit tests for NetconfNodeManager.
100  *
101  * @author Thomas Pantelis
102  */
103 @RunWith(MockitoJUnitRunner.StrictStubs.class)
104 public class NetconfNodeManagerTest extends AbstractBaseSchemasTest {
105     private static final String ACTOR_SYSTEM_NAME = "test";
106     private static final RemoteDeviceId DEVICE_ID = new RemoteDeviceId("device", new InetSocketAddress(65535));
107     private static final List<SourceIdentifier> SOURCE_IDENTIFIERS = List.of(new SourceIdentifier("testID"));
108
109     @Mock
110     private DOMMountPointService mockMountPointService;
111
112     @Mock
113     private DOMMountPointService.DOMMountPointBuilder mockMountPointBuilder;
114
115     @Mock
116     private ObjectRegistration<DOMMountPoint> mockMountPointReg;
117
118     @Mock
119     private DataBroker mockDataBroker;
120
121     @Mock
122     private NetconfDataTreeService netconfService;
123
124     @Mock
125     private DOMDataBroker mockDeviceDataBroker;
126
127     @Mock
128     private Rpcs.Normalized mockRpcService;
129
130     @Mock
131     private Actions.Normalized mockActionService;
132
133     @Mock
134     private NetconfDeviceSchemasResolver mockSchemasResolver;
135
136     @Mock
137     private EffectiveModelContextFactory mockSchemaContextFactory;
138
139     private ActorSystem slaveSystem;
140     private ActorSystem masterSystem;
141     private TestActorRef<TestMasterActor> testMasterActorRef;
142     private NetconfNodeManager netconfNodeManager;
143     private String masterAddress;
144
145     @Before
146     public void setup() {
147         final Timeout responseTimeout = Timeout.apply(1, TimeUnit.SECONDS);
148
149         slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
150         masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
151
152         masterAddress = Cluster.get(masterSystem).selfAddress().toString();
153
154         SharedSchemaRepository masterSchemaRepository = new SharedSchemaRepository("master");
155         masterSchemaRepository.registerSchemaSourceListener(
156                 TextToIRTransformer.create(masterSchemaRepository, masterSchemaRepository));
157
158         final String yangTemplate = """
159             module ID {\
160               namespace "ID";\
161               prefix ID;\
162             }""";
163
164         SOURCE_IDENTIFIERS.stream().map(
165             sourceId -> masterSchemaRepository.registerSchemaSource(
166                 id -> Futures.immediateFuture(YangTextSchemaSource.delegateForByteSource(id,
167                         ByteSource.wrap(yangTemplate.replaceAll("ID", id.name().getLocalName()).getBytes(UTF_8)))),
168                 PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, 1)))
169         .collect(Collectors.toList());
170
171         NetconfTopologySetup masterSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
172                 .setActorSystem(masterSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO(
173                         new NetconfDevice.SchemaResourcesDTO(masterSchemaRepository, masterSchemaRepository,
174                                 mockSchemaContextFactory, mockSchemasResolver)).setBaseSchemas(BASE_SCHEMAS).build();
175
176         testMasterActorRef = TestActorRef.create(masterSystem, Props.create(TestMasterActor.class, masterSetup,
177                 DEVICE_ID, responseTimeout, mockMountPointService).withDispatcher(Dispatchers.DefaultDispatcherId()),
178                 NetconfTopologyUtils.createMasterActorName(DEVICE_ID.name(), masterAddress));
179
180         SharedSchemaRepository slaveSchemaRepository = new SharedSchemaRepository("slave");
181         slaveSchemaRepository.registerSchemaSourceListener(
182                 TextToIRTransformer.create(slaveSchemaRepository, slaveSchemaRepository));
183
184         NetconfTopologySetup slaveSetup = new NetconfTopologySetup.NetconfTopologySetupBuilder()
185                 .setActorSystem(slaveSystem).setDataBroker(mockDataBroker).setSchemaResourceDTO(
186                         new NetconfDevice.SchemaResourcesDTO(slaveSchemaRepository, slaveSchemaRepository,
187                                 mockSchemaContextFactory, mockSchemasResolver)).setBaseSchemas(BASE_SCHEMAS).build();
188
189         netconfNodeManager = new NetconfNodeManager(slaveSetup, DEVICE_ID, responseTimeout,
190                 mockMountPointService);
191
192         setupMountPointMocks();
193     }
194
195     @After
196     public void teardown() {
197         TestKit.shutdownActorSystem(slaveSystem, true);
198         TestKit.shutdownActorSystem(masterSystem, true);
199     }
200
201     @SuppressWarnings("unchecked")
202     @Test
203     public void testSlaveMountPointRegistration() throws InterruptedException, ExecutionException, TimeoutException {
204         initializeMaster();
205
206         ListenerRegistration<?> mockListenerReg = mock(ListenerRegistration.class);
207         doReturn(mockListenerReg).when(mockDataBroker).registerDataTreeChangeListener(any(), any());
208
209         final NodeId nodeId = new NodeId("device");
210         final NodeKey nodeKey = new NodeKey(nodeId);
211         final String topologyId = "topology-netconf";
212         final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
213                 nodeKey, topologyId);
214
215         netconfNodeManager.registerDataTreeChangeListener(topologyId, nodeKey);
216         verify(mockDataBroker).registerDataTreeChangeListener(any(), eq(netconfNodeManager));
217
218         // Invoke onDataTreeChanged with a NetconfNode WRITE to simulate the master writing the operational state to
219         // Connected. Expect the slave mount point created and registered.
220
221         final NetconfNode netconfNode = newNetconfNode();
222         final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(netconfNode).build();
223
224         DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
225         doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
226         doReturn(WRITE).when(mockDataObjModification).getModificationType();
227         doReturn(node).when(mockDataObjModification).getDataAfter();
228
229         netconfNodeManager.onDataTreeChanged(List.of(
230                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
231                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
232
233         verify(mockMountPointBuilder, timeout(5000)).register();
234         verify(mockMountPointBuilder).addService(eq(DOMDataBroker.class), any());
235         verify(mockMountPointBuilder).addService(eq(DOMRpcService.class), any());
236         verify(mockMountPointBuilder).addService(eq(DOMNotificationService.class), any());
237         verify(mockMountPointService).createMountPoint(NetconfNodeUtils.defaultTopologyMountPath(DEVICE_ID));
238
239         // Notify that the NetconfNode operational state was deleted. Expect the slave mount point closed.
240
241         doReturn(DELETE).when(mockDataObjModification).getModificationType();
242
243         netconfNodeManager.onDataTreeChanged(List.of(
244                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
245                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
246
247         verify(mockMountPointReg, timeout(5000)).close();
248
249         // Notify with a NetconfNode operational state WRITE. Expect the slave mount point re-created.
250
251         setupMountPointMocks();
252
253         doReturn(WRITE).when(mockDataObjModification).getModificationType();
254         doReturn(null).when(mockDataObjModification).getDataBefore();
255         doReturn(node).when(mockDataObjModification).getDataAfter();
256
257         netconfNodeManager.onDataTreeChanged(List.of(
258                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
259                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
260
261         verify(mockMountPointBuilder, timeout(5000)).register();
262
263         // Notify again with a NetconfNode operational state WRITE. Expect the prior slave mount point closed and
264         // and a new one registered.
265
266         setupMountPointMocks();
267
268         doReturn(node).when(mockDataObjModification).getDataBefore();
269
270         netconfNodeManager.onDataTreeChanged(List.of(
271                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
272                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
273
274         verify(mockMountPointReg, timeout(5000)).close();
275         verify(mockMountPointBuilder, timeout(5000)).register();
276
277         // Notify that the NetconfNode operational state was changed to UnableToConnect. Expect the slave mount point
278         // closed.
279
280         reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
281         doNothing().when(mockMountPointReg).close();
282
283         final Node updatedNode = new NodeBuilder().setNodeId(nodeId)
284                 .addAugmentation(new NetconfNodeBuilder(netconfNode)
285                     .setConnectionStatus(ConnectionStatus.UnableToConnect)
286                     .build())
287                 .build();
288
289         doReturn(SUBTREE_MODIFIED).when(mockDataObjModification).getModificationType();
290         doReturn(node).when(mockDataObjModification).getDataBefore();
291         doReturn(updatedNode).when(mockDataObjModification).getDataAfter();
292
293         netconfNodeManager.onDataTreeChanged(List.of(
294                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
295                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
296
297         verify(mockMountPointReg, timeout(5000)).close();
298
299         netconfNodeManager.close();
300         verifyNoMoreInteractions(mockMountPointReg);
301     }
302
303     @SuppressWarnings("unchecked")
304     @Test
305     public void testSlaveMountPointRegistrationFailuresAndRetries()
306             throws InterruptedException, ExecutionException, TimeoutException {
307         final NodeId nodeId = new NodeId("device");
308         final NodeKey nodeKey = new NodeKey(nodeId);
309         final String topologyId = "topology-netconf";
310         final InstanceIdentifier<Node> nodeListPath = NetconfTopologyUtils.createTopologyNodeListPath(
311                 nodeKey, topologyId);
312
313         final NetconfNode netconfNode = newNetconfNode();
314         final Node node = new NodeBuilder().setNodeId(nodeId).addAugmentation(netconfNode).build();
315
316         DataObjectModification<Node> mockDataObjModification = mock(DataObjectModification.class);
317         doReturn(Iterables.getLast(nodeListPath.getPathArguments())).when(mockDataObjModification).getIdentifier();
318         doReturn(WRITE).when(mockDataObjModification).getModificationType();
319         doReturn(node).when(mockDataObjModification).getDataAfter();
320
321         // First try the registration where the perceived master hasn't been initialized as the master.
322
323         netconfNodeManager.onDataTreeChanged(List.of(
324                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
325                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
326
327         verify(mockMountPointBuilder, after(1000).never()).register();
328
329         // Initialize the master but drop the initial YangTextSchemaSourceRequest message sent to the master so
330         // it retries.
331
332         initializeMaster();
333
334         CompletableFuture<AskForMasterMountPoint> yangTextSchemaSourceRequestFuture = new CompletableFuture<>();
335         testMasterActorRef.underlyingActor().messagesToDrop.put(YangTextSchemaSourceRequest.class,
336                 yangTextSchemaSourceRequestFuture);
337
338         netconfNodeManager.onDataTreeChanged(List.of(
339                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
340                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
341
342         yangTextSchemaSourceRequestFuture.get(5, TimeUnit.SECONDS);
343         verify(mockMountPointBuilder, timeout(5000)).register();
344
345         // Initiate another registration but drop the initial AskForMasterMountPoint message sent to the master so
346         // it retries.
347
348         setupMountPointMocks();
349
350         CompletableFuture<AskForMasterMountPoint> askForMasterMountPointFuture = new CompletableFuture<>();
351         testMasterActorRef.underlyingActor().messagesToDrop.put(AskForMasterMountPoint.class,
352                 askForMasterMountPointFuture);
353
354         netconfNodeManager.onDataTreeChanged(List.of(
355                 new NetconfTopologyManagerTest.CustomTreeModification(DataTreeIdentifier.create(
356                         LogicalDatastoreType.OPERATIONAL, nodeListPath), mockDataObjModification)));
357
358         askForMasterMountPointFuture.get(5, TimeUnit.SECONDS);
359         verify(mockMountPointReg, timeout(5000)).close();
360         verify(mockMountPointBuilder, timeout(5000)).register();
361
362         reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
363         doNothing().when(mockMountPointReg).close();
364         netconfNodeManager.close();
365         verify(mockMountPointReg, timeout(5000)).close();
366     }
367
368     private NetconfNode newNetconfNode() {
369         return new NetconfNodeBuilder()
370                 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
371                 .setPort(new PortNumber(Uint16.valueOf(9999)))
372                 .setConnectionStatus(ConnectionStatus.Connected)
373                 .setClusteredConnectionStatus(new ClusteredConnectionStatusBuilder()
374                         .setNetconfMasterNode(masterAddress).build())
375                 .build();
376     }
377
378     private void setupMountPointMocks() {
379         reset(mockMountPointService, mockMountPointBuilder, mockMountPointReg);
380         doNothing().when(mockMountPointReg).close();
381         doReturn(mockMountPointReg).when(mockMountPointBuilder).register();
382         doReturn(mockMountPointBuilder).when(mockMountPointService).createMountPoint(any());
383     }
384
385     private void initializeMaster() {
386         TestKit kit = new TestKit(masterSystem);
387         testMasterActorRef.tell(new CreateInitialMasterActorData(mockDeviceDataBroker, netconfService,
388             SOURCE_IDENTIFIERS, new RemoteDeviceServices(mockRpcService, mockActionService)), kit.getRef());
389
390         kit.expectMsgClass(MasterActorDataInitialized.class);
391     }
392
393     private static class TestMasterActor extends NetconfNodeActor {
394         final Map<Class<?>, CompletableFuture<? extends Object>> messagesToDrop = new ConcurrentHashMap<>();
395
396         TestMasterActor(final NetconfTopologySetup setup, final RemoteDeviceId deviceId,
397                 final Timeout actorResponseWaitTime, final DOMMountPointService mountPointService) {
398             super(setup, deviceId, actorResponseWaitTime, mountPointService);
399         }
400
401         @SuppressWarnings({ "rawtypes", "unchecked" })
402         @Override
403         public void handleReceive(final Object message) {
404             CompletableFuture dropFuture = messagesToDrop.remove(message.getClass());
405             if (dropFuture != null) {
406                 dropFuture.complete(message);
407             } else {
408                 super.handleReceive(message);
409             }
410         }
411     }
412 }