2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 from common import test_utils
20 class TransportPCEFulltesting(unittest.TestCase):
23 cr_serv_sample_data = {"input": {
24 "sdnc-request-header": {
25 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
26 "rpc-action": "service-create",
27 "request-system-id": "appname",
28 "notification-url": "http://localhost:8585/NotificationServer/notify"
30 "service-name": "service1",
31 "common-id": "ASATT1234567",
32 "connection-type": "service",
34 "service-rate": "100",
36 "service-format": "Ethernet",
37 "clli": "SNJSCAMCJP8",
40 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
41 "port-type": "router",
42 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
43 "port-rack": "000000.00",
47 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
48 "lgx-port-name": "LGX Back.3",
49 "lgx-port-rack": "000000.00",
50 "lgx-port-shelf": "00"
55 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
56 "port-type": "router",
57 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
58 "port-rack": "000000.00",
62 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
63 "lgx-port-name": "LGX Back.4",
64 "lgx-port-rack": "000000.00",
65 "lgx-port-shelf": "00"
71 "service-rate": "100",
73 "service-format": "Ethernet",
74 "clli": "SNJSCAMCJT4",
77 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
78 "port-type": "router",
79 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
80 "port-rack": "000000.00",
84 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
85 "lgx-port-name": "LGX Back.29",
86 "lgx-port-rack": "000000.00",
87 "lgx-port-shelf": "00"
92 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
93 "port-type": "router",
94 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
95 "port-rack": "000000.00",
99 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
100 "lgx-port-name": "LGX Back.30",
101 "lgx-port-rack": "000000.00",
102 "lgx-port-shelf": "00"
107 "due-date": "2016-11-28T00:00:01Z",
108 "operator-contact": "pw1234"
112 WAITING = 20 # nominal value is 300
116 cls.processes = test_utils.start_tpce()
117 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
120 def tearDownClass(cls):
121 # pylint: disable=not-an-iterable
122 for process in cls.processes:
123 test_utils.shutdown_process(process)
124 print("all processes killed")
126 def setUp(self): # instruction executed before each test method
127 print("execution of {}".format(self.id().split(".")[-1]))
129 def test_01_connect_xpdrA(self):
130 response = test_utils.mount_device("XPDR-A1", 'xpdra')
131 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
133 def test_02_connect_xpdrC(self):
134 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
135 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
137 def test_03_connect_rdmA(self):
138 response = test_utils.mount_device("ROADM-A1", 'roadma')
139 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
141 def test_04_connect_rdmC(self):
142 response = test_utils.mount_device("ROADM-C1", 'roadmc')
143 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
146 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
147 "ROADM-A1", "1", "SRG1-PP1-TXRX")
148 self.assertEqual(response.status_code, requests.codes.ok)
149 res = response.json()
150 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
153 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
154 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
155 "ROADM-A1", "1", "SRG1-PP1-TXRX")
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
161 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
162 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
163 "ROADM-C1", "1", "SRG1-PP1-TXRX")
164 self.assertEqual(response.status_code, requests.codes.ok)
165 res = response.json()
166 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
169 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
170 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
171 "ROADM-C1", "1", "SRG1-PP1-TXRX")
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
174 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
177 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
178 # Config ROADMA-ROADMC oms-attributes
180 "auto-spanloss": "true",
181 "spanloss-base": 11.4,
182 "spanloss-current": 12,
183 "engineered-spanloss": 12.2,
184 "link-concatenation": [{
187 "SRLG-length": 100000,
189 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
190 self.assertEqual(response.status_code, requests.codes.created)
192 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
193 # Config ROADMC-ROADMA oms-attributes
195 "auto-spanloss": "true",
196 "spanloss-base": 11.4,
197 "spanloss-current": 12,
198 "engineered-spanloss": 12.2,
199 "link-concatenation": [{
202 "SRLG-length": 100000,
204 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
205 self.assertEqual(response.status_code, requests.codes.created)
207 # test service-create for Eth service from xpdr to xpdr
208 def test_11_create_eth_service1(self):
209 self.cr_serv_sample_data["input"]["service-name"] = "service1"
210 response = test_utils.service_create_request(self.cr_serv_sample_data)
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
213 self.assertIn('PCE calculation in progress',
214 res['output']['configuration-response-common']['response-message'])
215 time.sleep(self.WAITING)
217 def test_12_get_eth_service1(self):
218 response = test_utils.get_service_list_request("services/service1")
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
222 res['services'][0]['administrative-state'], 'inService')
224 res['services'][0]['service-name'], 'service1')
226 res['services'][0]['connection-type'], 'service')
228 res['services'][0]['lifecycle-state'], 'planned')
231 def test_13_check_xc1_ROADMA(self):
232 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
236 self.assertDictEqual(
238 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
239 'opticalControlMode': 'gainLoss',
240 'target-output-power': -3.0
241 }, **res['roadm-connections'][0]),
242 res['roadm-connections'][0]
244 self.assertDictEqual(
245 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
246 res['roadm-connections'][0]['source'])
247 self.assertDictEqual(
248 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
249 res['roadm-connections'][0]['destination'])
252 def test_14_check_xc1_ROADMC(self):
253 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
254 self.assertEqual(response.status_code, requests.codes.ok)
255 res = response.json()
256 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
257 self.assertDictEqual(
259 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
260 'opticalControlMode': 'gainLoss',
261 'target-output-power': -3.0
262 }, **res['roadm-connections'][0]),
263 res['roadm-connections'][0]
265 self.assertDictEqual(
266 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
267 res['roadm-connections'][0]['source'])
268 self.assertDictEqual(
269 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
270 res['roadm-connections'][0]['destination'])
273 def test_15_check_topo_XPDRA(self):
274 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
275 self.assertEqual(response.status_code, requests.codes.ok)
276 res = response.json()
277 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
279 if ele['tp-id'] == 'XPDR1-NETWORK1':
280 self.assertEqual({u'frequency': 196.1,
282 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
283 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
284 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
285 if ele['tp-id'] == 'XPDR1-NETWORK2':
286 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
289 def test_16_check_topo_ROADMA_SRG1(self):
290 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
291 self.assertEqual(response.status_code, requests.codes.ok)
292 res = response.json()
293 self.assertNotIn({u'index': 1},
294 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
295 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
297 if ele['tp-id'] == 'SRG1-PP1-TXRX':
298 self.assertIn({u'index': 1, u'frequency': 196.1,
300 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
301 if ele['tp-id'] == 'SRG1-PP2-TXRX':
302 self.assertNotIn('used-wavelength', dict.keys(ele))
305 def test_17_check_topo_ROADMA_DEG1(self):
306 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
307 self.assertEqual(response.status_code, requests.codes.ok)
308 res = response.json()
309 self.assertNotIn({u'index': 1},
310 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
311 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
313 if ele['tp-id'] == 'DEG2-CTP-TXRX':
314 self.assertIn({u'index': 1, u'frequency': 196.1,
316 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
317 if ele['tp-id'] == 'DEG2-TTP-TXRX':
318 self.assertIn({u'index': 1, u'frequency': 196.1,
320 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
323 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
324 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
325 "ROADM-A1", "1", "SRG1-PP2-TXRX")
326 self.assertEqual(response.status_code, requests.codes.ok)
327 res = response.json()
328 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
331 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
332 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
333 "ROADM-A1", "1", "SRG1-PP2-TXRX")
334 self.assertEqual(response.status_code, requests.codes.ok)
335 res = response.json()
336 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
339 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
340 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
341 "ROADM-C1", "1", "SRG1-PP2-TXRX")
342 self.assertEqual(response.status_code, requests.codes.ok)
343 res = response.json()
344 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
347 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
348 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
349 "ROADM-C1", "1", "SRG1-PP2-TXRX")
350 self.assertEqual(response.status_code, requests.codes.ok)
351 res = response.json()
352 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
355 def test_22_create_eth_service2(self):
356 self.cr_serv_sample_data["input"]["service-name"] = "service2"
357 response = test_utils.service_create_request(self.cr_serv_sample_data)
358 self.assertEqual(response.status_code, requests.codes.ok)
359 res = response.json()
360 self.assertIn('PCE calculation in progress',
361 res['output']['configuration-response-common']['response-message'])
362 time.sleep(self.WAITING)
364 def test_23_get_eth_service2(self):
365 response = test_utils.get_service_list_request("services/service2")
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
369 res['services'][0]['administrative-state'],
372 res['services'][0]['service-name'], 'service2')
374 res['services'][0]['connection-type'], 'service')
376 res['services'][0]['lifecycle-state'], 'planned')
379 def test_24_check_xc2_ROADMA(self):
380 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2")
381 self.assertEqual(response.status_code, requests.codes.ok)
382 res = response.json()
383 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
384 self.assertDictEqual(
386 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
387 'opticalControlMode': 'power'
388 }, **res['roadm-connections'][0]),
389 res['roadm-connections'][0]
391 self.assertDictEqual(
392 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
393 res['roadm-connections'][0]['source'])
394 self.assertDictEqual(
395 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
396 res['roadm-connections'][0]['destination'])
398 def test_25_check_topo_XPDRA(self):
399 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
400 self.assertEqual(response.status_code, requests.codes.ok)
401 res = response.json()
402 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
404 if ele['tp-id'] == 'XPDR1-NETWORK1':
405 self.assertEqual({u'frequency': 196.1,
407 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
408 if ele['tp-id'] == 'XPDR1-NETWORK2':
409 self.assertEqual({u'frequency': 196.05,
411 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
412 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
413 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
416 def test_26_check_topo_ROADMA_SRG1(self):
417 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
418 self.assertEqual(response.status_code, requests.codes.ok)
419 res = response.json()
420 self.assertNotIn({u'index': 1}, res['node'][0]
421 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
422 self.assertNotIn({u'index': 2}, res['node'][0]
423 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
424 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
426 if ele['tp-id'] == 'SRG1-PP1-TXRX':
427 self.assertIn({u'index': 1, u'frequency': 196.1,
429 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
430 self.assertNotIn({u'index': 2, u'frequency': 196.05,
432 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
433 if ele['tp-id'] == 'SRG1-PP2-TXRX':
434 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
435 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
436 self.assertNotIn({u'index': 1, u'frequency': 196.1,
438 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
439 if ele['tp-id'] == 'SRG1-PP3-TXRX':
440 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
443 def test_27_check_topo_ROADMA_DEG2(self):
444 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
445 self.assertEqual(response.status_code, requests.codes.ok)
446 res = response.json()
447 self.assertNotIn({u'index': 1}, res['node'][0]
448 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
449 self.assertNotIn({u'index': 2}, res['node'][0]
450 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
451 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
453 if ele['tp-id'] == 'DEG2-CTP-TXRX':
454 self.assertIn({u'index': 1, u'frequency': 196.1,
456 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
457 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
458 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
459 if ele['tp-id'] == 'DEG2-TTP-TXRX':
460 self.assertIn({u'index': 1, u'frequency': 196.1,
462 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
463 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
464 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
467 # creation service test on a non-available resource
468 def test_28_create_eth_service3(self):
469 self.cr_serv_sample_data["input"]["service-name"] = "service3"
470 response = test_utils.service_create_request(self.cr_serv_sample_data)
471 self.assertEqual(response.status_code, requests.codes.ok)
472 res = response.json()
473 self.assertIn('PCE calculation in progress',
474 res['output']['configuration-response-common']['response-message'])
475 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
476 time.sleep(self.WAITING)
478 # add a test that check the openroadm-service-list still only contains 2 elements
479 def test_29_delete_eth_service3(self):
480 response = test_utils.service_delete_request("service3")
481 self.assertEqual(response.status_code, requests.codes.ok)
482 res = response.json()
483 self.assertIn('Service \'service3\' does not exist in datastore',
484 res['output']['configuration-response-common']['response-message'])
485 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
488 def test_30_delete_eth_service1(self):
489 response = test_utils.service_delete_request("service1")
490 self.assertEqual(response.status_code, requests.codes.ok)
491 res = response.json()
492 self.assertIn('Renderer service delete in progress',
493 res['output']['configuration-response-common']['response-message'])
496 def test_31_delete_eth_service2(self):
497 response = test_utils.service_delete_request("service2")
498 self.assertEqual(response.status_code, requests.codes.ok)
499 res = response.json()
500 self.assertIn('Renderer service delete in progress',
501 res['output']['configuration-response-common']['response-message'])
504 def test_32_check_no_xc_ROADMA(self):
505 response = test_utils.check_netconf_node_request("ROADM-A1", "")
506 res = response.json()
507 self.assertEqual(response.status_code, requests.codes.ok)
508 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
511 def test_33_check_topo_XPDRA(self):
512 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
513 self.assertEqual(response.status_code, requests.codes.ok)
514 res = response.json()
515 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
517 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
518 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
519 elif ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
520 self.assertIn(u'tail-equipment-id',
521 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
522 self.assertNotIn('wavelength', dict.keys(
523 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
526 def test_34_check_topo_ROADMA_SRG1(self):
527 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
528 self.assertEqual(response.status_code, requests.codes.ok)
529 res = response.json()
530 self.assertIn({u'index': 1}, res['node'][0]
531 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
532 self.assertIn({u'index': 2}, res['node'][0]
533 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
534 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
536 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
537 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
539 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
542 def test_35_check_topo_ROADMA_DEG2(self):
543 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
544 self.assertEqual(response.status_code, requests.codes.ok)
545 res = response.json()
546 self.assertIn({u'index': 1}, res['node'][0]
547 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
548 self.assertIn({u'index': 2}, res['node'][0]
549 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
550 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
552 if ele['tp-id'] == 'DEG2-CTP-TXRX':
553 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
554 if ele['tp-id'] == 'DEG2-TTP-TXRX':
555 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
558 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
559 def test_36_create_oc_service1(self):
560 self.cr_serv_sample_data["input"]["service-name"] = "service1"
561 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
562 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
563 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
564 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
565 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
566 response = test_utils.service_create_request(self.cr_serv_sample_data)
567 self.assertEqual(response.status_code, requests.codes.ok)
568 res = response.json()
569 self.assertIn('PCE calculation in progress',
570 res['output']['configuration-response-common']['response-message'])
571 time.sleep(self.WAITING)
573 def test_37_get_oc_service1(self):
574 response = test_utils.get_service_list_request("services/service1")
575 self.assertEqual(response.status_code, requests.codes.ok)
576 res = response.json()
578 res['services'][0]['administrative-state'],
581 res['services'][0]['service-name'], 'service1')
583 res['services'][0]['connection-type'], 'roadm-line')
585 res['services'][0]['lifecycle-state'], 'planned')
588 def test_38_check_xc1_ROADMA(self):
589 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
593 self.assertDictEqual(
595 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
596 'opticalControlMode': 'gainLoss',
597 'target-output-power': -3.0
598 }, **res['roadm-connections'][0]),
599 res['roadm-connections'][0]
601 self.assertDictEqual(
602 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
603 res['roadm-connections'][0]['source'])
604 self.assertDictEqual(
605 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
606 res['roadm-connections'][0]['destination'])
609 def test_39_check_xc1_ROADMC(self):
610 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
611 self.assertEqual(response.status_code, requests.codes.ok)
612 res = response.json()
613 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
614 self.assertDictEqual(
616 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
617 'opticalControlMode': 'gainLoss',
618 'target-output-power': -3.0
619 }, **res['roadm-connections'][0]),
620 res['roadm-connections'][0]
622 self.assertDictEqual(
623 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
624 res['roadm-connections'][0]['source'])
625 self.assertDictEqual(
626 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
627 res['roadm-connections'][0]['destination'])
630 def test_40_create_oc_service2(self):
631 self.cr_serv_sample_data["input"]["service-name"] = "service2"
632 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
633 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
634 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
635 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
636 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
637 response = test_utils.service_create_request(self.cr_serv_sample_data)
638 self.assertEqual(response.status_code, requests.codes.ok)
639 res = response.json()
640 self.assertIn('PCE calculation in progress',
641 res['output']['configuration-response-common']['response-message'])
642 time.sleep(self.WAITING)
644 def test_41_get_oc_service2(self):
645 response = test_utils.get_service_list_request("services/service2")
646 self.assertEqual(response.status_code, requests.codes.ok)
647 res = response.json()
649 res['services'][0]['administrative-state'],
652 res['services'][0]['service-name'], 'service2')
654 res['services'][0]['connection-type'], 'roadm-line')
656 res['services'][0]['lifecycle-state'], 'planned')
659 def test_42_check_xc2_ROADMA(self):
660 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2")
661 self.assertEqual(response.status_code, requests.codes.ok)
662 res = response.json()
663 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
664 self.assertDictEqual(
666 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
667 'opticalControlMode': 'gainLoss',
668 'target-output-power': -3.0
669 }, **res['roadm-connections'][0]),
670 res['roadm-connections'][0]
672 self.assertDictEqual(
673 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
674 res['roadm-connections'][0]['source'])
675 self.assertDictEqual(
676 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
677 res['roadm-connections'][0]['destination'])
680 def test_43_check_topo_ROADMA(self):
681 self.test_26_check_topo_ROADMA_SRG1()
682 self.test_27_check_topo_ROADMA_DEG2()
685 def test_44_delete_oc_service1(self):
686 response = test_utils.service_delete_request("service1")
687 self.assertEqual(response.status_code, requests.codes.ok)
688 res = response.json()
689 self.assertIn('Renderer service delete in progress',
690 res['output']['configuration-response-common']['response-message'])
693 def test_45_delete_oc_service2(self):
694 response = test_utils.service_delete_request("service2")
695 self.assertEqual(response.status_code, requests.codes.ok)
696 res = response.json()
697 self.assertIn('Renderer service delete in progress',
698 res['output']['configuration-response-common']['response-message'])
701 def test_46_get_no_oc_services(self):
703 response = test_utils.get_service_list_request("")
704 self.assertEqual(response.status_code, requests.codes.conflict)
705 res = response.json()
707 {"error-type": "application", "error-tag": "data-missing",
708 "error-message": "Request could not be completed because the relevant data model content does not exist"},
709 res['errors']['error'])
712 def test_47_get_no_xc_ROADMA(self):
713 response = test_utils.check_netconf_node_request("ROADM-A1", "")
714 self.assertEqual(response.status_code, requests.codes.ok)
715 res = response.json()
716 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
719 def test_48_check_topo_ROADMA(self):
720 self.test_34_check_topo_ROADMA_SRG1()
721 self.test_35_check_topo_ROADMA_DEG2()
723 def test_49_loop_create_eth_service(self):
724 for i in range(1, 6):
725 print("iteration number {}".format(i))
726 print("eth service creation")
727 self.test_11_create_eth_service1()
728 print("check xc in ROADM-A1")
729 self.test_13_check_xc1_ROADMA()
730 print("check xc in ROADM-C1")
731 self.test_14_check_xc1_ROADMC()
732 print("eth service deletion\n")
733 self.test_30_delete_eth_service1()
735 def test_50_loop_create_oc_service(self):
736 response = test_utils.get_service_list_request("services/service1")
737 if response.status_code != 404:
738 response = test_utils.service_delete_request("service1")
741 for i in range(1, 6):
742 print("iteration number {}".format(i))
743 print("oc service creation")
744 self.test_36_create_oc_service1()
745 print("check xc in ROADM-A1")
746 self.test_38_check_xc1_ROADMA()
747 print("check xc in ROADM-C1")
748 self.test_39_check_xc1_ROADMC()
749 print("oc service deletion\n")
750 self.test_44_delete_oc_service1()
752 def test_51_disconnect_XPDRA(self):
753 response = test_utils.unmount_device("XPDR-A1")
754 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
756 def test_52_disconnect_XPDRC(self):
757 response = test_utils.unmount_device("XPDR-C1")
758 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
760 def test_53_disconnect_ROADMA(self):
761 response = test_utils.unmount_device("ROADM-A1")
762 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
764 def test_54_disconnect_ROADMC(self):
765 response = test_utils.unmount_device("ROADM-C1")
766 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
769 if __name__ == "__main__":
770 unittest.main(verbosity=2)