2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
17 from common import test_utils
20 class TransportPCEFulltesting(unittest.TestCase):
22 cr_serv_sample_data = {"input": {
23 "sdnc-request-header": {
24 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
25 "rpc-action": "service-create",
26 "request-system-id": "appname",
28 "http://localhost:8585/NotificationServer/notify"
30 "service-name": "service1",
31 "common-id": "ASATT1234567",
32 "connection-type": "service",
34 "service-rate": "100",
36 "service-format": "Ethernet",
37 "clli": "SNJSCAMCJP8",
41 "ROUTER_SNJSCAMCJP8_000000.00_00",
42 "port-type": "router",
43 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
44 "port-rack": "000000.00",
49 "LGX Panel_SNJSCAMCJP8_000000.00_00",
50 "lgx-port-name": "LGX Back.3",
51 "lgx-port-rack": "000000.00",
52 "lgx-port-shelf": "00"
58 "ROUTER_SNJSCAMCJP8_000000.00_00",
59 "port-type": "router",
60 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
61 "port-rack": "000000.00",
66 "LGX Panel_SNJSCAMCJP8_000000.00_00",
67 "lgx-port-name": "LGX Back.4",
68 "lgx-port-rack": "000000.00",
69 "lgx-port-shelf": "00"
75 "service-rate": "100",
77 "service-format": "Ethernet",
78 "clli": "SNJSCAMCJT4",
82 "ROUTER_SNJSCAMCJT4_000000.00_00",
83 "port-type": "router",
84 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
85 "port-rack": "000000.00",
90 "LGX Panel_SNJSCAMCJT4_000000.00_00",
91 "lgx-port-name": "LGX Back.29",
92 "lgx-port-rack": "000000.00",
93 "lgx-port-shelf": "00"
99 "ROUTER_SNJSCAMCJT4_000000.00_00",
100 "port-type": "router",
101 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
102 "port-rack": "000000.00",
107 "LGX Panel_SNJSCAMCJT4_000000.00_00",
108 "lgx-port-name": "LGX Back.30",
109 "lgx-port-rack": "000000.00",
110 "lgx-port-shelf": "00"
115 "due-date": "2016-11-28T00:00:01Z",
116 "operator-contact": "pw1234"
124 cls.processes = test_utils.start_tpce()
125 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
128 def tearDownClass(cls):
129 for process in cls.processes:
130 test_utils.shutdown_process(process)
131 print("all processes killed")
133 def setUp(self): # instruction executed before each test method
134 print("execution of {}".format(self.id().split(".")[-1]))
136 # connect netconf devices
137 def test_01_connect_xpdrA(self):
138 response = test_utils.mount_device("XPDRA01", 'xpdra')
139 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
141 def test_02_connect_xpdrC(self):
142 response = test_utils.mount_device("XPDRC01", 'xpdrc')
143 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145 def test_03_connect_rdmA(self):
146 response = test_utils.mount_device("ROADMA01", 'roadma-full')
147 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149 def test_04_connect_rdmC(self):
150 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
151 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
154 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
155 "ROADMA01", "1", "SRG1-PP1-TXRX")
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 self.assertIn('Xponder Roadm Link created successfully',
159 res["output"]["result"])
162 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
163 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
164 "ROADMA01", "1", "SRG1-PP1-TXRX")
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
167 self.assertIn('Roadm Xponder links created successfully',
168 res["output"]["result"])
171 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
172 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
173 "ROADMC01", "1", "SRG1-PP1-TXRX")
174 self.assertEqual(response.status_code, requests.codes.ok)
175 res = response.json()
176 self.assertIn('Xponder Roadm Link created successfully',
177 res["output"]["result"])
180 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
181 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
182 "ROADMC01", "1", "SRG1-PP1-TXRX")
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Roadm Xponder links created successfully',
186 res["output"]["result"])
189 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
190 # Config ROADMA-ROADMC oms-attributes
193 "auto-spanloss": "true",
194 "spanloss-base": 11.4,
195 "spanloss-current": 12,
196 "engineered-spanloss": 12.2,
197 "link-concatenation": [{
200 "SRLG-length": 100000,
202 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
203 self.assertEqual(response.status_code, requests.codes.created)
205 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
206 # Config ROADMC-ROADMA oms-attributes
209 "auto-spanloss": "true",
210 "spanloss-base": 11.4,
211 "spanloss-current": 12,
212 "engineered-spanloss": 12.2,
213 "link-concatenation": [{
216 "SRLG-length": 100000,
218 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
219 self.assertEqual(response.status_code, requests.codes.created)
221 # test service-create for Eth service from xpdr to xpdr
222 def test_11_create_eth_service1(self):
223 self.cr_serv_sample_data["input"]["service-name"] = "service1"
224 response = test_utils.service_create_request(self.cr_serv_sample_data)
225 self.assertEqual(response.status_code, requests.codes.ok)
226 res = response.json()
227 self.assertIn('PCE calculation in progress',
228 res['output']['configuration-response-common'][
230 time.sleep(self.WAITING)
232 def test_12_get_eth_service1(self):
233 response = test_utils.get_service_list_request("services/service1")
234 self.assertEqual(response.status_code, requests.codes.ok)
235 res = response.json()
237 res['services'][0]['administrative-state'],
240 res['services'][0]['service-name'], 'service1')
242 res['services'][0]['connection-type'], 'service')
244 res['services'][0]['lifecycle-state'], 'planned')
247 def test_13_check_xc1_ROADMA(self):
248 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
252 self.assertDictEqual(
254 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
255 'wavelength-number': 1,
256 'opticalControlMode': 'gainLoss',
257 'target-output-power': -3.0
258 }, **res['roadm-connections'][0]),
259 res['roadm-connections'][0]
261 self.assertDictEqual(
262 {'src-if': 'SRG1-PP1-TXRX-1'},
263 res['roadm-connections'][0]['source'])
264 self.assertDictEqual(
265 {'dst-if': 'DEG1-TTP-TXRX-1'},
266 res['roadm-connections'][0]['destination'])
269 def test_14_check_xc1_ROADMC(self):
270 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
271 self.assertEqual(response.status_code, requests.codes.ok)
272 res = response.json()
273 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
274 self.assertDictEqual(
276 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
277 'wavelength-number': 1,
278 'opticalControlMode': 'gainLoss',
279 'target-output-power': 2.0
280 }, **res['roadm-connections'][0]),
281 res['roadm-connections'][0]
283 self.assertDictEqual(
284 {'src-if': 'SRG1-PP1-TXRX-1'},
285 res['roadm-connections'][0]['source'])
286 self.assertDictEqual(
287 {'dst-if': 'DEG2-TTP-TXRX-1'},
288 res['roadm-connections'][0]['destination'])
291 def test_15_check_topo_XPDRA(self):
292 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
293 self.assertEqual(response.status_code, requests.codes.ok)
294 res = response.json()
295 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
297 if ele['tp-id'] == 'XPDR1-NETWORK1':
298 self.assertEqual({u'frequency': 196.1, u'width': 40},
299 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
300 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
302 'org-openroadm-network-topology:xpdr-client-attributes',
304 if ele['tp-id'] == 'XPDR1-NETWORK2':
306 'org-openroadm-network-topology:xpdr-network-attributes',
310 def test_16_check_topo_ROADMA_SRG1(self):
311 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
312 self.assertEqual(response.status_code, requests.codes.ok)
313 res = response.json()
314 self.assertNotIn({u'index': 1},
316 u'org-openroadm-network-topology:srg-attributes'][
317 'available-wavelengths'])
318 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
320 if ele['tp-id'] == 'SRG1-PP1-TXRX':
321 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
322 ele['org-openroadm-network-topology:'
323 'pp-attributes']['used-wavelength']
325 if ele['tp-id'] == 'SRG1-PP2-TXRX':
326 self.assertNotIn('used-wavelength', dict.keys(ele))
329 def test_17_check_topo_ROADMA_DEG1(self):
330 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
331 self.assertEqual(response.status_code, requests.codes.ok)
332 res = response.json()
333 self.assertNotIn({u'index': 1},
335 u'org-openroadm-network-topology:'
336 u'degree-attributes'][
337 'available-wavelengths'])
338 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
340 if ele['tp-id'] == 'DEG1-CTP-TXRX':
341 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
342 ele['org-openroadm-network-topology:'
345 if ele['tp-id'] == 'DEG1-TTP-TXRX':
346 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
347 ele['org-openroadm-network-topology:'
348 'tx-ttp-attributes'][
352 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
353 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
354 "ROADMA01", "1", "SRG1-PP2-TXRX")
355 self.assertEqual(response.status_code, requests.codes.ok)
356 res = response.json()
357 self.assertIn('Xponder Roadm Link created successfully',
358 res["output"]["result"])
361 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
362 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
363 "ROADMA01", "1", "SRG1-PP2-TXRX")
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
366 self.assertIn('Roadm Xponder links created successfully',
367 res["output"]["result"])
370 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
371 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
372 "ROADMC01", "1", "SRG1-PP2-TXRX")
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
375 self.assertIn('Xponder Roadm Link created successfully',
376 res["output"]["result"])
379 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
380 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
381 "ROADMC01", "1", "SRG1-PP2-TXRX")
382 self.assertEqual(response.status_code, requests.codes.ok)
383 res = response.json()
384 self.assertIn('Roadm Xponder links created successfully',
385 res["output"]["result"])
388 def test_22_create_eth_service2(self):
389 self.cr_serv_sample_data["input"]["service-name"] = "service2"
390 response = test_utils.service_create_request(self.cr_serv_sample_data)
391 self.assertEqual(response.status_code, requests.codes.ok)
392 res = response.json()
393 self.assertIn('PCE calculation in progress',
394 res['output']['configuration-response-common'][
396 time.sleep(self.WAITING)
398 def test_23_get_eth_service2(self):
399 response = test_utils.get_service_list_request("services/service2")
400 self.assertEqual(response.status_code, requests.codes.ok)
401 res = response.json()
403 res['services'][0]['administrative-state'],
406 res['services'][0]['service-name'], 'service2')
408 res['services'][0]['connection-type'], 'service')
410 res['services'][0]['lifecycle-state'], 'planned')
413 def test_24_check_xc2_ROADMA(self):
414 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2")
415 self.assertEqual(response.status_code, requests.codes.ok)
416 res = response.json()
417 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
418 self.assertDictEqual(
420 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
421 'wavelength-number': 2,
422 'opticalControlMode': 'power'
423 }, **res['roadm-connections'][0]),
424 res['roadm-connections'][0]
426 self.assertDictEqual(
427 {'src-if': 'DEG1-TTP-TXRX-2'},
428 res['roadm-connections'][0]['source'])
429 self.assertDictEqual(
430 {'dst-if': 'SRG1-PP2-TXRX-2'},
431 res['roadm-connections'][0]['destination'])
433 def test_25_check_topo_XPDRA(self):
434 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
435 self.assertEqual(response.status_code, requests.codes.ok)
436 res = response.json()
437 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
439 if ele['tp-id'] == 'XPDR1-NETWORK1':
440 self.assertEqual({u'frequency': 196.1, u'width': 40},
441 ele['org-openroadm-network-topology:'
442 'xpdr-network-attributes'][
444 if ele['tp-id'] == 'XPDR1-NETWORK2':
445 self.assertEqual({u'frequency': 196.05, u'width': 40},
446 ele['org-openroadm-network-topology:'
447 'xpdr-network-attributes'][
449 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
450 ele['tp-id'] == 'XPDR1-CLIENT3':
452 'org-openroadm-network-topology:xpdr-client-attributes',
456 def test_26_check_topo_ROADMA_SRG1(self):
457 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
458 self.assertEqual(response.status_code, requests.codes.ok)
459 res = response.json()
460 self.assertNotIn({u'index': 1}, res['node'][0][
461 u'org-openroadm-network-topology:srg-attributes'][
462 'available-wavelengths'])
463 self.assertNotIn({u'index': 2}, res['node'][0][
464 u'org-openroadm-network-topology:srg-attributes'][
465 'available-wavelengths'])
466 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
468 if ele['tp-id'] == 'SRG1-PP1-TXRX':
469 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
470 ele['org-openroadm-network-topology:'
471 'pp-attributes']['used-wavelength'])
472 self.assertNotIn({u'index': 2, u'frequency': 196.05,
474 ele['org-openroadm-network-topology:'
475 'pp-attributes']['used-wavelength'])
476 if ele['tp-id'] == 'SRG1-PP2-TXRX':
477 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
478 ele['org-openroadm-network-topology:'
479 'pp-attributes']['used-wavelength'])
480 self.assertNotIn({u'index': 1, u'frequency': 196.1,
482 ele['org-openroadm-network-topology:'
483 'pp-attributes']['used-wavelength'])
484 if ele['tp-id'] == 'SRG1-PP3-TXRX':
485 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
489 def test_27_check_topo_ROADMA_DEG1(self):
490 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
491 self.assertEqual(response.status_code, requests.codes.ok)
492 res = response.json()
493 self.assertNotIn({u'index': 1}, res['node'][0][
494 u'org-openroadm-network-topology:degree-attributes'][
495 'available-wavelengths'])
496 self.assertNotIn({u'index': 2}, res['node'][0][
497 u'org-openroadm-network-topology:degree-attributes'][
498 'available-wavelengths'])
499 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
501 if ele['tp-id'] == 'DEG1-CTP-TXRX':
502 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
503 ele['org-openroadm-network-topology:'
504 'ctp-attributes']['used-wavelengths'])
505 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
506 ele['org-openroadm-network-topology:'
507 'ctp-attributes']['used-wavelengths'])
508 if ele['tp-id'] == 'DEG1-TTP-TXRX':
509 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
510 ele['org-openroadm-network-topology:'
511 'tx-ttp-attributes']['used-wavelengths'])
512 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
513 ele['org-openroadm-network-topology:'
514 'tx-ttp-attributes']['used-wavelengths'])
517 # creation service test on a non-available resource
518 def test_28_create_eth_service3(self):
519 self.cr_serv_sample_data["input"]["service-name"] = "service3"
520 response = test_utils.service_create_request(self.cr_serv_sample_data)
521 self.assertEqual(response.status_code, requests.codes.ok)
522 res = response.json()
523 self.assertIn('PCE calculation in progress',
524 res['output']['configuration-response-common'][
526 self.assertIn('200', res['output']['configuration-response-common'][
528 time.sleep(self.WAITING)
530 # add a test that check the openroadm-service-list still only
531 # contains 2 elements
533 def test_29_delete_eth_service3(self):
534 url = "{}/operations/org-openroadm-service:service-delete"
536 "sdnc-request-header": {
537 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
538 "rpc-action": "service-delete",
539 "request-system-id": "appname",
541 "http://localhost:8585/NotificationServer/notify"
543 "service-delete-req-info": {
544 "service-name": "service3",
545 "tail-retention": "no"
549 response = test_utils.post_request(url, data)
550 self.assertEqual(response.status_code, requests.codes.ok)
551 res = response.json()
552 self.assertIn('Service \'service3\' does not exist in datastore',
553 res['output']['configuration-response-common'][
555 self.assertIn('500', res['output']['configuration-response-common'][
559 def test_30_delete_eth_service1(self):
560 url = "{}/operations/org-openroadm-service:service-delete"
562 "sdnc-request-header": {
563 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
564 "rpc-action": "service-delete",
565 "request-system-id": "appname",
567 "http://localhost:8585/NotificationServer/notify"
569 "service-delete-req-info": {
570 "service-name": "service1",
571 "tail-retention": "no"
575 response = test_utils.post_request(url, data)
576 self.assertEqual(response.status_code, requests.codes.ok)
577 res = response.json()
578 self.assertIn('Renderer service delete in progress',
579 res['output']['configuration-response-common'][
583 def test_31_delete_eth_service2(self):
584 url = "{}/operations/org-openroadm-service:service-delete"
586 "sdnc-request-header": {
587 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
588 "rpc-action": "service-delete",
589 "request-system-id": "appname",
591 "http://localhost:8585/NotificationServer/notify"
593 "service-delete-req-info": {
594 "service-name": "service2",
595 "tail-retention": "no"
599 response = test_utils.post_request(url, data)
600 self.assertEqual(response.status_code, requests.codes.ok)
601 res = response.json()
602 self.assertIn('Renderer service delete in progress',
603 res['output']['configuration-response-common'][
607 def test_32_check_no_xc_ROADMA(self):
608 response = test_utils.check_netconf_node_request("ROADMA01", "")
609 res = response.json()
610 self.assertEqual(response.status_code, requests.codes.ok)
611 self.assertNotIn('roadm-connections',
612 dict.keys(res['org-openroadm-device']))
615 def test_33_check_topo_XPDRA(self):
616 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
617 self.assertEqual(response.status_code, requests.codes.ok)
618 res = response.json()
619 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
621 if ((ele[u'org-openroadm-common-network:tp-type'] ==
623 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
624 'tp-id'] == 'XPDR1-CLIENT3')):
626 'org-openroadm-network-topology:xpdr-client-attributes',
628 elif (ele[u'org-openroadm-common-network:tp-type'] ==
630 self.assertIn(u'tail-equipment-id', dict.keys(
631 ele[u'org-openroadm-network-topology:'
632 u'xpdr-network-attributes']))
633 self.assertNotIn('wavelength', dict.keys(
634 ele[u'org-openroadm-network-topology:'
635 u'xpdr-network-attributes']))
638 def test_34_check_topo_ROADMA_SRG1(self):
639 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
640 self.assertEqual(response.status_code, requests.codes.ok)
641 res = response.json()
642 self.assertIn({u'index': 1}, res['node'][0][
643 u'org-openroadm-network-topology:srg-attributes'][
644 'available-wavelengths'])
645 self.assertIn({u'index': 2}, res['node'][0][
646 u'org-openroadm-network-topology:srg-attributes'][
647 'available-wavelengths'])
648 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
650 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
651 ele['tp-id'] == 'SRG1-PP1-TXRX':
652 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
655 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
659 def test_35_check_topo_ROADMA_DEG1(self):
660 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
661 self.assertEqual(response.status_code, requests.codes.ok)
662 res = response.json()
663 self.assertIn({u'index': 1}, res['node'][0][
664 u'org-openroadm-network-topology:degree-attributes'][
665 'available-wavelengths'])
666 self.assertIn({u'index': 2}, res['node'][0][
667 u'org-openroadm-network-topology:degree-attributes'][
668 'available-wavelengths'])
669 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
671 if ele['tp-id'] == 'DEG1-CTP-TXRX':
672 self.assertNotIn('org-openroadm-network-topology:'
673 'ctp-attributes', dict.keys(ele))
674 if ele['tp-id'] == 'DEG1-TTP-TXRX':
675 self.assertNotIn('org-openroadm-network-topology:'
676 'tx-ttp-attributes', dict.keys(ele))
679 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
680 def test_36_create_oc_service1(self):
681 self.cr_serv_sample_data["input"]["service-name"] = "service1"
682 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
683 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
684 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
685 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
686 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
687 response = test_utils.service_create_request(self.cr_serv_sample_data)
688 self.assertEqual(response.status_code, requests.codes.ok)
689 res = response.json()
690 self.assertIn('PCE calculation in progress',
691 res['output']['configuration-response-common'][
693 time.sleep(self.WAITING)
695 def test_37_get_oc_service1(self):
696 response = test_utils.get_service_list_request("services/service1")
697 self.assertEqual(response.status_code, requests.codes.ok)
698 res = response.json()
700 res['services'][0]['administrative-state'],
703 res['services'][0]['service-name'], 'service1')
705 res['services'][0]['connection-type'], 'roadm-line')
707 res['services'][0]['lifecycle-state'], 'planned')
710 def test_38_check_xc1_ROADMA(self):
711 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
712 self.assertEqual(response.status_code, requests.codes.ok)
713 res = response.json()
714 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
715 self.assertDictEqual(
717 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
718 'wavelength-number': 1,
719 'opticalControlMode': 'gainLoss',
720 'target-output-power': -3.0
721 }, **res['roadm-connections'][0]),
722 res['roadm-connections'][0]
724 self.assertDictEqual(
725 {'src-if': 'SRG1-PP1-TXRX-1'},
726 res['roadm-connections'][0]['source'])
727 self.assertDictEqual(
728 {'dst-if': 'DEG1-TTP-TXRX-1'},
729 res['roadm-connections'][0]['destination'])
732 def test_39_check_xc1_ROADMC(self):
733 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
734 self.assertEqual(response.status_code, requests.codes.ok)
735 res = response.json()
736 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
737 self.assertDictEqual(
739 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
740 'wavelength-number': 1,
741 'opticalControlMode': 'gainLoss',
742 'target-output-power': 2.0
743 }, **res['roadm-connections'][0]),
744 res['roadm-connections'][0]
746 self.assertDictEqual(
747 {'src-if': 'SRG1-PP1-TXRX-1'},
748 res['roadm-connections'][0]['source'])
749 self.assertDictEqual(
750 {'dst-if': 'DEG2-TTP-TXRX-1'},
751 res['roadm-connections'][0]['destination'])
754 def test_40_create_oc_service2(self):
755 self.cr_serv_sample_data["input"]["service-name"] = "service2"
756 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
757 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
758 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
759 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
760 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
761 response = test_utils.service_create_request(self.cr_serv_sample_data)
762 self.assertEqual(response.status_code, requests.codes.ok)
763 res = response.json()
764 self.assertIn('PCE calculation in progress',
765 res['output']['configuration-response-common'][
767 time.sleep(self.WAITING)
769 def test_41_get_oc_service2(self):
770 response = test_utils.get_service_list_request("services/service2")
771 self.assertEqual(response.status_code, requests.codes.ok)
772 res = response.json()
774 res['services'][0]['administrative-state'],
777 res['services'][0]['service-name'], 'service2')
779 res['services'][0]['connection-type'], 'roadm-line')
781 res['services'][0]['lifecycle-state'], 'planned')
784 def test_42_check_xc2_ROADMA(self):
785 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2")
786 self.assertEqual(response.status_code, requests.codes.ok)
787 res = response.json()
788 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
789 self.assertDictEqual(
791 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
792 'wavelength-number': 2,
793 'opticalControlMode': 'gainLoss',
794 'target-output-power': -3.0
795 }, **res['roadm-connections'][0]),
796 res['roadm-connections'][0]
798 self.assertDictEqual(
799 {'src-if': 'SRG1-PP2-TXRX-2'},
800 res['roadm-connections'][0]['source'])
801 self.assertDictEqual(
802 {'dst-if': 'DEG1-TTP-TXRX-2'},
803 res['roadm-connections'][0]['destination'])
806 def test_43_check_topo_ROADMA(self):
807 self.test_26_check_topo_ROADMA_SRG1()
808 self.test_27_check_topo_ROADMA_DEG1()
811 def test_44_delete_oc_service1(self):
812 url = "{}/operations/org-openroadm-service:service-delete"
814 "sdnc-request-header": {
815 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
816 "rpc-action": "service-delete",
817 "request-system-id": "appname",
819 "http://localhost:8585/NotificationServer/notify"
821 "service-delete-req-info": {
822 "service-name": "service1",
823 "tail-retention": "no"
827 response = test_utils.post_request(url, data)
828 self.assertEqual(response.status_code, requests.codes.ok)
829 res = response.json()
830 self.assertIn('Renderer service delete in progress',
831 res['output']['configuration-response-common'][
835 def test_45_delete_oc_service2(self):
836 url = "{}/operations/org-openroadm-service:service-delete"
838 "sdnc-request-header": {
839 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
840 "rpc-action": "service-delete",
841 "request-system-id": "appname",
843 "http://localhost:8585/NotificationServer/notify"
845 "service-delete-req-info": {
846 "service-name": "service2",
847 "tail-retention": "no"
851 response = test_utils.post_request(url, data)
852 self.assertEqual(response.status_code, requests.codes.ok)
853 res = response.json()
854 self.assertIn('Renderer service delete in progress',
855 res['output']['configuration-response-common'][
859 def test_46_get_no_oc_services(self):
861 response = test_utils.get_service_list_request("")
862 self.assertEqual(response.status_code, requests.codes.not_found)
863 res = response.json()
866 "error-type": "application",
867 "error-tag": "data-missing",
869 "Request could not be completed because the relevant data "
870 "model content does not exist"
872 res['errors']['error'])
875 def test_47_get_no_xc_ROADMA(self):
876 response = test_utils.check_netconf_node_request("ROADMA01", "")
877 self.assertEqual(response.status_code, requests.codes.ok)
878 res = response.json()
879 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
882 def test_48_check_topo_ROADMA(self):
883 self.test_34_check_topo_ROADMA_SRG1()
884 self.test_35_check_topo_ROADMA_DEG1()
886 def test_49_loop_create_eth_service(self):
887 for i in range(1, 6):
888 print("iteration number {}".format(i))
889 print("eth service creation")
890 self.test_11_create_eth_service1()
891 print("check xc in ROADMA01")
892 self.test_13_check_xc1_ROADMA()
893 print("check xc in ROADMC01")
894 self.test_14_check_xc1_ROADMC()
895 print("eth service deletion\n")
896 self.test_30_delete_eth_service1()
898 def test_50_loop_create_oc_service(self):
899 response = test_utils.get_service_list_request("services/service1")
900 if response.status_code != 404:
901 url = "{}/operations/org-openroadm-service:service-delete"
903 "sdnc-request-header": {
904 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
905 "rpc-action": "service-delete",
906 "request-system-id": "appname",
908 "http://localhost:8585/NotificationServer/notify"
910 "service-delete-req-info": {
911 "service-name": "service1",
912 "tail-retention": "no"
916 test_utils.post_request(url, data)
919 for i in range(1, 6):
920 print("iteration number {}".format(i))
921 print("oc service creation")
922 self.test_36_create_oc_service1()
923 print("check xc in ROADMA01")
924 self.test_38_check_xc1_ROADMA()
925 print("check xc in ROADMC01")
926 self.test_39_check_xc1_ROADMC()
927 print("oc service deletion\n")
928 self.test_44_delete_oc_service1()
930 def test_51_disconnect_XPDRA(self):
931 response = test_utils.unmount_device("XPDRA01")
932 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
934 def test_52_disconnect_XPDRC(self):
935 response = test_utils.unmount_device("XPDRC01")
936 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
938 def test_53_disconnect_ROADMA(self):
939 response = test_utils.unmount_device("ROADMA01")
940 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
942 def test_54_disconnect_ROADMC(self):
943 response = test_utils.unmount_device("ROADMC01")
944 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
947 if __name__ == "__main__":
948 unittest.main(verbosity=2)