48eba6c85fb87a0b4b7ba9779016b38bffd5735e
[netconf.git] / apps / netconf-topology-singleton / src / test / java / org / opendaylight / netconf / topology / singleton / impl / MountPointEndToEndTest.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl;
9
10 import static org.awaitility.Awaitility.await;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertTrue;
16 import static org.junit.Assert.fail;
17 import static org.mockito.ArgumentMatchers.any;
18 import static org.mockito.Mockito.doAnswer;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21 import static org.mockito.Mockito.spy;
22 import static org.mockito.Mockito.timeout;
23 import static org.mockito.Mockito.verify;
24
25 import akka.actor.ActorSystem;
26 import akka.testkit.javadsl.TestKit;
27 import akka.util.Timeout;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.ImmutableMap;
30 import com.google.common.util.concurrent.FluentFuture;
31 import com.google.common.util.concurrent.FutureCallback;
32 import com.google.common.util.concurrent.Futures;
33 import com.google.common.util.concurrent.ListenableFuture;
34 import com.google.common.util.concurrent.SettableFuture;
35 import com.typesafe.config.ConfigFactory;
36 import java.io.File;
37 import java.util.Iterator;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Map.Entry;
41 import java.util.Optional;
42 import java.util.Set;
43 import java.util.concurrent.ExecutionException;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import org.apache.commons.io.FileUtils;
47 import org.eclipse.jdt.annotation.NonNull;
48 import org.junit.After;
49 import org.junit.Before;
50 import org.junit.Test;
51 import org.junit.runner.RunWith;
52 import org.mockito.Mock;
53 import org.mockito.junit.MockitoJUnitRunner;
54 import org.opendaylight.aaa.encrypt.AAAEncryptionService;
55 import org.opendaylight.mdsal.binding.api.DataBroker;
56 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
57 import org.opendaylight.mdsal.binding.api.ReadTransaction;
58 import org.opendaylight.mdsal.binding.api.RpcProviderService;
59 import org.opendaylight.mdsal.binding.api.TransactionChain;
60 import org.opendaylight.mdsal.binding.api.WriteTransaction;
61 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest;
62 import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
63 import org.opendaylight.mdsal.binding.runtime.spi.BindingRuntimeHelpers;
64 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
65 import org.opendaylight.mdsal.dom.api.DOMActionService;
66 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
67 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations;
68 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
69 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
70 import org.opendaylight.mdsal.dom.api.DOMMountPoint;
71 import org.opendaylight.mdsal.dom.api.DOMMountPointListener;
72 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
73 import org.opendaylight.mdsal.dom.api.DOMRpcIdentifier;
74 import org.opendaylight.mdsal.dom.api.DOMRpcImplementation;
75 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
76 import org.opendaylight.mdsal.dom.api.DOMRpcService;
77 import org.opendaylight.mdsal.dom.api.DOMService;
78 import org.opendaylight.mdsal.dom.broker.DOMMountPointServiceImpl;
79 import org.opendaylight.mdsal.dom.broker.DOMRpcRouter;
80 import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult;
81 import org.opendaylight.mdsal.dom.spi.FixedDOMSchemaService;
82 import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService;
83 import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider;
84 import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier;
85 import org.opendaylight.mdsal.singleton.impl.EOSClusterSingletonServiceProvider;
86 import org.opendaylight.netconf.api.CapabilityURN;
87 import org.opendaylight.netconf.client.NetconfClientFactory;
88 import org.opendaylight.netconf.client.mdsal.NetconfDeviceCapabilities;
89 import org.opendaylight.netconf.client.mdsal.NetconfDeviceSchema;
90 import org.opendaylight.netconf.client.mdsal.api.CredentialProvider;
91 import org.opendaylight.netconf.client.mdsal.api.DeviceActionFactory;
92 import org.opendaylight.netconf.client.mdsal.api.NetconfSessionPreferences;
93 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices;
94 import org.opendaylight.netconf.client.mdsal.api.RemoteDeviceServices.Rpcs;
95 import org.opendaylight.netconf.client.mdsal.api.SchemaResourceManager;
96 import org.opendaylight.netconf.client.mdsal.api.SslHandlerFactoryProvider;
97 import org.opendaylight.netconf.client.mdsal.impl.DefaultSchemaResourceManager;
98 import org.opendaylight.netconf.common.NetconfTimer;
99 import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
100 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
101 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
102 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactory;
103 import org.opendaylight.netconf.topology.spi.NetconfClientConfigurationBuilderFactoryImpl;
104 import org.opendaylight.netconf.topology.spi.NetconfNodeUtils;
105 import org.opendaylight.netconf.topology.spi.NetconfTopologySchemaAssembler;
106 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Host;
107 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.IpAddress;
108 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Ipv4Address;
109 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.PortNumber;
110 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.ConnectionOper.ConnectionStatus;
111 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.credentials.credentials.LoginPwUnencryptedBuilder;
112 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev240120.credentials.credentials.login.pw.unencrypted.LoginPasswordUnencryptedBuilder;
113 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.keystore.rev171017.Keystore;
114 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNode;
115 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev231121.NetconfNodeBuilder;
116 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.GetTopInput;
117 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.GetTopOutputBuilder;
118 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.PutTopInputBuilder;
119 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.Top;
120 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelList;
121 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelListBuilder;
122 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.two.level.list.TopLevelListKey;
123 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
124 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
125 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyBuilder;
126 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
127 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeBuilder;
128 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
129 import org.opendaylight.yangtools.concepts.Registration;
130 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
131 import org.opendaylight.yangtools.yang.binding.DataObject;
132 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
133 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
134 import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
135 import org.opendaylight.yangtools.yang.common.Empty;
136 import org.opendaylight.yangtools.yang.common.ErrorTag;
137 import org.opendaylight.yangtools.yang.common.ErrorType;
138 import org.opendaylight.yangtools.yang.common.QName;
139 import org.opendaylight.yangtools.yang.common.Revision;
140 import org.opendaylight.yangtools.yang.common.RpcError;
141 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
142 import org.opendaylight.yangtools.yang.common.Uint16;
143 import org.opendaylight.yangtools.yang.common.Uint32;
144 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
145 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
146 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
147 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
148 import org.opendaylight.yangtools.yang.data.api.schema.MountPointContext;
149 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
150 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
151 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
152 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
153 import org.opendaylight.yangtools.yang.model.api.Module;
154 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
155 import org.opendaylight.yangtools.yang.model.api.source.SourceIdentifier;
156 import org.opendaylight.yangtools.yang.model.api.source.YangTextSource;
157 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
158 import org.opendaylight.yangtools.yang.model.spi.source.DelegatedYangTextSource;
159 import org.opendaylight.yangtools.yang.parser.impl.DefaultYangParserFactory;
160 import org.slf4j.Logger;
161 import org.slf4j.LoggerFactory;
162
163 /**
164  * Tests netconf mount points end-to-end.
165  *
166  * @author Thomas Pantelis
167  */
168 @RunWith(MockitoJUnitRunner.StrictStubs.class)
169 public class MountPointEndToEndTest extends AbstractBaseSchemasTest {
170     private static final Logger LOG = LoggerFactory.getLogger(MountPointEndToEndTest.class);
171
172     private static final String TOP_MODULE_NAME = "opendaylight-mdsal-list-test";
173     private static final String ACTOR_SYSTEM_NAME = "test";
174     private static final String TOPOLOGY_ID = NetconfNodeUtils.DEFAULT_TOPOLOGY_NAME;
175     private static final @NonNull KeyedInstanceIdentifier<Node, NodeKey> NODE_INSTANCE_ID =
176         NetconfTopologyUtils.createTopologyNodeListPath(new NodeKey(new NodeId("node-id")), TOPOLOGY_ID);
177
178     private static final String TEST_ROOT_DIRECTORY = "test-cache-root";
179     private static final String TEST_DEFAULT_SUBDIR = "test-schema";
180
181     @Mock
182     private RpcProviderService mockRpcProviderService;
183     @Mock
184     private Registration mockRpcReg;
185     @Mock
186     private NetconfClientFactory mockClientFactory;
187     @Mock
188     private AAAEncryptionService mockEncryptionService;
189     @Mock
190     private NetconfTimer mockTimer;
191     @Mock
192     private DeviceActionFactory deviceActionFactory;
193     @Mock
194     private CredentialProvider credentialProvider;
195     @Mock
196     private SslHandlerFactoryProvider sslHandlerFactoryProvider;
197     @Mock
198     private DOMMountPointListener masterMountPointListener;
199     private final DOMMountPointService masterMountPointService = new DOMMountPointServiceImpl();
200     private Rpcs.Normalized deviceRpcService;
201
202     private EOSClusterSingletonServiceProvider masterClusterSingletonServiceProvider;
203     private DataBroker masterDataBroker;
204     private DOMDataBroker deviceDOMDataBroker;
205     private ActorSystem masterSystem;
206     private NetconfTopologyManager masterNetconfTopologyManager;
207
208     private volatile SettableFuture<MasterSalFacade> masterSalFacadeFuture = SettableFuture.create();
209
210     @Mock
211     private ClusterSingletonServiceProvider mockSlaveClusterSingletonServiceProvider;
212     @Mock
213     private Registration mockSlaveClusterSingletonServiceReg;
214     @Mock
215     private DOMMountPointListener slaveMountPointListener;
216     private final DOMMountPointService slaveMountPointService = new DOMMountPointServiceImpl();
217     private DataBroker slaveDataBroker;
218     private ActorSystem slaveSystem;
219     private NetconfTopologyManager slaveNetconfTopologyManager;
220     private final SettableFuture<NetconfTopologyContext> slaveNetconfTopologyContextFuture = SettableFuture.create();
221     private TransactionChain slaveTxChain;
222
223     private NetconfClientConfigurationBuilderFactory builderFactory;
224     private EffectiveModelContext deviceSchemaContext;
225     private YangModuleInfo topModuleInfo;
226     private QName putTopRpcSchemaPath;
227     private QName getTopRpcSchemaPath;
228     private BindingNormalizedNodeSerializer bindingToNormalized;
229     private YangInstanceIdentifier yangNodeInstanceId;
230     private final TopDOMRpcImplementation topRpcImplementation = new TopDOMRpcImplementation();
231     private final ContainerNode getTopInput = ImmutableNodes.containerNode(GetTopInput.QNAME);
232
233     private SchemaResourceManager resourceManager;
234     private NetconfTopologySchemaAssembler schemaAssembler;
235
236     @Before
237     public void setUp() throws Exception {
238         deleteCacheDir();
239
240         schemaAssembler = new NetconfTopologySchemaAssembler(1, 1, 0, TimeUnit.SECONDS);
241
242         resourceManager = new DefaultSchemaResourceManager(new DefaultYangParserFactory(), TEST_ROOT_DIRECTORY,
243             TEST_DEFAULT_SUBDIR);
244
245         topModuleInfo = BindingRuntimeHelpers.getYangModuleInfo(Top.class);
246
247         deviceSchemaContext = BindingRuntimeHelpers.createEffectiveModel(Top.class);
248
249         final var router = new DOMRpcRouter(new FixedDOMSchemaService(deviceSchemaContext));
250
251         putTopRpcSchemaPath = findRpcDefinition("put-top").getQName();
252         getTopRpcSchemaPath = findRpcDefinition("get-top").getQName();
253
254         router.rpcProviderService().registerRpcImplementation(topRpcImplementation,
255                 DOMRpcIdentifier.create(putTopRpcSchemaPath), DOMRpcIdentifier.create(getTopRpcSchemaPath));
256
257         final var rpcService = router.rpcService();
258         deviceRpcService = () -> rpcService;
259
260         builderFactory = new NetconfClientConfigurationBuilderFactoryImpl(mockEncryptionService, credentialProvider,
261             sslHandlerFactoryProvider);
262
263         doReturn(mockRpcReg).when(mockRpcProviderService).registerRpcImplementations(any());
264
265         setupMaster();
266
267         setupSlave();
268
269         yangNodeInstanceId = bindingToNormalized.toYangInstanceIdentifier(NODE_INSTANCE_ID);
270         doReturn(mock(ListenableFuture.class)).when(mockClientFactory).createClient(any());
271
272         LOG.info("****** Setup complete");
273     }
274
275     private static void deleteCacheDir() {
276         FileUtils.deleteQuietly(new File(TEST_ROOT_DIRECTORY));
277     }
278
279     @After
280     public void tearDown() throws Exception {
281         deleteCacheDir();
282         TestKit.shutdownActorSystem(slaveSystem, true);
283         TestKit.shutdownActorSystem(masterSystem, true);
284         schemaAssembler.close();
285     }
286
287     private void setupMaster() throws Exception {
288         final var dataBrokerTest = newDataBrokerTest();
289         masterDataBroker = dataBrokerTest.getDataBroker();
290         deviceDOMDataBroker = dataBrokerTest.getDomBroker();
291         bindingToNormalized = dataBrokerTest.getDataBrokerTestCustomizer().getAdapterContext().currentSerializer();
292
293         masterSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Master"));
294
295         masterClusterSingletonServiceProvider = new EOSClusterSingletonServiceProvider(
296             new SimpleDOMEntityOwnershipService());
297
298         final var resources =  resourceManager.getSchemaResources(TEST_DEFAULT_SUBDIR, "test");
299         resources.getSchemaRegistry().registerSchemaSource(
300             id -> Futures.immediateFuture(new DelegatedYangTextSource(id, topModuleInfo.getYangTextCharSource())),
301             PotentialSchemaSource.create(new SourceIdentifier(TOP_MODULE_NAME,
302                     topModuleInfo.getName().getRevision().map(Revision::toString).orElse(null)),
303                 YangTextSource.class, 1));
304
305         masterNetconfTopologyManager = new NetconfTopologyManager(BASE_SCHEMAS, masterDataBroker,
306                 masterClusterSingletonServiceProvider, mockTimer, schemaAssembler, masterSystem,
307                 mockClientFactory, masterMountPointService, mockEncryptionService, mockRpcProviderService,
308                 deviceActionFactory, resourceManager, builderFactory, TOPOLOGY_ID, Uint16.ZERO) {
309             @Override
310             protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
311                     final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
312                     final DeviceActionFactory deviceActionFact) {
313                 final var context = super.newNetconfTopologyContext(setup, serviceGroupIdent, actorResponseWaitTime,
314                     deviceActionFact);
315                 final var spiedContext = spy(context);
316                 final var spiedSingleton = spy(context.getTopologySingleton());
317                 doAnswer(invocation -> {
318                     final var spiedFacade = (MasterSalFacade) spy(invocation.callRealMethod());
319                     doReturn(deviceDOMDataBroker).when(spiedFacade)
320                         .newDeviceDataBroker(any(MountPointContext.class), any(NetconfSessionPreferences.class));
321                     masterSalFacadeFuture.set(spiedFacade);
322                     return spiedFacade;
323                 }).when(spiedSingleton).createSalFacade(any(boolean.class));
324                 doReturn(spiedSingleton).when(spiedContext).getTopologySingleton();
325                 return spiedContext;
326             }
327         };
328
329         verifyTopologyNodesCreated(masterDataBroker);
330     }
331
332     private void setupSlave() throws Exception {
333         AbstractConcurrentDataBrokerTest dataBrokerTest = newDataBrokerTest();
334         slaveDataBroker = dataBrokerTest.getDataBroker();
335
336         slaveSystem = ActorSystem.create(ACTOR_SYSTEM_NAME, ConfigFactory.load().getConfig("Slave"));
337
338         doReturn(mockSlaveClusterSingletonServiceReg).when(mockSlaveClusterSingletonServiceProvider)
339                 .registerClusterSingletonService(any());
340
341         slaveNetconfTopologyManager = new NetconfTopologyManager(BASE_SCHEMAS, slaveDataBroker,
342                 mockSlaveClusterSingletonServiceProvider, mockTimer, schemaAssembler, slaveSystem,
343                 mockClientFactory, slaveMountPointService, mockEncryptionService, mockRpcProviderService,
344                 deviceActionFactory, resourceManager, builderFactory, TOPOLOGY_ID, Uint16.ZERO) {
345             @Override
346             protected NetconfTopologyContext newNetconfTopologyContext(final NetconfTopologySetup setup,
347                 final ServiceGroupIdentifier serviceGroupIdent, final Timeout actorResponseWaitTime,
348                 final DeviceActionFactory actionFactory) {
349                 NetconfTopologyContext spiedContext = spy(super.newNetconfTopologyContext(setup, serviceGroupIdent,
350                     actorResponseWaitTime, actionFactory));
351
352                 slaveNetconfTopologyContextFuture.set(spiedContext);
353                 return spiedContext;
354             }
355         };
356
357         verifyTopologyNodesCreated(slaveDataBroker);
358
359         slaveTxChain = slaveDataBroker.createTransactionChain();
360         slaveTxChain.addCallback(new FutureCallback<Empty>() {
361             @Override
362             public void onSuccess(final Empty result) {
363                 // No-op
364             }
365
366             @Override
367             public void onFailure(final Throwable cause) {
368                 LOG.error("Slave transaction chain failed", cause);
369             }
370         });
371     }
372
373     @Test
374     public void test() throws Exception {
375         testMaster();
376
377         testSlave();
378
379         final MasterSalFacade masterSalFacade = testMasterNodeUpdated();
380
381         testMasterDisconnected(masterSalFacade);
382
383         testCleanup();
384     }
385
386     private MasterSalFacade testMaster() throws Exception {
387         LOG.info("****** Testing master");
388
389         writeNetconfNode(TEST_DEFAULT_SUBDIR, masterDataBroker);
390
391         final var masterSalFacade = masterSalFacadeFuture.get(5, TimeUnit.SECONDS);
392         masterSalFacade.onDeviceConnected(new NetconfDeviceSchema(NetconfDeviceCapabilities.empty(),
393             MountPointContext.of(deviceSchemaContext)),
394             NetconfSessionPreferences.fromStrings(List.of(CapabilityURN.CANDIDATE)),
395             new RemoteDeviceServices(deviceRpcService, null));
396
397         final var masterMountPoint = awaitMountPoint(masterMountPointService);
398
399         LOG.info("****** Testing master DOMDataBroker operations");
400
401         testDOMDataBrokerOperations(getDOMDataBroker(masterMountPoint));
402
403         LOG.info("****** Testing master DOMRpcService");
404
405         testDOMRpcService(getDOMRpcService(masterMountPoint));
406         return masterSalFacade;
407     }
408
409     private void testSlave() throws Exception {
410         LOG.info("****** Testing slave");
411
412         writeNetconfNode("slave", slaveDataBroker);
413
414         verify(mockSlaveClusterSingletonServiceProvider, timeout(5000)).registerClusterSingletonService(any());
415
416         // Since the master and slave use separate DataBrokers we need to copy the master's oper node to the slave.
417         // This is essentially what happens in a clustered environment but we'll use a DTCL here.
418
419         masterDataBroker.registerTreeChangeListener(
420             DataTreeIdentifier.of(LogicalDatastoreType.OPERATIONAL, NODE_INSTANCE_ID), changes -> {
421                 final WriteTransaction slaveTx = slaveTxChain.newWriteOnlyTransaction();
422                 for (var dataTreeModification : changes) {
423                     var rootNode = dataTreeModification.getRootNode();
424                     var path = dataTreeModification.getRootPath().path();
425                     switch (rootNode.modificationType()) {
426                         case WRITE:
427                         case SUBTREE_MODIFIED:
428                             slaveTx.merge(LogicalDatastoreType.OPERATIONAL, path, rootNode.dataAfter());
429                             break;
430                         case DELETE:
431                             slaveTx.delete(LogicalDatastoreType.OPERATIONAL, path);
432                             break;
433                         default:
434                             break;
435                     }
436                 }
437
438                 slaveTx.commit();
439             });
440
441         DOMMountPoint slaveMountPoint = awaitMountPoint(slaveMountPointService);
442
443         LOG.info("****** Testing slave DOMDataBroker operations");
444
445         testDOMDataBrokerOperations(getDOMDataBroker(slaveMountPoint));
446
447         LOG.info("****** Testing slave DOMRpcService");
448
449         testDOMRpcService(getDOMRpcService(slaveMountPoint));
450     }
451
452     private MasterSalFacade testMasterNodeUpdated() throws Exception {
453         LOG.info("****** Testing update master node");
454
455         masterMountPointService.registerProvisionListener(masterMountPointListener);
456         slaveMountPointService.registerProvisionListener(slaveMountPointListener);
457
458         masterSalFacadeFuture = SettableFuture.create();
459         writeNetconfNode(TEST_DEFAULT_SUBDIR, masterDataBroker);
460
461         verify(masterMountPointListener, timeout(5000)).onMountPointRemoved(yangNodeInstanceId);
462
463         final var masterSalFacade = masterSalFacadeFuture.get(5, TimeUnit.SECONDS);
464         masterSalFacade.onDeviceConnected(
465             new NetconfDeviceSchema(NetconfDeviceCapabilities.empty(), MountPointContext.of(deviceSchemaContext)),
466             NetconfSessionPreferences.fromStrings(List.of(CapabilityURN.CANDIDATE)),
467             new RemoteDeviceServices(deviceRpcService, null));
468
469         verify(masterMountPointListener, timeout(5000)).onMountPointCreated(yangNodeInstanceId);
470
471         verify(slaveMountPointListener, timeout(5000)).onMountPointRemoved(yangNodeInstanceId);
472         verify(slaveMountPointListener, timeout(5000)).onMountPointCreated(yangNodeInstanceId);
473
474         return masterSalFacade;
475     }
476
477     private void testMasterDisconnected(final MasterSalFacade masterSalFacade) throws Exception {
478         LOG.info("****** Testing master disconnected");
479
480         masterSalFacade.onDeviceDisconnected();
481
482         awaitMountPointNotPresent(masterMountPointService);
483
484         await().atMost(5, TimeUnit.SECONDS).until(() -> {
485             try (ReadTransaction readTx = masterDataBroker.newReadOnlyTransaction()) {
486                 Optional<Node> node = readTx.read(LogicalDatastoreType.OPERATIONAL,
487                         NODE_INSTANCE_ID).get(5, TimeUnit.SECONDS);
488                 assertTrue(node.isPresent());
489                 final NetconfNode netconfNode = node.orElseThrow().augmentation(NetconfNode.class);
490                 return netconfNode.getConnectionStatus() != ConnectionStatus.Connected;
491             }
492         });
493
494         awaitMountPointNotPresent(slaveMountPointService);
495     }
496
497     private void testCleanup() throws Exception {
498         LOG.info("****** Testing cleanup");
499
500         slaveNetconfTopologyManager.close();
501         verify(mockSlaveClusterSingletonServiceReg).close();
502     }
503
504     private void testDOMRpcService(final DOMRpcService domRpcService)
505             throws InterruptedException, ExecutionException, TimeoutException {
506         testPutTopRpc(domRpcService, new DefaultDOMRpcResult((ContainerNode)null));
507         testPutTopRpc(domRpcService, null);
508         testPutTopRpc(domRpcService, new DefaultDOMRpcResult(ImmutableList.of(
509                 RpcResultBuilder.newError(ErrorType.APPLICATION, new ErrorTag("tag1"), "error1"),
510                 RpcResultBuilder.newError(ErrorType.APPLICATION, new ErrorTag("tag2"), "error2"))));
511
512         testGetTopRpc(domRpcService, new DefaultDOMRpcResult(bindingToNormalized.toNormalizedNodeRpcData(
513                 new GetTopOutputBuilder().setTopLevelList(oneTopLevelList()).build())));
514
515         testFailedRpc(domRpcService, getTopRpcSchemaPath, getTopInput);
516     }
517
518     private void testPutTopRpc(final DOMRpcService domRpcService, final DOMRpcResult result)
519             throws InterruptedException, ExecutionException, TimeoutException {
520         ContainerNode putTopInput = bindingToNormalized.toNormalizedNodeRpcData(
521                 new PutTopInputBuilder().setTopLevelList(oneTopLevelList()).build());
522         testRpc(domRpcService, putTopRpcSchemaPath, putTopInput, result);
523     }
524
525     private static Map<TopLevelListKey, TopLevelList> oneTopLevelList() {
526         final TopLevelListKey key = new TopLevelListKey("one");
527         return ImmutableMap.of(key, new TopLevelListBuilder().withKey(key).build());
528     }
529
530     private void testGetTopRpc(final DOMRpcService domRpcService, final DOMRpcResult result)
531             throws InterruptedException, ExecutionException, TimeoutException {
532         testRpc(domRpcService, getTopRpcSchemaPath, getTopInput, result);
533     }
534
535     private void testRpc(final DOMRpcService domRpcService, final QName qname, final ContainerNode input,
536             final DOMRpcResult result) throws InterruptedException, ExecutionException, TimeoutException {
537         final FluentFuture<DOMRpcResult> future = result == null ? FluentFutures.immediateNullFluentFuture()
538                 : FluentFutures.immediateFluentFuture(result);
539         final DOMRpcResult actual = invokeRpc(domRpcService, qname, input, future);
540         if (result == null) {
541             assertNull(actual);
542             return;
543         }
544
545         assertNotNull(actual);
546         assertEquals(result.value(), actual.value());
547
548         assertEquals(result.errors().size(), actual.errors().size());
549         Iterator<? extends RpcError> iter1 = result.errors().iterator();
550         Iterator<? extends RpcError> iter2 = actual.errors().iterator();
551         while (iter1.hasNext() && iter2.hasNext()) {
552             RpcError err1 = iter1.next();
553             RpcError err2 = iter2.next();
554             assertEquals(err1.getErrorType(), err2.getErrorType());
555             assertEquals(err1.getTag(), err2.getTag());
556             assertEquals(err1.getMessage(), err2.getMessage());
557             assertEquals(err1.getSeverity(), err2.getSeverity());
558             assertEquals(err1.getApplicationTag(), err2.getApplicationTag());
559             assertEquals(err1.getInfo(), err2.getInfo());
560         }
561     }
562
563     private void testFailedRpc(final DOMRpcService domRpcService, final QName qname, final ContainerNode input)
564             throws InterruptedException, TimeoutException {
565         try {
566             invokeRpc(domRpcService, qname, input, Futures.immediateFailedFuture(new ClusteringRpcException("mock")));
567             fail("Expected exception");
568         } catch (ExecutionException e) {
569             assertTrue(e.getCause() instanceof ClusteringRpcException);
570             assertEquals("mock", e.getCause().getMessage());
571         }
572     }
573
574     private DOMRpcResult invokeRpc(final DOMRpcService domRpcService, final QName qname, final ContainerNode input,
575             final ListenableFuture<DOMRpcResult> returnFuture)
576                 throws InterruptedException, ExecutionException, TimeoutException {
577         topRpcImplementation.init(returnFuture);
578         final ListenableFuture<? extends DOMRpcResult> resultFuture = domRpcService.invokeRpc(qname, input);
579
580         topRpcImplementation.verify(DOMRpcIdentifier.create(qname), input);
581
582         return resultFuture.get(5, TimeUnit.SECONDS);
583     }
584
585     private static void testDOMDataBrokerOperations(final DOMDataBroker dataBroker)
586             throws InterruptedException, ExecutionException, TimeoutException {
587
588         DOMDataTreeWriteTransaction writeTx = dataBroker.newWriteOnlyTransaction();
589
590         final ContainerNode topNode = Builders.containerBuilder()
591                 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(Top.QNAME)).build();
592         final YangInstanceIdentifier topPath = YangInstanceIdentifier.of(Top.QNAME);
593         writeTx.put(LogicalDatastoreType.CONFIGURATION, topPath, topNode);
594
595         final QName name = QName.create(TopLevelList.QNAME, "name");
596         final YangInstanceIdentifier listPath = YangInstanceIdentifier.builder(topPath)
597                 .node(TopLevelList.QNAME).build();
598         final MapEntryNode listEntryNode = ImmutableNodes.mapEntry(TopLevelList.QNAME, name, "one");
599         final MapNode listNode = ImmutableNodes.mapNodeBuilder(TopLevelList.QNAME).addChild(listEntryNode).build();
600         writeTx.merge(LogicalDatastoreType.CONFIGURATION, listPath, listNode);
601         writeTx.commit().get(5, TimeUnit.SECONDS);
602
603         verifyDataInStore(dataBroker.newReadWriteTransaction(), YangInstanceIdentifier.builder(listPath)
604                 .nodeWithKey(TopLevelList.QNAME, name, "one").build(), listEntryNode);
605
606         writeTx = dataBroker.newWriteOnlyTransaction();
607         writeTx.delete(LogicalDatastoreType.CONFIGURATION, topPath);
608         writeTx.commit().get(5, TimeUnit.SECONDS);
609
610         DOMDataTreeReadWriteTransaction readTx = dataBroker.newReadWriteTransaction();
611         assertFalse(readTx.exists(LogicalDatastoreType.CONFIGURATION, topPath).get(5, TimeUnit.SECONDS));
612         assertTrue(readTx.cancel());
613     }
614
615     private static void writeNetconfNode(final String cacheDir, final DataBroker dataBroker) throws Exception {
616         putData(dataBroker, NODE_INSTANCE_ID, new NodeBuilder()
617             .withKey(NODE_INSTANCE_ID.getKey())
618             .addAugmentation(new NetconfNodeBuilder()
619                 .setHost(new Host(new IpAddress(new Ipv4Address("127.0.0.1"))))
620                 .setPort(new PortNumber(Uint16.valueOf(1234)))
621                 .setActorResponseWaitTime(Uint16.valueOf(10))
622                 .setTcpOnly(Boolean.TRUE)
623                 .setSchemaless(Boolean.FALSE)
624                 .setKeepaliveDelay(Uint32.ZERO)
625                 .setConnectionTimeoutMillis(Uint32.valueOf(5000))
626                 .setDefaultRequestTimeoutMillis(Uint32.valueOf(5000))
627                 .setMaxConnectionAttempts(Uint32.ONE)
628                 .setCredentials(new LoginPwUnencryptedBuilder()
629                     .setLoginPasswordUnencrypted(new LoginPasswordUnencryptedBuilder()
630                         .setUsername("user")
631                         .setPassword("pass")
632                         .build())
633                     .build())
634                 .setSchemaCacheDirectory(cacheDir)
635                 .build())
636             .build());
637     }
638
639     private static <T extends DataObject> void putData(final DataBroker databroker, final InstanceIdentifier<T> path,
640             final T data) throws Exception {
641         final var writeTx = databroker.newWriteOnlyTransaction();
642         writeTx.put(LogicalDatastoreType.CONFIGURATION, path, data);
643         writeTx.commit().get(5, TimeUnit.SECONDS);
644     }
645
646     private static void verifyDataInStore(final DOMDataTreeReadOperations readTx, final YangInstanceIdentifier path,
647             final NormalizedNode expNode) throws InterruptedException, ExecutionException, TimeoutException {
648         assertEquals(Optional.of(expNode), readTx.read(LogicalDatastoreType.CONFIGURATION, path)
649             .get(5, TimeUnit.SECONDS));
650         assertTrue(readTx.exists(LogicalDatastoreType.CONFIGURATION, path).get(5, TimeUnit.SECONDS));
651     }
652
653     private static void verifyTopologyNodesCreated(final DataBroker dataBroker) {
654         await().atMost(5, TimeUnit.SECONDS).until(() -> {
655             try (ReadTransaction readTx = dataBroker.newReadOnlyTransaction()) {
656                 return readTx.exists(LogicalDatastoreType.OPERATIONAL,
657                     NetconfTopologyUtils.createTopologyListPath(TOPOLOGY_ID)).get(3, TimeUnit.SECONDS);
658             }
659         });
660     }
661
662     private AbstractConcurrentDataBrokerTest newDataBrokerTest() throws Exception {
663         final var dataBrokerTest = new AbstractConcurrentDataBrokerTest(true) {
664             @Override
665             protected Set<YangModuleInfo> getModuleInfos() {
666                 return Set.of(
667                     BindingRuntimeHelpers.getYangModuleInfo(NetconfNode.class),
668                     BindingRuntimeHelpers.getYangModuleInfo(NetworkTopology.class),
669                     BindingRuntimeHelpers.getYangModuleInfo(Keystore.class),
670                     topModuleInfo);
671             }
672         };
673
674         dataBrokerTest.setup();
675
676         final var path = NetconfTopologyUtils.createTopologyListPath(TOPOLOGY_ID);
677         putData(dataBrokerTest.getDataBroker(), path, new TopologyBuilder().withKey(path.getKey()).build());
678         return dataBrokerTest;
679     }
680
681     private void awaitMountPointNotPresent(final DOMMountPointService mountPointService) {
682         await().atMost(5, TimeUnit.SECONDS).until(
683             () -> mountPointService.getMountPoint(yangNodeInstanceId).isEmpty());
684     }
685
686     private static DOMDataBroker getDOMDataBroker(final DOMMountPoint mountPoint) {
687         return getMountPointService(mountPoint, DOMDataBroker.class);
688     }
689
690     private static DOMRpcService getDOMRpcService(final DOMMountPoint mountPoint) {
691         return getMountPointService(mountPoint, DOMRpcService.class);
692     }
693
694     private static DOMActionService getDomActionService(final DOMMountPoint mountPoint) {
695         return getMountPointService(mountPoint, DOMActionService.class);
696     }
697
698     private static <T extends DOMService<T, E>, E extends DOMService.Extension<T, E>> T getMountPointService(
699             final DOMMountPoint mountPoint, final Class<T> serviceClass) {
700         return mountPoint.getService(serviceClass).orElseThrow();
701     }
702
703     private DOMMountPoint awaitMountPoint(final DOMMountPointService mountPointService) {
704         await().atMost(5, TimeUnit.SECONDS).until(() ->
705                 mountPointService.getMountPoint(yangNodeInstanceId).isPresent());
706
707         return mountPointService.getMountPoint(yangNodeInstanceId).orElseThrow();
708     }
709
710     private RpcDefinition findRpcDefinition(final String rpc) {
711         Module topModule = deviceSchemaContext.findModule(TOP_MODULE_NAME, topModuleInfo.getName().getRevision())
712             .orElseThrow();
713         RpcDefinition rpcDefinition = null;
714         for (RpcDefinition def: topModule.getRpcs()) {
715             if (def.getQName().getLocalName().equals(rpc)) {
716                 rpcDefinition = def;
717                 break;
718             }
719         }
720
721         assertNotNull(rpc + " rpc not found in " + topModule.getRpcs(), rpcDefinition);
722         return rpcDefinition;
723     }
724
725     private static final class TopDOMRpcImplementation implements DOMRpcImplementation {
726         private volatile SettableFuture<Entry<DOMRpcIdentifier, NormalizedNode>> rpcInvokedFuture;
727         private volatile ListenableFuture<DOMRpcResult> returnFuture;
728
729         @Override
730         public ListenableFuture<DOMRpcResult> invokeRpc(final DOMRpcIdentifier rpc, final ContainerNode input) {
731             rpcInvokedFuture.set(Map.entry(rpc, input));
732             return returnFuture;
733         }
734
735         void init(final ListenableFuture<DOMRpcResult> retFuture) {
736             returnFuture = retFuture;
737             rpcInvokedFuture = SettableFuture.create();
738         }
739
740         void verify(final DOMRpcIdentifier expRpc, final NormalizedNode expInput)
741                 throws InterruptedException, ExecutionException, TimeoutException {
742             final Entry<DOMRpcIdentifier, NormalizedNode> actual = rpcInvokedFuture.get(5, TimeUnit.SECONDS);
743             assertEquals(expRpc, actual.getKey());
744             assertEquals(expInput, actual.getValue());
745         }
746     }
747 }