2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
17 from common import test_utils
20 class TransportPCEFulltesting(unittest.TestCase):
22 cr_serv_sample_data = {"input": {
23 "sdnc-request-header": {
24 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
25 "rpc-action": "service-create",
26 "request-system-id": "appname",
28 "http://localhost:8585/NotificationServer/notify"
30 "service-name": "service1",
31 "common-id": "ASATT1234567",
32 "connection-type": "service",
34 "service-rate": "100",
36 "service-format": "Ethernet",
37 "clli": "SNJSCAMCJP8",
41 "ROUTER_SNJSCAMCJP8_000000.00_00",
42 "port-type": "router",
43 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
44 "port-rack": "000000.00",
49 "LGX Panel_SNJSCAMCJP8_000000.00_00",
50 "lgx-port-name": "LGX Back.3",
51 "lgx-port-rack": "000000.00",
52 "lgx-port-shelf": "00"
58 "ROUTER_SNJSCAMCJP8_000000.00_00",
59 "port-type": "router",
60 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
61 "port-rack": "000000.00",
66 "LGX Panel_SNJSCAMCJP8_000000.00_00",
67 "lgx-port-name": "LGX Back.4",
68 "lgx-port-rack": "000000.00",
69 "lgx-port-shelf": "00"
75 "service-rate": "100",
77 "service-format": "Ethernet",
78 "clli": "SNJSCAMCJT4",
82 "ROUTER_SNJSCAMCJT4_000000.00_00",
83 "port-type": "router",
84 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
85 "port-rack": "000000.00",
90 "LGX Panel_SNJSCAMCJT4_000000.00_00",
91 "lgx-port-name": "LGX Back.29",
92 "lgx-port-rack": "000000.00",
93 "lgx-port-shelf": "00"
99 "ROUTER_SNJSCAMCJT4_000000.00_00",
100 "port-type": "router",
101 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
102 "port-rack": "000000.00",
107 "LGX Panel_SNJSCAMCJT4_000000.00_00",
108 "lgx-port-name": "LGX Back.30",
109 "lgx-port-rack": "000000.00",
110 "lgx-port-shelf": "00"
115 "due-date": "2016-11-28T00:00:01Z",
116 "operator-contact": "pw1234"
124 cls.processes = test_utils.start_tpce()
125 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
128 def tearDownClass(cls):
129 for process in cls.processes:
130 test_utils.shutdown_process(process)
131 print("all processes killed")
133 def setUp(self): # instruction executed before each test method
134 print("execution of {}".format(self.id().split(".")[-1]))
136 # connect netconf devices
137 def test_01_connect_xpdrA(self):
138 response = test_utils.mount_device("XPDRA01", 'xpdra')
139 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
141 def test_02_connect_xpdrC(self):
142 response = test_utils.mount_device("XPDRC01", 'xpdrc')
143 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145 def test_03_connect_rdmA(self):
146 response = test_utils.mount_device("ROADMA01", 'roadma-full')
147 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149 def test_04_connect_rdmC(self):
150 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
151 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
154 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
155 "ROADMA01", "1", "SRG1-PP1-TXRX")
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 self.assertIn('Xponder Roadm Link created successfully',
159 res["output"]["result"])
162 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
163 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
164 "ROADMA01", "1", "SRG1-PP1-TXRX")
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
167 self.assertIn('Roadm Xponder links created successfully',
168 res["output"]["result"])
171 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
172 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
173 "ROADMC01", "1", "SRG1-PP1-TXRX")
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('Xponder Roadm Link created successfully',
177 res["output"]["result"])
180 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
181 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
182 "ROADMC01", "1", "SRG1-PP1-TXRX")
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Roadm Xponder links created successfully',
186 res["output"]["result"])
189 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
190 # Config ROADMA-ROADMC oms-attributes
193 "auto-spanloss": "true",
194 "spanloss-base": 11.4,
195 "spanloss-current": 12,
196 "engineered-spanloss": 12.2,
197 "link-concatenation": [{
200 "SRLG-length": 100000,
202 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
203 self.assertEqual(response.status_code, requests.codes.created)
205 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
206 # Config ROADMC-ROADMA oms-attributes
209 "auto-spanloss": "true",
210 "spanloss-base": 11.4,
211 "spanloss-current": 12,
212 "engineered-spanloss": 12.2,
213 "link-concatenation": [{
216 "SRLG-length": 100000,
218 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
219 self.assertEqual(response.status_code, requests.codes.created)
221 # test service-create for Eth service from xpdr to xpdr
222 def test_11_create_eth_service1(self):
223 self.cr_serv_sample_data["input"]["service-name"] = "service1"
224 response = test_utils.service_create_request(self.cr_serv_sample_data)
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
227 self.assertIn('PCE calculation in progress',
228 res['output']['configuration-response-common'][
230 time.sleep(self.WAITING)
232 def test_12_get_eth_service1(self):
233 response = test_utils.get_service_list_request("services/service1")
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
237 res['services'][0]['administrative-state'],
240 res['services'][0]['service-name'], 'service1')
242 res['services'][0]['connection-type'], 'service')
244 res['services'][0]['lifecycle-state'], 'planned')
247 def test_13_check_xc1_ROADMA(self):
248 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
252 self.assertDictEqual(
254 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
255 'wavelength-number': 1,
256 'opticalControlMode': 'gainLoss',
257 'target-output-power': -3.0
258 }, **res['roadm-connections'][0]),
259 res['roadm-connections'][0]
261 self.assertDictEqual(
262 {'src-if': 'SRG1-PP1-TXRX-1'},
263 res['roadm-connections'][0]['source'])
264 self.assertDictEqual(
265 {'dst-if': 'DEG1-TTP-TXRX-1'},
266 res['roadm-connections'][0]['destination'])
269 def test_14_check_xc1_ROADMC(self):
270 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
274 self.assertDictEqual(
276 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
277 'wavelength-number': 1,
278 'opticalControlMode': 'gainLoss',
279 'target-output-power': 2.0
280 }, **res['roadm-connections'][0]),
281 res['roadm-connections'][0]
283 self.assertDictEqual(
284 {'src-if': 'SRG1-PP1-TXRX-1'},
285 res['roadm-connections'][0]['source'])
286 self.assertDictEqual(
287 {'dst-if': 'DEG2-TTP-TXRX-1'},
288 res['roadm-connections'][0]['destination'])
291 def test_15_check_topo_XPDRA(self):
292 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
293 self.assertEqual(response.status_code, requests.codes.ok)
294 res = response.json()
295 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
297 if ele['tp-id'] == 'XPDR1-NETWORK1':
298 self.assertEqual({u'frequency': 196.1, u'width': 40},
299 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
300 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
302 'org-openroadm-network-topology:xpdr-client-attributes',
304 if ele['tp-id'] == 'XPDR1-NETWORK2':
306 'org-openroadm-network-topology:xpdr-network-attributes',
310 def test_16_check_topo_ROADMA_SRG1(self):
311 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
312 self.assertEqual(response.status_code, requests.codes.ok)
313 res = response.json()
314 self.assertNotIn({u'index': 1},
316 u'org-openroadm-network-topology:srg-attributes'][
317 'available-wavelengths'])
318 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
320 if ele['tp-id'] == 'SRG1-PP1-TXRX':
321 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
322 ele['org-openroadm-network-topology:'
323 'pp-attributes']['used-wavelength']
325 if ele['tp-id'] == 'SRG1-PP2-TXRX':
326 self.assertNotIn('used-wavelength', dict.keys(ele))
329 def test_17_check_topo_ROADMA_DEG1(self):
330 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
331 self.assertEqual(response.status_code, requests.codes.ok)
332 res = response.json()
333 self.assertNotIn({u'index': 1},
335 u'org-openroadm-network-topology:'
336 u'degree-attributes'][
337 'available-wavelengths'])
338 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
340 if ele['tp-id'] == 'DEG1-CTP-TXRX':
341 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
342 ele['org-openroadm-network-topology:'
345 if ele['tp-id'] == 'DEG1-TTP-TXRX':
346 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
347 ele['org-openroadm-network-topology:'
348 'tx-ttp-attributes'][
352 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
353 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
354 "ROADMA01", "1", "SRG1-PP2-TXRX")
355 self.assertEqual(response.status_code, requests.codes.ok)
356 res = response.json()
357 self.assertIn('Xponder Roadm Link created successfully',
358 res["output"]["result"])
361 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
362 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
363 "ROADMA01", "1", "SRG1-PP2-TXRX")
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
366 self.assertIn('Roadm Xponder links created successfully',
367 res["output"]["result"])
370 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
371 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
372 "ROADMC01", "1", "SRG1-PP2-TXRX")
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
375 self.assertIn('Xponder Roadm Link created successfully',
376 res["output"]["result"])
379 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
380 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
381 "ROADMC01", "1", "SRG1-PP2-TXRX")
382 self.assertEqual(response.status_code, requests.codes.ok)
383 res = response.json()
384 self.assertIn('Roadm Xponder links created successfully',
385 res["output"]["result"])
388 def test_22_create_eth_service2(self):
389 self.cr_serv_sample_data["input"]["service-name"] = "service2"
390 response = test_utils.service_create_request(self.cr_serv_sample_data)
391 self.assertEqual(response.status_code, requests.codes.ok)
392 res = response.json()
393 self.assertIn('PCE calculation in progress',
394 res['output']['configuration-response-common'][
396 time.sleep(self.WAITING)
398 def test_23_get_eth_service2(self):
399 response = test_utils.get_service_list_request("services/service2")
400 self.assertEqual(response.status_code, requests.codes.ok)
401 res = response.json()
403 res['services'][0]['administrative-state'],
406 res['services'][0]['service-name'], 'service2')
408 res['services'][0]['connection-type'], 'service')
410 res['services'][0]['lifecycle-state'], 'planned')
413 def test_24_check_xc2_ROADMA(self):
414 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2")
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
418 self.assertDictEqual(
420 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
421 'wavelength-number': 2,
422 'opticalControlMode': 'power'
423 }, **res['roadm-connections'][0]),
424 res['roadm-connections'][0]
426 self.assertDictEqual(
427 {'src-if': 'DEG1-TTP-TXRX-2'},
428 res['roadm-connections'][0]['source'])
429 self.assertDictEqual(
430 {'dst-if': 'SRG1-PP2-TXRX-2'},
431 res['roadm-connections'][0]['destination'])
433 def test_25_check_topo_XPDRA(self):
434 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
435 self.assertEqual(response.status_code, requests.codes.ok)
436 res = response.json()
437 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
439 if ele['tp-id'] == 'XPDR1-NETWORK1':
440 self.assertEqual({u'frequency': 196.1, u'width': 40},
441 ele['org-openroadm-network-topology:'
442 'xpdr-network-attributes'][
444 if ele['tp-id'] == 'XPDR1-NETWORK2':
445 self.assertEqual({u'frequency': 196.05, u'width': 40},
446 ele['org-openroadm-network-topology:'
447 'xpdr-network-attributes'][
449 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
450 ele['tp-id'] == 'XPDR1-CLIENT3':
452 'org-openroadm-network-topology:xpdr-client-attributes',
456 def test_26_check_topo_ROADMA_SRG1(self):
457 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
458 self.assertEqual(response.status_code, requests.codes.ok)
459 res = response.json()
460 self.assertNotIn({u'index': 1}, res['node'][0][
461 u'org-openroadm-network-topology:srg-attributes'][
462 'available-wavelengths'])
463 self.assertNotIn({u'index': 2}, res['node'][0][
464 u'org-openroadm-network-topology:srg-attributes'][
465 'available-wavelengths'])
466 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
468 if ele['tp-id'] == 'SRG1-PP1-TXRX':
469 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
470 ele['org-openroadm-network-topology:'
471 'pp-attributes']['used-wavelength'])
472 self.assertNotIn({u'index': 2, u'frequency': 196.05,
474 ele['org-openroadm-network-topology:'
475 'pp-attributes']['used-wavelength'])
476 if ele['tp-id'] == 'SRG1-PP2-TXRX':
477 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
478 ele['org-openroadm-network-topology:'
479 'pp-attributes']['used-wavelength'])
480 self.assertNotIn({u'index': 1, u'frequency': 196.1,
482 ele['org-openroadm-network-topology:'
483 'pp-attributes']['used-wavelength'])
484 if ele['tp-id'] == 'SRG1-PP3-TXRX':
485 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
489 def test_27_check_topo_ROADMA_DEG1(self):
490 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
491 self.assertEqual(response.status_code, requests.codes.ok)
492 res = response.json()
493 self.assertNotIn({u'index': 1}, res['node'][0][
494 u'org-openroadm-network-topology:degree-attributes'][
495 'available-wavelengths'])
496 self.assertNotIn({u'index': 2}, res['node'][0][
497 u'org-openroadm-network-topology:degree-attributes'][
498 'available-wavelengths'])
499 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
501 if ele['tp-id'] == 'DEG1-CTP-TXRX':
502 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
503 ele['org-openroadm-network-topology:'
504 'ctp-attributes']['used-wavelengths'])
505 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
506 ele['org-openroadm-network-topology:'
507 'ctp-attributes']['used-wavelengths'])
508 if ele['tp-id'] == 'DEG1-TTP-TXRX':
509 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
510 ele['org-openroadm-network-topology:'
511 'tx-ttp-attributes']['used-wavelengths'])
512 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
513 ele['org-openroadm-network-topology:'
514 'tx-ttp-attributes']['used-wavelengths'])
517 # creation service test on a non-available resource
518 def test_28_create_eth_service3(self):
519 self.cr_serv_sample_data["input"]["service-name"] = "service3"
520 response = test_utils.service_create_request(self.cr_serv_sample_data)
521 self.assertEqual(response.status_code, requests.codes.ok)
522 res = response.json()
523 self.assertIn('PCE calculation in progress',
524 res['output']['configuration-response-common'][
526 self.assertIn('200', res['output']['configuration-response-common'][
528 time.sleep(self.WAITING)
530 # add a test that check the openroadm-service-list still only
531 # contains 2 elements
533 def test_29_delete_eth_service3(self):
534 response = test_utils.service_delete_request("service3")
535 self.assertEqual(response.status_code, requests.codes.ok)
536 res = response.json()
537 self.assertIn('Service \'service3\' does not exist in datastore',
538 res['output']['configuration-response-common'][
540 self.assertIn('500', res['output']['configuration-response-common'][
544 def test_30_delete_eth_service1(self):
545 response = test_utils.service_delete_request("service1")
546 self.assertEqual(response.status_code, requests.codes.ok)
547 res = response.json()
548 self.assertIn('Renderer service delete in progress',
549 res['output']['configuration-response-common'][
553 def test_31_delete_eth_service2(self):
554 response = test_utils.service_delete_request("service2")
555 self.assertEqual(response.status_code, requests.codes.ok)
556 res = response.json()
557 self.assertIn('Renderer service delete in progress',
558 res['output']['configuration-response-common'][
562 def test_32_check_no_xc_ROADMA(self):
563 response = test_utils.check_netconf_node_request("ROADMA01", "")
564 res = response.json()
565 self.assertEqual(response.status_code, requests.codes.ok)
566 self.assertNotIn('roadm-connections',
567 dict.keys(res['org-openroadm-device']))
570 def test_33_check_topo_XPDRA(self):
571 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
572 self.assertEqual(response.status_code, requests.codes.ok)
573 res = response.json()
574 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
576 if ((ele[u'org-openroadm-common-network:tp-type'] ==
578 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
579 'tp-id'] == 'XPDR1-CLIENT3')):
581 'org-openroadm-network-topology:xpdr-client-attributes',
583 elif (ele[u'org-openroadm-common-network:tp-type'] ==
585 self.assertIn(u'tail-equipment-id', dict.keys(
586 ele[u'org-openroadm-network-topology:'
587 u'xpdr-network-attributes']))
588 self.assertNotIn('wavelength', dict.keys(
589 ele[u'org-openroadm-network-topology:'
590 u'xpdr-network-attributes']))
593 def test_34_check_topo_ROADMA_SRG1(self):
594 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
595 self.assertEqual(response.status_code, requests.codes.ok)
596 res = response.json()
597 self.assertIn({u'index': 1}, res['node'][0][
598 u'org-openroadm-network-topology:srg-attributes'][
599 'available-wavelengths'])
600 self.assertIn({u'index': 2}, res['node'][0][
601 u'org-openroadm-network-topology:srg-attributes'][
602 'available-wavelengths'])
603 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
605 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
606 ele['tp-id'] == 'SRG1-PP1-TXRX':
607 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
610 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
614 def test_35_check_topo_ROADMA_DEG1(self):
615 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
616 self.assertEqual(response.status_code, requests.codes.ok)
617 res = response.json()
618 self.assertIn({u'index': 1}, res['node'][0][
619 u'org-openroadm-network-topology:degree-attributes'][
620 'available-wavelengths'])
621 self.assertIn({u'index': 2}, res['node'][0][
622 u'org-openroadm-network-topology:degree-attributes'][
623 'available-wavelengths'])
624 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
626 if ele['tp-id'] == 'DEG1-CTP-TXRX':
627 self.assertNotIn('org-openroadm-network-topology:'
628 'ctp-attributes', dict.keys(ele))
629 if ele['tp-id'] == 'DEG1-TTP-TXRX':
630 self.assertNotIn('org-openroadm-network-topology:'
631 'tx-ttp-attributes', dict.keys(ele))
634 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
635 def test_36_create_oc_service1(self):
636 self.cr_serv_sample_data["input"]["service-name"] = "service1"
637 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
638 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
639 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
640 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
641 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
642 response = test_utils.service_create_request(self.cr_serv_sample_data)
643 self.assertEqual(response.status_code, requests.codes.ok)
644 res = response.json()
645 self.assertIn('PCE calculation in progress',
646 res['output']['configuration-response-common'][
648 time.sleep(self.WAITING)
650 def test_37_get_oc_service1(self):
651 response = test_utils.get_service_list_request("services/service1")
652 self.assertEqual(response.status_code, requests.codes.ok)
653 res = response.json()
655 res['services'][0]['administrative-state'],
658 res['services'][0]['service-name'], 'service1')
660 res['services'][0]['connection-type'], 'roadm-line')
662 res['services'][0]['lifecycle-state'], 'planned')
665 def test_38_check_xc1_ROADMA(self):
666 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
667 self.assertEqual(response.status_code, requests.codes.ok)
668 res = response.json()
669 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
670 self.assertDictEqual(
672 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
673 'wavelength-number': 1,
674 'opticalControlMode': 'gainLoss',
675 'target-output-power': -3.0
676 }, **res['roadm-connections'][0]),
677 res['roadm-connections'][0]
679 self.assertDictEqual(
680 {'src-if': 'SRG1-PP1-TXRX-1'},
681 res['roadm-connections'][0]['source'])
682 self.assertDictEqual(
683 {'dst-if': 'DEG1-TTP-TXRX-1'},
684 res['roadm-connections'][0]['destination'])
687 def test_39_check_xc1_ROADMC(self):
688 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
689 self.assertEqual(response.status_code, requests.codes.ok)
690 res = response.json()
691 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
692 self.assertDictEqual(
694 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
695 'wavelength-number': 1,
696 'opticalControlMode': 'gainLoss',
697 'target-output-power': 2.0
698 }, **res['roadm-connections'][0]),
699 res['roadm-connections'][0]
701 self.assertDictEqual(
702 {'src-if': 'SRG1-PP1-TXRX-1'},
703 res['roadm-connections'][0]['source'])
704 self.assertDictEqual(
705 {'dst-if': 'DEG2-TTP-TXRX-1'},
706 res['roadm-connections'][0]['destination'])
709 def test_40_create_oc_service2(self):
710 self.cr_serv_sample_data["input"]["service-name"] = "service2"
711 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
712 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
713 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
714 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
715 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
716 response = test_utils.service_create_request(self.cr_serv_sample_data)
717 self.assertEqual(response.status_code, requests.codes.ok)
718 res = response.json()
719 self.assertIn('PCE calculation in progress',
720 res['output']['configuration-response-common'][
722 time.sleep(self.WAITING)
724 def test_41_get_oc_service2(self):
725 response = test_utils.get_service_list_request("services/service2")
726 self.assertEqual(response.status_code, requests.codes.ok)
727 res = response.json()
729 res['services'][0]['administrative-state'],
732 res['services'][0]['service-name'], 'service2')
734 res['services'][0]['connection-type'], 'roadm-line')
736 res['services'][0]['lifecycle-state'], 'planned')
739 def test_42_check_xc2_ROADMA(self):
740 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2")
741 self.assertEqual(response.status_code, requests.codes.ok)
742 res = response.json()
743 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
744 self.assertDictEqual(
746 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
747 'wavelength-number': 2,
748 'opticalControlMode': 'gainLoss',
749 'target-output-power': -3.0
750 }, **res['roadm-connections'][0]),
751 res['roadm-connections'][0]
753 self.assertDictEqual(
754 {'src-if': 'SRG1-PP2-TXRX-2'},
755 res['roadm-connections'][0]['source'])
756 self.assertDictEqual(
757 {'dst-if': 'DEG1-TTP-TXRX-2'},
758 res['roadm-connections'][0]['destination'])
761 def test_43_check_topo_ROADMA(self):
762 self.test_26_check_topo_ROADMA_SRG1()
763 self.test_27_check_topo_ROADMA_DEG1()
766 def test_44_delete_oc_service1(self):
767 response = test_utils.service_delete_request("service1")
768 self.assertEqual(response.status_code, requests.codes.ok)
769 res = response.json()
770 self.assertIn('Renderer service delete in progress',
771 res['output']['configuration-response-common'][
775 def test_45_delete_oc_service2(self):
776 response = test_utils.service_delete_request("service2")
777 self.assertEqual(response.status_code, requests.codes.ok)
778 res = response.json()
779 self.assertIn('Renderer service delete in progress',
780 res['output']['configuration-response-common'][
784 def test_46_get_no_oc_services(self):
786 response = test_utils.get_service_list_request("")
787 self.assertEqual(response.status_code, requests.codes.not_found)
788 res = response.json()
791 "error-type": "application",
792 "error-tag": "data-missing",
794 "Request could not be completed because the relevant data "
795 "model content does not exist"
797 res['errors']['error'])
800 def test_47_get_no_xc_ROADMA(self):
801 response = test_utils.check_netconf_node_request("ROADMA01", "")
802 self.assertEqual(response.status_code, requests.codes.ok)
803 res = response.json()
804 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
807 def test_48_check_topo_ROADMA(self):
808 self.test_34_check_topo_ROADMA_SRG1()
809 self.test_35_check_topo_ROADMA_DEG1()
811 def test_49_loop_create_eth_service(self):
812 for i in range(1, 6):
813 print("iteration number {}".format(i))
814 print("eth service creation")
815 self.test_11_create_eth_service1()
816 print("check xc in ROADMA01")
817 self.test_13_check_xc1_ROADMA()
818 print("check xc in ROADMC01")
819 self.test_14_check_xc1_ROADMC()
820 print("eth service deletion\n")
821 self.test_30_delete_eth_service1()
823 def test_50_loop_create_oc_service(self):
824 response = test_utils.get_service_list_request("services/service1")
825 if response.status_code != 404:
826 response = test_utils.service_delete_request("service1")
829 for i in range(1, 6):
830 print("iteration number {}".format(i))
831 print("oc service creation")
832 self.test_36_create_oc_service1()
833 print("check xc in ROADMA01")
834 self.test_38_check_xc1_ROADMA()
835 print("check xc in ROADMC01")
836 self.test_39_check_xc1_ROADMC()
837 print("oc service deletion\n")
838 self.test_44_delete_oc_service1()
840 def test_51_disconnect_XPDRA(self):
841 response = test_utils.unmount_device("XPDRA01")
842 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
844 def test_52_disconnect_XPDRC(self):
845 response = test_utils.unmount_device("XPDRC01")
846 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
848 def test_53_disconnect_ROADMA(self):
849 response = test_utils.unmount_device("ROADMA01")
850 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
852 def test_54_disconnect_ROADMC(self):
853 response = test_utils.unmount_device("ROADMC01")
854 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
857 if __name__ == "__main__":
858 unittest.main(verbosity=2)