Remove netconf-config
[netconf.git] / apps / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / MountPointEndToEndTest.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.doAnswer;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24
25 import akka.actor.ActorSystem;
26 import akka.testkit.javadsl.TestKit;
27 import akka.util.Timeout;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.util.concurrent.FluentFuture;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.common.util.concurrent.SettableFuture;
34 import com.typesafe.config.ConfigFactory;
35 import io.netty.util.Timer;
36 import java.io.File;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Optional;
42 import java.util.Set;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import org.apache.commons.io.FileUtils;
47 import org.eclipse.jdt.annotation.NonNull;
48 import org.junit.After;
49 import org.junit.Before;
50 import org.junit.Test;
51 import org.junit.runner.RunWith;
52 import org.mockito.Mock;
53 import org.mockito.junit.MockitoJUnitRunner;
54 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
55 import org.opendaylight.mdsal.binding.api.DataBroker;
56 import org.opendaylight.mdsal.binding.api.DataObjectModification;
57 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
58 import org.opendaylight.mdsal.binding.api.DataTreeModification;
59 import org.opendaylight.mdsal.binding.api.ReadTransaction;
60 import org.opendaylight.mdsal.binding.api.RpcProviderService;
61 import org.opendaylight.mdsal.binding.api.Transaction;
62 import org.opendaylight.mdsal.binding.api.TransactionChain;
63 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
64 import org.opendaylight.mdsal.binding.api.WriteTransaction;
65 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
66 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
67 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
68 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
69 import org.opendaylight.mdsal.dom.api.DOMActionService;
70 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
74 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
75 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
76 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
77 import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener;
78 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
79 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
80 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
81 import org.opendaylight.mdsal.dom.api.DOMRpcService;
82 import org.opendaylight.mdsal.dom.api.DOMService;
83 import org.opendaylight.mdsal.dom.broker.DOMMountPointServiceImpl;
84 import org.opendaylight.mdsal.dom.broker.DOMRpcRouter;
85 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
86 import org.opendaylight.mdsal.dom.spi.FixedDOMSchemaService;
87 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
88 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
89 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
90 import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
91 import org.opendaylight.netconf.api.CapabilityURN;
92 import org.opendaylight.netconf.client.NetconfClientFactory;
93 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCapabilities;
94 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
95 import org.opendaylight.netconf.client.mdsal.api.CredentialProvider;
96 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
97 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
98 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
99 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
100 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
101 import org.opendaylight.netconf.client.mdsal.api.SslHandlerFactoryProvider;
102 import org.opendaylight.netconf.client.mdsal.impl.DefaultSchemaResourceManager;
103 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
104 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
105 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
106 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
107 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactoryImpl;
108 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
109 import org.opendaylight.netconf.topology.spi.NetconfTopologySchemaAssembler;
110 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
111 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
112 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
113 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.ConnectionOper.ConnectionStatus;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.credentials.credentials.LoginPwUnencryptedBuilder;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.credentials.credentials.login.pw.unencrypted.LoginPasswordUnencryptedBuilder;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.keystore.rev171017.Keystore;
118 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
119 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNodeBuilder;
120 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.GetTopInput;
121 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.GetTopOutputBuilder;
122 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.PutTopInputBuilder;
123 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
124 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
125 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelListBuilder;
126 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelListKey;
127 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
128 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
129 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
130 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
131 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
132 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
133 import org.opendaylight.yangtools.concepts.ListenerRegistration;
134 import org.opendaylight.yangtools.concepts.Registration;
135 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
136 import org.opendaylight.yangtools.yang.binding.DataObject;
137 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
138 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
139 import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
140 import org.opendaylight.yangtools.yang.common.ErrorTag;
141 import org.opendaylight.yangtools.yang.common.ErrorType;
142 import org.opendaylight.yangtools.yang.common.QName;
143 import org.opendaylight.yangtools.yang.common.Revision;
144 import org.opendaylight.yangtools.yang.common.RpcError;
145 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
146 import org.opendaylight.yangtools.yang.common.Uint16;
147 import org.opendaylight.yangtools.yang.common.Uint32;
148 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
149 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
150 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
151 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
152 import org.opendaylight.yangtools.yang.data.api.schema.MountPointContext;
153 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
154 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
155 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
156 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
157 import org.opendaylight.yangtools.yang.model.api.Module;
158 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
159 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
160 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
161 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
162 import org.opendaylight.yangtools.yang.parser.impl.DefaultYangParserFactory;
163 import org.slf4j.Logger;
164 import org.slf4j.LoggerFactory;
165
166 /**
167  * Tests netconf mount points end-to-end.
168  *
169  * @author Thomas Pantelis
170  */
171 @RunWith(MockitoJUnitRunner.StrictStubs.class)
172 public class MountPointEndToEndTest extends AbstractBaseSchemasTest {
173     private static final Logger LOG = LoggerFactory.getLogger(MountPointEndToEndTest.class);
174
175     private static final String TOP_MODULE_NAME = "opendaylight-mdsal-list-test";
176     private static final String ACTOR_SYSTEM_NAME = "test";
177     private static final String TOPOLOGY_ID = NetconfNodeUtils.DEFAULT_TOPOLOGY_NAME;
178     private static final @NonNull KeyedInstanceIdentifier<Node, NodeKey> NODE_INSTANCE_ID =
179         NetconfTopologyUtils.createTopologyNodeListPath(new NodeKey(new NodeId("node-id")), TOPOLOGY_ID);
180
181     private static final String TEST_ROOT_DIRECTORY = "test-cache-root";
182     private static final String TEST_DEFAULT_SUBDIR = "test-schema";
183
184     @Mock
185     private RpcProviderService mockRpcProviderService;
186     @Mock
187     private Registration mockRpcReg;
188     @Mock
189     private NetconfClientFactory mockClientFactory;
190     @Mock
191     private AAAEncryptionService mockEncryptionService;
192     @Mock
193     private Timer mockTimer;
194     @Mock
195     private DeviceActionFactory deviceActionFactory;
196     @Mock
197     private CredentialProvider credentialProvider;
198     @Mock
199     private SslHandlerFactoryProvider sslHandlerFactoryProvider;
200     @Mock
201     private DOMMountPointListener masterMountPointListener;
202     private final DOMMountPointService masterMountPointService = new DOMMountPointServiceImpl();
203     private Rpcs.Normalized deviceRpcService;
204
205     private DOMClusterSingletonServiceProviderImpl masterClusterSingletonServiceProvider;
206     private DataBroker masterDataBroker;
207     private DOMDataBroker deviceDOMDataBroker;
208     private ActorSystem masterSystem;
209     private NetconfTopologyManager masterNetconfTopologyManager;
210
211     private volatile SettableFuture<MasterSalFacade> masterSalFacadeFuture = SettableFuture.create();
212
213     @Mock
214     private ClusterSingletonServiceProvider mockSlaveClusterSingletonServiceProvider;
215     @Mock
216     private ClusterSingletonServiceRegistration mockSlaveClusterSingletonServiceReg;
217     @Mock
218     private DOMMountPointListener slaveMountPointListener;
219     private final DOMMountPointService slaveMountPointService = new DOMMountPointServiceImpl();
220     private DataBroker slaveDataBroker;
221     private ActorSystem slaveSystem;
222     private NetconfTopologyManager slaveNetconfTopologyManager;
223     private final SettableFuture<NetconfTopologyContext> slaveNetconfTopologyContextFuture = SettableFuture.create();
224     private TransactionChain slaveTxChain;
225
226     private NetconfClientConfigurationBuilderFactory builderFactory;
227     private EffectiveModelContext deviceSchemaContext;
228     private YangModuleInfo topModuleInfo;
229     private QName putTopRpcSchemaPath;
230     private QName getTopRpcSchemaPath;
231     private BindingNormalizedNodeSerializer bindingToNormalized;
232     private YangInstanceIdentifier yangNodeInstanceId;
233     private final TopDOMRpcImplementation topRpcImplementation = new TopDOMRpcImplementation();
234     private final ContainerNode getTopInput = ImmutableNodes.containerNode(GetTopInput.QNAME);
235
236     private SchemaResourceManager resourceManager;
237     private NetconfTopologySchemaAssembler schemaAssembler;
238
239     @Before
240     public void setUp() throws Exception {
241         deleteCacheDir();
242
243         schemaAssembler = new NetconfTopologySchemaAssembler(1, 1, 0, TimeUnit.SECONDS);
244
245         resourceManager = new DefaultSchemaResourceManager(new DefaultYangParserFactory(), TEST_ROOT_DIRECTORY,
246             TEST_DEFAULT_SUBDIR);
247
248         topModuleInfo = BindingRuntimeHelpers.getYangModuleInfo(Top.class);
249
250         deviceSchemaContext = BindingRuntimeHelpers.createEffectiveModel(Top.class);
251
252         final var router = new DOMRpcRouter(FixedDOMSchemaService.of(deviceSchemaContext));
253
254         putTopRpcSchemaPath = findRpcDefinition("put-top").getQName();
255         getTopRpcSchemaPath = findRpcDefinition("get-top").getQName();
256
257         router.getRpcProviderService().registerRpcImplementation(topRpcImplementation,
258                 DOMRpcIdentifier.create(putTopRpcSchemaPath), DOMRpcIdentifier.create(getTopRpcSchemaPath));
259
260         final var rpcService = router.getRpcService();
261         deviceRpcService = new Rpcs.Normalized() {
262             @Override
263             public ListenableFuture<? extends DOMRpcResult> invokeRpc(final QName type, final ContainerNode input) {
264                 return rpcService.invokeRpc(type, input);
265             }
266
267             @Override
268             public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
269                     final T listener) {
270                 return rpcService.registerRpcListener(listener);
271             }
272         };
273
274         builderFactory = new NetconfClientConfigurationBuilderFactoryImpl(mockEncryptionService, credentialProvider,
275             sslHandlerFactoryProvider);
276
277         doReturn(mockRpcReg).when(mockRpcProviderService).registerRpcImplementations(any());
278
279         setupMaster();
280
281         setupSlave();
282
283         yangNodeInstanceId = bindingToNormalized.toYangInstanceIdentifier(NODE_INSTANCE_ID);
284         doReturn(mock(ListenableFuture.class)).when(mockClientFactory).createClient(any());
285
286         LOG.info("****** Setup complete");
287     }
288
289     private static void deleteCacheDir() {
290         FileUtils.deleteQuietly(new File(TEST_ROOT_DIRECTORY));
291     }
292
293     @After
294     public void tearDown() throws Exception {
295         deleteCacheDir();
296         TestKit.shutdownActorSystem(slaveSystem, true);
297         TestKit.shutdownActorSystem(masterSystem, true);
298         schemaAssembler.close();
299     }
300
301     private void setupMaster() throws Exception {
302         final var dataBrokerTest = newDataBrokerTest();
303         masterDataBroker = dataBrokerTest.getDataBroker();
304         deviceDOMDataBroker = dataBrokerTest.getDomBroker();
305         bindingToNormalized = dataBrokerTest.getDataBrokerTestCustomizer().getAdapterContext().currentSerializer();
306
307         masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
308
309         masterClusterSingletonServiceProvider = new DOMClusterSingletonServiceProviderImpl();
310         masterClusterSingletonServiceProvider.initializeProvider();
311
312         final var resources =  resourceManager.getSchemaResources(TEST_DEFAULT_SUBDIR, "test");
313         resources.getSchemaRegistry().registerSchemaSource(
314             id -> Futures.immediateFuture(YangTextSchemaSource.delegateForCharSource(id,
315                     topModuleInfo.getYangTextCharSource())),
316             PotentialSchemaSource.create(new SourceIdentifier(TOP_MODULE_NAME,
317                     topModuleInfo.getName().getRevision().map(Revision::toString).orElse(null)),
318                 YangTextSchemaSource.class, 1));
319
320         masterNetconfTopologyManager = new NetconfTopologyManager(BASE_SCHEMAS, masterDataBroker,
321                 masterClusterSingletonServiceProvider, mockTimer, schemaAssembler, masterSystem,
322                 mockClientFactory, masterMountPointService, mockEncryptionService, mockRpcProviderService,
323                 deviceActionFactory, resourceManager, builderFactory, TOPOLOGY_ID, Uint16.ZERO) {
324             @Override
325             protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
326                     final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
327                     final DeviceActionFactory deviceActionFact) {
328                 final var context = super.newNetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime,
329                     deviceActionFact);
330                 final var spiedContext = spy(context);
331                 final var spiedSingleton = spy(context.getTopologySingleton());
332                 doAnswer(invocation -> {
333                     final var spiedFacade = (MasterSalFacade) spy(invocation.callRealMethod());
334                     doReturn(deviceDOMDataBroker).when(spiedFacade)
335                         .newDeviceDataBroker(any(MountPointContext.class), any(NetconfSessionPreferences.class));
336                     masterSalFacadeFuture.set(spiedFacade);
337                     return spiedFacade;
338                 }).when(spiedSingleton).createSalFacade(any(boolean.class));
339                 doReturn(spiedSingleton).when(spiedContext).getTopologySingleton();
340                 return spiedContext;
341             }
342         };
343
344         verifyTopologyNodesCreated(masterDataBroker);
345     }
346
347     private void setupSlave() throws Exception {
348         AbstractConcurrentDataBrokerTest dataBrokerTest = newDataBrokerTest();
349         slaveDataBroker = dataBrokerTest.getDataBroker();
350
351         slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
352
353         doReturn(mockSlaveClusterSingletonServiceReg).when(mockSlaveClusterSingletonServiceProvider)
354                 .registerClusterSingletonService(any());
355
356         slaveNetconfTopologyManager = new NetconfTopologyManager(BASE_SCHEMAS, slaveDataBroker,
357                 mockSlaveClusterSingletonServiceProvider, mockTimer, schemaAssembler, slaveSystem,
358                 mockClientFactory, slaveMountPointService, mockEncryptionService, mockRpcProviderService,
359                 deviceActionFactory, resourceManager, builderFactory, TOPOLOGY_ID, Uint16.ZERO) {
360             @Override
361             protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
362                 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
363                 final DeviceActionFactory actionFactory) {
364                 NetconfTopologyContext spiedContext = spy(super.newNetconfTopologyContext(setup, serviceGroupIdent,
365                     actorResponseWaitTime, actionFactory));
366
367                 slaveNetconfTopologyContextFuture.set(spiedContext);
368                 return spiedContext;
369             }
370         };
371
372         verifyTopologyNodesCreated(slaveDataBroker);
373
374         slaveTxChain = slaveDataBroker.createTransactionChain(new TransactionChainListener() {
375             @Override
376             public void onTransactionChainSuccessful(final TransactionChain chain) {
377                 // No-op
378             }
379
380             @Override
381             public void onTransactionChainFailed(final TransactionChain chain, final Transaction transaction,
382                     final Throwable cause) {
383                 LOG.error("Slave transaction chain failed", cause);
384             }
385         });
386     }
387
388     @Test
389     public void test() throws Exception {
390         testMaster();
391
392         testSlave();
393
394         final MasterSalFacade masterSalFacade = testMasterNodeUpdated();
395
396         testMasterDisconnected(masterSalFacade);
397
398         testCleanup();
399     }
400
401     private MasterSalFacade testMaster() throws Exception {
402         LOG.info("****** Testing master");
403
404         writeNetconfNode(TEST_DEFAULT_SUBDIR, masterDataBroker);
405
406         final var masterSalFacade = masterSalFacadeFuture.get(5, TimeUnit.SECONDS);
407         masterSalFacade.onDeviceConnected(new NetconfDeviceSchema(NetconfDeviceCapabilities.empty(),
408             MountPointContext.of(deviceSchemaContext)),
409             NetconfSessionPreferences.fromStrings(List.of(CapabilityURN.CANDIDATE)),
410             new RemoteDeviceServices(deviceRpcService, null));
411
412         final var masterMountPoint = awaitMountPoint(masterMountPointService);
413
414         LOG.info("****** Testing master DOMDataBroker operations");
415
416         testDOMDataBrokerOperations(getDOMDataBroker(masterMountPoint));
417
418         LOG.info("****** Testing master DOMRpcService");
419
420         testDOMRpcService(getDOMRpcService(masterMountPoint));
421         return masterSalFacade;
422     }
423
424     private void testSlave() throws Exception {
425         LOG.info("****** Testing slave");
426
427         writeNetconfNode("slave", slaveDataBroker);
428
429         verify(mockSlaveClusterSingletonServiceProvider, timeout(5000)).registerClusterSingletonService(any());
430
431         // Since the master and slave use separate DataBrokers we need to copy the master's oper node to the slave.
432         // This is essentially what happens in a clustered environment but we'll use a DTCL here.
433
434         masterDataBroker.registerDataTreeChangeListener(
435             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NODE_INSTANCE_ID), changes -> {
436                 final WriteTransaction slaveTx = slaveTxChain.newWriteOnlyTransaction();
437                 for (DataTreeModification<Node> dataTreeModification : changes) {
438                     DataObjectModification<Node> rootNode = dataTreeModification.getRootNode();
439                     InstanceIdentifier<Node> path = dataTreeModification.getRootPath().getRootIdentifier();
440                     switch (rootNode.getModificationType()) {
441                         case WRITE:
442                         case SUBTREE_MODIFIED:
443                             slaveTx.merge(LogicalDatastoreType.OPERATIONAL, path, rootNode.getDataAfter());
444                             break;
445                         case DELETE:
446                             slaveTx.delete(LogicalDatastoreType.OPERATIONAL, path);
447                             break;
448                         default:
449                             break;
450                     }
451                 }
452
453                 slaveTx.commit();
454             });
455
456         DOMMountPoint slaveMountPoint = awaitMountPoint(slaveMountPointService);
457
458         LOG.info("****** Testing slave DOMDataBroker operations");
459
460         testDOMDataBrokerOperations(getDOMDataBroker(slaveMountPoint));
461
462         LOG.info("****** Testing slave DOMRpcService");
463
464         testDOMRpcService(getDOMRpcService(slaveMountPoint));
465     }
466
467     private MasterSalFacade testMasterNodeUpdated() throws Exception {
468         LOG.info("****** Testing update master node");
469
470         masterMountPointService.registerProvisionListener(masterMountPointListener);
471         slaveMountPointService.registerProvisionListener(slaveMountPointListener);
472
473         masterSalFacadeFuture = SettableFuture.create();
474         writeNetconfNode(TEST_DEFAULT_SUBDIR, masterDataBroker);
475
476         verify(masterMountPointListener, timeout(5000)).onMountPointRemoved(yangNodeInstanceId);
477
478         final var masterSalFacade = masterSalFacadeFuture.get(5, TimeUnit.SECONDS);
479         masterSalFacade.onDeviceConnected(
480             new NetconfDeviceSchema(NetconfDeviceCapabilities.empty(), MountPointContext.of(deviceSchemaContext)),
481             NetconfSessionPreferences.fromStrings(List.of(CapabilityURN.CANDIDATE)),
482             new RemoteDeviceServices(deviceRpcService, null));
483
484         verify(masterMountPointListener, timeout(5000)).onMountPointCreated(yangNodeInstanceId);
485
486         verify(slaveMountPointListener, timeout(5000)).onMountPointRemoved(yangNodeInstanceId);
487         verify(slaveMountPointListener, timeout(5000)).onMountPointCreated(yangNodeInstanceId);
488
489         return masterSalFacade;
490     }
491
492     private void testMasterDisconnected(final MasterSalFacade masterSalFacade) throws Exception {
493         LOG.info("****** Testing master disconnected");
494
495         masterSalFacade.onDeviceDisconnected();
496
497         awaitMountPointNotPresent(masterMountPointService);
498
499         await().atMost(5, TimeUnit.SECONDS).until(() -> {
500             try (ReadTransaction readTx = masterDataBroker.newReadOnlyTransaction()) {
501                 Optional<Node> node = readTx.read(LogicalDatastoreType.OPERATIONAL,
502                         NODE_INSTANCE_ID).get(5, TimeUnit.SECONDS);
503                 assertTrue(node.isPresent());
504                 final NetconfNode netconfNode = node.orElseThrow().augmentation(NetconfNode.class);
505                 return netconfNode.getConnectionStatus() != ConnectionStatus.Connected;
506             }
507         });
508
509         awaitMountPointNotPresent(slaveMountPointService);
510     }
511
512     private void testCleanup() throws Exception {
513         LOG.info("****** Testing cleanup");
514
515         slaveNetconfTopologyManager.close();
516         verify(mockSlaveClusterSingletonServiceReg).close();
517     }
518
519     private void testDOMRpcService(final DOMRpcService domRpcService)
520             throws InterruptedException, ExecutionException, TimeoutException {
521         testPutTopRpc(domRpcService, new DefaultDOMRpcResult((ContainerNode)null));
522         testPutTopRpc(domRpcService, null);
523         testPutTopRpc(domRpcService, new DefaultDOMRpcResult(ImmutableList.of(
524                 RpcResultBuilder.newError(ErrorType.APPLICATION, new ErrorTag("tag1"), "error1"),
525                 RpcResultBuilder.newError(ErrorType.APPLICATION, new ErrorTag("tag2"), "error2"))));
526
527         testGetTopRpc(domRpcService, new DefaultDOMRpcResult(bindingToNormalized.toNormalizedNodeRpcData(
528                 new GetTopOutputBuilder().setTopLevelList(oneTopLevelList()).build())));
529
530         testFailedRpc(domRpcService, getTopRpcSchemaPath, getTopInput);
531     }
532
533     private void testPutTopRpc(final DOMRpcService domRpcService, final DOMRpcResult result)
534             throws InterruptedException, ExecutionException, TimeoutException {
535         ContainerNode putTopInput = bindingToNormalized.toNormalizedNodeRpcData(
536                 new PutTopInputBuilder().setTopLevelList(oneTopLevelList()).build());
537         testRpc(domRpcService, putTopRpcSchemaPath, putTopInput, result);
538     }
539
540     private static Map<TopLevelListKey, TopLevelList> oneTopLevelList() {
541         final TopLevelListKey key = new TopLevelListKey("one");
542         return ImmutableMap.of(key, new TopLevelListBuilder().withKey(key).build());
543     }
544
545     private void testGetTopRpc(final DOMRpcService domRpcService, final DOMRpcResult result)
546             throws InterruptedException, ExecutionException, TimeoutException {
547         testRpc(domRpcService, getTopRpcSchemaPath, getTopInput, result);
548     }
549
550     private void testRpc(final DOMRpcService domRpcService, final QName qname, final ContainerNode input,
551             final DOMRpcResult result) throws InterruptedException, ExecutionException, TimeoutException {
552         final FluentFuture<DOMRpcResult> future = result == null ? FluentFutures.immediateNullFluentFuture()
553                 : FluentFutures.immediateFluentFuture(result);
554         final DOMRpcResult actual = invokeRpc(domRpcService, qname, input, future);
555         if (result == null) {
556             assertNull(actual);
557             return;
558         }
559
560         assertNotNull(actual);
561         assertEquals(result.value(), actual.value());
562
563         assertEquals(result.errors().size(), actual.errors().size());
564         Iterator<? extends RpcError> iter1 = result.errors().iterator();
565         Iterator<? extends RpcError> iter2 = actual.errors().iterator();
566         while (iter1.hasNext() && iter2.hasNext()) {
567             RpcError err1 = iter1.next();
568             RpcError err2 = iter2.next();
569             assertEquals(err1.getErrorType(), err2.getErrorType());
570             assertEquals(err1.getTag(), err2.getTag());
571             assertEquals(err1.getMessage(), err2.getMessage());
572             assertEquals(err1.getSeverity(), err2.getSeverity());
573             assertEquals(err1.getApplicationTag(), err2.getApplicationTag());
574             assertEquals(err1.getInfo(), err2.getInfo());
575         }
576     }
577
578     private void testFailedRpc(final DOMRpcService domRpcService, final QName qname, final ContainerNode input)
579             throws InterruptedException, TimeoutException {
580         try {
581             invokeRpc(domRpcService, qname, input, Futures.immediateFailedFuture(new ClusteringRpcException("mock")));
582             fail("Expected exception");
583         } catch (ExecutionException e) {
584             assertTrue(e.getCause() instanceof ClusteringRpcException);
585             assertEquals("mock", e.getCause().getMessage());
586         }
587     }
588
589     private DOMRpcResult invokeRpc(final DOMRpcService domRpcService, final QName qname, final ContainerNode input,
590             final ListenableFuture<DOMRpcResult> returnFuture)
591                 throws InterruptedException, ExecutionException, TimeoutException {
592         topRpcImplementation.init(returnFuture);
593         final ListenableFuture<? extends DOMRpcResult> resultFuture = domRpcService.invokeRpc(qname, input);
594
595         topRpcImplementation.verify(DOMRpcIdentifier.create(qname), input);
596
597         return resultFuture.get(5, TimeUnit.SECONDS);
598     }
599
600     private static void testDOMDataBrokerOperations(final DOMDataBroker dataBroker)
601             throws InterruptedException, ExecutionException, TimeoutException {
602
603         DOMDataTreeWriteTransaction writeTx = dataBroker.newWriteOnlyTransaction();
604
605         final ContainerNode topNode = Builders.containerBuilder()
606                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(Top.QNAME)).build();
607         final YangInstanceIdentifier topPath = YangInstanceIdentifier.of(Top.QNAME);
608         writeTx.put(LogicalDatastoreType.CONFIGURATION, topPath, topNode);
609
610         final QName name = QName.create(TopLevelList.QNAME, "name");
611         final YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(topPath)
612                 .node(TopLevelList.QNAME).build();
613         final MapEntryNode listEntryNode = ImmutableNodes.mapEntry(TopLevelList.QNAME, name, "one");
614         final MapNode listNode = ImmutableNodes.mapNodeBuilder(TopLevelList.QNAME).addChild(listEntryNode).build();
615         writeTx.merge(LogicalDatastoreType.CONFIGURATION, listPath, listNode);
616         writeTx.commit().get(5, TimeUnit.SECONDS);
617
618         verifyDataInStore(dataBroker.newReadWriteTransaction(), YangInstanceIdentifier.builder(listPath)
619                 .nodeWithKey(TopLevelList.QNAME, name, "one").build(), listEntryNode);
620
621         writeTx = dataBroker.newWriteOnlyTransaction();
622         writeTx.delete(LogicalDatastoreType.CONFIGURATION, topPath);
623         writeTx.commit().get(5, TimeUnit.SECONDS);
624
625         DOMDataTreeReadWriteTransaction readTx = dataBroker.newReadWriteTransaction();
626         assertFalse(readTx.exists(LogicalDatastoreType.CONFIGURATION, topPath).get(5, TimeUnit.SECONDS));
627         assertTrue(readTx.cancel());
628     }
629
630     private static void writeNetconfNode(final String cacheDir, final DataBroker dataBroker) throws Exception {
631         putData(dataBroker, NODE_INSTANCE_ID, new NodeBuilder()
632             .withKey(NODE_INSTANCE_ID.getKey())
633             .addAugmentation(new NetconfNodeBuilder()
634                 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
635                 .setPort(new PortNumber(Uint16.valueOf(1234)))
636                 .setActorResponseWaitTime(Uint16.valueOf(10))
637                 .setTcpOnly(Boolean.TRUE)
638                 .setSchemaless(Boolean.FALSE)
639                 .setKeepaliveDelay(Uint32.ZERO)
640                 .setConnectionTimeoutMillis(Uint32.valueOf(5000))
641                 .setDefaultRequestTimeoutMillis(Uint32.valueOf(5000))
642                 .setMaxConnectionAttempts(Uint32.ONE)
643                 .setCredentials(new LoginPwUnencryptedBuilder()
644                     .setLoginPasswordUnencrypted(new LoginPasswordUnencryptedBuilder()
645                         .setUsername("user")
646                         .setPassword("pass")
647                         .build())
648                     .build())
649                 .setSchemaCacheDirectory(cacheDir)
650                 .build())
651             .build());
652     }
653
654     private static <T extends DataObject> void putData(final DataBroker databroker, final InstanceIdentifier<T> path,
655             final T data) throws Exception {
656         final var writeTx = databroker.newWriteOnlyTransaction();
657         writeTx.put(LogicalDatastoreType.CONFIGURATION, path, data);
658         writeTx.commit().get(5, TimeUnit.SECONDS);
659     }
660
661     private static void verifyDataInStore(final DOMDataTreeReadOperations readTx, final YangInstanceIdentifier path,
662             final NormalizedNode expNode) throws InterruptedException, ExecutionException, TimeoutException {
663         assertEquals(Optional.of(expNode), readTx.read(LogicalDatastoreType.CONFIGURATION, path)
664             .get(5, TimeUnit.SECONDS));
665         assertTrue(readTx.exists(LogicalDatastoreType.CONFIGURATION, path).get(5, TimeUnit.SECONDS));
666     }
667
668     private static void verifyTopologyNodesCreated(final DataBroker dataBroker) {
669         await().atMost(5, TimeUnit.SECONDS).until(() -> {
670             try (ReadTransaction readTx = dataBroker.newReadOnlyTransaction()) {
671                 return readTx.exists(LogicalDatastoreType.OPERATIONAL,
672                     NetconfTopologyUtils.createTopologyListPath(TOPOLOGY_ID)).get(3, TimeUnit.SECONDS);
673             }
674         });
675     }
676
677     private AbstractConcurrentDataBrokerTest newDataBrokerTest() throws Exception {
678         final var dataBrokerTest = new AbstractConcurrentDataBrokerTest(true) {
679             @Override
680             protected Set<YangModuleInfo> getModuleInfos() {
681                 return Set.of(
682                     BindingRuntimeHelpers.getYangModuleInfo(NetconfNode.class),
683                     BindingRuntimeHelpers.getYangModuleInfo(NetworkTopology.class),
684                     BindingRuntimeHelpers.getYangModuleInfo(Keystore.class),
685                     topModuleInfo);
686             }
687         };
688
689         dataBrokerTest.setup();
690
691         final var path = NetconfTopologyUtils.createTopologyListPath(TOPOLOGY_ID);
692         putData(dataBrokerTest.getDataBroker(), path, new TopologyBuilder().withKey(path.getKey()).build());
693         return dataBrokerTest;
694     }
695
696     private void awaitMountPointNotPresent(final DOMMountPointService mountPointService) {
697         await().atMost(5, TimeUnit.SECONDS).until(
698             () -> mountPointService.getMountPoint(yangNodeInstanceId).isEmpty());
699     }
700
701     private static DOMDataBroker getDOMDataBroker(final DOMMountPoint mountPoint) {
702         return getMountPointService(mountPoint, DOMDataBroker.class);
703     }
704
705     private static DOMRpcService getDOMRpcService(final DOMMountPoint mountPoint) {
706         return getMountPointService(mountPoint, DOMRpcService.class);
707     }
708
709     private static DOMActionService getDomActionService(final DOMMountPoint mountPoint) {
710         return getMountPointService(mountPoint, DOMActionService.class);
711     }
712
713     private static <T extends DOMService> T getMountPointService(final DOMMountPoint mountPoint,
714             final Class<T> serviceClass) {
715         return mountPoint.getService(serviceClass).orElseThrow();
716     }
717
718     private DOMMountPoint awaitMountPoint(final DOMMountPointService mountPointService) {
719         await().atMost(5, TimeUnit.SECONDS).until(() ->
720                 mountPointService.getMountPoint(yangNodeInstanceId).isPresent());
721
722         return mountPointService.getMountPoint(yangNodeInstanceId).orElseThrow();
723     }
724
725     private RpcDefinition findRpcDefinition(final String rpc) {
726         Module topModule = deviceSchemaContext.findModule(TOP_MODULE_NAME, topModuleInfo.getName().getRevision())
727             .orElseThrow();
728         RpcDefinition rpcDefinition = null;
729         for (RpcDefinition def: topModule.getRpcs()) {
730             if (def.getQName().getLocalName().equals(rpc)) {
731                 rpcDefinition = def;
732                 break;
733             }
734         }
735
736         assertNotNull(rpc + " rpc not found in " + topModule.getRpcs(), rpcDefinition);
737         return rpcDefinition;
738     }
739
740     private static final class TopDOMRpcImplementation implements DOMRpcImplementation {
741         private volatile SettableFuture<Entry<DOMRpcIdentifier, NormalizedNode>> rpcInvokedFuture;
742         private volatile ListenableFuture<DOMRpcResult> returnFuture;
743
744         @Override
745         public ListenableFuture<DOMRpcResult> invokeRpc(final DOMRpcIdentifier rpc, final ContainerNode input) {
746             rpcInvokedFuture.set(Map.entry(rpc, input));
747             return returnFuture;
748         }
749
750         void init(final ListenableFuture<DOMRpcResult> retFuture) {
751             returnFuture = retFuture;
752             rpcInvokedFuture = SettableFuture.create();
753         }
754
755         void verify(final DOMRpcIdentifier expRpc, final NormalizedNode expInput)
756                 throws InterruptedException, ExecutionException, TimeoutException {
757             final Entry<DOMRpcIdentifier, NormalizedNode> actual = rpcInvokedFuture.get(5, TimeUnit.SECONDS);
758             assertEquals(expRpc, actual.getKey());
759             assertEquals(expInput, actual.getValue());
760         }
761     }
762 }