2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 from common import test_utils
21 class TransportPCEFulltesting(unittest.TestCase):
23 cr_serv_sample_data = {"input": {
24 "sdnc-request-header": {
25 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
26 "rpc-action": "service-create",
27 "request-system-id": "appname",
29 "http://localhost:8585/NotificationServer/notify"
31 "service-name": "service1",
32 "common-id": "ASATT1234567",
33 "connection-type": "service",
35 "service-rate": "100",
37 "service-format": "Ethernet",
38 "clli": "SNJSCAMCJP8",
42 "ROUTER_SNJSCAMCJP8_000000.00_00",
43 "port-type": "router",
44 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
45 "port-rack": "000000.00",
50 "LGX Panel_SNJSCAMCJP8_000000.00_00",
51 "lgx-port-name": "LGX Back.3",
52 "lgx-port-rack": "000000.00",
53 "lgx-port-shelf": "00"
59 "ROUTER_SNJSCAMCJP8_000000.00_00",
60 "port-type": "router",
61 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
62 "port-rack": "000000.00",
67 "LGX Panel_SNJSCAMCJP8_000000.00_00",
68 "lgx-port-name": "LGX Back.4",
69 "lgx-port-rack": "000000.00",
70 "lgx-port-shelf": "00"
76 "service-rate": "100",
78 "service-format": "Ethernet",
79 "clli": "SNJSCAMCJT4",
83 "ROUTER_SNJSCAMCJT4_000000.00_00",
84 "port-type": "router",
85 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
86 "port-rack": "000000.00",
91 "LGX Panel_SNJSCAMCJT4_000000.00_00",
92 "lgx-port-name": "LGX Back.29",
93 "lgx-port-rack": "000000.00",
94 "lgx-port-shelf": "00"
100 "ROUTER_SNJSCAMCJT4_000000.00_00",
101 "port-type": "router",
102 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
103 "port-rack": "000000.00",
108 "LGX Panel_SNJSCAMCJT4_000000.00_00",
109 "lgx-port-name": "LGX Back.30",
110 "lgx-port-rack": "000000.00",
111 "lgx-port-shelf": "00"
116 "due-date": "2016-11-28T00:00:01Z",
117 "operator-contact": "pw1234"
125 cls.processes = test_utils.start_tpce()
126 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
129 def tearDownClass(cls):
130 # pylint: disable=not-an-iterable
131 for process in cls.processes:
132 test_utils.shutdown_process(process)
133 print("all processes killed")
135 def setUp(self): # instruction executed before each test method
136 print("execution of {}".format(self.id().split(".")[-1]))
138 # connect netconf devices
139 def test_01_connect_xpdrA(self):
140 response = test_utils.mount_device("XPDRA01", 'xpdra')
141 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_02_connect_xpdrC(self):
144 response = test_utils.mount_device("XPDRC01", 'xpdrc')
145 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
147 def test_03_connect_rdmA(self):
148 response = test_utils.mount_device("ROADMA01", 'roadma-full')
149 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
151 def test_04_connect_rdmC(self):
152 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
153 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
155 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
156 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
157 "ROADMA01", "1", "SRG1-PP1-TXRX")
158 self.assertEqual(response.status_code, requests.codes.ok)
159 res = response.json()
160 self.assertIn('Xponder Roadm Link created successfully',
161 res["output"]["result"])
164 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
165 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
166 "ROADMA01", "1", "SRG1-PP1-TXRX")
167 self.assertEqual(response.status_code, requests.codes.ok)
168 res = response.json()
169 self.assertIn('Roadm Xponder links created successfully',
170 res["output"]["result"])
173 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
174 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
175 "ROADMC01", "1", "SRG1-PP1-TXRX")
176 self.assertEqual(response.status_code, requests.codes.ok)
177 res = response.json()
178 self.assertIn('Xponder Roadm Link created successfully',
179 res["output"]["result"])
182 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
183 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
184 "ROADMC01", "1", "SRG1-PP1-TXRX")
185 self.assertEqual(response.status_code, requests.codes.ok)
186 res = response.json()
187 self.assertIn('Roadm Xponder links created successfully',
188 res["output"]["result"])
191 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
192 # Config ROADMA-ROADMC oms-attributes
195 "auto-spanloss": "true",
196 "spanloss-base": 11.4,
197 "spanloss-current": 12,
198 "engineered-spanloss": 12.2,
199 "link-concatenation": [{
202 "SRLG-length": 100000,
204 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
205 self.assertEqual(response.status_code, requests.codes.created)
207 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
208 # Config ROADMC-ROADMA oms-attributes
211 "auto-spanloss": "true",
212 "spanloss-base": 11.4,
213 "spanloss-current": 12,
214 "engineered-spanloss": 12.2,
215 "link-concatenation": [{
218 "SRLG-length": 100000,
220 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
221 self.assertEqual(response.status_code, requests.codes.created)
223 # test service-create for Eth service from xpdr to xpdr
224 def test_11_create_eth_service1(self):
225 self.cr_serv_sample_data["input"]["service-name"] = "service1"
226 response = test_utils.service_create_request(self.cr_serv_sample_data)
227 self.assertEqual(response.status_code, requests.codes.ok)
228 res = response.json()
229 self.assertIn('PCE calculation in progress',
230 res['output']['configuration-response-common'][
232 time.sleep(self.WAITING)
234 def test_12_get_eth_service1(self):
235 response = test_utils.get_service_list_request("services/service1")
236 self.assertEqual(response.status_code, requests.codes.ok)
237 res = response.json()
239 res['services'][0]['administrative-state'],
242 res['services'][0]['service-name'], 'service1')
244 res['services'][0]['connection-type'], 'service')
246 res['services'][0]['lifecycle-state'], 'planned')
249 def test_13_check_xc1_ROADMA(self):
250 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
251 self.assertEqual(response.status_code, requests.codes.ok)
252 res = response.json()
253 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
254 self.assertDictEqual(
256 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
257 'wavelength-number': 1,
258 'opticalControlMode': 'gainLoss',
259 'target-output-power': -3.0
260 }, **res['roadm-connections'][0]),
261 res['roadm-connections'][0]
263 self.assertDictEqual(
264 {'src-if': 'SRG1-PP1-TXRX-1'},
265 res['roadm-connections'][0]['source'])
266 self.assertDictEqual(
267 {'dst-if': 'DEG1-TTP-TXRX-1'},
268 res['roadm-connections'][0]['destination'])
271 def test_14_check_xc1_ROADMC(self):
272 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
273 self.assertEqual(response.status_code, requests.codes.ok)
274 res = response.json()
275 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
276 self.assertDictEqual(
278 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
279 'wavelength-number': 1,
280 'opticalControlMode': 'gainLoss',
281 'target-output-power': 2.0
282 }, **res['roadm-connections'][0]),
283 res['roadm-connections'][0]
285 self.assertDictEqual(
286 {'src-if': 'SRG1-PP1-TXRX-1'},
287 res['roadm-connections'][0]['source'])
288 self.assertDictEqual(
289 {'dst-if': 'DEG2-TTP-TXRX-1'},
290 res['roadm-connections'][0]['destination'])
293 def test_15_check_topo_XPDRA(self):
294 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
295 self.assertEqual(response.status_code, requests.codes.ok)
296 res = response.json()
297 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
299 if ele['tp-id'] == 'XPDR1-NETWORK1':
300 self.assertEqual({u'frequency': 196.1, u'width': 40},
301 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
302 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
304 'org-openroadm-network-topology:xpdr-client-attributes',
306 if ele['tp-id'] == 'XPDR1-NETWORK2':
308 'org-openroadm-network-topology:xpdr-network-attributes',
312 def test_16_check_topo_ROADMA_SRG1(self):
313 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
314 self.assertEqual(response.status_code, requests.codes.ok)
315 res = response.json()
316 self.assertNotIn({u'index': 1},
318 u'org-openroadm-network-topology:srg-attributes'][
319 'available-wavelengths'])
320 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
322 if ele['tp-id'] == 'SRG1-PP1-TXRX':
323 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
324 ele['org-openroadm-network-topology:'
325 'pp-attributes']['used-wavelength']
327 if ele['tp-id'] == 'SRG1-PP2-TXRX':
328 self.assertNotIn('used-wavelength', dict.keys(ele))
331 def test_17_check_topo_ROADMA_DEG1(self):
332 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 self.assertNotIn({u'index': 1},
337 u'org-openroadm-network-topology:'
338 u'degree-attributes'][
339 'available-wavelengths'])
340 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
342 if ele['tp-id'] == 'DEG1-CTP-TXRX':
343 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
344 ele['org-openroadm-network-topology:'
347 if ele['tp-id'] == 'DEG1-TTP-TXRX':
348 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
349 ele['org-openroadm-network-topology:'
350 'tx-ttp-attributes'][
354 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
355 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
356 "ROADMA01", "1", "SRG1-PP2-TXRX")
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertIn('Xponder Roadm Link created successfully',
360 res["output"]["result"])
363 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
364 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
365 "ROADMA01", "1", "SRG1-PP2-TXRX")
366 self.assertEqual(response.status_code, requests.codes.ok)
367 res = response.json()
368 self.assertIn('Roadm Xponder links created successfully',
369 res["output"]["result"])
372 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
373 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
374 "ROADMC01", "1", "SRG1-PP2-TXRX")
375 self.assertEqual(response.status_code, requests.codes.ok)
376 res = response.json()
377 self.assertIn('Xponder Roadm Link created successfully',
378 res["output"]["result"])
381 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
382 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
383 "ROADMC01", "1", "SRG1-PP2-TXRX")
384 self.assertEqual(response.status_code, requests.codes.ok)
385 res = response.json()
386 self.assertIn('Roadm Xponder links created successfully',
387 res["output"]["result"])
390 def test_22_create_eth_service2(self):
391 self.cr_serv_sample_data["input"]["service-name"] = "service2"
392 response = test_utils.service_create_request(self.cr_serv_sample_data)
393 self.assertEqual(response.status_code, requests.codes.ok)
394 res = response.json()
395 self.assertIn('PCE calculation in progress',
396 res['output']['configuration-response-common'][
398 time.sleep(self.WAITING)
400 def test_23_get_eth_service2(self):
401 response = test_utils.get_service_list_request("services/service2")
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
405 res['services'][0]['administrative-state'],
408 res['services'][0]['service-name'], 'service2')
410 res['services'][0]['connection-type'], 'service')
412 res['services'][0]['lifecycle-state'], 'planned')
415 def test_24_check_xc2_ROADMA(self):
416 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2")
417 self.assertEqual(response.status_code, requests.codes.ok)
418 res = response.json()
419 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
420 self.assertDictEqual(
422 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
423 'wavelength-number': 2,
424 'opticalControlMode': 'power'
425 }, **res['roadm-connections'][0]),
426 res['roadm-connections'][0]
428 self.assertDictEqual(
429 {'src-if': 'DEG1-TTP-TXRX-2'},
430 res['roadm-connections'][0]['source'])
431 self.assertDictEqual(
432 {'dst-if': 'SRG1-PP2-TXRX-2'},
433 res['roadm-connections'][0]['destination'])
435 def test_25_check_topo_XPDRA(self):
436 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
437 self.assertEqual(response.status_code, requests.codes.ok)
438 res = response.json()
439 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
441 if ele['tp-id'] == 'XPDR1-NETWORK1':
442 self.assertEqual({u'frequency': 196.1, u'width': 40},
443 ele['org-openroadm-network-topology:'
444 'xpdr-network-attributes'][
446 if ele['tp-id'] == 'XPDR1-NETWORK2':
447 self.assertEqual({u'frequency': 196.05, u'width': 40},
448 ele['org-openroadm-network-topology:'
449 'xpdr-network-attributes'][
451 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
452 ele['tp-id'] == 'XPDR1-CLIENT3':
454 'org-openroadm-network-topology:xpdr-client-attributes',
458 def test_26_check_topo_ROADMA_SRG1(self):
459 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
460 self.assertEqual(response.status_code, requests.codes.ok)
461 res = response.json()
462 self.assertNotIn({u'index': 1}, res['node'][0][
463 u'org-openroadm-network-topology:srg-attributes'][
464 'available-wavelengths'])
465 self.assertNotIn({u'index': 2}, res['node'][0][
466 u'org-openroadm-network-topology:srg-attributes'][
467 'available-wavelengths'])
468 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
470 if ele['tp-id'] == 'SRG1-PP1-TXRX':
471 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
472 ele['org-openroadm-network-topology:'
473 'pp-attributes']['used-wavelength'])
474 self.assertNotIn({u'index': 2, u'frequency': 196.05,
476 ele['org-openroadm-network-topology:'
477 'pp-attributes']['used-wavelength'])
478 if ele['tp-id'] == 'SRG1-PP2-TXRX':
479 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
480 ele['org-openroadm-network-topology:'
481 'pp-attributes']['used-wavelength'])
482 self.assertNotIn({u'index': 1, u'frequency': 196.1,
484 ele['org-openroadm-network-topology:'
485 'pp-attributes']['used-wavelength'])
486 if ele['tp-id'] == 'SRG1-PP3-TXRX':
487 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
491 def test_27_check_topo_ROADMA_DEG1(self):
492 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
493 self.assertEqual(response.status_code, requests.codes.ok)
494 res = response.json()
495 self.assertNotIn({u'index': 1}, res['node'][0][
496 u'org-openroadm-network-topology:degree-attributes'][
497 'available-wavelengths'])
498 self.assertNotIn({u'index': 2}, res['node'][0][
499 u'org-openroadm-network-topology:degree-attributes'][
500 'available-wavelengths'])
501 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
503 if ele['tp-id'] == 'DEG1-CTP-TXRX':
504 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
505 ele['org-openroadm-network-topology:'
506 'ctp-attributes']['used-wavelengths'])
507 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
508 ele['org-openroadm-network-topology:'
509 'ctp-attributes']['used-wavelengths'])
510 if ele['tp-id'] == 'DEG1-TTP-TXRX':
511 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
512 ele['org-openroadm-network-topology:'
513 'tx-ttp-attributes']['used-wavelengths'])
514 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
515 ele['org-openroadm-network-topology:'
516 'tx-ttp-attributes']['used-wavelengths'])
519 # creation service test on a non-available resource
520 def test_28_create_eth_service3(self):
521 self.cr_serv_sample_data["input"]["service-name"] = "service3"
522 response = test_utils.service_create_request(self.cr_serv_sample_data)
523 self.assertEqual(response.status_code, requests.codes.ok)
524 res = response.json()
525 self.assertIn('PCE calculation in progress',
526 res['output']['configuration-response-common'][
528 self.assertIn('200', res['output']['configuration-response-common'][
530 time.sleep(self.WAITING)
532 # add a test that check the openroadm-service-list still only
533 # contains 2 elements
535 def test_29_delete_eth_service3(self):
536 response = test_utils.service_delete_request("service3")
537 self.assertEqual(response.status_code, requests.codes.ok)
538 res = response.json()
539 self.assertIn('Service \'service3\' does not exist in datastore',
540 res['output']['configuration-response-common'][
542 self.assertIn('500', res['output']['configuration-response-common'][
546 def test_30_delete_eth_service1(self):
547 response = test_utils.service_delete_request("service1")
548 self.assertEqual(response.status_code, requests.codes.ok)
549 res = response.json()
550 self.assertIn('Renderer service delete in progress',
551 res['output']['configuration-response-common'][
555 def test_31_delete_eth_service2(self):
556 response = test_utils.service_delete_request("service2")
557 self.assertEqual(response.status_code, requests.codes.ok)
558 res = response.json()
559 self.assertIn('Renderer service delete in progress',
560 res['output']['configuration-response-common'][
564 def test_32_check_no_xc_ROADMA(self):
565 response = test_utils.check_netconf_node_request("ROADMA01", "")
566 res = response.json()
567 self.assertEqual(response.status_code, requests.codes.ok)
568 self.assertNotIn('roadm-connections',
569 dict.keys(res['org-openroadm-device']))
572 def test_33_check_topo_XPDRA(self):
573 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
574 self.assertEqual(response.status_code, requests.codes.ok)
575 res = response.json()
576 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
578 if ((ele[u'org-openroadm-common-network:tp-type'] ==
580 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
581 'tp-id'] == 'XPDR1-CLIENT3')):
583 'org-openroadm-network-topology:xpdr-client-attributes',
585 elif (ele[u'org-openroadm-common-network:tp-type'] ==
587 self.assertIn(u'tail-equipment-id', dict.keys(
588 ele[u'org-openroadm-network-topology:'
589 u'xpdr-network-attributes']))
590 self.assertNotIn('wavelength', dict.keys(
591 ele[u'org-openroadm-network-topology:'
592 u'xpdr-network-attributes']))
595 def test_34_check_topo_ROADMA_SRG1(self):
596 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
597 self.assertEqual(response.status_code, requests.codes.ok)
598 res = response.json()
599 self.assertIn({u'index': 1}, res['node'][0][
600 u'org-openroadm-network-topology:srg-attributes'][
601 'available-wavelengths'])
602 self.assertIn({u'index': 2}, res['node'][0][
603 u'org-openroadm-network-topology:srg-attributes'][
604 'available-wavelengths'])
605 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
607 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
608 ele['tp-id'] == 'SRG1-PP1-TXRX':
609 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
612 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
616 def test_35_check_topo_ROADMA_DEG1(self):
617 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
618 self.assertEqual(response.status_code, requests.codes.ok)
619 res = response.json()
620 self.assertIn({u'index': 1}, res['node'][0][
621 u'org-openroadm-network-topology:degree-attributes'][
622 'available-wavelengths'])
623 self.assertIn({u'index': 2}, res['node'][0][
624 u'org-openroadm-network-topology:degree-attributes'][
625 'available-wavelengths'])
626 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
628 if ele['tp-id'] == 'DEG1-CTP-TXRX':
629 self.assertNotIn('org-openroadm-network-topology:'
630 'ctp-attributes', dict.keys(ele))
631 if ele['tp-id'] == 'DEG1-TTP-TXRX':
632 self.assertNotIn('org-openroadm-network-topology:'
633 'tx-ttp-attributes', dict.keys(ele))
636 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
637 def test_36_create_oc_service1(self):
638 self.cr_serv_sample_data["input"]["service-name"] = "service1"
639 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
640 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
641 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
642 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
643 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
644 response = test_utils.service_create_request(self.cr_serv_sample_data)
645 self.assertEqual(response.status_code, requests.codes.ok)
646 res = response.json()
647 self.assertIn('PCE calculation in progress',
648 res['output']['configuration-response-common'][
650 time.sleep(self.WAITING)
652 def test_37_get_oc_service1(self):
653 response = test_utils.get_service_list_request("services/service1")
654 self.assertEqual(response.status_code, requests.codes.ok)
655 res = response.json()
657 res['services'][0]['administrative-state'],
660 res['services'][0]['service-name'], 'service1')
662 res['services'][0]['connection-type'], 'roadm-line')
664 res['services'][0]['lifecycle-state'], 'planned')
667 def test_38_check_xc1_ROADMA(self):
668 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
669 self.assertEqual(response.status_code, requests.codes.ok)
670 res = response.json()
671 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
672 self.assertDictEqual(
674 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
675 'wavelength-number': 1,
676 'opticalControlMode': 'gainLoss',
677 'target-output-power': -3.0
678 }, **res['roadm-connections'][0]),
679 res['roadm-connections'][0]
681 self.assertDictEqual(
682 {'src-if': 'SRG1-PP1-TXRX-1'},
683 res['roadm-connections'][0]['source'])
684 self.assertDictEqual(
685 {'dst-if': 'DEG1-TTP-TXRX-1'},
686 res['roadm-connections'][0]['destination'])
689 def test_39_check_xc1_ROADMC(self):
690 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
691 self.assertEqual(response.status_code, requests.codes.ok)
692 res = response.json()
693 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
694 self.assertDictEqual(
696 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
697 'wavelength-number': 1,
698 'opticalControlMode': 'gainLoss',
699 'target-output-power': 2.0
700 }, **res['roadm-connections'][0]),
701 res['roadm-connections'][0]
703 self.assertDictEqual(
704 {'src-if': 'SRG1-PP1-TXRX-1'},
705 res['roadm-connections'][0]['source'])
706 self.assertDictEqual(
707 {'dst-if': 'DEG2-TTP-TXRX-1'},
708 res['roadm-connections'][0]['destination'])
711 def test_40_create_oc_service2(self):
712 self.cr_serv_sample_data["input"]["service-name"] = "service2"
713 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
714 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
715 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
716 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
717 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
718 response = test_utils.service_create_request(self.cr_serv_sample_data)
719 self.assertEqual(response.status_code, requests.codes.ok)
720 res = response.json()
721 self.assertIn('PCE calculation in progress',
722 res['output']['configuration-response-common'][
724 time.sleep(self.WAITING)
726 def test_41_get_oc_service2(self):
727 response = test_utils.get_service_list_request("services/service2")
728 self.assertEqual(response.status_code, requests.codes.ok)
729 res = response.json()
731 res['services'][0]['administrative-state'],
734 res['services'][0]['service-name'], 'service2')
736 res['services'][0]['connection-type'], 'roadm-line')
738 res['services'][0]['lifecycle-state'], 'planned')
741 def test_42_check_xc2_ROADMA(self):
742 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2")
743 self.assertEqual(response.status_code, requests.codes.ok)
744 res = response.json()
745 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
746 self.assertDictEqual(
748 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
749 'wavelength-number': 2,
750 'opticalControlMode': 'gainLoss',
751 'target-output-power': -3.0
752 }, **res['roadm-connections'][0]),
753 res['roadm-connections'][0]
755 self.assertDictEqual(
756 {'src-if': 'SRG1-PP2-TXRX-2'},
757 res['roadm-connections'][0]['source'])
758 self.assertDictEqual(
759 {'dst-if': 'DEG1-TTP-TXRX-2'},
760 res['roadm-connections'][0]['destination'])
763 def test_43_check_topo_ROADMA(self):
764 self.test_26_check_topo_ROADMA_SRG1()
765 self.test_27_check_topo_ROADMA_DEG1()
768 def test_44_delete_oc_service1(self):
769 response = test_utils.service_delete_request("service1")
770 self.assertEqual(response.status_code, requests.codes.ok)
771 res = response.json()
772 self.assertIn('Renderer service delete in progress',
773 res['output']['configuration-response-common'][
777 def test_45_delete_oc_service2(self):
778 response = test_utils.service_delete_request("service2")
779 self.assertEqual(response.status_code, requests.codes.ok)
780 res = response.json()
781 self.assertIn('Renderer service delete in progress',
782 res['output']['configuration-response-common'][
786 def test_46_get_no_oc_services(self):
788 response = test_utils.get_service_list_request("")
789 self.assertEqual(response.status_code, requests.codes.conflict)
790 res = response.json()
793 "error-type": "application",
794 "error-tag": "data-missing",
796 "Request could not be completed because the relevant data "
797 "model content does not exist"
799 res['errors']['error'])
802 def test_47_get_no_xc_ROADMA(self):
803 response = test_utils.check_netconf_node_request("ROADMA01", "")
804 self.assertEqual(response.status_code, requests.codes.ok)
805 res = response.json()
806 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
809 def test_48_check_topo_ROADMA(self):
810 self.test_34_check_topo_ROADMA_SRG1()
811 self.test_35_check_topo_ROADMA_DEG1()
813 def test_49_loop_create_eth_service(self):
814 for i in range(1, 6):
815 print("iteration number {}".format(i))
816 print("eth service creation")
817 self.test_11_create_eth_service1()
818 print("check xc in ROADMA01")
819 self.test_13_check_xc1_ROADMA()
820 print("check xc in ROADMC01")
821 self.test_14_check_xc1_ROADMC()
822 print("eth service deletion\n")
823 self.test_30_delete_eth_service1()
825 def test_50_loop_create_oc_service(self):
826 response = test_utils.get_service_list_request("services/service1")
827 if response.status_code != 404:
828 response = test_utils.service_delete_request("service1")
831 for i in range(1, 6):
832 print("iteration number {}".format(i))
833 print("oc service creation")
834 self.test_36_create_oc_service1()
835 print("check xc in ROADMA01")
836 self.test_38_check_xc1_ROADMA()
837 print("check xc in ROADMC01")
838 self.test_39_check_xc1_ROADMC()
839 print("oc service deletion\n")
840 self.test_44_delete_oc_service1()
842 def test_51_disconnect_XPDRA(self):
843 response = test_utils.unmount_device("XPDRA01")
844 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
846 def test_52_disconnect_XPDRC(self):
847 response = test_utils.unmount_device("XPDRC01")
848 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
850 def test_53_disconnect_ROADMA(self):
851 response = test_utils.unmount_device("ROADMA01")
852 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
854 def test_54_disconnect_ROADMC(self):
855 response = test_utils.unmount_device("ROADMC01")
856 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
859 if __name__ == "__main__":
860 unittest.main(verbosity=2)