rationalize functional tests sims starters
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_topology.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import signal
14 import time
15 import unittest
16 import requests
17 import psutil
18 import test_utils
19
20
21 class TransportPCETopologyTesting(unittest.TestCase):
22
23     sim_process1 = None
24     sim_process2 = None
25     sim_process3 = None
26     sim_process4 = None
27     odl_process = None
28     restconf_baseurl = "http://localhost:8181/restconf"
29
30 # START_IGNORE_XTESTING
31
32     @classmethod
33     def setUpClass(cls):
34         cls.sim_process1 = test_utils.start_sim('xpdra')
35         time.sleep(20)
36
37         cls.sim_process2 = test_utils.start_sim('roadma')
38         time.sleep(20)
39
40         cls.sim_process3 = test_utils.start_sim('roadmb')
41         time.sleep(20)
42
43         cls.sim_process4 = test_utils.start_sim('roadmc')
44         time.sleep(20)
45         print("all sims started")
46
47         cls.odl_process = test_utils.start_tpce()
48         time.sleep(60)
49         print("opendaylight started")
50
51     @classmethod
52     def tearDownClass(cls):
53         for child in psutil.Process(cls.odl_process.pid).children():
54             child.send_signal(signal.SIGINT)
55             child.wait()
56         cls.odl_process.send_signal(signal.SIGINT)
57         cls.odl_process.wait()
58         for child in psutil.Process(cls.sim_process1.pid).children():
59             child.send_signal(signal.SIGINT)
60             child.wait()
61         cls.sim_process1.send_signal(signal.SIGINT)
62         cls.sim_process1.wait()
63         for child in psutil.Process(cls.sim_process2.pid).children():
64             child.send_signal(signal.SIGINT)
65             child.wait()
66         cls.sim_process2.send_signal(signal.SIGINT)
67         cls.sim_process2.wait()
68
69         for child in psutil.Process(cls.sim_process3.pid).children():
70             child.send_signal(signal.SIGINT)
71             child.wait()
72         cls.sim_process3.send_signal(signal.SIGINT)
73         cls.sim_process3.wait()
74
75         for child in psutil.Process(cls.sim_process4.pid).children():
76             child.send_signal(signal.SIGINT)
77             child.wait()
78         cls.sim_process4.send_signal(signal.SIGINT)
79         cls.sim_process4.wait()
80
81     def setUp(self):
82         time.sleep(5)
83
84 # END_IGNORE_XTESTING
85
86     def test_01_connect_ROADMA(self):
87         # Config ROADMA
88         url = ("{}/config/network-topology:"
89                "network-topology/topology/topology-netconf/node/ROADMA01"
90                .format(self.restconf_baseurl))
91         data = {"node": [{
92             "node-id": "ROADMA01",
93             "netconf-node-topology:username": "admin",
94             "netconf-node-topology:password": "admin",
95             "netconf-node-topology:host": "127.0.0.1",
96             "netconf-node-topology:port": test_utils.sims['roadma']['port'],
97             "netconf-node-topology:tcp-only": "false",
98             "netconf-node-topology:pass-through": {}}]}
99         headers = {'content-type': 'application/json'}
100         response = requests.request(
101             "PUT", url, data=json.dumps(data), headers=headers,
102             auth=('admin', 'admin'))
103         self.assertEqual(response.status_code, requests.codes.created)
104         time.sleep(20)
105
106     def test_02_getClliNetwork(self):
107         url = ("{}/config/ietf-network:networks/network/clli-network"
108                .format(self.restconf_baseurl))
109         headers = {'content-type': 'application/json'}
110         response = requests.request(
111             "GET", url, headers=headers, auth=('admin', 'admin'))
112         self.assertEqual(response.status_code, requests.codes.ok)
113         res = response.json()
114         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
115         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
116
117     def test_03_getOpenRoadmNetwork(self):
118         url = ("{}/config/ietf-network:networks/network/openroadm-network"
119                .format(self.restconf_baseurl))
120         headers = {'content-type': 'application/json'}
121         response = requests.request(
122             "GET", url, headers=headers, auth=('admin', 'admin'))
123         self.assertEqual(response.status_code, requests.codes.ok)
124         res = response.json()
125         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADMA01')
126         self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
127         self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
128         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
129         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], '2')
130
131     def test_04_getLinks_OpenroadmTopology(self):
132         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
133                .format(self.restconf_baseurl))
134         headers = {'content-type': 'application/json'}
135         response = requests.request(
136             "GET", url, headers=headers, auth=('admin', 'admin'))
137         self.assertEqual(response.status_code, requests.codes.ok)
138         res = response.json()
139         # Tests related to links
140         nbLink = len(res['network'][0]['ietf-network-topology:link'])
141         self.assertEqual(nbLink, 10)
142         expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
143                        'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
144         addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
145                    'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
146         dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
147                     'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
148         for i in range(0, nbLink):
149             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
150             if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK':
151                 find = linkId in expressLink
152                 self.assertEqual(find, True)
153                 expressLink.remove(linkId)
154             elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK':
155                 find = linkId in addLink
156                 self.assertEqual(find, True)
157                 addLink.remove(linkId)
158             elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK':
159                 find = linkId in dropLink
160                 self.assertEqual(find, True)
161                 dropLink.remove(linkId)
162             else:
163                 self.assertFalse(True)
164         self.assertEqual(len(expressLink), 0)
165         self.assertEqual(len(addLink), 0)
166         self.assertEqual(len(dropLink), 0)
167
168     def test_05_getNodes_OpenRoadmTopology(self):
169         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
170                .format(self.restconf_baseurl))
171         headers = {'content-type': 'application/json'}
172         response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
173         res = response.json()
174         # Tests related to nodes
175         self.assertEqual(response.status_code, requests.codes.ok)
176         nbNode = len(res['network'][0]['node'])
177         self.assertEqual(nbNode, 4)
178         listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
179         for i in range(0, nbNode):
180             self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
181                           res['network'][0]['node'][i]['supporting-node'])
182             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
183             nodeId = res['network'][0]['node'][i]['node-id']
184             if nodeId == 'ROADMA01-SRG1':
185                 # Test related to SRG1
186                 self.assertEqual(nodeType, 'SRG')
187                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
188                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
189                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
190                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
191                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
192                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
193                               res['network'][0]['node'][i]['supporting-node'])
194                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
195                               res['network'][0]['node'][i]['supporting-node'])
196                 listNode.remove(nodeId)
197             elif nodeId == 'ROADMA01-SRG3':
198                 # Test related to SRG1
199                 self.assertEqual(nodeType, 'SRG')
200                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
201                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
202                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
203                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
204                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
205                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
206                               res['network'][0]['node'][i]['supporting-node'])
207                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
208                               res['network'][0]['node'][i]['supporting-node'])
209                 listNode.remove(nodeId)
210             elif nodeId == 'ROADMA01-DEG1':
211                 # Test related to DEG1
212                 self.assertEqual(nodeType, 'DEGREE')
213                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
214                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
215                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
216                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
217                 listNode.remove(nodeId)
218                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
219                               res['network'][0]['node'][i]['supporting-node'])
220                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
221                               res['network'][0]['node'][i]['supporting-node'])
222             elif nodeId == 'ROADMA01-DEG2':
223                 # Test related to DEG2
224                 self.assertEqual(nodeType, 'DEGREE')
225                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
226                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
227                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
228                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
229                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
230                               res['network'][0]['node'][i]['supporting-node'])
231                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
232                               res['network'][0]['node'][i]['supporting-node'])
233                 listNode.remove(nodeId)
234             else:
235                 self.assertFalse(True)
236         self.assertEqual(len(listNode), 0)
237
238     def test_06_connect_XPDRA(self):
239         url = ("{}/config/network-topology:"
240                "network-topology/topology/topology-netconf/node/XPDRA01"
241                .format(self.restconf_baseurl))
242         data = {"node": [{
243             "node-id": "XPDRA01",
244             "netconf-node-topology:username": "admin",
245             "netconf-node-topology:password": "admin",
246             "netconf-node-topology:host": "127.0.0.1",
247             "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
248             "netconf-node-topology:tcp-only": "false",
249             "netconf-node-topology:pass-through": {}}]}
250         headers = {'content-type': 'application/json'}
251         response = requests.request(
252             "PUT", url, data=json.dumps(data), headers=headers,
253             auth=('admin', 'admin'))
254         self.assertEqual(response.status_code, requests.codes.created)
255         time.sleep(30)
256
257     def test_07_getClliNetwork(self):
258         url = ("{}/config/ietf-network:networks/network/clli-network"
259                .format(self.restconf_baseurl))
260         headers = {'content-type': 'application/json'}
261         response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
262         self.assertEqual(response.status_code, requests.codes.ok)
263         res = response.json()
264         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
265         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
266
267     def test_08_getOpenRoadmNetwork(self):
268         url = ("{}/config/ietf-network:networks/network/openroadm-network"
269                .format(self.restconf_baseurl))
270         headers = {'content-type': 'application/json'}
271         response = requests.request(
272             "GET", url, headers=headers, auth=('admin', 'admin'))
273         self.assertEqual(response.status_code, requests.codes.ok)
274         res = response.json()
275         nbNode = len(res['network'][0]['node'])
276         self.assertEqual(nbNode, 2)
277         for i in range(0, nbNode):
278             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
279             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
280             nodeId = res['network'][0]['node'][i]['node-id']
281             if(nodeId == 'XPDRA01'):
282                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
283                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
284             elif(nodeId == 'ROADMA01'):
285                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
286                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
287             else:
288                 self.assertFalse(True)
289
290     def test_09_getNodes_OpenRoadmTopology(self):
291         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
292                .format(self.restconf_baseurl))
293         headers = {'content-type': 'application/json'}
294         response = requests.request(
295             "GET", url, headers=headers, auth=('admin', 'admin'))
296         res = response.json()
297         # Tests related to nodes
298         self.assertEqual(response.status_code, requests.codes.ok)
299         nbNode = len(res['network'][0]['node'])
300         self.assertEqual(nbNode, 5)
301         listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
302         for i in range(0, nbNode):
303             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
304             nodeId = res['network'][0]['node'][i]['node-id']
305             # Tests related to XPDRA nodes
306             if(nodeId == 'XPDRA01-XPDR1'):
307                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
308                               res['network'][0]['node'][i]['supporting-node'])
309                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
310                               res['network'][0]['node'][i]['supporting-node'])
311                 self.assertEqual(nodeType, 'XPONDER')
312                 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
313                 client = 0
314                 network = 0
315                 for j in range(0, nbTps):
316                     tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
317                     tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
318                     if (tpType == 'XPONDER-CLIENT'):
319                         client += 1
320                     elif (tpType == 'XPONDER-NETWORK'):
321                         network += 1
322                     if (tpId == 'XPDR1-NETWORK2'):
323                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
324                                          [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT3')
325                     if (tpId == 'XPDR1-CLIENT3'):
326                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
327                                          [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
328                 self.assertTrue(client == 4)
329                 self.assertTrue(network == 2)
330                 listNode.remove(nodeId)
331             elif(nodeId == 'ROADMA01-SRG1'):
332                 # Test related to SRG1
333                 self.assertEqual(nodeType, 'SRG')
334                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
335                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
336                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
337                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
338                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
339                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
340                               res['network'][0]['node'][i]['supporting-node'])
341                 listNode.remove(nodeId)
342             elif(nodeId == 'ROADMA01-SRG3'):
343                 # Test related to SRG1
344                 self.assertEqual(nodeType, 'SRG')
345                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
346                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
347                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
348                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
349                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
350                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
351                               res['network'][0]['node'][i]['supporting-node'])
352                 listNode.remove(nodeId)
353             elif(nodeId == 'ROADMA01-DEG1'):
354                 # Test related to DEG1
355                 self.assertEqual(nodeType, 'DEGREE')
356                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
357                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
358                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
359                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
360                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
361                               res['network'][0]['node'][i]['supporting-node'])
362                 listNode.remove(nodeId)
363             elif(nodeId == 'ROADMA01-DEG2'):
364                 # Test related to DEG2
365                 self.assertEqual(nodeType, 'DEGREE')
366                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
367                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
368                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
369                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
370                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
371                               res['network'][0]['node'][i]['supporting-node'])
372                 listNode.remove(nodeId)
373             else:
374                 self.assertFalse(True)
375         self.assertEqual(len(listNode), 0)
376
377     # Connect the tail XPDRA to ROADMA and vice versa
378     def test_10_connect_tail_xpdr_rdm(self):
379         # Connect the tail: XPDRA to ROADMA
380         url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
381                .format(self.restconf_baseurl))
382         data = {"networkutils:input": {
383             "networkutils:links-input": {
384                 "networkutils:xpdr-node": "XPDRA01",
385                 "networkutils:xpdr-num": "1",
386                 "networkutils:network-num": "1",
387                 "networkutils:rdm-node": "ROADMA01",
388                 "networkutils:srg-num": "1",
389                 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
390             }
391         }
392         }
393         headers = {'content-type': 'application/json'}
394         response = requests.request(
395             "POST", url, data=json.dumps(data), headers=headers,
396             auth=('admin', 'admin'))
397         self.assertEqual(response.status_code, requests.codes.ok)
398
399     def test_11_connect_tail_rdm_xpdr(self):
400         # Connect the tail: ROADMA to XPDRA
401         url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
402                .format(self.restconf_baseurl))
403         data = {"networkutils:input": {
404             "networkutils:links-input": {
405                 "networkutils:xpdr-node": "XPDRA01",
406                 "networkutils:xpdr-num": "1",
407                 "networkutils:network-num": "1",
408                 "networkutils:rdm-node": "ROADMA01",
409                 "networkutils:srg-num": "1",
410                 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
411             }
412         }
413         }
414         headers = {'content-type': 'application/json'}
415         response = requests.request(
416             "POST", url, data=json.dumps(data), headers=headers,
417             auth=('admin', 'admin'))
418         self.assertEqual(response.status_code, requests.codes.ok)
419
420     def test_12_getLinks_OpenRoadmTopology(self):
421         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
422                .format(self.restconf_baseurl))
423         headers = {'content-type': 'application/json'}
424         response = requests.request(
425             "GET", url, headers=headers, auth=('admin', 'admin'))
426         self.assertEqual(response.status_code, requests.codes.ok)
427         res = response.json()
428         # Tests related to links
429         nbLink = len(res['network'][0]['ietf-network-topology:link'])
430         self.assertEqual(nbLink, 12)
431         expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
432                        'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
433         addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
434                    'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
435         dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
436                     'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
437         XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
438         XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
439         for i in range(0, nbLink):
440             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
441             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
442             if(nodeType == 'EXPRESS-LINK'):
443                 find = linkId in expressLink
444                 self.assertEqual(find, True)
445                 expressLink.remove(linkId)
446             elif(nodeType == 'ADD-LINK'):
447                 find = linkId in addLink
448                 self.assertEqual(find, True)
449                 addLink.remove(linkId)
450             elif(nodeType == 'DROP-LINK'):
451                 find = linkId in dropLink
452                 self.assertEqual(find, True)
453                 dropLink.remove(linkId)
454             elif(nodeType == 'XPONDER-INPUT'):
455                 find = linkId in XPDR_IN
456                 self.assertEqual(find, True)
457                 XPDR_IN.remove(linkId)
458             elif(nodeType == 'XPONDER-OUTPUT'):
459                 find = linkId in XPDR_OUT
460                 self.assertEqual(find, True)
461                 XPDR_OUT.remove(linkId)
462             else:
463                 self.assertFalse(True)
464         self.assertEqual(len(expressLink), 0)
465         self.assertEqual(len(addLink), 0)
466         self.assertEqual(len(dropLink), 0)
467         self.assertEqual(len(XPDR_IN), 0)
468         self.assertEqual(len(XPDR_OUT), 0)
469
470     def test_13_connect_ROADMC(self):
471         # Config ROADMC
472         url = ("{}/config/network-topology:"
473                "network-topology/topology/topology-netconf/node/ROADMC01"
474                .format(self.restconf_baseurl))
475         data = {"node": [{
476             "node-id": "ROADMC01",
477             "netconf-node-topology:username": "admin",
478             "netconf-node-topology:password": "admin",
479             "netconf-node-topology:host": "127.0.0.1",
480             "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
481             "netconf-node-topology:tcp-only": "false",
482             "netconf-node-topology:pass-through": {}}]}
483         headers = {'content-type': 'application/json'}
484         response = requests.request(
485             "PUT", url, data=json.dumps(data), headers=headers,
486             auth=('admin', 'admin'))
487         self.assertEqual(response.status_code, requests.codes.created)
488         time.sleep(20)
489
490     def test_14_omsAttributes_ROADMA_ROADMC(self):
491         # Config ROADMA01-ROADMC01 oms-attributes
492         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
493                "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
494                "OMS-attributes/span"
495                .format(self.restconf_baseurl))
496         data = {"span": {
497             "auto-spanloss": "true",
498             "engineered-spanloss": 12.2,
499             "link-concatenation": [{
500                 "SRLG-Id": 0,
501                 "fiber-type": "smf",
502                 "SRLG-length": 100000,
503                 "pmd": 0.5}]}}
504         headers = {'content-type': 'application/json'}
505         response = requests.request(
506             "PUT", url, data=json.dumps(data), headers=headers,
507             auth=('admin', 'admin'))
508         self.assertEqual(response.status_code, requests.codes.created)
509
510     def test_15_omsAttributes_ROADMC_ROADMA(self):
511         # Config ROADMC01-ROADMA oms-attributes
512         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
513                "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
514                "OMS-attributes/span"
515                .format(self.restconf_baseurl))
516         data = {"span": {
517             "auto-spanloss": "true",
518             "engineered-spanloss": 12.2,
519             "link-concatenation": [{
520                 "SRLG-Id": 0,
521                 "fiber-type": "smf",
522                 "SRLG-length": 100000,
523                 "pmd": 0.5}]}}
524         headers = {'content-type': 'application/json'}
525         response = requests.request(
526             "PUT", url, data=json.dumps(data), headers=headers,
527             auth=('admin', 'admin'))
528         self.assertEqual(response.status_code, requests.codes.created)
529
530     def test_16_getClliNetwork(self):
531         url = ("{}/config/ietf-network:networks/network/clli-network"
532                .format(self.restconf_baseurl))
533         headers = {'content-type': 'application/json'}
534         response = requests.request(
535             "GET", url, headers=headers, auth=('admin', 'admin'))
536         self.assertEqual(response.status_code, requests.codes.ok)
537         res = response.json()
538         nbNode = len(res['network'][0]['node'])
539         listNode = ['NodeA', 'NodeC']
540         for i in range(0, nbNode):
541             nodeId = res['network'][0]['node'][i]['node-id']
542             find = nodeId in listNode
543             self.assertEqual(find, True)
544             if(nodeId == 'NodeA'):
545                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
546             else:
547                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
548             listNode.remove(nodeId)
549
550         self.assertEqual(len(listNode), 0)
551
552     def test_17_getOpenRoadmNetwork(self):
553         url = ("{}/config/ietf-network:networks/network/openroadm-network"
554                .format(self.restconf_baseurl))
555         headers = {'content-type': 'application/json'}
556         response = requests.request(
557             "GET", url, headers=headers, auth=('admin', 'admin'))
558         self.assertEqual(response.status_code, requests.codes.ok)
559         res = response.json()
560         nbNode = len(res['network'][0]['node'])
561         self.assertEqual(nbNode, 3)
562         listNode = ['XPDRA01', 'ROADMA01', 'ROADMC01']
563         for i in range(0, nbNode):
564             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
565             nodeId = res['network'][0]['node'][i]['node-id']
566             if(nodeId == 'XPDRA01'):
567                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
568                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
569                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
570                 listNode.remove(nodeId)
571             elif(nodeId == 'ROADMA01'):
572                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
573                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
574                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
575                 listNode.remove(nodeId)
576             elif(nodeId == 'ROADMC01'):
577                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
578                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
579                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
580                 listNode.remove(nodeId)
581             else:
582                 self.assertFalse(True)
583         self.assertEqual(len(listNode), 0)
584
585     def test_18_getROADMLinkOpenRoadmTopology(self):
586         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
587                .format(self.restconf_baseurl))
588         headers = {'content-type': 'application/json'}
589         response = requests.request(
590             "GET", url, headers=headers, auth=('admin', 'admin'))
591         self.assertEqual(response.status_code, requests.codes.ok)
592         res = response.json()
593         # Tests related to links
594         nbLink = len(res['network'][0]['ietf-network-topology:link'])
595         self.assertEqual(nbLink, 20)
596         expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX', 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
597                        'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX', 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX']
598         addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
599                    'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
600                    'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX', 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX']
601         dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
602                     'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
603                     'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX', 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX']
604         R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
605                    'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
606         XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
607         XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
608         for i in range(0, nbLink):
609             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
610             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
611             if(nodeType == 'EXPRESS-LINK'):
612                 find = linkId in expressLink
613                 self.assertEqual(find, True)
614                 expressLink.remove(linkId)
615             elif(nodeType == 'ADD-LINK'):
616                 find = linkId in addLink
617                 self.assertEqual(find, True)
618                 addLink.remove(linkId)
619             elif(nodeType == 'DROP-LINK'):
620                 find = linkId in dropLink
621                 self.assertEqual(find, True)
622                 dropLink.remove(linkId)
623             elif(nodeType == 'ROADM-TO-ROADM'):
624                 find = linkId in R2RLink
625                 self.assertEqual(find, True)
626                 R2RLink.remove(linkId)
627             elif(nodeType == 'XPONDER-INPUT'):
628                 find = linkId in XPDR_IN
629                 self.assertEqual(find, True)
630                 XPDR_IN.remove(linkId)
631             elif(nodeType == 'XPONDER-OUTPUT'):
632                 find = linkId in XPDR_OUT
633                 self.assertEqual(find, True)
634                 XPDR_OUT.remove(linkId)
635             else:
636                 self.assertFalse(True)
637         self.assertEqual(len(expressLink), 0)
638         self.assertEqual(len(addLink), 0)
639         self.assertEqual(len(dropLink), 0)
640         self.assertEqual(len(R2RLink), 0)
641         self.assertEqual(len(XPDR_IN), 0)
642         self.assertEqual(len(XPDR_OUT), 0)
643
644     def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
645         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
646                .format(self.restconf_baseurl))
647         headers = {'content-type': 'application/json'}
648         response = requests.request(
649             "GET", url, headers=headers, auth=('admin', 'admin'))
650         self.assertEqual(response.status_code, requests.codes.ok)
651         res = response.json()
652         # Tests related to links
653         nbLink = len(res['network'][0]['ietf-network-topology:link'])
654         self.assertEqual(nbLink, 20)
655         R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
656                    'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
657         for i in range(0, nbLink):
658             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
659             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
660             if(link_id in R2RLink):
661                 find = False
662                 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
663                     'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
664                 length = res['network'][0]['ietf-network-topology:link'][i][
665                     'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
666                 if((spanLoss != None) & (length != None)):
667                     find = True
668                 self.assertTrue(find)
669                 R2RLink.remove(link_id)
670         self.assertEqual(len(R2RLink), 0)
671
672     def test_20_getNodes_OpenRoadmTopology(self):
673         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
674                .format(self.restconf_baseurl))
675         headers = {'content-type': 'application/json'}
676         response = requests.request(
677             "GET", url, headers=headers, auth=('admin', 'admin'))
678         res = response.json()
679         # Tests related to nodes
680         self.assertEqual(response.status_code, requests.codes.ok)
681         nbNode = len(res['network'][0]['node'])
682         self.assertEqual(nbNode, 8)
683         listNode = ['XPDRA01-XPDR1',
684                     'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2',
685                     'ROADMC01-SRG1', 'ROADMC01-DEG1', 'ROADMC01-DEG2']
686         # ************************Tests related to XPDRA nodes
687         for i in range(0, nbNode):
688             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
689             nodeId = res['network'][0]['node'][i]['node-id']
690             if(nodeId == 'XPDRA01-XPDR1'):
691                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
692                               res['network'][0]['node'][i]['supporting-node'])
693                 self.assertEqual(nodeType, 'XPONDER')
694                 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
695                 self.assertTrue(nbTps == 6)
696                 client = 0
697                 network = 0
698                 for j in range(0, nbTps):
699                     tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
700                     if (tpType == 'XPONDER-CLIENT'):
701                         client += 1
702                     elif (tpType == 'XPONDER-NETWORK'):
703                         network += 1
704                 self.assertTrue(client == 4)
705                 self.assertTrue(network == 2)
706                 listNode.remove(nodeId)
707             elif(nodeId == 'ROADMA01-SRG1'):
708                 # Test related to SRG1
709                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
710                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
711                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
712                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
713                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
714                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
715                               res['network'][0]['node'][i]['supporting-node'])
716                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
717                 listNode.remove(nodeId)
718             elif(nodeId == 'ROADMA01-SRG3'):
719                 # Test related to SRG1
720                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
721                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
722                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
723                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
724                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
725                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
726                               res['network'][0]['node'][i]['supporting-node'])
727                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
728                 listNode.remove(nodeId)
729             elif(nodeId == 'ROADMA01-DEG1'):
730                 # Test related to DEG1
731                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
732                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
733                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
734                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
735                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
736                               res['network'][0]['node'][i]['supporting-node'])
737                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
738                 listNode.remove(nodeId)
739             elif(nodeId == 'ROADMA01-DEG2'):
740                 # Test related to DEG2
741                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
742                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
743                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
744                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
745                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
746                               res['network'][0]['node'][i]['supporting-node'])
747                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
748                 listNode.remove(nodeId)
749             elif(nodeId == 'ROADMC01-SRG1'):
750                 # Test related to SRG1
751                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
752                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
753                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
754                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
755                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
756                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
757                               res['network'][0]['node'][i]['supporting-node'])
758                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
759                 listNode.remove(nodeId)
760             elif(nodeId == 'ROADMC01-DEG1'):
761                 # Test related to DEG1
762                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
763                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
764                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
765                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
766                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
767                               res['network'][0]['node'][i]['supporting-node'])
768                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
769                 listNode.remove(nodeId)
770             elif(nodeId == 'ROADMC01-DEG2'):
771                 # Test related to DEG2
772                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
773                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
774                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
775                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
776                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
777                               res['network'][0]['node'][i]['supporting-node'])
778                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
779                 listNode.remove(nodeId)
780             else:
781                 self.assertFalse(True)
782         self.assertEqual(len(listNode), 0)
783
784     def test_21_connect_ROADMB(self):
785         url = ("{}/config/network-topology:"
786                "network-topology/topology/topology-netconf/node/ROADMB01"
787                .format(self.restconf_baseurl))
788         data = {"node": [{
789             "node-id": "ROADMB01",
790             "netconf-node-topology:username": "admin",
791             "netconf-node-topology:password": "admin",
792             "netconf-node-topology:host": "127.0.0.1",
793             "netconf-node-topology:port": test_utils.sims['roadmb']['port'],
794             "netconf-node-topology:tcp-only": "false",
795             "netconf-node-topology:pass-through": {}}]}
796         headers = {'content-type': 'application/json'}
797         response = requests.request(
798             "PUT", url, data=json.dumps(data), headers=headers,
799             auth=('admin', 'admin'))
800         self.assertEqual(response.status_code, requests.codes.created)
801         time.sleep(20)
802
803     def test_22_omsAttributes_ROADMA_ROADMB(self):
804         # Config ROADMA01-ROADMB01 oms-attributes
805         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
806                "link/ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
807                "OMS-attributes/span"
808                .format(self.restconf_baseurl))
809         data = {"span": {
810                 "auto-spanloss": "true",
811                 "engineered-spanloss": 12.2,
812                 "link-concatenation": [{
813                     "SRLG-Id": 0,
814                     "fiber-type": "smf",
815                     "SRLG-length": 100000,
816                     "pmd": 0.5}]}}
817         headers = {'content-type': 'application/json'}
818         response = requests.request(
819             "PUT", url, data=json.dumps(data), headers=headers,
820             auth=('admin', 'admin'))
821         self.assertEqual(response.status_code, requests.codes.created)
822
823     def test_23_omsAttributes_ROADMB_ROADMA(self):
824         # Config ROADMB01-ROADMA01 oms-attributes
825         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
826                "link/ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
827                "OMS-attributes/span"
828                .format(self.restconf_baseurl))
829         data = {"span": {
830                 "auto-spanloss": "true",
831                 "engineered-spanloss": 12.2,
832                 "link-concatenation": [{
833                     "SRLG-Id": 0,
834                     "fiber-type": "smf",
835                     "SRLG-length": 100000,
836                     "pmd": 0.5}]}}
837         headers = {'content-type': 'application/json'}
838         response = requests.request(
839             "PUT", url, data=json.dumps(data), headers=headers,
840             auth=('admin', 'admin'))
841         self.assertEqual(response.status_code, requests.codes.created)
842
843     def test_24_omsAttributes_ROADMB_ROADMC(self):
844         # Config ROADMB01-ROADMC01 oms-attributes
845         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
846                "link/ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
847                "OMS-attributes/span"
848                .format(self.restconf_baseurl))
849         data = {"span": {
850                 "auto-spanloss": "true",
851                 "engineered-spanloss": 12.2,
852                 "link-concatenation": [{
853                     "SRLG-Id": 0,
854                     "fiber-type": "smf",
855                     "SRLG-length": 100000,
856                     "pmd": 0.5}]}}
857         headers = {'content-type': 'application/json'}
858         response = requests.request(
859             "PUT", url, data=json.dumps(data), headers=headers,
860             auth=('admin', 'admin'))
861         self.assertEqual(response.status_code, requests.codes.created)
862
863     def test_25_omsAttributes_ROADMC_ROADMB(self):
864         # Config ROADMC01-ROADMB01 oms-attributes
865         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
866                "link/ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
867                "OMS-attributes/span"
868                .format(self.restconf_baseurl))
869         data = {"span": {
870                 "auto-spanloss": "true",
871                 "engineered-spanloss": 12.2,
872                 "link-concatenation": [{
873                     "SRLG-Id": 0,
874                     "fiber-type": "smf",
875                     "SRLG-length": 100000,
876                     "pmd": 0.5}]}}
877         headers = {'content-type': 'application/json'}
878         response = requests.request(
879             "PUT", url, data=json.dumps(data), headers=headers,
880             auth=('admin', 'admin'))
881         self.assertEqual(response.status_code, requests.codes.created)
882
883     def test_26_getClliNetwork(self):
884         url = ("{}/config/ietf-network:networks/network/clli-network"
885                .format(self.restconf_baseurl))
886         headers = {'content-type': 'application/json'}
887         response = requests.request(
888             "GET", url, headers=headers, auth=('admin', 'admin'))
889         self.assertEqual(response.status_code, requests.codes.ok)
890         res = response.json()
891         nbNode = len(res['network'][0]['node'])
892         listNode = ['NodeA', 'NodeB', 'NodeC']
893         for i in range(0, nbNode):
894             nodeId = res['network'][0]['node'][i]['node-id']
895             find = nodeId in listNode
896             self.assertEqual(find, True)
897             if(nodeId == 'NodeA'):
898                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
899             elif(nodeId == 'NodeB'):
900                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
901             else:
902                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
903             listNode.remove(nodeId)
904         self.assertEqual(len(listNode), 0)
905
906     def test_27_verifyDegree(self):
907         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
908                .format(self.restconf_baseurl))
909         headers = {'content-type': 'application/json'}
910         response = requests.request(
911             "GET", url, headers=headers, auth=('admin', 'admin'))
912         self.assertEqual(response.status_code, requests.codes.ok)
913         res = response.json()
914         # Tests related to links
915         nbLink = len(res['network'][0]['ietf-network-topology:link'])
916         listR2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX', 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
917                        'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX', 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
918                        'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX', 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
919         for i in range(0, nbLink):
920             if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
921                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
922                 find = link_id in listR2RLink
923                 self.assertEqual(find, True)
924                 listR2RLink.remove(link_id)
925         self.assertEqual(len(listR2RLink), 0)
926
927     def test_28_verifyOppositeLinkTopology(self):
928         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
929                .format(self.restconf_baseurl))
930         headers = {'content-type': 'application/json'}
931         response = requests.request(
932             "GET", url, headers=headers, auth=('admin', 'admin'))
933         self.assertEqual(response.status_code, requests.codes.ok)
934         res = response.json()
935         # Tests related to links
936         nbLink = len(res['network'][0]['ietf-network-topology:link'])
937         self.assertEqual(nbLink, 34)
938         for i in range(0, nbLink):
939             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
940             link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
941             link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
942             link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
943             oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
944             # Find the opposite link
945             url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
946             url = (url_oppLink.format(self.restconf_baseurl))
947             headers = {'content-type': 'application/json'}
948             response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
949             self.assertEqual(response_oppLink.status_code, requests.codes.ok)
950             res_oppLink = response_oppLink.json()
951             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
952                              ['org-openroadm-common-network:opposite-link'], link_id)
953             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
954             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
955             oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
956             if link_type == 'ADD-LINK':
957                 self.assertEqual(oppLink_type, 'DROP-LINK')
958             elif link_type == 'DROP-LINK':
959                 self.assertEqual(oppLink_type, 'ADD-LINK')
960             elif link_type == 'EXPRESS-LINK':
961                 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
962             elif link_type == 'ROADM-TO-ROADM':
963                 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
964             elif link_type == 'XPONDER-INPUT':
965                 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
966             elif link_type == 'XPONDER-OUTPUT':
967                 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
968
969     def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
970         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
971                .format(self.restconf_baseurl))
972         headers = {'content-type': 'application/json'}
973         response = requests.request(
974             "GET", url, headers=headers, auth=('admin', 'admin'))
975         self.assertEqual(response.status_code, requests.codes.ok)
976         res = response.json()
977         nbLink = len(res['network'][0]['ietf-network-topology:link'])
978         R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
979                    'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
980                    'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
981                    'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
982                    'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
983                    'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
984         for i in range(0, nbLink):
985             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
986             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
987             if(link_id in R2RLink):
988                 find = False
989                 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
990                     'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
991                 length = res['network'][0]['ietf-network-topology:link'][i][
992                     'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
993                 if((spanLoss != None) & (length != None)):
994                     find = True
995                 self.assertTrue(find)
996                 R2RLink.remove(link_id)
997         self.assertEqual(len(R2RLink), 0)
998
999     def test_30_disconnect_ROADMB(self):
1000         # Delete in the topology-netconf
1001         url = ("{}/config/network-topology:"
1002                "network-topology/topology/topology-netconf/node/ROADMB01"
1003                .format(self.restconf_baseurl))
1004         data = {}
1005         headers = {'content-type': 'application/json'}
1006         response = requests.request(
1007             "DELETE", url, data=json.dumps(data), headers=headers,
1008             auth=('admin', 'admin'))
1009         self.assertEqual(response.status_code, requests.codes.ok)
1010         # Delete in the clli-network
1011         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1012                .format(self.restconf_baseurl))
1013         data = {}
1014         headers = {'content-type': 'application/json'}
1015         response = requests.request(
1016             "DELETE", url, data=json.dumps(data), headers=headers,
1017             auth=('admin', 'admin'))
1018         self.assertEqual(response.status_code, requests.codes.ok)
1019
1020     def test_31_disconnect_ROADMC(self):
1021         # Delete in the topology-netconf
1022         url = ("{}/config/network-topology:"
1023                "network-topology/topology/topology-netconf/node/ROADMC01"
1024                .format(self.restconf_baseurl))
1025         data = {}
1026         headers = {'content-type': 'application/json'}
1027         response = requests.request(
1028             "DELETE", url, data=json.dumps(data), headers=headers,
1029             auth=('admin', 'admin'))
1030         self.assertEqual(response.status_code, requests.codes.ok)
1031         # Delete in the clli-network
1032         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1033                .format(self.restconf_baseurl))
1034         data = {}
1035         headers = {'content-type': 'application/json'}
1036         response = requests.request(
1037             "DELETE", url, data=json.dumps(data), headers=headers,
1038             auth=('admin', 'admin'))
1039         self.assertEqual(response.status_code, requests.codes.ok)
1040
1041     def test_32_getNodes_OpenRoadmTopology(self):
1042         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1043                .format(self.restconf_baseurl))
1044         headers = {'content-type': 'application/json'}
1045         response = requests.request(
1046             "GET", url, headers=headers, auth=('admin', 'admin'))
1047         res = response.json()
1048         # Tests related to nodes
1049         self.assertEqual(response.status_code, requests.codes.ok)
1050         nbNode = len(res['network'][0]['node'])
1051         self.assertEqual(nbNode, 5)
1052         listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1053         for i in range(0, nbNode):
1054             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1055             nodeId = res['network'][0]['node'][i]['node-id']
1056             # Tests related to XPDRA nodes
1057             if(nodeId == 'XPDRA01-XPDR1'):
1058                 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1059                 for j in range(0, nbTp):
1060                     tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1061                     if (tpid == 'XPDR1-CLIENT1'):
1062                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1063                                          ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1064                     if (tpid == 'XPDR1-NETWORK1'):
1065                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1066                                          ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1067                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1068                                          ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1069                                          'ROADMA01-SRG1--SRG1-PP1-TXRX')
1070                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
1071                               res['network'][0]['node'][i]['supporting-node'])
1072                 listNode.remove(nodeId)
1073             elif(nodeId == 'ROADMA01-SRG1'):
1074                 # Test related to SRG1
1075                 self.assertEqual(nodeType, 'SRG')
1076                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1077                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1078                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1079                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1080                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1081                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1082                               res['network'][0]['node'][i]['supporting-node'])
1083                 listNode.remove(nodeId)
1084             elif(nodeId == 'ROADMA01-SRG3'):
1085                 # Test related to SRG1
1086                 self.assertEqual(nodeType, 'SRG')
1087                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1088                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1089                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1090                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1091                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1092                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1093                               res['network'][0]['node'][i]['supporting-node'])
1094                 listNode.remove(nodeId)
1095             elif(nodeId == 'ROADMA01-DEG1'):
1096                 # Test related to DEG1
1097                 self.assertEqual(nodeType, 'DEGREE')
1098                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1099                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1100                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1101                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1102                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1103                               res['network'][0]['node'][i]['supporting-node'])
1104                 listNode.remove(nodeId)
1105             elif(nodeId == 'ROADMA01-DEG2'):
1106                 # Test related to DEG2
1107                 self.assertEqual(nodeType, 'DEGREE')
1108                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1109                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1110                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1111                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1112                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1113                               res['network'][0]['node'][i]['supporting-node'])
1114                 listNode.remove(nodeId)
1115             else:
1116                 self.assertFalse(True)
1117         self.assertEqual(len(listNode), 0)
1118         # Test related to SRG1 of ROADMC
1119         for i in range(0, nbNode):
1120             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-SRG1')
1121             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG1')
1122             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG2')
1123
1124     def test_33_getOpenRoadmNetwork(self):
1125         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1126                .format(self.restconf_baseurl))
1127         headers = {'content-type': 'application/json'}
1128         response = requests.request(
1129             "GET", url, headers=headers, auth=('admin', 'admin'))
1130         self.assertEqual(response.status_code, requests.codes.ok)
1131         res = response.json()
1132         nbNode = len(res['network'][0]['node'])
1133         self.assertEqual(nbNode, 2)
1134         for i in range(0, nbNode-1):
1135             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01')
1136             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMB01')
1137
1138     def test_34_getClliNetwork(self):
1139         url = ("{}/config/ietf-network:networks/network/clli-network"
1140                .format(self.restconf_baseurl))
1141         headers = {'content-type': 'application/json'}
1142         response = requests.request(
1143             "GET", url, headers=headers, auth=('admin', 'admin'))
1144         self.assertEqual(response.status_code, requests.codes.ok)
1145         res = response.json()
1146         nbNode = len(res['network'][0]['node'])
1147         self.assertEqual(nbNode, 1)
1148         for i in range(0, nbNode-1):
1149             self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1150
1151     def test_35_disconnect_XPDRA(self):
1152         url = ("{}/config/network-topology:"
1153                "network-topology/topology/topology-netconf/node/XPDRA01"
1154                .format(self.restconf_baseurl))
1155         data = {}
1156         headers = {'content-type': 'application/json'}
1157         response = requests.request(
1158             "DELETE", url, data=json.dumps(data), headers=headers,
1159             auth=('admin', 'admin'))
1160         self.assertEqual(response.status_code, requests.codes.ok)
1161
1162     def test_36_getClliNetwork(self):
1163         url = ("{}/config/ietf-network:networks/network/clli-network"
1164                .format(self.restconf_baseurl))
1165         headers = {'content-type': 'application/json'}
1166         response = requests.request(
1167             "GET", url, headers=headers, auth=('admin', 'admin'))
1168         self.assertEqual(response.status_code, requests.codes.ok)
1169         res = response.json()
1170         nbNode = len(res['network'][0]['node'])
1171         self.assertEqual(nbNode, 1)
1172         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1173
1174     def test_37_getOpenRoadmNetwork(self):
1175         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1176                .format(self.restconf_baseurl))
1177         headers = {'content-type': 'application/json'}
1178         response = requests.request(
1179             "GET", url, headers=headers, auth=('admin', 'admin'))
1180         self.assertEqual(response.status_code, requests.codes.ok)
1181         res = response.json()
1182         nbNode = len(res['network'][0]['node'])
1183         self.assertEqual(nbNode, 1)
1184         for i in range(0, nbNode):
1185             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDRA01')
1186
1187     def test_38_getNodes_OpenRoadmTopology(self):
1188         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1189                .format(self.restconf_baseurl))
1190         headers = {'content-type': 'application/json'}
1191         response = requests.request(
1192             "GET", url, headers=headers, auth=('admin', 'admin'))
1193         res = response.json()
1194         # Tests related to nodes
1195         self.assertEqual(response.status_code, requests.codes.ok)
1196         nbNode = len(res['network'][0]['node'])
1197         self.assertEqual(nbNode, 4)
1198         listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1199         for i in range(0, nbNode):
1200             self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1201                           res['network'][0]['node'][i]['supporting-node'])
1202             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1203             nodeId = res['network'][0]['node'][i]['node-id']
1204             if(nodeId == 'ROADMA01-SRG1'):
1205                 # Test related to SRG1
1206                 self.assertEqual(nodeType, 'SRG')
1207                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1208                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1209                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1210                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1211                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1212                 listNode.remove(nodeId)
1213             elif(nodeId == 'ROADMA01-SRG3'):
1214                 # Test related to SRG1
1215                 self.assertEqual(nodeType, 'SRG')
1216                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1217                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1218                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1219                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1220                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1221                 listNode.remove(nodeId)
1222             elif(nodeId == 'ROADMA01-DEG1'):
1223                 # Test related to DEG1
1224                 self.assertEqual(nodeType, 'DEGREE')
1225                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1226                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1227                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1228                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1229                 listNode.remove(nodeId)
1230             elif(nodeId == 'ROADMA01-DEG2'):
1231                 # Test related to DEG2
1232                 self.assertEqual(nodeType, 'DEGREE')
1233                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1234                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1235                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1236                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1237                 listNode.remove(nodeId)
1238             else:
1239                 self.assertFalse(True)
1240         self.assertEqual(len(listNode), 0)
1241
1242     def test_39_disconnect_ROADM_XPDRA_link(self):
1243         # Link-1
1244         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1245                "link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
1246                .format(self.restconf_baseurl))
1247         data = {}
1248         headers = {'content-type': 'application/json'}
1249         response = requests.request(
1250             "DELETE", url, data=json.dumps(data), headers=headers,
1251             auth=('admin', 'admin'))
1252         self.assertEqual(response.status_code, requests.codes.ok)
1253         # Link-2
1254         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1255                "link/ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1"
1256                .format(self.restconf_baseurl))
1257         data = {}
1258         headers = {'content-type': 'application/json'}
1259         response = requests.request(
1260             "DELETE", url, data=json.dumps(data), headers=headers,
1261             auth=('admin', 'admin'))
1262         self.assertEqual(response.status_code, requests.codes.ok)
1263
1264     def test_40_getLinks_OpenRoadmTopology(self):
1265         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1266                .format(self.restconf_baseurl))
1267         headers = {'content-type': 'application/json'}
1268         response = requests.request(
1269             "GET", url, headers=headers, auth=('admin', 'admin'))
1270         self.assertEqual(response.status_code, requests.codes.ok)
1271         res = response.json()
1272         nbLink = len(res['network'][0]['ietf-network-topology:link'])
1273         self.assertEqual(nbLink, 16)
1274         expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1275                        'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
1276         addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1277                    'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
1278         dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
1279                     'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
1280         roadmtoroadmLink = 0
1281         for i in range(0, nbLink):
1282             if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1283                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1284                 find = link_id in expressLink
1285                 self.assertEqual(find, True)
1286                 expressLink.remove(link_id)
1287             elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1288                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1289                 find = link_id in addLink
1290                 self.assertEqual(find, True)
1291                 addLink.remove(link_id)
1292             elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1293                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1294                 find = link_id in dropLink
1295                 self.assertEqual(find, True)
1296                 dropLink.remove(link_id)
1297             else:
1298                 roadmtoroadmLink += 1
1299         self.assertEqual(len(expressLink), 0)
1300         self.assertEqual(len(addLink), 0)
1301         self.assertEqual(len(dropLink), 0)
1302         self.assertEqual(roadmtoroadmLink, 6)
1303         for i in range(0, nbLink):
1304             self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1305                                 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1306             self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1307                                 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1308
1309     def test_41_disconnect_ROADMA(self):
1310         url = ("{}/config/network-topology:"
1311                "network-topology/topology/topology-netconf/node/ROADMA01"
1312                .format(self.restconf_baseurl))
1313         data = {}
1314         headers = {'content-type': 'application/json'}
1315         response = requests.request(
1316             "DELETE", url, data=json.dumps(data), headers=headers,
1317             auth=('admin', 'admin'))
1318         self.assertEqual(response.status_code, requests.codes.ok)
1319         # Delete in the clli-network
1320         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1321                .format(self.restconf_baseurl))
1322         data = {}
1323         headers = {'content-type': 'application/json'}
1324         response = requests.request(
1325             "DELETE", url, data=json.dumps(data), headers=headers,
1326             auth=('admin', 'admin'))
1327         self.assertEqual(response.status_code, requests.codes.ok)
1328
1329     def test_42_getClliNetwork(self):
1330         url = ("{}/config/ietf-network:networks/network/clli-network"
1331                .format(self.restconf_baseurl))
1332         headers = {'content-type': 'application/json'}
1333         response = requests.request(
1334             "GET", url, headers=headers, auth=('admin', 'admin'))
1335         self.assertEqual(response.status_code, requests.codes.ok)
1336         res = response.json()
1337         self.assertNotIn('node', res['network'][0])
1338
1339     def test_43_getOpenRoadmNetwork(self):
1340         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1341                .format(self.restconf_baseurl))
1342         headers = {'content-type': 'application/json'}
1343         response = requests.request(
1344             "GET", url, headers=headers, auth=('admin', 'admin'))
1345         self.assertEqual(response.status_code, requests.codes.ok)
1346         res = response.json()
1347         self.assertNotIn('node', res['network'][0])
1348
1349     def test_44_check_roadm2roadm_link_persistence(self):
1350         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1351                .format(self.restconf_baseurl))
1352         headers = {'content-type': 'application/json'}
1353         response = requests.request(
1354             "GET", url, headers=headers, auth=('admin', 'admin'))
1355         self.assertEqual(response.status_code, requests.codes.ok)
1356         res = response.json()
1357         nbLink = len(res['network'][0]['ietf-network-topology:link'])
1358         self.assertNotIn('node', res['network'][0])
1359         self.assertEqual(nbLink, 6)
1360
1361
1362 if __name__ == "__main__":
1363     unittest.main(verbosity=2)