improve functional tests pylint score
[transportpce.git] / tests / transportpce_tests / 1.2.1 / test_topology.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import signal
14 import time
15 import unittest
16 import requests
17 import psutil
18 import test_utils
19
20
21 class TransportPCETopologyTesting(unittest.TestCase):
22
23     honeynode_process1 = None
24     honeynode_process2 = None
25     honeynode_process3 = None
26     honeynode_process4 = None
27     odl_process = None
28     restconf_baseurl = "http://localhost:8181/restconf"
29
30 # START_IGNORE_XTESTING
31
32     @classmethod
33     def setUpClass(cls):
34         print("starting honeynode1...")
35         cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
36         time.sleep(20)
37
38         print("starting honeynode2...")
39         cls.honeynode_process2 = test_utils.start_roadma_honeynode()
40         time.sleep(20)
41
42         print("starting honeynode3...")
43         cls.honeynode_process3 = test_utils.start_roadmb_honeynode()
44         time.sleep(20)
45
46         print("starting honeynode4...")
47         cls.honeynode_process4 = test_utils.start_roadmc_honeynode()
48         time.sleep(20)
49         print("all honeynodes started")
50
51         print("starting opendaylight...")
52         cls.odl_process = test_utils.start_tpce()
53         time.sleep(60)
54         print("opendaylight started")
55
56     @classmethod
57     def tearDownClass(cls):
58         for child in psutil.Process(cls.odl_process.pid).children():
59             child.send_signal(signal.SIGINT)
60             child.wait()
61         cls.odl_process.send_signal(signal.SIGINT)
62         cls.odl_process.wait()
63         for child in psutil.Process(cls.honeynode_process1.pid).children():
64             child.send_signal(signal.SIGINT)
65             child.wait()
66         cls.honeynode_process1.send_signal(signal.SIGINT)
67         cls.honeynode_process1.wait()
68         for child in psutil.Process(cls.honeynode_process2.pid).children():
69             child.send_signal(signal.SIGINT)
70             child.wait()
71         cls.honeynode_process2.send_signal(signal.SIGINT)
72         cls.honeynode_process2.wait()
73
74         for child in psutil.Process(cls.honeynode_process3.pid).children():
75             child.send_signal(signal.SIGINT)
76             child.wait()
77         cls.honeynode_process3.send_signal(signal.SIGINT)
78         cls.honeynode_process3.wait()
79
80         for child in psutil.Process(cls.honeynode_process4.pid).children():
81             child.send_signal(signal.SIGINT)
82             child.wait()
83         cls.honeynode_process4.send_signal(signal.SIGINT)
84         cls.honeynode_process4.wait()
85
86     def setUp(self):
87         time.sleep(5)
88
89 # END_IGNORE_XTESTING
90
91     def test_01_connect_ROADMA(self):
92         # Config ROADMA
93         url = ("{}/config/network-topology:"
94                "network-topology/topology/topology-netconf/node/ROADMA01"
95                .format(self.restconf_baseurl))
96         data = {"node": [{
97             "node-id": "ROADMA01",
98             "netconf-node-topology:username": "admin",
99             "netconf-node-topology:password": "admin",
100             "netconf-node-topology:host": "127.0.0.1",
101             "netconf-node-topology:port": "17831",
102             "netconf-node-topology:tcp-only": "false",
103             "netconf-node-topology:pass-through": {}}]}
104         headers = {'content-type': 'application/json'}
105         response = requests.request(
106             "PUT", url, data=json.dumps(data), headers=headers,
107             auth=('admin', 'admin'))
108         self.assertEqual(response.status_code, requests.codes.created)
109         time.sleep(20)
110
111     def test_02_getClliNetwork(self):
112         url = ("{}/config/ietf-network:networks/network/clli-network"
113                .format(self.restconf_baseurl))
114         headers = {'content-type': 'application/json'}
115         response = requests.request(
116             "GET", url, headers=headers, auth=('admin', 'admin'))
117         self.assertEqual(response.status_code, requests.codes.ok)
118         res = response.json()
119         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
120         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
121
122     def test_03_getOpenRoadmNetwork(self):
123         url = ("{}/config/ietf-network:networks/network/openroadm-network"
124                .format(self.restconf_baseurl))
125         headers = {'content-type': 'application/json'}
126         response = requests.request(
127             "GET", url, headers=headers, auth=('admin', 'admin'))
128         self.assertEqual(response.status_code, requests.codes.ok)
129         res = response.json()
130         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADMA01')
131         self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
132         self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
133         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
134         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], '2')
135
136     def test_04_getLinks_OpenroadmTopology(self):
137         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
138                .format(self.restconf_baseurl))
139         headers = {'content-type': 'application/json'}
140         response = requests.request(
141             "GET", url, headers=headers, auth=('admin', 'admin'))
142         self.assertEqual(response.status_code, requests.codes.ok)
143         res = response.json()
144         # Tests related to links
145         nbLink = len(res['network'][0]['ietf-network-topology:link'])
146         self.assertEqual(nbLink, 10)
147         expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
148                        'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
149         addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
150                    'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
151         dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
152                     'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
153         for i in range(0, nbLink):
154             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
155             if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK':
156                 find = linkId in expressLink
157                 self.assertEqual(find, True)
158                 expressLink.remove(linkId)
159             elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK':
160                 find = linkId in addLink
161                 self.assertEqual(find, True)
162                 addLink.remove(linkId)
163             elif res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK':
164                 find = linkId in dropLink
165                 self.assertEqual(find, True)
166                 dropLink.remove(linkId)
167             else:
168                 self.assertFalse(True)
169         self.assertEqual(len(expressLink), 0)
170         self.assertEqual(len(addLink), 0)
171         self.assertEqual(len(dropLink), 0)
172
173     def test_05_getNodes_OpenRoadmTopology(self):
174         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
175                .format(self.restconf_baseurl))
176         headers = {'content-type': 'application/json'}
177         response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
178         res = response.json()
179         # Tests related to nodes
180         self.assertEqual(response.status_code, requests.codes.ok)
181         nbNode = len(res['network'][0]['node'])
182         self.assertEqual(nbNode, 4)
183         listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
184         for i in range(0, nbNode):
185             self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
186                           res['network'][0]['node'][i]['supporting-node'])
187             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
188             nodeId = res['network'][0]['node'][i]['node-id']
189             if nodeId == 'ROADMA01-SRG1':
190                 # Test related to SRG1
191                 self.assertEqual(nodeType, 'SRG')
192                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
193                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
194                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
195                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
196                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
197                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
198                               res['network'][0]['node'][i]['supporting-node'])
199                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
200                               res['network'][0]['node'][i]['supporting-node'])
201                 listNode.remove(nodeId)
202             elif nodeId == 'ROADMA01-SRG3':
203                 # Test related to SRG1
204                 self.assertEqual(nodeType, 'SRG')
205                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
206                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
207                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
208                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
209                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
210                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
211                               res['network'][0]['node'][i]['supporting-node'])
212                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
213                               res['network'][0]['node'][i]['supporting-node'])
214                 listNode.remove(nodeId)
215             elif nodeId == 'ROADMA01-DEG1':
216                 # Test related to DEG1
217                 self.assertEqual(nodeType, 'DEGREE')
218                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
219                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
220                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
221                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
222                 listNode.remove(nodeId)
223                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
224                               res['network'][0]['node'][i]['supporting-node'])
225                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
226                               res['network'][0]['node'][i]['supporting-node'])
227             elif nodeId == 'ROADMA01-DEG2':
228                 # Test related to DEG2
229                 self.assertEqual(nodeType, 'DEGREE')
230                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
231                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
232                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
233                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
234                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
235                               res['network'][0]['node'][i]['supporting-node'])
236                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
237                               res['network'][0]['node'][i]['supporting-node'])
238                 listNode.remove(nodeId)
239             else:
240                 self.assertFalse(True)
241         self.assertEqual(len(listNode), 0)
242
243     def test_06_connect_XPDRA(self):
244         url = ("{}/config/network-topology:"
245                "network-topology/topology/topology-netconf/node/XPDRA01"
246                .format(self.restconf_baseurl))
247         data = {"node": [{
248             "node-id": "XPDRA01",
249             "netconf-node-topology:username": "admin",
250             "netconf-node-topology:password": "admin",
251             "netconf-node-topology:host": "127.0.0.1",
252             "netconf-node-topology:port": "17830",
253             "netconf-node-topology:tcp-only": "false",
254             "netconf-node-topology:pass-through": {}}]}
255         headers = {'content-type': 'application/json'}
256         response = requests.request(
257             "PUT", url, data=json.dumps(data), headers=headers,
258             auth=('admin', 'admin'))
259         self.assertEqual(response.status_code, requests.codes.created)
260         time.sleep(30)
261
262     def test_07_getClliNetwork(self):
263         url = ("{}/config/ietf-network:networks/network/clli-network"
264                .format(self.restconf_baseurl))
265         headers = {'content-type': 'application/json'}
266         response = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
267         self.assertEqual(response.status_code, requests.codes.ok)
268         res = response.json()
269         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
270         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
271
272     def test_08_getOpenRoadmNetwork(self):
273         url = ("{}/config/ietf-network:networks/network/openroadm-network"
274                .format(self.restconf_baseurl))
275         headers = {'content-type': 'application/json'}
276         response = requests.request(
277             "GET", url, headers=headers, auth=('admin', 'admin'))
278         self.assertEqual(response.status_code, requests.codes.ok)
279         res = response.json()
280         nbNode = len(res['network'][0]['node'])
281         self.assertEqual(nbNode, 2)
282         for i in range(0, nbNode):
283             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
284             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
285             nodeId = res['network'][0]['node'][i]['node-id']
286             if(nodeId == 'XPDRA01'):
287                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
288                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
289             elif(nodeId == 'ROADMA01'):
290                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
291                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
292             else:
293                 self.assertFalse(True)
294
295     def test_09_getNodes_OpenRoadmTopology(self):
296         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
297                .format(self.restconf_baseurl))
298         headers = {'content-type': 'application/json'}
299         response = requests.request(
300             "GET", url, headers=headers, auth=('admin', 'admin'))
301         res = response.json()
302         # Tests related to nodes
303         self.assertEqual(response.status_code, requests.codes.ok)
304         nbNode = len(res['network'][0]['node'])
305         self.assertEqual(nbNode, 5)
306         listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
307         for i in range(0, nbNode):
308             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
309             nodeId = res['network'][0]['node'][i]['node-id']
310             # Tests related to XPDRA nodes
311             if(nodeId == 'XPDRA01-XPDR1'):
312                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
313                               res['network'][0]['node'][i]['supporting-node'])
314                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
315                               res['network'][0]['node'][i]['supporting-node'])
316                 self.assertEqual(nodeType, 'XPONDER')
317                 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
318                 client = 0
319                 network = 0
320                 for j in range(0, nbTps):
321                     tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
322                     tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
323                     if (tpType == 'XPONDER-CLIENT'):
324                         client += 1
325                     elif (tpType == 'XPONDER-NETWORK'):
326                         network += 1
327                     if (tpId == 'XPDR1-NETWORK2'):
328                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
329                                          [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT3')
330                     if (tpId == 'XPDR1-CLIENT3'):
331                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point']
332                                          [j]['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
333                 self.assertTrue(client == 4)
334                 self.assertTrue(network == 2)
335                 listNode.remove(nodeId)
336             elif(nodeId == 'ROADMA01-SRG1'):
337                 # Test related to SRG1
338                 self.assertEqual(nodeType, 'SRG')
339                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
340                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
341                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
342                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
343                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
344                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
345                               res['network'][0]['node'][i]['supporting-node'])
346                 listNode.remove(nodeId)
347             elif(nodeId == 'ROADMA01-SRG3'):
348                 # Test related to SRG1
349                 self.assertEqual(nodeType, 'SRG')
350                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
351                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
352                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
353                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
354                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
355                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
356                               res['network'][0]['node'][i]['supporting-node'])
357                 listNode.remove(nodeId)
358             elif(nodeId == 'ROADMA01-DEG1'):
359                 # Test related to DEG1
360                 self.assertEqual(nodeType, 'DEGREE')
361                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
362                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
363                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
364                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
365                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
366                               res['network'][0]['node'][i]['supporting-node'])
367                 listNode.remove(nodeId)
368             elif(nodeId == 'ROADMA01-DEG2'):
369                 # Test related to DEG2
370                 self.assertEqual(nodeType, 'DEGREE')
371                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
372                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
373                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
374                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
375                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
376                               res['network'][0]['node'][i]['supporting-node'])
377                 listNode.remove(nodeId)
378             else:
379                 self.assertFalse(True)
380         self.assertEqual(len(listNode), 0)
381
382     # Connect the tail XPDRA to ROADMA and vice versa
383     def test_10_connect_tail_xpdr_rdm(self):
384         # Connect the tail: XPDRA to ROADMA
385         url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
386                .format(self.restconf_baseurl))
387         data = {"networkutils:input": {
388             "networkutils:links-input": {
389                 "networkutils:xpdr-node": "XPDRA01",
390                 "networkutils:xpdr-num": "1",
391                 "networkutils:network-num": "1",
392                 "networkutils:rdm-node": "ROADMA01",
393                 "networkutils:srg-num": "1",
394                 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
395             }
396         }
397         }
398         headers = {'content-type': 'application/json'}
399         response = requests.request(
400             "POST", url, data=json.dumps(data), headers=headers,
401             auth=('admin', 'admin'))
402         self.assertEqual(response.status_code, requests.codes.ok)
403
404     def test_11_connect_tail_rdm_xpdr(self):
405         # Connect the tail: ROADMA to XPDRA
406         url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
407                .format(self.restconf_baseurl))
408         data = {"networkutils:input": {
409             "networkutils:links-input": {
410                 "networkutils:xpdr-node": "XPDRA01",
411                 "networkutils:xpdr-num": "1",
412                 "networkutils:network-num": "1",
413                 "networkutils:rdm-node": "ROADMA01",
414                 "networkutils:srg-num": "1",
415                 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
416             }
417         }
418         }
419         headers = {'content-type': 'application/json'}
420         response = requests.request(
421             "POST", url, data=json.dumps(data), headers=headers,
422             auth=('admin', 'admin'))
423         self.assertEqual(response.status_code, requests.codes.ok)
424
425     def test_12_getLinks_OpenRoadmTopology(self):
426         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
427                .format(self.restconf_baseurl))
428         headers = {'content-type': 'application/json'}
429         response = requests.request(
430             "GET", url, headers=headers, auth=('admin', 'admin'))
431         self.assertEqual(response.status_code, requests.codes.ok)
432         res = response.json()
433         # Tests related to links
434         nbLink = len(res['network'][0]['ietf-network-topology:link'])
435         self.assertEqual(nbLink, 12)
436         expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
437                        'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
438         addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
439                    'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
440         dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
441                     'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
442         XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
443         XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
444         for i in range(0, nbLink):
445             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
446             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
447             if(nodeType == 'EXPRESS-LINK'):
448                 find = linkId in expressLink
449                 self.assertEqual(find, True)
450                 expressLink.remove(linkId)
451             elif(nodeType == 'ADD-LINK'):
452                 find = linkId in addLink
453                 self.assertEqual(find, True)
454                 addLink.remove(linkId)
455             elif(nodeType == 'DROP-LINK'):
456                 find = linkId in dropLink
457                 self.assertEqual(find, True)
458                 dropLink.remove(linkId)
459             elif(nodeType == 'XPONDER-INPUT'):
460                 find = linkId in XPDR_IN
461                 self.assertEqual(find, True)
462                 XPDR_IN.remove(linkId)
463             elif(nodeType == 'XPONDER-OUTPUT'):
464                 find = linkId in XPDR_OUT
465                 self.assertEqual(find, True)
466                 XPDR_OUT.remove(linkId)
467             else:
468                 self.assertFalse(True)
469         self.assertEqual(len(expressLink), 0)
470         self.assertEqual(len(addLink), 0)
471         self.assertEqual(len(dropLink), 0)
472         self.assertEqual(len(XPDR_IN), 0)
473         self.assertEqual(len(XPDR_OUT), 0)
474
475     def test_13_connect_ROADMC(self):
476         # Config ROADMC
477         url = ("{}/config/network-topology:"
478                "network-topology/topology/topology-netconf/node/ROADMC01"
479                .format(self.restconf_baseurl))
480         data = {"node": [{
481             "node-id": "ROADMC01",
482             "netconf-node-topology:username": "admin",
483             "netconf-node-topology:password": "admin",
484             "netconf-node-topology:host": "127.0.0.1",
485             "netconf-node-topology:port": "17833",
486             "netconf-node-topology:tcp-only": "false",
487             "netconf-node-topology:pass-through": {}}]}
488         headers = {'content-type': 'application/json'}
489         response = requests.request(
490             "PUT", url, data=json.dumps(data), headers=headers,
491             auth=('admin', 'admin'))
492         self.assertEqual(response.status_code, requests.codes.created)
493         time.sleep(20)
494
495     def test_14_omsAttributes_ROADMA_ROADMC(self):
496         # Config ROADMA01-ROADMC01 oms-attributes
497         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
498                "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
499                "OMS-attributes/span"
500                .format(self.restconf_baseurl))
501         data = {"span": {
502             "auto-spanloss": "true",
503             "engineered-spanloss": 12.2,
504             "link-concatenation": [{
505                 "SRLG-Id": 0,
506                 "fiber-type": "smf",
507                 "SRLG-length": 100000,
508                 "pmd": 0.5}]}}
509         headers = {'content-type': 'application/json'}
510         response = requests.request(
511             "PUT", url, data=json.dumps(data), headers=headers,
512             auth=('admin', 'admin'))
513         self.assertEqual(response.status_code, requests.codes.created)
514
515     def test_15_omsAttributes_ROADMC_ROADMA(self):
516         # Config ROADMC01-ROADMA oms-attributes
517         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
518                "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
519                "OMS-attributes/span"
520                .format(self.restconf_baseurl))
521         data = {"span": {
522             "auto-spanloss": "true",
523             "engineered-spanloss": 12.2,
524             "link-concatenation": [{
525                 "SRLG-Id": 0,
526                 "fiber-type": "smf",
527                 "SRLG-length": 100000,
528                 "pmd": 0.5}]}}
529         headers = {'content-type': 'application/json'}
530         response = requests.request(
531             "PUT", url, data=json.dumps(data), headers=headers,
532             auth=('admin', 'admin'))
533         self.assertEqual(response.status_code, requests.codes.created)
534
535     def test_16_getClliNetwork(self):
536         url = ("{}/config/ietf-network:networks/network/clli-network"
537                .format(self.restconf_baseurl))
538         headers = {'content-type': 'application/json'}
539         response = requests.request(
540             "GET", url, headers=headers, auth=('admin', 'admin'))
541         self.assertEqual(response.status_code, requests.codes.ok)
542         res = response.json()
543         nbNode = len(res['network'][0]['node'])
544         listNode = ['NodeA', 'NodeC']
545         for i in range(0, nbNode):
546             nodeId = res['network'][0]['node'][i]['node-id']
547             find = nodeId in listNode
548             self.assertEqual(find, True)
549             if(nodeId == 'NodeA'):
550                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
551             else:
552                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
553             listNode.remove(nodeId)
554
555         self.assertEqual(len(listNode), 0)
556
557     def test_17_getOpenRoadmNetwork(self):
558         url = ("{}/config/ietf-network:networks/network/openroadm-network"
559                .format(self.restconf_baseurl))
560         headers = {'content-type': 'application/json'}
561         response = requests.request(
562             "GET", url, headers=headers, auth=('admin', 'admin'))
563         self.assertEqual(response.status_code, requests.codes.ok)
564         res = response.json()
565         nbNode = len(res['network'][0]['node'])
566         self.assertEqual(nbNode, 3)
567         listNode = ['XPDRA01', 'ROADMA01', 'ROADMC01']
568         for i in range(0, nbNode):
569             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
570             nodeId = res['network'][0]['node'][i]['node-id']
571             if(nodeId == 'XPDRA01'):
572                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
573                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
574                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '1')
575                 listNode.remove(nodeId)
576             elif(nodeId == 'ROADMA01'):
577                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
578                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
579                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
580                 listNode.remove(nodeId)
581             elif(nodeId == 'ROADMC01'):
582                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
583                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
584                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], '2')
585                 listNode.remove(nodeId)
586             else:
587                 self.assertFalse(True)
588         self.assertEqual(len(listNode), 0)
589
590     def test_18_getROADMLinkOpenRoadmTopology(self):
591         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
592                .format(self.restconf_baseurl))
593         headers = {'content-type': 'application/json'}
594         response = requests.request(
595             "GET", url, headers=headers, auth=('admin', 'admin'))
596         self.assertEqual(response.status_code, requests.codes.ok)
597         res = response.json()
598         # Tests related to links
599         nbLink = len(res['network'][0]['ietf-network-topology:link'])
600         self.assertEqual(nbLink, 20)
601         expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX', 'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX',
602                        'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX', 'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX']
603         addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
604                    'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
605                    'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG2-DEG2-CTP-TXRX', 'ROADMC01-SRG1-SRG1-CP-TXRXtoROADMC01-DEG1-DEG1-CTP-TXRX']
606         dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
607                     'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX',
608                     'ROADMC01-DEG1-DEG1-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX', 'ROADMC01-DEG2-DEG2-CTP-TXRXtoROADMC01-SRG1-SRG1-CP-TXRX']
609         R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
610                    'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
611         XPDR_IN = ['ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1']
612         XPDR_OUT = ['XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX']
613         for i in range(0, nbLink):
614             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
615             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
616             if(nodeType == 'EXPRESS-LINK'):
617                 find = linkId in expressLink
618                 self.assertEqual(find, True)
619                 expressLink.remove(linkId)
620             elif(nodeType == 'ADD-LINK'):
621                 find = linkId in addLink
622                 self.assertEqual(find, True)
623                 addLink.remove(linkId)
624             elif(nodeType == 'DROP-LINK'):
625                 find = linkId in dropLink
626                 self.assertEqual(find, True)
627                 dropLink.remove(linkId)
628             elif(nodeType == 'ROADM-TO-ROADM'):
629                 find = linkId in R2RLink
630                 self.assertEqual(find, True)
631                 R2RLink.remove(linkId)
632             elif(nodeType == 'XPONDER-INPUT'):
633                 find = linkId in XPDR_IN
634                 self.assertEqual(find, True)
635                 XPDR_IN.remove(linkId)
636             elif(nodeType == 'XPONDER-OUTPUT'):
637                 find = linkId in XPDR_OUT
638                 self.assertEqual(find, True)
639                 XPDR_OUT.remove(linkId)
640             else:
641                 self.assertFalse(True)
642         self.assertEqual(len(expressLink), 0)
643         self.assertEqual(len(addLink), 0)
644         self.assertEqual(len(dropLink), 0)
645         self.assertEqual(len(R2RLink), 0)
646         self.assertEqual(len(XPDR_IN), 0)
647         self.assertEqual(len(XPDR_OUT), 0)
648
649     def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
650         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
651                .format(self.restconf_baseurl))
652         headers = {'content-type': 'application/json'}
653         response = requests.request(
654             "GET", url, headers=headers, auth=('admin', 'admin'))
655         self.assertEqual(response.status_code, requests.codes.ok)
656         res = response.json()
657         # Tests related to links
658         nbLink = len(res['network'][0]['ietf-network-topology:link'])
659         self.assertEqual(nbLink, 20)
660         R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
661                    'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX']
662         for i in range(0, nbLink):
663             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
664             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
665             if(link_id in R2RLink):
666                 find = False
667                 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
668                     'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
669                 length = res['network'][0]['ietf-network-topology:link'][i][
670                     'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
671                 if((spanLoss != None) & (length != None)):
672                     find = True
673                 self.assertTrue(find)
674                 R2RLink.remove(link_id)
675         self.assertEqual(len(R2RLink), 0)
676
677     def test_20_getNodes_OpenRoadmTopology(self):
678         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
679                .format(self.restconf_baseurl))
680         headers = {'content-type': 'application/json'}
681         response = requests.request(
682             "GET", url, headers=headers, auth=('admin', 'admin'))
683         res = response.json()
684         # Tests related to nodes
685         self.assertEqual(response.status_code, requests.codes.ok)
686         nbNode = len(res['network'][0]['node'])
687         self.assertEqual(nbNode, 8)
688         listNode = ['XPDRA01-XPDR1',
689                     'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2',
690                     'ROADMC01-SRG1', 'ROADMC01-DEG1', 'ROADMC01-DEG2']
691         # ************************Tests related to XPDRA nodes
692         for i in range(0, nbNode):
693             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
694             nodeId = res['network'][0]['node'][i]['node-id']
695             if(nodeId == 'XPDRA01-XPDR1'):
696                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
697                               res['network'][0]['node'][i]['supporting-node'])
698                 self.assertEqual(nodeType, 'XPONDER')
699                 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
700                 self.assertTrue(nbTps == 6)
701                 client = 0
702                 network = 0
703                 for j in range(0, nbTps):
704                     tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
705                     if (tpType == 'XPONDER-CLIENT'):
706                         client += 1
707                     elif (tpType == 'XPONDER-NETWORK'):
708                         network += 1
709                 self.assertTrue(client == 4)
710                 self.assertTrue(network == 2)
711                 listNode.remove(nodeId)
712             elif(nodeId == 'ROADMA01-SRG1'):
713                 # Test related to SRG1
714                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
715                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
716                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
717                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
718                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
719                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
720                               res['network'][0]['node'][i]['supporting-node'])
721                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
722                 listNode.remove(nodeId)
723             elif(nodeId == 'ROADMA01-SRG3'):
724                 # Test related to SRG1
725                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
726                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
727                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
728                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
729                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
730                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
731                               res['network'][0]['node'][i]['supporting-node'])
732                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
733                 listNode.remove(nodeId)
734             elif(nodeId == 'ROADMA01-DEG1'):
735                 # Test related to DEG1
736                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
737                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
738                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
739                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
740                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
741                               res['network'][0]['node'][i]['supporting-node'])
742                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
743                 listNode.remove(nodeId)
744             elif(nodeId == 'ROADMA01-DEG2'):
745                 # Test related to DEG2
746                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
747                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
748                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
749                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
750                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
751                               res['network'][0]['node'][i]['supporting-node'])
752                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
753                 listNode.remove(nodeId)
754             elif(nodeId == 'ROADMC01-SRG1'):
755                 # Test related to SRG1
756                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
757                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
758                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
759                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
760                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
761                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
762                               res['network'][0]['node'][i]['supporting-node'])
763                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
764                 listNode.remove(nodeId)
765             elif(nodeId == 'ROADMC01-DEG1'):
766                 # Test related to DEG1
767                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
768                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
769                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
770                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
771                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
772                               res['network'][0]['node'][i]['supporting-node'])
773                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
774                 listNode.remove(nodeId)
775             elif(nodeId == 'ROADMC01-DEG2'):
776                 # Test related to DEG2
777                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
778                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
779                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
780                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
781                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMC01'},
782                               res['network'][0]['node'][i]['supporting-node'])
783                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
784                 listNode.remove(nodeId)
785             else:
786                 self.assertFalse(True)
787         self.assertEqual(len(listNode), 0)
788
789     def test_21_connect_ROADMB(self):
790         url = ("{}/config/network-topology:"
791                "network-topology/topology/topology-netconf/node/ROADMB01"
792                .format(self.restconf_baseurl))
793         data = {"node": [{
794             "node-id": "ROADMB01",
795             "netconf-node-topology:username": "admin",
796             "netconf-node-topology:password": "admin",
797             "netconf-node-topology:host": "127.0.0.1",
798             "netconf-node-topology:port": "17832",
799             "netconf-node-topology:tcp-only": "false",
800             "netconf-node-topology:pass-through": {}}]}
801         headers = {'content-type': 'application/json'}
802         response = requests.request(
803             "PUT", url, data=json.dumps(data), headers=headers,
804             auth=('admin', 'admin'))
805         self.assertEqual(response.status_code, requests.codes.created)
806         time.sleep(20)
807
808     def test_22_omsAttributes_ROADMA_ROADMB(self):
809         # Config ROADMA01-ROADMB01 oms-attributes
810         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
811                "link/ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
812                "OMS-attributes/span"
813                .format(self.restconf_baseurl))
814         data = {"span": {
815                 "auto-spanloss": "true",
816                 "engineered-spanloss": 12.2,
817                 "link-concatenation": [{
818                     "SRLG-Id": 0,
819                     "fiber-type": "smf",
820                     "SRLG-length": 100000,
821                     "pmd": 0.5}]}}
822         headers = {'content-type': 'application/json'}
823         response = requests.request(
824             "PUT", url, data=json.dumps(data), headers=headers,
825             auth=('admin', 'admin'))
826         self.assertEqual(response.status_code, requests.codes.created)
827
828     def test_23_omsAttributes_ROADMB_ROADMA(self):
829         # Config ROADMB01-ROADMA01 oms-attributes
830         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
831                "link/ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
832                "OMS-attributes/span"
833                .format(self.restconf_baseurl))
834         data = {"span": {
835                 "auto-spanloss": "true",
836                 "engineered-spanloss": 12.2,
837                 "link-concatenation": [{
838                     "SRLG-Id": 0,
839                     "fiber-type": "smf",
840                     "SRLG-length": 100000,
841                     "pmd": 0.5}]}}
842         headers = {'content-type': 'application/json'}
843         response = requests.request(
844             "PUT", url, data=json.dumps(data), headers=headers,
845             auth=('admin', 'admin'))
846         self.assertEqual(response.status_code, requests.codes.created)
847
848     def test_24_omsAttributes_ROADMB_ROADMC(self):
849         # Config ROADMB01-ROADMC01 oms-attributes
850         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
851                "link/ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
852                "OMS-attributes/span"
853                .format(self.restconf_baseurl))
854         data = {"span": {
855                 "auto-spanloss": "true",
856                 "engineered-spanloss": 12.2,
857                 "link-concatenation": [{
858                     "SRLG-Id": 0,
859                     "fiber-type": "smf",
860                     "SRLG-length": 100000,
861                     "pmd": 0.5}]}}
862         headers = {'content-type': 'application/json'}
863         response = requests.request(
864             "PUT", url, data=json.dumps(data), headers=headers,
865             auth=('admin', 'admin'))
866         self.assertEqual(response.status_code, requests.codes.created)
867
868     def test_25_omsAttributes_ROADMC_ROADMB(self):
869         # Config ROADMC01-ROADMB01 oms-attributes
870         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
871                "link/ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
872                "OMS-attributes/span"
873                .format(self.restconf_baseurl))
874         data = {"span": {
875                 "auto-spanloss": "true",
876                 "engineered-spanloss": 12.2,
877                 "link-concatenation": [{
878                     "SRLG-Id": 0,
879                     "fiber-type": "smf",
880                     "SRLG-length": 100000,
881                     "pmd": 0.5}]}}
882         headers = {'content-type': 'application/json'}
883         response = requests.request(
884             "PUT", url, data=json.dumps(data), headers=headers,
885             auth=('admin', 'admin'))
886         self.assertEqual(response.status_code, requests.codes.created)
887
888     def test_26_getClliNetwork(self):
889         url = ("{}/config/ietf-network:networks/network/clli-network"
890                .format(self.restconf_baseurl))
891         headers = {'content-type': 'application/json'}
892         response = requests.request(
893             "GET", url, headers=headers, auth=('admin', 'admin'))
894         self.assertEqual(response.status_code, requests.codes.ok)
895         res = response.json()
896         nbNode = len(res['network'][0]['node'])
897         listNode = ['NodeA', 'NodeB', 'NodeC']
898         for i in range(0, nbNode):
899             nodeId = res['network'][0]['node'][i]['node-id']
900             find = nodeId in listNode
901             self.assertEqual(find, True)
902             if(nodeId == 'NodeA'):
903                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
904             elif(nodeId == 'NodeB'):
905                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
906             else:
907                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
908             listNode.remove(nodeId)
909         self.assertEqual(len(listNode), 0)
910
911     def test_27_verifyDegree(self):
912         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
913                .format(self.restconf_baseurl))
914         headers = {'content-type': 'application/json'}
915         response = requests.request(
916             "GET", url, headers=headers, auth=('admin', 'admin'))
917         self.assertEqual(response.status_code, requests.codes.ok)
918         res = response.json()
919         # Tests related to links
920         nbLink = len(res['network'][0]['ietf-network-topology:link'])
921         listR2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX', 'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
922                        'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX', 'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
923                        'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX', 'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
924         for i in range(0, nbLink):
925             if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
926                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
927                 find = link_id in listR2RLink
928                 self.assertEqual(find, True)
929                 listR2RLink.remove(link_id)
930         self.assertEqual(len(listR2RLink), 0)
931
932     def test_28_verifyOppositeLinkTopology(self):
933         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
934                .format(self.restconf_baseurl))
935         headers = {'content-type': 'application/json'}
936         response = requests.request(
937             "GET", url, headers=headers, auth=('admin', 'admin'))
938         self.assertEqual(response.status_code, requests.codes.ok)
939         res = response.json()
940         # Tests related to links
941         nbLink = len(res['network'][0]['ietf-network-topology:link'])
942         self.assertEqual(nbLink, 34)
943         for i in range(0, nbLink):
944             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
945             link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
946             link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
947             link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
948             oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
949             # Find the opposite link
950             url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
951             url = (url_oppLink.format(self.restconf_baseurl))
952             headers = {'content-type': 'application/json'}
953             response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
954             self.assertEqual(response_oppLink.status_code, requests.codes.ok)
955             res_oppLink = response_oppLink.json()
956             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
957                              ['org-openroadm-common-network:opposite-link'], link_id)
958             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
959             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
960             oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
961             if link_type == 'ADD-LINK':
962                 self.assertEqual(oppLink_type, 'DROP-LINK')
963             elif link_type == 'DROP-LINK':
964                 self.assertEqual(oppLink_type, 'ADD-LINK')
965             elif link_type == 'EXPRESS-LINK':
966                 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
967             elif link_type == 'ROADM-TO-ROADM':
968                 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
969             elif link_type == 'XPONDER-INPUT':
970                 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
971             elif link_type == 'XPONDER-OUTPUT':
972                 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
973
974     def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
975         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
976                .format(self.restconf_baseurl))
977         headers = {'content-type': 'application/json'}
978         response = requests.request(
979             "GET", url, headers=headers, auth=('admin', 'admin'))
980         self.assertEqual(response.status_code, requests.codes.ok)
981         res = response.json()
982         nbLink = len(res['network'][0]['ietf-network-topology:link'])
983         R2RLink = ['ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX',
984                    'ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX',
985                    'ROADMA01-DEG2-DEG2-TTP-TXRXtoROADMB01-DEG1-DEG1-TTP-TXRX',
986                    'ROADMC01-DEG1-DEG1-TTP-TXRXtoROADMB01-DEG2-DEG2-TTP-TXRX',
987                    'ROADMB01-DEG1-DEG1-TTP-TXRXtoROADMA01-DEG2-DEG2-TTP-TXRX',
988                    'ROADMB01-DEG2-DEG2-TTP-TXRXtoROADMC01-DEG1-DEG1-TTP-TXRX']
989         for i in range(0, nbLink):
990             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
991             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
992             if(link_id in R2RLink):
993                 find = False
994                 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
995                     'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
996                 length = res['network'][0]['ietf-network-topology:link'][i][
997                     'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
998                 if((spanLoss != None) & (length != None)):
999                     find = True
1000                 self.assertTrue(find)
1001                 R2RLink.remove(link_id)
1002         self.assertEqual(len(R2RLink), 0)
1003
1004     def test_30_disconnect_ROADMB(self):
1005         # Delete in the topology-netconf
1006         url = ("{}/config/network-topology:"
1007                "network-topology/topology/topology-netconf/node/ROADMB01"
1008                .format(self.restconf_baseurl))
1009         data = {}
1010         headers = {'content-type': 'application/json'}
1011         response = requests.request(
1012             "DELETE", url, data=json.dumps(data), headers=headers,
1013             auth=('admin', 'admin'))
1014         self.assertEqual(response.status_code, requests.codes.ok)
1015         # Delete in the clli-network
1016         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1017                .format(self.restconf_baseurl))
1018         data = {}
1019         headers = {'content-type': 'application/json'}
1020         response = requests.request(
1021             "DELETE", url, data=json.dumps(data), headers=headers,
1022             auth=('admin', 'admin'))
1023         self.assertEqual(response.status_code, requests.codes.ok)
1024
1025     def test_31_disconnect_ROADMC(self):
1026         # Delete in the topology-netconf
1027         url = ("{}/config/network-topology:"
1028                "network-topology/topology/topology-netconf/node/ROADMC01"
1029                .format(self.restconf_baseurl))
1030         data = {}
1031         headers = {'content-type': 'application/json'}
1032         response = requests.request(
1033             "DELETE", url, data=json.dumps(data), headers=headers,
1034             auth=('admin', 'admin'))
1035         self.assertEqual(response.status_code, requests.codes.ok)
1036         # Delete in the clli-network
1037         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1038                .format(self.restconf_baseurl))
1039         data = {}
1040         headers = {'content-type': 'application/json'}
1041         response = requests.request(
1042             "DELETE", url, data=json.dumps(data), headers=headers,
1043             auth=('admin', 'admin'))
1044         self.assertEqual(response.status_code, requests.codes.ok)
1045
1046     def test_32_getNodes_OpenRoadmTopology(self):
1047         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1048                .format(self.restconf_baseurl))
1049         headers = {'content-type': 'application/json'}
1050         response = requests.request(
1051             "GET", url, headers=headers, auth=('admin', 'admin'))
1052         res = response.json()
1053         # Tests related to nodes
1054         self.assertEqual(response.status_code, requests.codes.ok)
1055         nbNode = len(res['network'][0]['node'])
1056         self.assertEqual(nbNode, 5)
1057         listNode = ['XPDRA01-XPDR1', 'ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1058         for i in range(0, nbNode):
1059             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1060             nodeId = res['network'][0]['node'][i]['node-id']
1061             # Tests related to XPDRA nodes
1062             if(nodeId == 'XPDRA01-XPDR1'):
1063                 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1064                 for j in range(0, nbTp):
1065                     tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1066                     if (tpid == 'XPDR1-CLIENT1'):
1067                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1068                                          ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1069                     if (tpid == 'XPDR1-NETWORK1'):
1070                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1071                                          ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1072                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1073                                          ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1074                                          'ROADMA01-SRG1--SRG1-PP1-TXRX')
1075                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDRA01'},
1076                               res['network'][0]['node'][i]['supporting-node'])
1077                 listNode.remove(nodeId)
1078             elif(nodeId == 'ROADMA01-SRG1'):
1079                 # Test related to SRG1
1080                 self.assertEqual(nodeType, 'SRG')
1081                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1082                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1083                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1084                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1085                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1086                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1087                               res['network'][0]['node'][i]['supporting-node'])
1088                 listNode.remove(nodeId)
1089             elif(nodeId == 'ROADMA01-SRG3'):
1090                 # Test related to SRG1
1091                 self.assertEqual(nodeType, 'SRG')
1092                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1093                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1094                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1095                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1096                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1097                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1098                               res['network'][0]['node'][i]['supporting-node'])
1099                 listNode.remove(nodeId)
1100             elif(nodeId == 'ROADMA01-DEG1'):
1101                 # Test related to DEG1
1102                 self.assertEqual(nodeType, 'DEGREE')
1103                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1104                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1105                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1106                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1107                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1108                               res['network'][0]['node'][i]['supporting-node'])
1109                 listNode.remove(nodeId)
1110             elif(nodeId == 'ROADMA01-DEG2'):
1111                 # Test related to DEG2
1112                 self.assertEqual(nodeType, 'DEGREE')
1113                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1114                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1115                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1116                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1117                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1118                               res['network'][0]['node'][i]['supporting-node'])
1119                 listNode.remove(nodeId)
1120             else:
1121                 self.assertFalse(True)
1122         self.assertEqual(len(listNode), 0)
1123         # Test related to SRG1 of ROADMC
1124         for i in range(0, nbNode):
1125             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-SRG1')
1126             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG1')
1127             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01-DEG2')
1128
1129     def test_33_getOpenRoadmNetwork(self):
1130         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1131                .format(self.restconf_baseurl))
1132         headers = {'content-type': 'application/json'}
1133         response = requests.request(
1134             "GET", url, headers=headers, auth=('admin', 'admin'))
1135         self.assertEqual(response.status_code, requests.codes.ok)
1136         res = response.json()
1137         nbNode = len(res['network'][0]['node'])
1138         self.assertEqual(nbNode, 2)
1139         for i in range(0, nbNode-1):
1140             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMC01')
1141             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADMB01')
1142
1143     def test_34_getClliNetwork(self):
1144         url = ("{}/config/ietf-network:networks/network/clli-network"
1145                .format(self.restconf_baseurl))
1146         headers = {'content-type': 'application/json'}
1147         response = requests.request(
1148             "GET", url, headers=headers, auth=('admin', 'admin'))
1149         self.assertEqual(response.status_code, requests.codes.ok)
1150         res = response.json()
1151         nbNode = len(res['network'][0]['node'])
1152         self.assertEqual(nbNode, 1)
1153         for i in range(0, nbNode-1):
1154             self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1155
1156     def test_35_disconnect_XPDRA(self):
1157         url = ("{}/config/network-topology:"
1158                "network-topology/topology/topology-netconf/node/XPDRA01"
1159                .format(self.restconf_baseurl))
1160         data = {}
1161         headers = {'content-type': 'application/json'}
1162         response = requests.request(
1163             "DELETE", url, data=json.dumps(data), headers=headers,
1164             auth=('admin', 'admin'))
1165         self.assertEqual(response.status_code, requests.codes.ok)
1166
1167     def test_36_getClliNetwork(self):
1168         url = ("{}/config/ietf-network:networks/network/clli-network"
1169                .format(self.restconf_baseurl))
1170         headers = {'content-type': 'application/json'}
1171         response = requests.request(
1172             "GET", url, headers=headers, auth=('admin', 'admin'))
1173         self.assertEqual(response.status_code, requests.codes.ok)
1174         res = response.json()
1175         nbNode = len(res['network'][0]['node'])
1176         self.assertEqual(nbNode, 1)
1177         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1178
1179     def test_37_getOpenRoadmNetwork(self):
1180         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1181                .format(self.restconf_baseurl))
1182         headers = {'content-type': 'application/json'}
1183         response = requests.request(
1184             "GET", url, headers=headers, auth=('admin', 'admin'))
1185         self.assertEqual(response.status_code, requests.codes.ok)
1186         res = response.json()
1187         nbNode = len(res['network'][0]['node'])
1188         self.assertEqual(nbNode, 1)
1189         for i in range(0, nbNode):
1190             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDRA01')
1191
1192     def test_38_getNodes_OpenRoadmTopology(self):
1193         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1194                .format(self.restconf_baseurl))
1195         headers = {'content-type': 'application/json'}
1196         response = requests.request(
1197             "GET", url, headers=headers, auth=('admin', 'admin'))
1198         res = response.json()
1199         # Tests related to nodes
1200         self.assertEqual(response.status_code, requests.codes.ok)
1201         nbNode = len(res['network'][0]['node'])
1202         self.assertEqual(nbNode, 4)
1203         listNode = ['ROADMA01-SRG1', 'ROADMA01-SRG3', 'ROADMA01-DEG1', 'ROADMA01-DEG2']
1204         for i in range(0, nbNode):
1205             self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADMA01'},
1206                           res['network'][0]['node'][i]['supporting-node'])
1207             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1208             nodeId = res['network'][0]['node'][i]['node-id']
1209             if(nodeId == 'ROADMA01-SRG1'):
1210                 # Test related to SRG1
1211                 self.assertEqual(nodeType, 'SRG')
1212                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1213                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1214                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1215                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1216                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1217                 listNode.remove(nodeId)
1218             elif(nodeId == 'ROADMA01-SRG3'):
1219                 # Test related to SRG1
1220                 self.assertEqual(nodeType, 'SRG')
1221                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 17)
1222                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1223                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1224                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1225                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1226                 listNode.remove(nodeId)
1227             elif(nodeId == 'ROADMA01-DEG1'):
1228                 # Test related to DEG1
1229                 self.assertEqual(nodeType, 'DEGREE')
1230                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1231                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1232                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1233                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1234                 listNode.remove(nodeId)
1235             elif(nodeId == 'ROADMA01-DEG2'):
1236                 # Test related to DEG2
1237                 self.assertEqual(nodeType, 'DEGREE')
1238                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1239                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1240                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1241                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1242                 listNode.remove(nodeId)
1243             else:
1244                 self.assertFalse(True)
1245         self.assertEqual(len(listNode), 0)
1246
1247     def test_39_disconnect_ROADM_XPDRA_link(self):
1248         # Link-1
1249         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1250                "link/XPDRA01-XPDR1-XPDR1-NETWORK1toROADMA01-SRG1-SRG1-PP1-TXRX"
1251                .format(self.restconf_baseurl))
1252         data = {}
1253         headers = {'content-type': 'application/json'}
1254         response = requests.request(
1255             "DELETE", url, data=json.dumps(data), headers=headers,
1256             auth=('admin', 'admin'))
1257         self.assertEqual(response.status_code, requests.codes.ok)
1258         # Link-2
1259         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1260                "link/ROADMA01-SRG1-SRG1-PP1-TXRXtoXPDRA01-XPDR1-XPDR1-NETWORK1"
1261                .format(self.restconf_baseurl))
1262         data = {}
1263         headers = {'content-type': 'application/json'}
1264         response = requests.request(
1265             "DELETE", url, data=json.dumps(data), headers=headers,
1266             auth=('admin', 'admin'))
1267         self.assertEqual(response.status_code, requests.codes.ok)
1268
1269     def test_40_getLinks_OpenRoadmTopology(self):
1270         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1271                .format(self.restconf_baseurl))
1272         headers = {'content-type': 'application/json'}
1273         response = requests.request(
1274             "GET", url, headers=headers, auth=('admin', 'admin'))
1275         self.assertEqual(response.status_code, requests.codes.ok)
1276         res = response.json()
1277         nbLink = len(res['network'][0]['ietf-network-topology:link'])
1278         self.assertEqual(nbLink, 16)
1279         expressLink = ['ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1280                        'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX']
1281         addLink = ['ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG1-SRG1-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX',
1282                    'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG2-DEG2-CTP-TXRX', 'ROADMA01-SRG3-SRG3-CP-TXRXtoROADMA01-DEG1-DEG1-CTP-TXRX']
1283         dropLink = ['ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG1-SRG1-CP-TXRX',
1284                     'ROADMA01-DEG1-DEG1-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX', 'ROADMA01-DEG2-DEG2-CTP-TXRXtoROADMA01-SRG3-SRG3-CP-TXRX']
1285         roadmtoroadmLink = 0
1286         for i in range(0, nbLink):
1287             if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1288                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1289                 find = link_id in expressLink
1290                 self.assertEqual(find, True)
1291                 expressLink.remove(link_id)
1292             elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1293                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1294                 find = link_id in addLink
1295                 self.assertEqual(find, True)
1296                 addLink.remove(link_id)
1297             elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1298                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1299                 find = link_id in dropLink
1300                 self.assertEqual(find, True)
1301                 dropLink.remove(link_id)
1302             else:
1303                 roadmtoroadmLink += 1
1304         self.assertEqual(len(expressLink), 0)
1305         self.assertEqual(len(addLink), 0)
1306         self.assertEqual(len(dropLink), 0)
1307         self.assertEqual(roadmtoroadmLink, 6)
1308         for i in range(0, nbLink):
1309             self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1310                                 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1311             self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1312                                 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1313
1314     def test_41_disconnect_ROADMA(self):
1315         url = ("{}/config/network-topology:"
1316                "network-topology/topology/topology-netconf/node/ROADMA01"
1317                .format(self.restconf_baseurl))
1318         data = {}
1319         headers = {'content-type': 'application/json'}
1320         response = requests.request(
1321             "DELETE", url, data=json.dumps(data), headers=headers,
1322             auth=('admin', 'admin'))
1323         self.assertEqual(response.status_code, requests.codes.ok)
1324         # Delete in the clli-network
1325         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1326                .format(self.restconf_baseurl))
1327         data = {}
1328         headers = {'content-type': 'application/json'}
1329         response = requests.request(
1330             "DELETE", url, data=json.dumps(data), headers=headers,
1331             auth=('admin', 'admin'))
1332         self.assertEqual(response.status_code, requests.codes.ok)
1333
1334     def test_42_getClliNetwork(self):
1335         url = ("{}/config/ietf-network:networks/network/clli-network"
1336                .format(self.restconf_baseurl))
1337         headers = {'content-type': 'application/json'}
1338         response = requests.request(
1339             "GET", url, headers=headers, auth=('admin', 'admin'))
1340         self.assertEqual(response.status_code, requests.codes.ok)
1341         res = response.json()
1342         self.assertNotIn('node', res['network'][0])
1343
1344     def test_43_getOpenRoadmNetwork(self):
1345         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1346                .format(self.restconf_baseurl))
1347         headers = {'content-type': 'application/json'}
1348         response = requests.request(
1349             "GET", url, headers=headers, auth=('admin', 'admin'))
1350         self.assertEqual(response.status_code, requests.codes.ok)
1351         res = response.json()
1352         self.assertNotIn('node', res['network'][0])
1353
1354     def test_44_check_roadm2roadm_link_persistence(self):
1355         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1356                .format(self.restconf_baseurl))
1357         headers = {'content-type': 'application/json'}
1358         response = requests.request(
1359             "GET", url, headers=headers, auth=('admin', 'admin'))
1360         self.assertEqual(response.status_code, requests.codes.ok)
1361         res = response.json()
1362         nbLink = len(res['network'][0]['ietf-network-topology:link'])
1363         self.assertNotIn('node', res['network'][0])
1364         self.assertEqual(nbLink, 6)
1365
1366
1367 if __name__ == "__main__":
1368     unittest.main(verbosity=2)