improve functional tests pylint score
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_topology.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2017 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 import json
13 import os
14 import psutil
15 import requests
16 import signal
17 import shutil
18 import subprocess
19 import time
20 import unittest
21 import logging
22 import test_utils
23
24
25 class TransportPCEtesting(unittest.TestCase):
26
27     honeynode_process1 = None
28     honeynode_process2 = None
29     honeynode_process3 = None
30     honeynode_process4 = None
31     odl_process = None
32     restconf_baseurl = "http://localhost:8181/restconf"
33
34 # START_IGNORE_XTESTING
35
36     @classmethod
37     def setUpClass(cls):
38         print("starting honeynode1...")
39         cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
40         time.sleep(20)
41
42         print("starting honeynode2...")
43         cls.honeynode_process2 = test_utils.start_roadma_honeynode()
44         time.sleep(20)
45
46         print("starting honeynode3...")
47         cls.honeynode_process3 = test_utils.start_roadmb_honeynode()
48         time.sleep(20)
49
50         print("starting honeynode4...")
51         cls.honeynode_process4 = test_utils.start_roadmc_honeynode()
52         time.sleep(20)
53         print("all honeynodes started")
54
55         print("starting opendaylight...")
56         cls.odl_process = test_utils.start_tpce()
57         time.sleep(60)
58         print("opendaylight started")
59
60     @classmethod
61     def tearDownClass(cls):
62         for child in psutil.Process(cls.odl_process.pid).children():
63             child.send_signal(signal.SIGINT)
64             child.wait()
65         cls.odl_process.send_signal(signal.SIGINT)
66         cls.odl_process.wait()
67         for child in psutil.Process(cls.honeynode_process1.pid).children():
68             child.send_signal(signal.SIGINT)
69             child.wait()
70         cls.honeynode_process1.send_signal(signal.SIGINT)
71         cls.honeynode_process1.wait()
72         for child in psutil.Process(cls.honeynode_process2.pid).children():
73             child.send_signal(signal.SIGINT)
74             child.wait()
75         cls.honeynode_process2.send_signal(signal.SIGINT)
76         cls.honeynode_process2.wait()
77         for child in psutil.Process(cls.honeynode_process3.pid).children():
78             child.send_signal(signal.SIGINT)
79             child.wait()
80         cls.honeynode_process3.send_signal(signal.SIGINT)
81         cls.honeynode_process3.wait()
82         for child in psutil.Process(cls.honeynode_process4.pid).children():
83             child.send_signal(signal.SIGINT)
84             child.wait()
85         cls.honeynode_process4.send_signal(signal.SIGINT)
86         cls.honeynode_process4.wait()
87
88     def setUp(self):
89         time.sleep(5)
90
91 # END_IGNORE_XTESTING
92
93     def test_01_connect_ROADM_A1(self):
94         # Config ROADMA
95         url = ("{}/config/network-topology:"
96                "network-topology/topology/topology-netconf/node/ROADM-A1"
97                .format(self.restconf_baseurl))
98         data = {"node": [{
99             "node-id": "ROADM-A1",
100             "netconf-node-topology:username": "admin",
101             "netconf-node-topology:password": "admin",
102             "netconf-node-topology:host": "127.0.0.1",
103             "netconf-node-topology:port": "17841",
104             "netconf-node-topology:tcp-only": "false",
105             "netconf-node-topology:pass-through": {}}]}
106         headers = {'content-type': 'application/json'}
107         response = requests.request(
108             "PUT", url, data=json.dumps(data), headers=headers,
109             auth=('admin', 'admin'))
110         self.assertEqual(response.status_code, requests.codes.created)
111         time.sleep(20)
112
113     def test_02_getClliNetwork(self):
114         url = ("{}/config/ietf-network:networks/network/clli-network"
115                .format(self.restconf_baseurl))
116         headers = {'content-type': 'application/json'}
117         response = requests.request(
118             "GET", url, headers=headers, auth=('admin', 'admin'))
119         self.assertEqual(response.status_code, requests.codes.ok)
120         res = response.json()
121         logging.info(res)
122         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
123         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
124
125     def test_03_getOpenRoadmNetwork(self):
126         url = ("{}/config/ietf-network:networks/network/openroadm-network"
127                .format(self.restconf_baseurl))
128         headers = {'content-type': 'application/json'}
129         response = requests.request(
130             "GET", url, headers=headers, auth=('admin', 'admin'))
131         self.assertEqual(response.status_code, requests.codes.ok)
132         res = response.json()
133         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADM-A1')
134         self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['network-ref'], 'clli-network')
135         self.assertEqual(res['network'][0]['node'][0]['supporting-node'][0]['node-ref'], 'NodeA')
136         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-common-network:node-type'], 'ROADM')
137         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], 'model2')
138
139     def test_04_getLinks_OpenroadmTopology(self):
140         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
141                .format(self.restconf_baseurl))
142         headers = {'content-type': 'application/json'}
143         response = requests.request(
144             "GET", url, headers=headers, auth=('admin', 'admin'))
145         self.assertEqual(response.status_code, requests.codes.ok)
146         res = response.json()
147         # Tests related to links
148         nbLink = len(res['network'][0]['ietf-network-topology:link'])
149         self.assertEqual(nbLink, 10)
150         expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
151                        'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
152         addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
153                    'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
154                    'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
155                    'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
156         dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
157                     'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
158                     'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
159                     'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
160         for i in range(0, nbLink):
161             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
162             if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
163                 find = linkId in expressLink
164                 self.assertEqual(find, True)
165                 expressLink.remove(linkId)
166             elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
167                 find = linkId in addLink
168                 self.assertEqual(find, True)
169                 addLink.remove(linkId)
170             elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
171                 find = linkId in dropLink
172                 self.assertEqual(find, True)
173                 dropLink.remove(linkId)
174             else:
175                 self.assertFalse(True)
176         self.assertEqual(len(expressLink), 0)
177         self.assertEqual(len(addLink), 0)
178         self.assertEqual(len(dropLink), 0)
179
180     def test_05_getNodes_OpenRoadmTopology(self):
181         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
182                .format(self.restconf_baseurl))
183         headers = {'content-type': 'application/json'}
184         response = requests.request(
185             "GET", url, headers=headers, auth=('admin', 'admin'))
186         res = response.json()
187         # Tests related to nodes
188         self.assertEqual(response.status_code, requests.codes.ok)
189         nbNode = len(res['network'][0]['node'])
190         self.assertEqual(nbNode, 4)
191         listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
192         for i in range(0, nbNode):
193             self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
194                           res['network'][0]['node'][i]['supporting-node'])
195             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
196             nodeId = res['network'][0]['node'][i]['node-id']
197             if(nodeId == 'ROADM-A1-SRG1'):
198                 # Test related to SRG1
199                 self.assertEqual(nodeType, 'SRG')
200                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
201                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
202                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
203                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
204                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
205                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
206                               res['network'][0]['node'][i]['supporting-node'])
207                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
208                               res['network'][0]['node'][i]['supporting-node'])
209                 listNode.remove(nodeId)
210             elif(nodeId == 'ROADM-A1-SRG3'):
211                 # Test related to SRG1
212                 self.assertEqual(nodeType, 'SRG')
213                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
214                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
215                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
216                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
217                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
218                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
219                               res['network'][0]['node'][i]['supporting-node'])
220                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
221                               res['network'][0]['node'][i]['supporting-node'])
222                 listNode.remove(nodeId)
223             elif(nodeId == 'ROADM-A1-DEG1'):
224                 # Test related to DEG1
225                 self.assertEqual(nodeType, 'DEGREE')
226                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
227                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
228                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
229                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
230                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
231                               res['network'][0]['node'][i]['supporting-node'])
232                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
233                               res['network'][0]['node'][i]['supporting-node'])
234                 listNode.remove(nodeId)
235             elif(nodeId == 'ROADM-A1-DEG2'):
236                 # Test related to DEG2
237                 self.assertEqual(nodeType, 'DEGREE')
238                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
239                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
240                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
241                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
242                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
243                               res['network'][0]['node'][i]['supporting-node'])
244                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
245                               res['network'][0]['node'][i]['supporting-node'])
246                 listNode.remove(nodeId)
247             else:
248                 self.assertFalse(True)
249         self.assertEqual(len(listNode), 0)
250
251     def test_06_connect_XPDRA(self):
252         url = ("{}/config/network-topology:"
253                "network-topology/topology/topology-netconf/node/XPDR-A1"
254                .format(self.restconf_baseurl))
255         data = {"node": [{
256             "node-id": "XPDR-A1",
257             "netconf-node-topology:username": "admin",
258             "netconf-node-topology:password": "admin",
259             "netconf-node-topology:host": "127.0.0.1",
260             "netconf-node-topology:port": "17840",
261             "netconf-node-topology:tcp-only": "false",
262             "netconf-node-topology:pass-through": {}}]}
263         headers = {'content-type': 'application/json'}
264         response = requests.request(
265             "PUT", url, data=json.dumps(data), headers=headers,
266             auth=('admin', 'admin'))
267         self.assertEqual(response.status_code, requests.codes.created)
268         time.sleep(30)
269
270     def test_07_getClliNetwork(self):
271         url = ("{}/config/ietf-network:networks/network/clli-network"
272                .format(self.restconf_baseurl))
273         headers = {'content-type': 'application/json'}
274         response = requests.request(
275             "GET", url, headers=headers, auth=('admin', 'admin'))
276         self.assertEqual(response.status_code, requests.codes.ok)
277         res = response.json()
278         self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
279         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
280
281     def test_08_getOpenRoadmNetwork(self):
282         url = ("{}/config/ietf-network:networks/network/openroadm-network"
283                .format(self.restconf_baseurl))
284         headers = {'content-type': 'application/json'}
285         response = requests.request(
286             "GET", url, headers=headers, auth=('admin', 'admin'))
287         self.assertEqual(response.status_code, requests.codes.ok)
288         res = response.json()
289         nbNode = len(res['network'][0]['node'])
290         self.assertEqual(nbNode, 2)
291         for i in range(0, nbNode):
292             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
293             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
294             nodeId = res['network'][0]['node'][i]['node-id']
295             if(nodeId == 'XPDR-A1'):
296                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
297                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
298             elif(nodeId == 'ROADM-A1'):
299                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
300                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
301             else:
302                 self.assertFalse(True)
303
304     def test_09_getNodes_OpenRoadmTopology(self):
305         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
306                .format(self.restconf_baseurl))
307         headers = {'content-type': 'application/json'}
308         response = requests.request(
309             "GET", url, headers=headers, auth=('admin', 'admin'))
310         res = response.json()
311         # Tests related to nodes
312         self.assertEqual(response.status_code, requests.codes.ok)
313         nbNode = len(res['network'][0]['node'])
314         self.assertEqual(nbNode, 5)
315         listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
316         for i in range(0, nbNode):
317             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
318             nodeId = res['network'][0]['node'][i]['node-id']
319             # Tests related to XPDRA nodes
320             if(nodeId == 'XPDR-A1-XPDR1'):
321                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
322                               res['network'][0]['node'][i]['supporting-node'])
323                 self.assertIn({'network-ref': 'clli-network', 'node-ref': 'NodeA'},
324                               res['network'][0]['node'][i]['supporting-node'])
325                 self.assertEqual(nodeType, 'XPONDER')
326                 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
327                 client = 0
328                 network = 0
329                 for j in range(0, nbTps):
330                     tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
331                     tpId = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
332                     if (tpType == 'XPONDER-CLIENT'):
333                         client += 1
334                     elif (tpType == 'XPONDER-NETWORK'):
335                         network += 1
336                     if (tpId == 'XPDR1-NETWORK2'):
337                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
338                                          ['transportpce-topology:associated-connection-map-port'], 'XPDR1-CLIENT2')
339                     if (tpId == 'XPDR1-CLIENT2'):
340                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
341                                          ['transportpce-topology:associated-connection-map-port'], 'XPDR1-NETWORK2')
342
343                 self.assertTrue(client == 2)
344                 self.assertTrue(network == 2)
345                 listNode.remove(nodeId)
346             elif(nodeId == 'ROADM-A1-SRG1'):
347                 # Test related to SRG1
348                 self.assertEqual(nodeType, 'SRG')
349                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
350                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
351                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
352                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
353                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
354                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
355                               res['network'][0]['node'][i]['supporting-node'])
356                 listNode.remove(nodeId)
357             elif(nodeId == 'ROADM-A1-SRG3'):
358                 # Test related to SRG1
359                 self.assertEqual(nodeType, 'SRG')
360                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
361                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
362                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
363                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
364                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
365                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
366                               res['network'][0]['node'][i]['supporting-node'])
367                 listNode.remove(nodeId)
368             elif(nodeId == 'ROADM-A1-DEG1'):
369                 # Test related to DEG1
370                 self.assertEqual(nodeType, 'DEGREE')
371                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
372                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
373                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
374                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
375                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
376                               res['network'][0]['node'][i]['supporting-node'])
377                 listNode.remove(nodeId)
378             elif(nodeId == 'ROADM-A1-DEG2'):
379                 # Test related to DEG2
380                 self.assertEqual(nodeType, 'DEGREE')
381                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
382                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
383                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
384                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
385                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
386                               res['network'][0]['node'][i]['supporting-node'])
387                 listNode.remove(nodeId)
388             else:
389                 self.assertFalse(True)
390         self.assertEqual(len(listNode), 0)
391
392     # Connect the tail XPDRA to ROADMA and vice versa
393     def test_10_connect_tail_xpdr_rdm(self):
394         # Connect the tail: XPDRA to ROADMA
395         url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
396                .format(self.restconf_baseurl))
397         data = {"networkutils:input": {
398             "networkutils:links-input": {
399                 "networkutils:xpdr-node": "XPDR-A1",
400                 "networkutils:xpdr-num": "1",
401                 "networkutils:network-num": "1",
402                 "networkutils:rdm-node": "ROADM-A1",
403                 "networkutils:srg-num": "1",
404                 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
405             }
406         }
407         }
408         headers = {'content-type': 'application/json'}
409         response = requests.request(
410             "POST", url, data=json.dumps(data), headers=headers,
411             auth=('admin', 'admin'))
412         self.assertEqual(response.status_code, requests.codes.ok)
413
414     def test_11_connect_tail_rdm_xpdr(self):
415         # Connect the tail: ROADMA to XPDRA
416         url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
417                .format(self.restconf_baseurl))
418         data = {"networkutils:input": {
419             "networkutils:links-input": {
420                 "networkutils:xpdr-node": "XPDR-A1",
421                 "networkutils:xpdr-num": "1",
422                 "networkutils:network-num": "1",
423                 "networkutils:rdm-node": "ROADM-A1",
424                 "networkutils:srg-num": "1",
425                 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
426             }
427         }
428         }
429         headers = {'content-type': 'application/json'}
430         response = requests.request(
431             "POST", url, data=json.dumps(data), headers=headers,
432             auth=('admin', 'admin'))
433         self.assertEqual(response.status_code, requests.codes.ok)
434
435     def test_12_getLinks_OpenRoadmTopology(self):
436         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
437                .format(self.restconf_baseurl))
438         headers = {'content-type': 'application/json'}
439         response = requests.request(
440             "GET", url, headers=headers, auth=('admin', 'admin'))
441         self.assertEqual(response.status_code, requests.codes.ok)
442         res = response.json()
443         # Tests related to links
444         nbLink = len(res['network'][0]['ietf-network-topology:link'])
445         self.assertEqual(nbLink, 12)
446         expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
447                        'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
448         addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
449                    'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
450         dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
451                     'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
452         XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
453         XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
454         for i in range(0, nbLink):
455             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
456             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
457             if(nodeType == 'EXPRESS-LINK'):
458                 find = linkId in expressLink
459                 self.assertEqual(find, True)
460                 expressLink.remove(linkId)
461             elif(nodeType == 'ADD-LINK'):
462                 find = linkId in addLink
463                 self.assertEqual(find, True)
464                 addLink.remove(linkId)
465             elif(nodeType == 'DROP-LINK'):
466                 find = linkId in dropLink
467                 self.assertEqual(find, True)
468                 dropLink.remove(linkId)
469             elif(nodeType == 'XPONDER-INPUT'):
470                 find = linkId in XPDR_IN
471                 self.assertEqual(find, True)
472                 XPDR_IN.remove(linkId)
473             elif(nodeType == 'XPONDER-OUTPUT'):
474                 find = linkId in XPDR_OUT
475                 self.assertEqual(find, True)
476                 XPDR_OUT.remove(linkId)
477             else:
478                 self.assertFalse(True)
479         self.assertEqual(len(expressLink), 0)
480         self.assertEqual(len(addLink), 0)
481         self.assertEqual(len(dropLink), 0)
482         self.assertEqual(len(XPDR_IN), 0)
483         self.assertEqual(len(XPDR_OUT), 0)
484
485     def test_13_connect_ROADMC(self):
486         # Config ROADMC
487         url = ("{}/config/network-topology:"
488                "network-topology/topology/topology-netconf/node/ROADM-C1"
489                .format(self.restconf_baseurl))
490         data = {"node": [{
491             "node-id": "ROADM-C1",
492             "netconf-node-topology:username": "admin",
493             "netconf-node-topology:password": "admin",
494             "netconf-node-topology:host": "127.0.0.1",
495             "netconf-node-topology:port": "17843",
496             "netconf-node-topology:tcp-only": "false",
497             "netconf-node-topology:pass-through": {}}]}
498         headers = {'content-type': 'application/json'}
499         response = requests.request(
500             "PUT", url, data=json.dumps(data), headers=headers,
501             auth=('admin', 'admin'))
502         self.assertEqual(response.status_code, requests.codes.created)
503         time.sleep(20)
504
505     def test_14_omsAttributes_ROADMA_ROADMC(self):
506         # Config ROADMA-ROADMC oms-attributes
507         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
508                "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
509                "OMS-attributes/span"
510                .format(self.restconf_baseurl))
511         data = {"span": {
512             "auto-spanloss": "true",
513             "engineered-spanloss": 12.2,
514             "link-concatenation": [{
515                 "SRLG-Id": 0,
516                 "fiber-type": "smf",
517                 "SRLG-length": 100000,
518                 "pmd": 0.5}]}}
519         headers = {'content-type': 'application/json'}
520         response = requests.request(
521             "PUT", url, data=json.dumps(data), headers=headers,
522             auth=('admin', 'admin'))
523         self.assertEqual(response.status_code, requests.codes.created)
524
525     def test_15_omsAttributes_ROADMC_ROADMA(self):
526         # Config ROADM-C1-ROADM-A1 oms-attributes
527         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
528                "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
529                "OMS-attributes/span"
530                .format(self.restconf_baseurl))
531         data = {"span": {
532             "auto-spanloss": "true",
533             "engineered-spanloss": 12.2,
534             "link-concatenation": [{
535                 "SRLG-Id": 0,
536                 "fiber-type": "smf",
537                 "SRLG-length": 100000,
538                 "pmd": 0.5}]}}
539
540         headers = {'content-type': 'application/json'}
541         response = requests.request(
542             "PUT", url, data=json.dumps(data), headers=headers,
543             auth=('admin', 'admin'))
544         self.assertEqual(response.status_code, requests.codes.created)
545
546     def test_16_getClliNetwork(self):
547         url = ("{}/config/ietf-network:networks/network/clli-network"
548                .format(self.restconf_baseurl))
549         headers = {'content-type': 'application/json'}
550         response = requests.request(
551             "GET", url, headers=headers, auth=('admin', 'admin'))
552         self.assertEqual(response.status_code, requests.codes.ok)
553         res = response.json()
554         nbNode = len(res['network'][0]['node'])
555         listNode = ['NodeA', 'NodeC']
556         for i in range(0, nbNode):
557             nodeId = res['network'][0]['node'][i]['node-id']
558             find = nodeId in listNode
559             self.assertEqual(find, True)
560             if(nodeId == 'NodeA'):
561                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
562             else:
563                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
564             listNode.remove(nodeId)
565         self.assertEqual(len(listNode), 0)
566
567     def test_17_getOpenRoadmNetwork(self):
568         url = ("{}/config/ietf-network:networks/network/openroadm-network"
569                .format(self.restconf_baseurl))
570         headers = {'content-type': 'application/json'}
571         response = requests.request(
572             "GET", url, headers=headers, auth=('admin', 'admin'))
573         self.assertEqual(response.status_code, requests.codes.ok)
574         res = response.json()
575         nbNode = len(res['network'][0]['node'])
576         self.assertEqual(nbNode, 3)
577         listNode = ['XPDR-A1', 'ROADM-A1', 'ROADM-C1']
578         for i in range(0, nbNode):
579             self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['network-ref'], 'clli-network')
580             nodeId = res['network'][0]['node'][i]['node-id']
581             if(nodeId == 'XPDR-A1'):
582                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
583                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'XPONDER')
584                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
585                 listNode.remove(nodeId)
586             elif(nodeId == 'ROADM-A1'):
587                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeA')
588                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
589                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
590                 listNode.remove(nodeId)
591             elif(nodeId == 'ROADM-C1'):
592                 self.assertEqual(res['network'][0]['node'][i]['supporting-node'][0]['node-ref'], 'NodeC')
593                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'ROADM')
594                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-network:model'], 'model2')
595                 listNode.remove(nodeId)
596             else:
597                 self.assertFalse(True)
598         self.assertEqual(len(listNode), 0)
599
600     def test_18_getROADMLinkOpenRoadmTopology(self):
601         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
602                .format(self.restconf_baseurl))
603         headers = {'content-type': 'application/json'}
604         response = requests.request(
605             "GET", url, headers=headers, auth=('admin', 'admin'))
606         self.assertEqual(response.status_code, requests.codes.ok)
607         res = response.json()
608         # Tests related to links
609         nbLink = len(res['network'][0]['ietf-network-topology:link'])
610         self.assertEqual(nbLink, 20)
611         expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX', 'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX',
612                        'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX', 'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX']
613         addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
614                    'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
615                    'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG2-DEG2-CTP-TXRX', 'ROADM-C1-SRG1-SRG1-CP-TXRXtoROADM-C1-DEG1-DEG1-CTP-TXRX']
616         dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
617                     'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX',
618                     'ROADM-C1-DEG1-DEG1-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX', 'ROADM-C1-DEG2-DEG2-CTP-TXRXtoROADM-C1-SRG1-SRG1-CP-TXRX']
619         R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
620                    'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
621         XPDR_IN = ['ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
622         XPDR_OUT = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX']
623         for i in range(0, nbLink):
624             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
625             linkId = res['network'][0]['ietf-network-topology:link'][i]['link-id']
626             if(nodeType == 'EXPRESS-LINK'):
627                 find = linkId in expressLink
628                 self.assertEqual(find, True)
629                 expressLink.remove(linkId)
630             elif(nodeType == 'ADD-LINK'):
631                 find = linkId in addLink
632                 self.assertEqual(find, True)
633                 addLink.remove(linkId)
634             elif(nodeType == 'DROP-LINK'):
635                 find = linkId in dropLink
636                 self.assertEqual(find, True)
637                 dropLink.remove(linkId)
638             elif(nodeType == 'ROADM-TO-ROADM'):
639                 find = linkId in R2RLink
640                 self.assertEqual(find, True)
641                 R2RLink.remove(linkId)
642             elif(nodeType == 'XPONDER-INPUT'):
643                 find = linkId in XPDR_IN
644                 self.assertEqual(find, True)
645                 XPDR_IN.remove(linkId)
646             elif(nodeType == 'XPONDER-OUTPUT'):
647                 find = linkId in XPDR_OUT
648                 self.assertEqual(find, True)
649                 XPDR_OUT.remove(linkId)
650             else:
651                 self.assertFalse(True)
652         self.assertEqual(len(expressLink), 0)
653         self.assertEqual(len(addLink), 0)
654         self.assertEqual(len(dropLink), 0)
655         self.assertEqual(len(R2RLink), 0)
656         self.assertEqual(len(XPDR_IN), 0)
657         self.assertEqual(len(XPDR_OUT), 0)
658
659     def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
660         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
661                .format(self.restconf_baseurl))
662         headers = {'content-type': 'application/json'}
663         response = requests.request(
664             "GET", url, headers=headers, auth=('admin', 'admin'))
665         self.assertEqual(response.status_code, requests.codes.ok)
666         res = response.json()
667         # Tests related to links
668         nbLink = len(res['network'][0]['ietf-network-topology:link'])
669         self.assertEqual(nbLink, 20)
670         R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
671                    'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX']
672         for i in range(0, nbLink):
673             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
674             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
675             if(link_id in R2RLink):
676                 find = False
677                 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
678                     'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
679                 length = res['network'][0]['ietf-network-topology:link'][i][
680                     'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
681                 if((spanLoss != None) & (length != None)):
682                     find = True
683                 self.assertTrue(find)
684                 R2RLink.remove(link_id)
685         self.assertEqual(len(R2RLink), 0)
686
687     def test_20_getNodes_OpenRoadmTopology(self):
688         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
689                .format(self.restconf_baseurl))
690         headers = {'content-type': 'application/json'}
691         response = requests.request(
692             "GET", url, headers=headers, auth=('admin', 'admin'))
693         res = response.json()
694         # Tests related to nodes
695         self.assertEqual(response.status_code, requests.codes.ok)
696         nbNode = len(res['network'][0]['node'])
697         self.assertEqual(nbNode, 8)
698         listNode = ['XPDR-A1-XPDR1',
699                     'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2',
700                     'ROADM-C1-SRG1', 'ROADM-C1-DEG1', 'ROADM-C1-DEG2']
701         # ************************Tests related to XPDRA nodes
702         for i in range(0, nbNode):
703             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
704             nodeId = res['network'][0]['node'][i]['node-id']
705             if(nodeId == 'XPDR-A1-XPDR1'):
706                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
707                               res['network'][0]['node'][i]['supporting-node'])
708                 self.assertEqual(nodeType, 'XPONDER')
709                 nbTps = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
710                 self.assertTrue(nbTps >= 4)
711                 client = 0
712                 network = 0
713                 for j in range(0, nbTps):
714                     tpType = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['org-openroadm-common-network:tp-type']
715                     if (tpType == 'XPONDER-CLIENT'):
716                         client += 1
717                     elif (tpType == 'XPONDER-NETWORK'):
718                         network += 1
719                 self.assertTrue(client == 2)
720                 self.assertTrue(network == 2)
721                 listNode.remove(nodeId)
722             elif(nodeId == 'ROADM-A1-SRG1'):
723                 # Test related to SRG1
724                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
725                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
726                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
727                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
728                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
729                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
730                               res['network'][0]['node'][i]['supporting-node'])
731                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
732                 listNode.remove(nodeId)
733             elif(nodeId == 'ROADM-A1-SRG3'):
734                 # Test related to SRG1
735                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
736                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
737                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
738                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
739                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
740                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
741                               res['network'][0]['node'][i]['supporting-node'])
742                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
743                 listNode.remove(nodeId)
744             elif(nodeId == 'ROADM-A1-DEG1'):
745                 # Test related to DEG1
746                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
747                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
748                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
749                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
750                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
751                               res['network'][0]['node'][i]['supporting-node'])
752                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
753                 listNode.remove(nodeId)
754             elif(nodeId == 'ROADM-A1-DEG2'):
755                 # Test related to DEG2
756                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
757                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
758                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
759                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
760                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
761                               res['network'][0]['node'][i]['supporting-node'])
762                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
763                 listNode.remove(nodeId)
764             elif(nodeId == 'ROADM-C1-SRG1'):
765                 # Test related to SRG1
766                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
767                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
768                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
769                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
770                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
771                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
772                               res['network'][0]['node'][i]['supporting-node'])
773                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'SRG')
774                 listNode.remove(nodeId)
775             elif(nodeId == 'ROADM-C1-DEG1'):
776                 # Test related to DEG1
777                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
778                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
779                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
780                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
781                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
782                               res['network'][0]['node'][i]['supporting-node'])
783                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
784                 listNode.remove(nodeId)
785             elif(nodeId == 'ROADM-C1-DEG2'):
786                 # Test related to DEG1
787                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
788                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
789                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
790                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
791                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-C1'},
792                               res['network'][0]['node'][i]['supporting-node'])
793                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-common-network:node-type'], 'DEGREE')
794                 listNode.remove(nodeId)
795             else:
796                 self.assertFalse(True)
797         self.assertEqual(len(listNode), 0)
798
799     def test_21_connect_ROADMB(self):
800         url = ("{}/config/network-topology:"
801                "network-topology/topology/topology-netconf/node/ROADM-B1"
802                .format(self.restconf_baseurl))
803         data = {"node": [{
804             "node-id": "ROADM-B1",
805             "netconf-node-topology:username": "admin",
806             "netconf-node-topology:password": "admin",
807             "netconf-node-topology:host": "127.0.0.1",
808             "netconf-node-topology:port": "17842",
809             "netconf-node-topology:tcp-only": "false",
810             "netconf-node-topology:pass-through": {}}]}
811         headers = {'content-type': 'application/json'}
812         response = requests.request(
813             "PUT", url, data=json.dumps(data), headers=headers,
814             auth=('admin', 'admin'))
815         self.assertEqual(response.status_code, requests.codes.created)
816         time.sleep(20)
817
818     def test_22_omsAttributes_ROADMA_ROADMB(self):
819         # Config ROADM-A1-ROADM-B1 oms-attributes
820         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
821                "link/ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
822                "OMS-attributes/span"
823                .format(self.restconf_baseurl))
824         data = {"span": {
825                 "auto-spanloss": "true",
826                 "engineered-spanloss": 12.2,
827                 "spanloss-current": 12,
828                 "spanloss-base": 11.4,
829                 "link-concatenation": [{
830                     "SRLG-Id": 0,
831                     "fiber-type": "smf",
832                     "SRLG-length": 100000,
833                     "pmd": 0.5}]}}
834         headers = {'content-type': 'application/json'}
835         response = requests.request(
836             "PUT", url, data=json.dumps(data), headers=headers,
837             auth=('admin', 'admin'))
838         self.assertEqual(response.status_code, requests.codes.created)
839
840     def test_23_omsAttributes_ROADMB_ROADMA(self):
841         # Config ROADM-B1-ROADM-A1 oms-attributes
842         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
843                "link/ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
844                "OMS-attributes/span"
845                .format(self.restconf_baseurl))
846         data = {"span": {
847                 "auto-spanloss": "true",
848                 "engineered-spanloss": 12.2,
849                 "spanloss-current": 12,
850                 "spanloss-base": 11.4,
851                 "link-concatenation": [{
852                     "SRLG-Id": 0,
853                     "fiber-type": "smf",
854                     "SRLG-length": 100000,
855                     "pmd": 0.5}]}}
856         headers = {'content-type': 'application/json'}
857         response = requests.request(
858             "PUT", url, data=json.dumps(data), headers=headers,
859             auth=('admin', 'admin'))
860         self.assertEqual(response.status_code, requests.codes.created)
861
862     def test_24_omsAttributes_ROADMB_ROADMC(self):
863         # Config ROADM-B1-ROADM-C1 oms-attributes
864         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
865                "link/ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
866                "OMS-attributes/span"
867                .format(self.restconf_baseurl))
868         data = {"span": {
869                 "auto-spanloss": "true",
870                 "engineered-spanloss": 12.2,
871                 "spanloss-current": 12,
872                 "spanloss-base": 11.4,
873                 "link-concatenation": [{
874                     "SRLG-Id": 0,
875                     "fiber-type": "smf",
876                     "SRLG-length": 100000,
877                     "pmd": 0.5}]}}
878         headers = {'content-type': 'application/json'}
879         response = requests.request(
880             "PUT", url, data=json.dumps(data), headers=headers,
881             auth=('admin', 'admin'))
882         self.assertEqual(response.status_code, requests.codes.created)
883
884     def test_25_omsAttributes_ROADMC_ROADMB(self):
885         # Config ROADM-C1-ROADM-B1 oms-attributes
886         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
887                "link/ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
888                "OMS-attributes/span"
889                .format(self.restconf_baseurl))
890         data = {"span": {
891                 "auto-spanloss": "true",
892                 "engineered-spanloss": 12.2,
893                 "link-concatenation": [{
894                     "SRLG-Id": 0,
895                     "fiber-type": "smf",
896                     "SRLG-length": 100000,
897                     "pmd": 0.5}]}}
898         headers = {'content-type': 'application/json'}
899         response = requests.request(
900             "PUT", url, data=json.dumps(data), headers=headers,
901             auth=('admin', 'admin'))
902         self.assertEqual(response.status_code, requests.codes.created)
903
904     def test_26_getClliNetwork(self):
905         url = ("{}/config/ietf-network:networks/network/clli-network"
906                .format(self.restconf_baseurl))
907         headers = {'content-type': 'application/json'}
908         response = requests.request(
909             "GET", url, headers=headers, auth=('admin', 'admin'))
910         self.assertEqual(response.status_code, requests.codes.ok)
911         res = response.json()
912         nbNode = len(res['network'][0]['node'])
913         listNode = ['NodeA', 'NodeB', 'NodeC']
914         for i in range(0, nbNode):
915             nodeId = res['network'][0]['node'][i]['node-id']
916             find = nodeId in listNode
917             self.assertEqual(find, True)
918             if(nodeId == 'NodeA'):
919                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeA')
920             elif(nodeId == 'NodeB'):
921                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeB')
922             else:
923                 self.assertEqual(res['network'][0]['node'][i]['org-openroadm-clli-network:clli'], 'NodeC')
924             listNode.remove(nodeId)
925         self.assertEqual(len(listNode), 0)
926
927     def test_27_verifyDegree(self):
928         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
929                .format(self.restconf_baseurl))
930         headers = {'content-type': 'application/json'}
931         response = requests.request(
932             "GET", url, headers=headers, auth=('admin', 'admin'))
933         self.assertEqual(response.status_code, requests.codes.ok)
934         res = response.json()
935         # Tests related to links
936         nbLink = len(res['network'][0]['ietf-network-topology:link'])
937         listR2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
938                        'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX', 'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
939                        'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX', 'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
940         for i in range(0, nbLink):
941             if res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ROADM-TO-ROADM':
942                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
943                 find = link_id in listR2RLink
944                 self.assertEqual(find, True)
945                 listR2RLink.remove(link_id)
946         self.assertEqual(len(listR2RLink), 0)
947
948     def test_28_verifyOppositeLinkTopology(self):
949         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
950                .format(self.restconf_baseurl))
951         headers = {'content-type': 'application/json'}
952         response = requests.request(
953             "GET", url, headers=headers, auth=('admin', 'admin'))
954         self.assertEqual(response.status_code, requests.codes.ok)
955         res = response.json()
956         # Tests related to links
957         nbLink = len(res['network'][0]['ietf-network-topology:link'])
958         self.assertEqual(nbLink, 26)
959         for i in range(0, nbLink):
960             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
961             link_type = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
962             link_src = res['network'][0]['ietf-network-topology:link'][i]['source']['source-node']
963             link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
964             oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
965             # Find the opposite link
966             url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
967             url = (url_oppLink.format(self.restconf_baseurl))
968             headers = {'content-type': 'application/json'}
969             response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
970             self.assertEqual(response_oppLink.status_code, requests.codes.ok)
971             res_oppLink = response_oppLink.json()
972             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
973                              ['org-openroadm-common-network:opposite-link'], link_id)
974             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['source']['source-node'], link_dest)
975             self.assertEqual(res_oppLink['ietf-network-topology:link'][0]['destination']['dest-node'], link_src)
976             oppLink_type = res_oppLink['ietf-network-topology:link'][0]['org-openroadm-common-network:link-type']
977             if link_type == 'ADD-LINK':
978                 self.assertEqual(oppLink_type, 'DROP-LINK')
979             elif link_type == 'DROP-LINK':
980                 self.assertEqual(oppLink_type, 'ADD-LINK')
981             elif link_type == 'EXPRESS-LINK':
982                 self.assertEqual(oppLink_type, 'EXPRESS-LINK')
983             elif link_type == 'ROADM-TO-ROADM':
984                 self.assertEqual(oppLink_type, 'ROADM-TO-ROADM')
985             elif link_type == 'XPONDER-INPUT':
986                 self.assertEqual(oppLink_type, 'XPONDER-OUTPUT')
987             elif link_type == 'XPONDER-OUTPUT':
988                 self.assertEqual(oppLink_type, 'XPONDER-INPUT')
989
990     def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
991         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
992                .format(self.restconf_baseurl))
993         headers = {'content-type': 'application/json'}
994         response = requests.request(
995             "GET", url, headers=headers, auth=('admin', 'admin'))
996         self.assertEqual(response.status_code, requests.codes.ok)
997         res = response.json()
998         nbLink = len(res['network'][0]['ietf-network-topology:link'])
999         R2RLink = ['ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX',
1000                    'ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
1001                    'ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX',
1002                    'ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX',
1003                    'ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX',
1004                    'ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX']
1005         for i in range(0, nbLink):
1006             nodeType = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1007             link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1008             if(link_id in R2RLink):
1009                 find = False
1010                 spanLoss = res['network'][0]['ietf-network-topology:link'][i][
1011                     'org-openroadm-network-topology:OMS-attributes']['span']["engineered-spanloss"]
1012                 length = res['network'][0]['ietf-network-topology:link'][i][
1013                     'org-openroadm-network-topology:OMS-attributes']['span']['link-concatenation'][0]['SRLG-length']
1014                 if((spanLoss != None) & (length != None)):
1015                     find = True
1016                 self.assertTrue(find)
1017                 R2RLink.remove(link_id)
1018         self.assertEqual(len(R2RLink), 0)
1019
1020     def test_30_disconnect_ROADMB(self):
1021         # Delete in the topology-netconf
1022         url = ("{}/config/network-topology:"
1023                "network-topology/topology/topology-netconf/node/ROADM-B1"
1024                .format(self.restconf_baseurl))
1025         data = {}
1026         headers = {'content-type': 'application/json'}
1027         response = requests.request(
1028             "DELETE", url, data=json.dumps(data), headers=headers,
1029             auth=('admin', 'admin'))
1030         self.assertEqual(response.status_code, requests.codes.ok)
1031         # Delete in the clli-network
1032         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
1033                .format(self.restconf_baseurl))
1034         data = {}
1035         headers = {'content-type': 'application/json'}
1036         response = requests.request(
1037             "DELETE", url, data=json.dumps(data), headers=headers,
1038             auth=('admin', 'admin'))
1039         self.assertEqual(response.status_code, requests.codes.ok)
1040
1041     def test_31_disconnect_ROADMC(self):
1042         # Delete in the topology-netconf
1043         url = ("{}/config/network-topology:"
1044                "network-topology/topology/topology-netconf/node/ROADM-C1"
1045                .format(self.restconf_baseurl))
1046         data = {}
1047         headers = {'content-type': 'application/json'}
1048         response = requests.request(
1049             "DELETE", url, data=json.dumps(data), headers=headers,
1050             auth=('admin', 'admin'))
1051         self.assertEqual(response.status_code, requests.codes.ok)
1052         # Delete in the clli-network
1053         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
1054                .format(self.restconf_baseurl))
1055         data = {}
1056         headers = {'content-type': 'application/json'}
1057         response = requests.request(
1058             "DELETE", url, data=json.dumps(data), headers=headers,
1059             auth=('admin', 'admin'))
1060         self.assertEqual(response.status_code, requests.codes.ok)
1061
1062 #    def test_24_check_roadm2roadm_links_deletion(self):
1063 #        url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1064 #               .format(self.restconf_baseurl))
1065 #        headers = {'content-type': 'application/json'}
1066 #        response = requests.request(
1067 #             "GET", url, headers=headers, auth=('admin', 'admin'))
1068 #        self.assertEqual(response.status_code, requests.codes.ok)
1069 #        res = response.json()
1070 #        #Write the response in the log
1071 #        with open('./transportpce_tests/log/response.log', 'a') as outfile1:
1072 #            outfile1.write(str(res))
1073 #        #Tests related to links
1074 #        nbLink=len(res['network'][0]['ietf-network-topology:link'])
1075 #        self.assertEqual(nbLink,8)
1076 #        expressLink=['ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX','ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX']
1077 #        addLink=['ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG2-DEG2-CTP-TXRX','ROADMA-SRG1-SRG1-CP-TXRXtoROADMA-DEG1-DEG1-CTP-TXRX',]
1078 #        dropLink=['ROADMA-DEG1-DEG1-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX','ROADMA-DEG2-DEG2-CTP-TXRXtoROADMA-SRG1-SRG1-CP-TXRX']
1079 #        XPDR_IN=['ROADMA-SRG1-SRG1-PP1-TXRXtoXPDRA-XPDR1-XPDR1-NETWORK1']
1080 #        XPDR_OUT=['XPDRA-XPDR1-XPDR1-NETWORK1toROADMA-SRG1-SRG1-PP1-TXRX']
1081 #        for i in range(0,nbLink):
1082 #            nodeType=res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type']
1083 #            linkId=res['network'][0]['ietf-network-topology:link'][i]['link-id']
1084 #            if(nodeType=='EXPRESS-LINK'):
1085 #                find= linkId in expressLink
1086 #                self.assertEqual(find, True)
1087 #                expressLink.remove(linkId)
1088 #            elif(nodeType=='ADD-LINK'):
1089 #                find= linkId in addLink
1090 #                self.assertEqual(find, True)
1091 #                addLink.remove(linkId)
1092 #            elif(nodeType=='DROP-LINK'):
1093 #                find= linkId in dropLink
1094 #                self.assertEqual(find, True)
1095 #                dropLink.remove(linkId)
1096 #            elif(nodeType=='XPONDER-INPUT'):
1097 #                find= linkId in XPDR_IN
1098 #                self.assertEqual(find, True)
1099 #                XPDR_IN.remove(linkId)
1100 #            elif(nodeType=='XPONDER-OUTPUT'):
1101 #                find= linkId in XPDR_OUT
1102 #                self.assertEqual(find, True)
1103 #                XPDR_OUT.remove(linkId)
1104 #            else:
1105 #                self.assertFalse(True)
1106 #        self.assertEqual(len(expressLink),0)
1107 #        self.assertEqual(len(addLink),0)
1108 #        self.assertEqual(len(dropLink),0)
1109 #        self.assertEqual(len(XPDR_IN),0)
1110 #        self.assertEqual(len(XPDR_OUT),0)
1111 #
1112 #        for i in range(0,nbLink):
1113 #            self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'],'ROADM-TO-ROADM')
1114 #            self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1115 #            self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1116 #            self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-SRG1-SRG1-CP-TXRXtoROADMC-DEG2-DEG1-CTP-TXRX')
1117 #            self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG2-CTP-TXRXtoROADMC-SRG1-SRG1-CP-TXRX')
1118 #            self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG1-DEG1-CTP-TXRXtoROADMC-DEG2-DEG2-CTP-TXRX')
1119 #            self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
1120
1121     def test_32_getNodes_OpenRoadmTopology(self):
1122         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1123                .format(self.restconf_baseurl))
1124         headers = {'content-type': 'application/json'}
1125         response = requests.request(
1126             "GET", url, headers=headers, auth=('admin', 'admin'))
1127         res = response.json()
1128         # Tests related to nodes
1129         self.assertEqual(response.status_code, requests.codes.ok)
1130         nbNode = len(res['network'][0]['node'])
1131         self.assertEqual(nbNode, 5)
1132         listNode = ['XPDR-A1-XPDR1', 'ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1133         for i in range(0, nbNode):
1134             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1135             nodeId = res['network'][0]['node'][i]['node-id']
1136             # Tests related to XPDRA nodes
1137             if(nodeId == 'XPDR-A1-XPDR1'):
1138                 nbTp = len(res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1139                 for j in range(0, nbTp):
1140                     tpid = res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
1141                     if (tpid == 'XPDR1-CLIENT1'):
1142                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1143                                          ['org-openroadm-common-network:tp-type'], 'XPONDER-CLIENT')
1144                     if (tpid == 'XPDR1-NETWORK1'):
1145                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1146                                          ['org-openroadm-common-network:tp-type'], 'XPONDER-NETWORK')
1147                         self.assertEqual(res['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]
1148                                          ['org-openroadm-network-topology:xpdr-network-attributes']['tail-equipment-id'],
1149                                          'ROADM-A1-SRG1--SRG1-PP1-TXRX')
1150                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'XPDR-A1'},
1151                               res['network'][0]['node'][i]['supporting-node'])
1152                 listNode.remove(nodeId)
1153             elif(nodeId == 'ROADM-A1-SRG1'):
1154                 # Test related to SRG1
1155                 self.assertEqual(nodeType, 'SRG')
1156                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1157                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1158                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1159                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1160                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1161                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1162                               res['network'][0]['node'][i]['supporting-node'])
1163                 listNode.remove(nodeId)
1164             elif(nodeId == 'ROADM-A1-SRG3'):
1165                 # Test related to SRG1
1166                 self.assertEqual(nodeType, 'SRG')
1167                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1168                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1169                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1170                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1171                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1172                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1173                               res['network'][0]['node'][i]['supporting-node'])
1174                 listNode.remove(nodeId)
1175             elif(nodeId == 'ROADM-A1-DEG1'):
1176                 # Test related to DEG1
1177                 self.assertEqual(nodeType, 'DEGREE')
1178                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1179                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1180                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1181                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1182                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1183                               res['network'][0]['node'][i]['supporting-node'])
1184                 listNode.remove(nodeId)
1185             elif(nodeId == 'ROADM-A1-DEG2'):
1186                 # Test related to DEG2
1187                 self.assertEqual(nodeType, 'DEGREE')
1188                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1189                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1190                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1191                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1192                 self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1193                               res['network'][0]['node'][i]['supporting-node'])
1194                 listNode.remove(nodeId)
1195             else:
1196                 self.assertFalse(True)
1197         self.assertEqual(len(listNode), 0)
1198         # Test related to SRG1 of ROADMC
1199         for i in range(0, nbNode):
1200             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-SRG1')
1201             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG1')
1202             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG2')
1203
1204     def test_33_getOpenRoadmNetwork(self):
1205         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1206                .format(self.restconf_baseurl))
1207         headers = {'content-type': 'application/json'}
1208         response = requests.request(
1209             "GET", url, headers=headers, auth=('admin', 'admin'))
1210         self.assertEqual(response.status_code, requests.codes.ok)
1211         res = response.json()
1212         nbNode = len(res['network'][0]['node'])
1213         self.assertEqual(nbNode, 2)
1214         for i in range(0, nbNode-1):
1215             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1')
1216             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-B1')
1217
1218     def test_34_getClliNetwork(self):
1219         url = ("{}/config/ietf-network:networks/network/clli-network"
1220                .format(self.restconf_baseurl))
1221         headers = {'content-type': 'application/json'}
1222         response = requests.request(
1223             "GET", url, headers=headers, auth=('admin', 'admin'))
1224         self.assertEqual(response.status_code, requests.codes.ok)
1225         res = response.json()
1226         nbNode = len(res['network'][0]['node'])
1227         self.assertEqual(nbNode, 1)
1228         for i in range(0, nbNode-1):
1229             self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
1230
1231     def test_35_disconnect_XPDRA(self):
1232         url = ("{}/config/network-topology:"
1233                "network-topology/topology/topology-netconf/node/XPDR-A1"
1234                .format(self.restconf_baseurl))
1235         data = {}
1236         headers = {'content-type': 'application/json'}
1237         response = requests.request(
1238             "DELETE", url, data=json.dumps(data), headers=headers,
1239             auth=('admin', 'admin'))
1240         self.assertEqual(response.status_code, requests.codes.ok)
1241
1242     def test_36_getClliNetwork(self):
1243         url = ("{}/config/ietf-network:networks/network/clli-network"
1244                .format(self.restconf_baseurl))
1245         headers = {'content-type': 'application/json'}
1246         response = requests.request(
1247             "GET", url, headers=headers, auth=('admin', 'admin'))
1248         self.assertEqual(response.status_code, requests.codes.ok)
1249         res = response.json()
1250         nbNode = len(res['network'][0]['node'])
1251         self.assertEqual(nbNode, 1)
1252         self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
1253
1254     def test_37_getOpenRoadmNetwork(self):
1255         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1256                .format(self.restconf_baseurl))
1257         headers = {'content-type': 'application/json'}
1258         response = requests.request(
1259             "GET", url, headers=headers, auth=('admin', 'admin'))
1260         self.assertEqual(response.status_code, requests.codes.ok)
1261         res = response.json()
1262         nbNode = len(res['network'][0]['node'])
1263         self.assertEqual(nbNode, 1)
1264         for i in range(0, nbNode):
1265             self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDR-A1')
1266
1267     def test_38_getNodes_OpenRoadmTopology(self):
1268         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1269                .format(self.restconf_baseurl))
1270         headers = {'content-type': 'application/json'}
1271         response = requests.request(
1272             "GET", url, headers=headers, auth=('admin', 'admin'))
1273         res = response.json()
1274         # Tests related to nodes
1275         self.assertEqual(response.status_code, requests.codes.ok)
1276         nbNode = len(res['network'][0]['node'])
1277         self.assertEqual(nbNode, 4)
1278         listNode = ['ROADM-A1-SRG1', 'ROADM-A1-SRG3', 'ROADM-A1-DEG1', 'ROADM-A1-DEG2']
1279         for i in range(0, nbNode):
1280             self.assertIn({'network-ref': 'openroadm-network', 'node-ref': 'ROADM-A1'},
1281                           res['network'][0]['node'][i]['supporting-node'])
1282             nodeType = res['network'][0]['node'][i]['org-openroadm-common-network:node-type']
1283             nodeId = res['network'][0]['node'][i]['node-id']
1284             if(nodeId == 'ROADM-A1-SRG1'):
1285                 # Test related to SRG1
1286                 self.assertEqual(nodeType, 'SRG')
1287                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1288                 self.assertIn({'tp-id': 'SRG1-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1289                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1290                 self.assertIn({'tp-id': 'SRG1-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1291                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1292                 listNode.remove(nodeId)
1293             elif(nodeId == 'ROADM-A1-SRG3'):
1294                 # Test related to SRG1
1295                 self.assertEqual(nodeType, 'SRG')
1296                 self.assertEqual(len(res['network'][0]['node'][i]['ietf-network-topology:termination-point']), 5)
1297                 self.assertIn({'tp-id': 'SRG3-CP-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-CP'},
1298                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1299                 self.assertIn({'tp-id': 'SRG3-PP1-TXRX', 'org-openroadm-common-network:tp-type': 'SRG-TXRX-PP'},
1300                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1301                 listNode.remove(nodeId)
1302             elif(nodeId == 'ROADM-A1-DEG1'):
1303                 # Test related to DEG1
1304                 self.assertEqual(nodeType, 'DEGREE')
1305                 self.assertIn({'tp-id': 'DEG1-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1306                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1307                 self.assertIn({'tp-id': 'DEG1-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1308                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1309                 listNode.remove(nodeId)
1310             elif(nodeId == 'ROADM-A1-DEG2'):
1311                 # Test related to DEG2
1312                 self.assertEqual(nodeType, 'DEGREE')
1313                 self.assertIn({'tp-id': 'DEG2-TTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-TTP'},
1314                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1315                 self.assertIn({'tp-id': 'DEG2-CTP-TXRX', 'org-openroadm-common-network:tp-type': 'DEGREE-TXRX-CTP'},
1316                               res['network'][0]['node'][i]['ietf-network-topology:termination-point'])
1317                 listNode.remove(nodeId)
1318             else:
1319                 self.assertFalse(True)
1320         self.assertEqual(len(listNode), 0)
1321
1322     def test_39_disconnect_ROADM_XPDRA_link(self):
1323         # Link-1
1324         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1325                "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
1326                .format(self.restconf_baseurl))
1327         data = {}
1328         headers = {'content-type': 'application/json'}
1329         response = requests.request(
1330             "DELETE", url, data=json.dumps(data), headers=headers,
1331             auth=('admin', 'admin'))
1332         self.assertEqual(response.status_code, requests.codes.ok)
1333         # Link-2
1334         url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
1335                "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
1336                .format(self.restconf_baseurl))
1337         data = {}
1338         headers = {'content-type': 'application/json'}
1339         response = requests.request(
1340             "DELETE", url, data=json.dumps(data), headers=headers,
1341             auth=('admin', 'admin'))
1342         self.assertEqual(response.status_code, requests.codes.ok)
1343
1344     def test_40_getLinks_OpenRoadmTopology(self):
1345         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1346                .format(self.restconf_baseurl))
1347         headers = {'content-type': 'application/json'}
1348         response = requests.request(
1349             "GET", url, headers=headers, auth=('admin', 'admin'))
1350         self.assertEqual(response.status_code, requests.codes.ok)
1351         res = response.json()
1352         nbLink = len(res['network'][0]['ietf-network-topology:link'])
1353         self.assertEqual(nbLink, 16)
1354         expressLink = ['ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1355                        'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX']
1356         addLink = ['ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG1-SRG1-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX',
1357                    'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG2-DEG2-CTP-TXRX', 'ROADM-A1-SRG3-SRG3-CP-TXRXtoROADM-A1-DEG1-DEG1-CTP-TXRX']
1358         dropLink = ['ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG1-SRG1-CP-TXRX',
1359                     'ROADM-A1-DEG1-DEG1-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX', 'ROADM-A1-DEG2-DEG2-CTP-TXRXtoROADM-A1-SRG3-SRG3-CP-TXRX']
1360         roadmtoroadmLink = 0
1361         for i in range(0, nbLink):
1362             if (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'EXPRESS-LINK'):
1363                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1364                 find = link_id in expressLink
1365                 self.assertEqual(find, True)
1366                 expressLink.remove(link_id)
1367             elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'ADD-LINK'):
1368                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1369                 find = link_id in addLink
1370                 self.assertEqual(find, True)
1371                 addLink.remove(link_id)
1372             elif (res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:link-type'] == 'DROP-LINK'):
1373                 link_id = res['network'][0]['ietf-network-topology:link'][i]['link-id']
1374                 find = link_id in dropLink
1375                 self.assertEqual(find, True)
1376                 dropLink.remove(link_id)
1377             else:
1378                 roadmtoroadmLink += 1
1379         self.assertEqual(len(expressLink), 0)
1380         self.assertEqual(len(addLink), 0)
1381         self.assertEqual(len(dropLink), 0)
1382         self.assertEqual(roadmtoroadmLink, 6)
1383         for i in range(0, nbLink):
1384             self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1385                                 ['org-openroadm-common-network:link-type'], 'XPONDER-OUTPUT')
1386             self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]
1387                                 ['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
1388
1389     def test_41_disconnect_ROADMA(self):
1390         url = ("{}/config/network-topology:"
1391                "network-topology/topology/topology-netconf/node/ROADM-A1"
1392                .format(self.restconf_baseurl))
1393         data = {}
1394         headers = {'content-type': 'application/json'}
1395         response = requests.request(
1396             "DELETE", url, data=json.dumps(data), headers=headers,
1397             auth=('admin', 'admin'))
1398         self.assertEqual(response.status_code, requests.codes.ok)
1399         # Delete in the clli-network
1400         url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
1401                .format(self.restconf_baseurl))
1402         data = {}
1403         headers = {'content-type': 'application/json'}
1404         response = requests.request(
1405             "DELETE", url, data=json.dumps(data), headers=headers,
1406             auth=('admin', 'admin'))
1407         self.assertEqual(response.status_code, requests.codes.ok)
1408
1409     def test_42_getClliNetwork(self):
1410         url = ("{}/config/ietf-network:networks/network/clli-network"
1411                .format(self.restconf_baseurl))
1412         headers = {'content-type': 'application/json'}
1413         response = requests.request(
1414             "GET", url, headers=headers, auth=('admin', 'admin'))
1415         self.assertEqual(response.status_code, requests.codes.ok)
1416         res = response.json()
1417         self.assertNotIn('node', res['network'][0])
1418
1419     def test_43_getOpenRoadmNetwork(self):
1420         url = ("{}/config/ietf-network:networks/network/openroadm-network"
1421                .format(self.restconf_baseurl))
1422         headers = {'content-type': 'application/json'}
1423         response = requests.request(
1424             "GET", url, headers=headers, auth=('admin', 'admin'))
1425         self.assertEqual(response.status_code, requests.codes.ok)
1426         res = response.json()
1427         self.assertNotIn('node', res['network'][0])
1428
1429     def test_44_check_roadm2roadm_link_persistence(self):
1430         url = ("{}/config/ietf-network:networks/network/openroadm-topology"
1431                .format(self.restconf_baseurl))
1432         headers = {'content-type': 'application/json'}
1433         response = requests.request(
1434             "GET", url, headers=headers, auth=('admin', 'admin'))
1435         self.assertEqual(response.status_code, requests.codes.ok)
1436         res = response.json()
1437         nbLink = len(res['network'][0]['ietf-network-topology:link'])
1438         self.assertNotIn('node', res['network'][0])
1439         self.assertEqual(nbLink, 6)
1440
1441
1442 if __name__ == "__main__":
1443     unittest.main(verbosity=2)