2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
18 from common import test_utils
21 class TransportPCEFulltesting(unittest.TestCase):
23 cr_serv_sample_data = {"input": {
24 "sdnc-request-header": {
25 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
26 "rpc-action": "service-create",
27 "request-system-id": "appname",
29 "http://localhost:8585/NotificationServer/notify"
31 "service-name": "service1",
32 "common-id": "ASATT1234567",
33 "connection-type": "service",
35 "service-rate": "100",
37 "service-format": "Ethernet",
38 "clli": "SNJSCAMCJP8",
42 "ROUTER_SNJSCAMCJP8_000000.00_00",
43 "port-type": "router",
44 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
45 "port-rack": "000000.00",
50 "LGX Panel_SNJSCAMCJP8_000000.00_00",
51 "lgx-port-name": "LGX Back.3",
52 "lgx-port-rack": "000000.00",
53 "lgx-port-shelf": "00"
59 "ROUTER_SNJSCAMCJP8_000000.00_00",
60 "port-type": "router",
61 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
62 "port-rack": "000000.00",
67 "LGX Panel_SNJSCAMCJP8_000000.00_00",
68 "lgx-port-name": "LGX Back.4",
69 "lgx-port-rack": "000000.00",
70 "lgx-port-shelf": "00"
76 "service-rate": "100",
78 "service-format": "Ethernet",
79 "clli": "SNJSCAMCJT4",
83 "ROUTER_SNJSCAMCJT4_000000.00_00",
84 "port-type": "router",
85 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
86 "port-rack": "000000.00",
91 "LGX Panel_SNJSCAMCJT4_000000.00_00",
92 "lgx-port-name": "LGX Back.29",
93 "lgx-port-rack": "000000.00",
94 "lgx-port-shelf": "00"
100 "ROUTER_SNJSCAMCJT4_000000.00_00",
101 "port-type": "router",
102 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
103 "port-rack": "000000.00",
108 "LGX Panel_SNJSCAMCJT4_000000.00_00",
109 "lgx-port-name": "LGX Back.30",
110 "lgx-port-rack": "000000.00",
111 "lgx-port-shelf": "00"
116 "due-date": "2016-11-28T00:00:01Z",
117 "operator-contact": "pw1234"
125 cls.processes = test_utils.start_tpce()
126 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
129 def tearDownClass(cls):
130 for process in cls.processes:
131 test_utils.shutdown_process(process)
132 print("all processes killed")
134 def setUp(self): # instruction executed before each test method
135 print("execution of {}".format(self.id().split(".")[-1]))
137 # connect netconf devices
138 def test_01_connect_xpdrA(self):
139 response = test_utils.mount_device("XPDRA01", 'xpdra')
140 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
142 def test_02_connect_xpdrC(self):
143 response = test_utils.mount_device("XPDRC01", 'xpdrc')
144 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
146 def test_03_connect_rdmA(self):
147 response = test_utils.mount_device("ROADMA01", 'roadma-full')
148 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
150 def test_04_connect_rdmC(self):
151 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
152 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
154 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
155 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
156 "ROADMA01", "1", "SRG1-PP1-TXRX")
157 self.assertEqual(response.status_code, requests.codes.ok)
158 res = response.json()
159 self.assertIn('Xponder Roadm Link created successfully',
160 res["output"]["result"])
163 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
164 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
165 "ROADMA01", "1", "SRG1-PP1-TXRX")
166 self.assertEqual(response.status_code, requests.codes.ok)
167 res = response.json()
168 self.assertIn('Roadm Xponder links created successfully',
169 res["output"]["result"])
172 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
173 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
174 "ROADMC01", "1", "SRG1-PP1-TXRX")
175 self.assertEqual(response.status_code, requests.codes.ok)
176 res = response.json()
177 self.assertIn('Xponder Roadm Link created successfully',
178 res["output"]["result"])
181 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
182 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
183 "ROADMC01", "1", "SRG1-PP1-TXRX")
184 self.assertEqual(response.status_code, requests.codes.ok)
185 res = response.json()
186 self.assertIn('Roadm Xponder links created successfully',
187 res["output"]["result"])
190 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
191 # Config ROADMA-ROADMC oms-attributes
194 "auto-spanloss": "true",
195 "spanloss-base": 11.4,
196 "spanloss-current": 12,
197 "engineered-spanloss": 12.2,
198 "link-concatenation": [{
201 "SRLG-length": 100000,
203 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
204 self.assertEqual(response.status_code, requests.codes.created)
206 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
207 # Config ROADMC-ROADMA oms-attributes
210 "auto-spanloss": "true",
211 "spanloss-base": 11.4,
212 "spanloss-current": 12,
213 "engineered-spanloss": 12.2,
214 "link-concatenation": [{
217 "SRLG-length": 100000,
219 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
220 self.assertEqual(response.status_code, requests.codes.created)
222 # test service-create for Eth service from xpdr to xpdr
223 def test_11_create_eth_service1(self):
224 self.cr_serv_sample_data["input"]["service-name"] = "service1"
225 response = test_utils.service_create_request(self.cr_serv_sample_data)
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
228 self.assertIn('PCE calculation in progress',
229 res['output']['configuration-response-common'][
231 time.sleep(self.WAITING)
233 def test_12_get_eth_service1(self):
234 response = test_utils.get_service_list_request("services/service1")
235 self.assertEqual(response.status_code, requests.codes.ok)
236 res = response.json()
238 res['services'][0]['administrative-state'],
241 res['services'][0]['service-name'], 'service1')
243 res['services'][0]['connection-type'], 'service')
245 res['services'][0]['lifecycle-state'], 'planned')
248 def test_13_check_xc1_ROADMA(self):
249 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
252 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
253 self.assertDictEqual(
255 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
256 'wavelength-number': 1,
257 'opticalControlMode': 'gainLoss',
258 'target-output-power': -3.0
259 }, **res['roadm-connections'][0]),
260 res['roadm-connections'][0]
262 self.assertDictEqual(
263 {'src-if': 'SRG1-PP1-TXRX-1'},
264 res['roadm-connections'][0]['source'])
265 self.assertDictEqual(
266 {'dst-if': 'DEG1-TTP-TXRX-1'},
267 res['roadm-connections'][0]['destination'])
270 def test_14_check_xc1_ROADMC(self):
271 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
275 self.assertDictEqual(
277 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
278 'wavelength-number': 1,
279 'opticalControlMode': 'gainLoss',
280 'target-output-power': 2.0
281 }, **res['roadm-connections'][0]),
282 res['roadm-connections'][0]
284 self.assertDictEqual(
285 {'src-if': 'SRG1-PP1-TXRX-1'},
286 res['roadm-connections'][0]['source'])
287 self.assertDictEqual(
288 {'dst-if': 'DEG2-TTP-TXRX-1'},
289 res['roadm-connections'][0]['destination'])
292 def test_15_check_topo_XPDRA(self):
293 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
294 self.assertEqual(response.status_code, requests.codes.ok)
295 res = response.json()
296 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
298 if ele['tp-id'] == 'XPDR1-NETWORK1':
299 self.assertEqual({u'frequency': 196.1, u'width': 40},
300 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
301 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
303 'org-openroadm-network-topology:xpdr-client-attributes',
305 if ele['tp-id'] == 'XPDR1-NETWORK2':
307 'org-openroadm-network-topology:xpdr-network-attributes',
311 def test_16_check_topo_ROADMA_SRG1(self):
312 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
313 self.assertEqual(response.status_code, requests.codes.ok)
314 res = response.json()
315 self.assertNotIn({u'index': 1},
317 u'org-openroadm-network-topology:srg-attributes'][
318 'available-wavelengths'])
319 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
321 if ele['tp-id'] == 'SRG1-PP1-TXRX':
322 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
323 ele['org-openroadm-network-topology:'
324 'pp-attributes']['used-wavelength']
326 if ele['tp-id'] == 'SRG1-PP2-TXRX':
327 self.assertNotIn('used-wavelength', dict.keys(ele))
330 def test_17_check_topo_ROADMA_DEG1(self):
331 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 self.assertNotIn({u'index': 1},
336 u'org-openroadm-network-topology:'
337 u'degree-attributes'][
338 'available-wavelengths'])
339 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
341 if ele['tp-id'] == 'DEG1-CTP-TXRX':
342 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
343 ele['org-openroadm-network-topology:'
346 if ele['tp-id'] == 'DEG1-TTP-TXRX':
347 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
348 ele['org-openroadm-network-topology:'
349 'tx-ttp-attributes'][
353 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
354 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
355 "ROADMA01", "1", "SRG1-PP2-TXRX")
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 self.assertIn('Xponder Roadm Link created successfully',
359 res["output"]["result"])
362 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
363 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
364 "ROADMA01", "1", "SRG1-PP2-TXRX")
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 self.assertIn('Roadm Xponder links created successfully',
368 res["output"]["result"])
371 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
372 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
373 "ROADMC01", "1", "SRG1-PP2-TXRX")
374 self.assertEqual(response.status_code, requests.codes.ok)
375 res = response.json()
376 self.assertIn('Xponder Roadm Link created successfully',
377 res["output"]["result"])
380 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
381 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
382 "ROADMC01", "1", "SRG1-PP2-TXRX")
383 self.assertEqual(response.status_code, requests.codes.ok)
384 res = response.json()
385 self.assertIn('Roadm Xponder links created successfully',
386 res["output"]["result"])
389 def test_22_create_eth_service2(self):
390 self.cr_serv_sample_data["input"]["service-name"] = "service2"
391 response = test_utils.service_create_request(self.cr_serv_sample_data)
392 self.assertEqual(response.status_code, requests.codes.ok)
393 res = response.json()
394 self.assertIn('PCE calculation in progress',
395 res['output']['configuration-response-common'][
397 time.sleep(self.WAITING)
399 def test_23_get_eth_service2(self):
400 response = test_utils.get_service_list_request("services/service2")
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
404 res['services'][0]['administrative-state'],
407 res['services'][0]['service-name'], 'service2')
409 res['services'][0]['connection-type'], 'service')
411 res['services'][0]['lifecycle-state'], 'planned')
414 def test_24_check_xc2_ROADMA(self):
415 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2")
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
419 self.assertDictEqual(
421 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
422 'wavelength-number': 2,
423 'opticalControlMode': 'power'
424 }, **res['roadm-connections'][0]),
425 res['roadm-connections'][0]
427 self.assertDictEqual(
428 {'src-if': 'DEG1-TTP-TXRX-2'},
429 res['roadm-connections'][0]['source'])
430 self.assertDictEqual(
431 {'dst-if': 'SRG1-PP2-TXRX-2'},
432 res['roadm-connections'][0]['destination'])
434 def test_25_check_topo_XPDRA(self):
435 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
436 self.assertEqual(response.status_code, requests.codes.ok)
437 res = response.json()
438 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
440 if ele['tp-id'] == 'XPDR1-NETWORK1':
441 self.assertEqual({u'frequency': 196.1, u'width': 40},
442 ele['org-openroadm-network-topology:'
443 'xpdr-network-attributes'][
445 if ele['tp-id'] == 'XPDR1-NETWORK2':
446 self.assertEqual({u'frequency': 196.05, u'width': 40},
447 ele['org-openroadm-network-topology:'
448 'xpdr-network-attributes'][
450 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
451 ele['tp-id'] == 'XPDR1-CLIENT3':
453 'org-openroadm-network-topology:xpdr-client-attributes',
457 def test_26_check_topo_ROADMA_SRG1(self):
458 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
459 self.assertEqual(response.status_code, requests.codes.ok)
460 res = response.json()
461 self.assertNotIn({u'index': 1}, res['node'][0][
462 u'org-openroadm-network-topology:srg-attributes'][
463 'available-wavelengths'])
464 self.assertNotIn({u'index': 2}, res['node'][0][
465 u'org-openroadm-network-topology:srg-attributes'][
466 'available-wavelengths'])
467 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
469 if ele['tp-id'] == 'SRG1-PP1-TXRX':
470 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
471 ele['org-openroadm-network-topology:'
472 'pp-attributes']['used-wavelength'])
473 self.assertNotIn({u'index': 2, u'frequency': 196.05,
475 ele['org-openroadm-network-topology:'
476 'pp-attributes']['used-wavelength'])
477 if ele['tp-id'] == 'SRG1-PP2-TXRX':
478 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
479 ele['org-openroadm-network-topology:'
480 'pp-attributes']['used-wavelength'])
481 self.assertNotIn({u'index': 1, u'frequency': 196.1,
483 ele['org-openroadm-network-topology:'
484 'pp-attributes']['used-wavelength'])
485 if ele['tp-id'] == 'SRG1-PP3-TXRX':
486 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
490 def test_27_check_topo_ROADMA_DEG1(self):
491 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
492 self.assertEqual(response.status_code, requests.codes.ok)
493 res = response.json()
494 self.assertNotIn({u'index': 1}, res['node'][0][
495 u'org-openroadm-network-topology:degree-attributes'][
496 'available-wavelengths'])
497 self.assertNotIn({u'index': 2}, res['node'][0][
498 u'org-openroadm-network-topology:degree-attributes'][
499 'available-wavelengths'])
500 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
502 if ele['tp-id'] == 'DEG1-CTP-TXRX':
503 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
504 ele['org-openroadm-network-topology:'
505 'ctp-attributes']['used-wavelengths'])
506 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
507 ele['org-openroadm-network-topology:'
508 'ctp-attributes']['used-wavelengths'])
509 if ele['tp-id'] == 'DEG1-TTP-TXRX':
510 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
511 ele['org-openroadm-network-topology:'
512 'tx-ttp-attributes']['used-wavelengths'])
513 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
514 ele['org-openroadm-network-topology:'
515 'tx-ttp-attributes']['used-wavelengths'])
518 # creation service test on a non-available resource
519 def test_28_create_eth_service3(self):
520 self.cr_serv_sample_data["input"]["service-name"] = "service3"
521 response = test_utils.service_create_request(self.cr_serv_sample_data)
522 self.assertEqual(response.status_code, requests.codes.ok)
523 res = response.json()
524 self.assertIn('PCE calculation in progress',
525 res['output']['configuration-response-common'][
527 self.assertIn('200', res['output']['configuration-response-common'][
529 time.sleep(self.WAITING)
531 # add a test that check the openroadm-service-list still only
532 # contains 2 elements
534 def test_29_delete_eth_service3(self):
535 response = test_utils.service_delete_request("service3")
536 self.assertEqual(response.status_code, requests.codes.ok)
537 res = response.json()
538 self.assertIn('Service \'service3\' does not exist in datastore',
539 res['output']['configuration-response-common'][
541 self.assertIn('500', res['output']['configuration-response-common'][
545 def test_30_delete_eth_service1(self):
546 response = test_utils.service_delete_request("service1")
547 self.assertEqual(response.status_code, requests.codes.ok)
548 res = response.json()
549 self.assertIn('Renderer service delete in progress',
550 res['output']['configuration-response-common'][
554 def test_31_delete_eth_service2(self):
555 response = test_utils.service_delete_request("service2")
556 self.assertEqual(response.status_code, requests.codes.ok)
557 res = response.json()
558 self.assertIn('Renderer service delete in progress',
559 res['output']['configuration-response-common'][
563 def test_32_check_no_xc_ROADMA(self):
564 response = test_utils.check_netconf_node_request("ROADMA01", "")
565 res = response.json()
566 self.assertEqual(response.status_code, requests.codes.ok)
567 self.assertNotIn('roadm-connections',
568 dict.keys(res['org-openroadm-device']))
571 def test_33_check_topo_XPDRA(self):
572 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
573 self.assertEqual(response.status_code, requests.codes.ok)
574 res = response.json()
575 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
577 if ((ele[u'org-openroadm-common-network:tp-type'] ==
579 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
580 'tp-id'] == 'XPDR1-CLIENT3')):
582 'org-openroadm-network-topology:xpdr-client-attributes',
584 elif (ele[u'org-openroadm-common-network:tp-type'] ==
586 self.assertIn(u'tail-equipment-id', dict.keys(
587 ele[u'org-openroadm-network-topology:'
588 u'xpdr-network-attributes']))
589 self.assertNotIn('wavelength', dict.keys(
590 ele[u'org-openroadm-network-topology:'
591 u'xpdr-network-attributes']))
594 def test_34_check_topo_ROADMA_SRG1(self):
595 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
596 self.assertEqual(response.status_code, requests.codes.ok)
597 res = response.json()
598 self.assertIn({u'index': 1}, res['node'][0][
599 u'org-openroadm-network-topology:srg-attributes'][
600 'available-wavelengths'])
601 self.assertIn({u'index': 2}, res['node'][0][
602 u'org-openroadm-network-topology:srg-attributes'][
603 'available-wavelengths'])
604 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
606 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
607 ele['tp-id'] == 'SRG1-PP1-TXRX':
608 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
611 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
615 def test_35_check_topo_ROADMA_DEG1(self):
616 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
617 self.assertEqual(response.status_code, requests.codes.ok)
618 res = response.json()
619 self.assertIn({u'index': 1}, res['node'][0][
620 u'org-openroadm-network-topology:degree-attributes'][
621 'available-wavelengths'])
622 self.assertIn({u'index': 2}, res['node'][0][
623 u'org-openroadm-network-topology:degree-attributes'][
624 'available-wavelengths'])
625 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
627 if ele['tp-id'] == 'DEG1-CTP-TXRX':
628 self.assertNotIn('org-openroadm-network-topology:'
629 'ctp-attributes', dict.keys(ele))
630 if ele['tp-id'] == 'DEG1-TTP-TXRX':
631 self.assertNotIn('org-openroadm-network-topology:'
632 'tx-ttp-attributes', dict.keys(ele))
635 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
636 def test_36_create_oc_service1(self):
637 self.cr_serv_sample_data["input"]["service-name"] = "service1"
638 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
639 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
640 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
641 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
642 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
643 response = test_utils.service_create_request(self.cr_serv_sample_data)
644 self.assertEqual(response.status_code, requests.codes.ok)
645 res = response.json()
646 self.assertIn('PCE calculation in progress',
647 res['output']['configuration-response-common'][
649 time.sleep(self.WAITING)
651 def test_37_get_oc_service1(self):
652 response = test_utils.get_service_list_request("services/service1")
653 self.assertEqual(response.status_code, requests.codes.ok)
654 res = response.json()
656 res['services'][0]['administrative-state'],
659 res['services'][0]['service-name'], 'service1')
661 res['services'][0]['connection-type'], 'roadm-line')
663 res['services'][0]['lifecycle-state'], 'planned')
666 def test_38_check_xc1_ROADMA(self):
667 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
668 self.assertEqual(response.status_code, requests.codes.ok)
669 res = response.json()
670 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
671 self.assertDictEqual(
673 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
674 'wavelength-number': 1,
675 'opticalControlMode': 'gainLoss',
676 'target-output-power': -3.0
677 }, **res['roadm-connections'][0]),
678 res['roadm-connections'][0]
680 self.assertDictEqual(
681 {'src-if': 'SRG1-PP1-TXRX-1'},
682 res['roadm-connections'][0]['source'])
683 self.assertDictEqual(
684 {'dst-if': 'DEG1-TTP-TXRX-1'},
685 res['roadm-connections'][0]['destination'])
688 def test_39_check_xc1_ROADMC(self):
689 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
690 self.assertEqual(response.status_code, requests.codes.ok)
691 res = response.json()
692 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
693 self.assertDictEqual(
695 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
696 'wavelength-number': 1,
697 'opticalControlMode': 'gainLoss',
698 'target-output-power': 2.0
699 }, **res['roadm-connections'][0]),
700 res['roadm-connections'][0]
702 self.assertDictEqual(
703 {'src-if': 'SRG1-PP1-TXRX-1'},
704 res['roadm-connections'][0]['source'])
705 self.assertDictEqual(
706 {'dst-if': 'DEG2-TTP-TXRX-1'},
707 res['roadm-connections'][0]['destination'])
710 def test_40_create_oc_service2(self):
711 self.cr_serv_sample_data["input"]["service-name"] = "service2"
712 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
713 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
714 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
715 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
716 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
717 response = test_utils.service_create_request(self.cr_serv_sample_data)
718 self.assertEqual(response.status_code, requests.codes.ok)
719 res = response.json()
720 self.assertIn('PCE calculation in progress',
721 res['output']['configuration-response-common'][
723 time.sleep(self.WAITING)
725 def test_41_get_oc_service2(self):
726 response = test_utils.get_service_list_request("services/service2")
727 self.assertEqual(response.status_code, requests.codes.ok)
728 res = response.json()
730 res['services'][0]['administrative-state'],
733 res['services'][0]['service-name'], 'service2')
735 res['services'][0]['connection-type'], 'roadm-line')
737 res['services'][0]['lifecycle-state'], 'planned')
740 def test_42_check_xc2_ROADMA(self):
741 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2")
742 self.assertEqual(response.status_code, requests.codes.ok)
743 res = response.json()
744 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
745 self.assertDictEqual(
747 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
748 'wavelength-number': 2,
749 'opticalControlMode': 'gainLoss',
750 'target-output-power': -3.0
751 }, **res['roadm-connections'][0]),
752 res['roadm-connections'][0]
754 self.assertDictEqual(
755 {'src-if': 'SRG1-PP2-TXRX-2'},
756 res['roadm-connections'][0]['source'])
757 self.assertDictEqual(
758 {'dst-if': 'DEG1-TTP-TXRX-2'},
759 res['roadm-connections'][0]['destination'])
762 def test_43_check_topo_ROADMA(self):
763 self.test_26_check_topo_ROADMA_SRG1()
764 self.test_27_check_topo_ROADMA_DEG1()
767 def test_44_delete_oc_service1(self):
768 response = test_utils.service_delete_request("service1")
769 self.assertEqual(response.status_code, requests.codes.ok)
770 res = response.json()
771 self.assertIn('Renderer service delete in progress',
772 res['output']['configuration-response-common'][
776 def test_45_delete_oc_service2(self):
777 response = test_utils.service_delete_request("service2")
778 self.assertEqual(response.status_code, requests.codes.ok)
779 res = response.json()
780 self.assertIn('Renderer service delete in progress',
781 res['output']['configuration-response-common'][
785 def test_46_get_no_oc_services(self):
787 response = test_utils.get_service_list_request("")
788 self.assertEqual(response.status_code, requests.codes.conflict)
789 res = response.json()
792 "error-type": "application",
793 "error-tag": "data-missing",
795 "Request could not be completed because the relevant data "
796 "model content does not exist"
798 res['errors']['error'])
801 def test_47_get_no_xc_ROADMA(self):
802 response = test_utils.check_netconf_node_request("ROADMA01", "")
803 self.assertEqual(response.status_code, requests.codes.ok)
804 res = response.json()
805 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
808 def test_48_check_topo_ROADMA(self):
809 self.test_34_check_topo_ROADMA_SRG1()
810 self.test_35_check_topo_ROADMA_DEG1()
812 def test_49_loop_create_eth_service(self):
813 for i in range(1, 6):
814 print("iteration number {}".format(i))
815 print("eth service creation")
816 self.test_11_create_eth_service1()
817 print("check xc in ROADMA01")
818 self.test_13_check_xc1_ROADMA()
819 print("check xc in ROADMC01")
820 self.test_14_check_xc1_ROADMC()
821 print("eth service deletion\n")
822 self.test_30_delete_eth_service1()
824 def test_50_loop_create_oc_service(self):
825 response = test_utils.get_service_list_request("services/service1")
826 if response.status_code != 404:
827 response = test_utils.service_delete_request("service1")
830 for i in range(1, 6):
831 print("iteration number {}".format(i))
832 print("oc service creation")
833 self.test_36_create_oc_service1()
834 print("check xc in ROADMA01")
835 self.test_38_check_xc1_ROADMA()
836 print("check xc in ROADMC01")
837 self.test_39_check_xc1_ROADMC()
838 print("oc service deletion\n")
839 self.test_44_delete_oc_service1()
841 def test_51_disconnect_XPDRA(self):
842 response = test_utils.unmount_device("XPDRA01")
843 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
845 def test_52_disconnect_XPDRC(self):
846 response = test_utils.unmount_device("XPDRC01")
847 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
849 def test_53_disconnect_ROADMA(self):
850 response = test_utils.unmount_device("ROADMA01")
851 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
853 def test_54_disconnect_ROADMC(self):
854 response = test_utils.unmount_device("ROADMC01")
855 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
858 if __name__ == "__main__":
859 unittest.main(verbosity=2)