2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
19 from common import test_utils
20 from common.flexgrid_utils import INDEX_1_USED_FREQ_MAP, INDEX_1_2_USED_FREQ_MAP, AVAILABLE_FREQ_MAP, check_freq_map
23 class TransportPCEFulltesting(unittest.TestCase):
24 cr_serv_sample_data = {"input": {
25 "sdnc-request-header": {
26 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
27 "rpc-action": "service-create",
28 "request-system-id": "appname",
30 "http://localhost:8585/NotificationServer/notify"
32 "service-name": "service1",
33 "common-id": "ASATT1234567",
34 "connection-type": "service",
36 "service-rate": "100",
38 "service-format": "Ethernet",
39 "clli": "SNJSCAMCJP8",
43 "ROUTER_SNJSCAMCJP8_000000.00_00",
44 "port-type": "router",
45 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
46 "port-rack": "000000.00",
51 "LGX Panel_SNJSCAMCJP8_000000.00_00",
52 "lgx-port-name": "LGX Back.3",
53 "lgx-port-rack": "000000.00",
54 "lgx-port-shelf": "00"
60 "ROUTER_SNJSCAMCJP8_000000.00_00",
61 "port-type": "router",
62 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
63 "port-rack": "000000.00",
68 "LGX Panel_SNJSCAMCJP8_000000.00_00",
69 "lgx-port-name": "LGX Back.4",
70 "lgx-port-rack": "000000.00",
71 "lgx-port-shelf": "00"
77 "service-rate": "100",
79 "service-format": "Ethernet",
80 "clli": "SNJSCAMCJT4",
84 "ROUTER_SNJSCAMCJT4_000000.00_00",
85 "port-type": "router",
86 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
87 "port-rack": "000000.00",
92 "LGX Panel_SNJSCAMCJT4_000000.00_00",
93 "lgx-port-name": "LGX Back.29",
94 "lgx-port-rack": "000000.00",
95 "lgx-port-shelf": "00"
101 "ROUTER_SNJSCAMCJT4_000000.00_00",
102 "port-type": "router",
103 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
104 "port-rack": "000000.00",
109 "LGX Panel_SNJSCAMCJT4_000000.00_00",
110 "lgx-port-name": "LGX Back.30",
111 "lgx-port-rack": "000000.00",
112 "lgx-port-shelf": "00"
117 "due-date": "2016-11-28T00:00:01Z",
118 "operator-contact": "pw1234"
126 cls.processes = test_utils.start_tpce()
127 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
130 def tearDownClass(cls):
131 # pylint: disable=not-an-iterable
132 for process in cls.processes:
133 test_utils.shutdown_process(process)
134 print("all processes killed")
136 def setUp(self): # instruction executed before each test method
137 print("execution of {}".format(self.id().split(".")[-1]))
139 # connect netconf devices
140 def test_01_connect_xpdrA(self):
141 response = test_utils.mount_device("XPDRA01", 'xpdra')
142 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
144 def test_02_connect_xpdrC(self):
145 response = test_utils.mount_device("XPDRC01", 'xpdrc')
146 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
148 def test_03_connect_rdmA(self):
149 response = test_utils.mount_device("ROADMA01", 'roadma-full')
150 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
152 def test_04_connect_rdmC(self):
153 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
154 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
156 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
157 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
158 "ROADMA01", "1", "SRG1-PP1-TXRX")
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
161 self.assertIn('Xponder Roadm Link created successfully',
162 res["output"]["result"])
165 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
166 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
167 "ROADMA01", "1", "SRG1-PP1-TXRX")
168 self.assertEqual(response.status_code, requests.codes.ok)
169 res = response.json()
170 self.assertIn('Roadm Xponder links created successfully',
171 res["output"]["result"])
174 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
175 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
176 "ROADMC01", "1", "SRG1-PP1-TXRX")
177 self.assertEqual(response.status_code, requests.codes.ok)
178 res = response.json()
179 self.assertIn('Xponder Roadm Link created successfully',
180 res["output"]["result"])
183 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
184 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
185 "ROADMC01", "1", "SRG1-PP1-TXRX")
186 self.assertEqual(response.status_code, requests.codes.ok)
187 res = response.json()
188 self.assertIn('Roadm Xponder links created successfully',
189 res["output"]["result"])
192 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
193 # Config ROADMA-ROADMC oms-attributes
195 "auto-spanloss": "true",
196 "spanloss-base": 11.4,
197 "spanloss-current": 12,
198 "engineered-spanloss": 12.2,
199 "link-concatenation": [{
202 "SRLG-length": 100000,
204 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
205 self.assertEqual(response.status_code, requests.codes.created)
207 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
208 # Config ROADMC-ROADMA oms-attributes
210 "auto-spanloss": "true",
211 "spanloss-base": 11.4,
212 "spanloss-current": 12,
213 "engineered-spanloss": 12.2,
214 "link-concatenation": [{
217 "SRLG-length": 100000,
219 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
220 self.assertEqual(response.status_code, requests.codes.created)
222 # test service-create for Eth service from xpdr to xpdr
223 def test_11_create_eth_service1(self):
224 self.cr_serv_sample_data["input"]["service-name"] = "service1"
225 response = test_utils.service_create_request(self.cr_serv_sample_data)
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
228 self.assertIn('PCE calculation in progress',
229 res['output']['configuration-response-common'][
231 time.sleep(self.WAITING)
233 def test_12_get_eth_service1(self):
234 response = test_utils.get_service_list_request("services/service1")
235 self.assertEqual(response.status_code, requests.codes.ok)
236 res = response.json()
238 res['services'][0]['administrative-state'],
241 res['services'][0]['service-name'], 'service1')
243 res['services'][0]['connection-type'], 'service')
245 res['services'][0]['lifecycle-state'], 'planned')
248 def test_13_check_xc1_ROADMA(self):
249 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
252 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
253 self.assertDictEqual(
255 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
256 'wavelength-number': 1,
257 'opticalControlMode': 'gainLoss',
258 'target-output-power': -3.0
259 }, **res['roadm-connections'][0]),
260 res['roadm-connections'][0]
262 self.assertDictEqual(
263 {'src-if': 'SRG1-PP1-TXRX-1'},
264 res['roadm-connections'][0]['source'])
265 self.assertDictEqual(
266 {'dst-if': 'DEG1-TTP-TXRX-1'},
267 res['roadm-connections'][0]['destination'])
270 def test_14_check_xc1_ROADMC(self):
271 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
272 self.assertEqual(response.status_code, requests.codes.ok)
273 res = response.json()
274 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
275 self.assertDictEqual(
277 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
278 'wavelength-number': 1,
279 'opticalControlMode': 'gainLoss',
280 'target-output-power': 2.0
281 }, **res['roadm-connections'][0]),
282 res['roadm-connections'][0]
284 self.assertDictEqual(
285 {'src-if': 'SRG1-PP1-TXRX-1'},
286 res['roadm-connections'][0]['source'])
287 self.assertDictEqual(
288 {'dst-if': 'DEG2-TTP-TXRX-1'},
289 res['roadm-connections'][0]['destination'])
292 def test_15_check_topo_XPDRA(self):
293 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
294 self.assertEqual(response.status_code, requests.codes.ok)
295 res = response.json()
296 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
298 if ele['tp-id'] == 'XPDR1-NETWORK1':
299 self.assertEqual({u'frequency': 196.1, u'width': 40},
300 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
301 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
303 'org-openroadm-network-topology:xpdr-client-attributes',
305 if ele['tp-id'] == 'XPDR1-NETWORK2':
307 'org-openroadm-network-topology:xpdr-network-attributes',
311 def test_16_check_topo_ROADMA_SRG1(self):
312 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
313 self.assertEqual(response.status_code, requests.codes.ok)
314 res = response.json()
315 freq_map = base64.b64decode(
316 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
317 freq_map_array = [int(x) for x in freq_map]
318 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
319 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
321 if ele['tp-id'] == 'SRG1-PP1-TXRX':
322 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
323 ele['org-openroadm-network-topology:'
324 'pp-attributes']['used-wavelength']
326 if ele['tp-id'] == 'SRG1-PP2-TXRX':
327 self.assertNotIn('used-wavelength', dict.keys(ele))
330 def test_17_check_topo_ROADMA_DEG1(self):
331 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 freq_map = base64.b64decode(
335 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
336 freq_map_array = [int(x) for x in freq_map]
337 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
338 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
340 if ele['tp-id'] == 'DEG1-CTP-TXRX':
341 self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
342 u'effective-bits': 768, u'freq-map': INDEX_1_USED_FREQ_MAP},
343 ele['org-openroadm-network-topology:'
346 if ele['tp-id'] == 'DEG1-TTP-TXRX':
347 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
348 ele['org-openroadm-network-topology:'
349 'tx-ttp-attributes'][
353 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
354 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
355 "ROADMA01", "1", "SRG1-PP2-TXRX")
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 self.assertIn('Xponder Roadm Link created successfully',
359 res["output"]["result"])
362 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
363 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
364 "ROADMA01", "1", "SRG1-PP2-TXRX")
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
367 self.assertIn('Roadm Xponder links created successfully',
368 res["output"]["result"])
371 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
372 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
373 "ROADMC01", "1", "SRG1-PP2-TXRX")
374 self.assertEqual(response.status_code, requests.codes.ok)
375 res = response.json()
376 self.assertIn('Xponder Roadm Link created successfully',
377 res["output"]["result"])
380 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
381 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
382 "ROADMC01", "1", "SRG1-PP2-TXRX")
383 self.assertEqual(response.status_code, requests.codes.ok)
384 res = response.json()
385 self.assertIn('Roadm Xponder links created successfully',
386 res["output"]["result"])
389 def test_22_create_eth_service2(self):
390 self.cr_serv_sample_data["input"]["service-name"] = "service2"
391 response = test_utils.service_create_request(self.cr_serv_sample_data)
392 self.assertEqual(response.status_code, requests.codes.ok)
393 res = response.json()
394 self.assertIn('PCE calculation in progress',
395 res['output']['configuration-response-common'][
397 time.sleep(self.WAITING)
399 def test_23_get_eth_service2(self):
400 response = test_utils.get_service_list_request("services/service2")
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
404 res['services'][0]['administrative-state'],
407 res['services'][0]['service-name'], 'service2')
409 res['services'][0]['connection-type'], 'service')
411 res['services'][0]['lifecycle-state'], 'planned')
414 def test_24_check_xc2_ROADMA(self):
415 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2")
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
419 self.assertDictEqual(
421 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
422 'wavelength-number': 2,
423 'opticalControlMode': 'power'
424 }, **res['roadm-connections'][0]),
425 res['roadm-connections'][0]
427 self.assertDictEqual(
428 {'src-if': 'DEG1-TTP-TXRX-2'},
429 res['roadm-connections'][0]['source'])
430 self.assertDictEqual(
431 {'dst-if': 'SRG1-PP2-TXRX-2'},
432 res['roadm-connections'][0]['destination'])
434 def test_25_check_topo_XPDRA(self):
435 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
436 self.assertEqual(response.status_code, requests.codes.ok)
437 res = response.json()
438 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
440 if ele['tp-id'] == 'XPDR1-NETWORK1':
441 self.assertEqual({u'frequency': 196.1, u'width': 40},
442 ele['org-openroadm-network-topology:'
443 'xpdr-network-attributes'][
445 if ele['tp-id'] == 'XPDR1-NETWORK2':
446 self.assertEqual({u'frequency': 196.05, u'width': 40},
447 ele['org-openroadm-network-topology:'
448 'xpdr-network-attributes'][
450 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
451 ele['tp-id'] == 'XPDR1-CLIENT3':
453 'org-openroadm-network-topology:xpdr-client-attributes',
457 def test_26_check_topo_ROADMA_SRG1(self):
458 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
459 self.assertEqual(response.status_code, requests.codes.ok)
460 res = response.json()
461 freq_map = base64.b64decode(
462 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
463 freq_map_array = [int(x) for x in freq_map]
464 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
465 self.assertEqual(freq_map_array[1], 0, "Index 2 should not be available")
466 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
468 if ele['tp-id'] == 'SRG1-PP1-TXRX':
469 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
470 ele['org-openroadm-network-topology:'
471 'pp-attributes']['used-wavelength'])
472 self.assertNotIn({u'index': 2, u'frequency': 196.05,
474 ele['org-openroadm-network-topology:'
475 'pp-attributes']['used-wavelength'])
476 if ele['tp-id'] == 'SRG1-PP2-TXRX':
477 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
478 ele['org-openroadm-network-topology:'
479 'pp-attributes']['used-wavelength'])
480 self.assertNotIn({u'index': 1, u'frequency': 196.1,
482 ele['org-openroadm-network-topology:'
483 'pp-attributes']['used-wavelength'])
484 if ele['tp-id'] == 'SRG1-PP3-TXRX':
485 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
489 def test_27_check_topo_ROADMA_DEG1(self):
490 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
491 self.assertEqual(response.status_code, requests.codes.ok)
492 res = response.json()
493 freq_map = base64.b64decode(
494 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
495 freq_map_array = [int(x) for x in freq_map]
496 self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
497 self.assertEqual(freq_map_array[1], 0, "Index 2 should not be available")
498 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
500 if ele['tp-id'] == 'DEG1-CTP-TXRX':
501 self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
502 u'effective-bits': 768, u'freq-map': INDEX_1_2_USED_FREQ_MAP},
503 ele['org-openroadm-network-topology:'
506 if ele['tp-id'] == 'DEG1-TTP-TXRX':
507 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
508 ele['org-openroadm-network-topology:'
509 'tx-ttp-attributes']['used-wavelengths'])
510 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
511 ele['org-openroadm-network-topology:'
512 'tx-ttp-attributes']['used-wavelengths'])
515 # creation service test on a non-available resource
516 def test_28_create_eth_service3(self):
517 self.cr_serv_sample_data["input"]["service-name"] = "service3"
518 response = test_utils.service_create_request(self.cr_serv_sample_data)
519 self.assertEqual(response.status_code, requests.codes.ok)
520 res = response.json()
521 self.assertIn('PCE calculation in progress',
522 res['output']['configuration-response-common'][
524 self.assertIn('200', res['output']['configuration-response-common'][
526 time.sleep(self.WAITING)
528 # add a test that check the openroadm-service-list still only
529 # contains 2 elements
531 def test_29_delete_eth_service3(self):
532 response = test_utils.service_delete_request("service3")
533 self.assertEqual(response.status_code, requests.codes.ok)
534 res = response.json()
535 self.assertIn('Service \'service3\' does not exist in datastore',
536 res['output']['configuration-response-common'][
538 self.assertIn('500', res['output']['configuration-response-common'][
542 def test_30_delete_eth_service1(self):
543 response = test_utils.service_delete_request("service1")
544 self.assertEqual(response.status_code, requests.codes.ok)
545 res = response.json()
546 self.assertIn('Renderer service delete in progress',
547 res['output']['configuration-response-common'][
551 def test_31_delete_eth_service2(self):
552 response = test_utils.service_delete_request("service2")
553 self.assertEqual(response.status_code, requests.codes.ok)
554 res = response.json()
555 self.assertIn('Renderer service delete in progress',
556 res['output']['configuration-response-common'][
560 def test_32_check_no_xc_ROADMA(self):
561 response = test_utils.check_netconf_node_request("ROADMA01", "")
562 res = response.json()
563 self.assertEqual(response.status_code, requests.codes.ok)
564 self.assertNotIn('roadm-connections',
565 dict.keys(res['org-openroadm-device']))
568 def test_33_check_topo_XPDRA(self):
569 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
570 self.assertEqual(response.status_code, requests.codes.ok)
571 res = response.json()
572 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
574 if ((ele[u'org-openroadm-common-network:tp-type'] ==
576 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
577 'tp-id'] == 'XPDR1-CLIENT3')):
579 'org-openroadm-network-topology:xpdr-client-attributes',
581 elif (ele[u'org-openroadm-common-network:tp-type'] ==
583 self.assertIn(u'tail-equipment-id', dict.keys(
584 ele[u'org-openroadm-network-topology:'
585 u'xpdr-network-attributes']))
586 self.assertNotIn('wavelength', dict.keys(
587 ele[u'org-openroadm-network-topology:'
588 u'xpdr-network-attributes']))
591 def test_34_check_topo_ROADMA_SRG1(self):
592 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
593 self.assertEqual(response.status_code, requests.codes.ok)
594 res = response.json()
595 freq_map = base64.b64decode(
596 res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
597 self.assertTrue(check_freq_map(freq_map), "Index 1 and 2 should be available")
598 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
600 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
603 def test_35_check_topo_ROADMA_DEG1(self):
604 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
605 self.assertEqual(response.status_code, requests.codes.ok)
606 res = response.json()
607 freq_map = base64.b64decode(
608 res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
609 self.assertTrue(check_freq_map(freq_map), "Index 1 and 2 should be available")
610 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
612 if ele['tp-id'] == 'DEG1-CTP-TXRX':
613 self.assertEqual(ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'],
615 if ele['tp-id'] == 'DEG1-TTP-TXRX':
616 self.assertNotIn('org-openroadm-network-topology:'
617 'tx-ttp-attributes', dict.keys(ele))
620 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
621 def test_36_create_oc_service1(self):
622 self.cr_serv_sample_data["input"]["service-name"] = "service1"
623 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
624 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
625 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
626 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
627 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
628 response = test_utils.service_create_request(self.cr_serv_sample_data)
629 self.assertEqual(response.status_code, requests.codes.ok)
630 res = response.json()
631 self.assertIn('PCE calculation in progress',
632 res['output']['configuration-response-common'][
634 time.sleep(self.WAITING)
636 def test_37_get_oc_service1(self):
637 response = test_utils.get_service_list_request("services/service1")
638 self.assertEqual(response.status_code, requests.codes.ok)
639 res = response.json()
641 res['services'][0]['administrative-state'],
644 res['services'][0]['service-name'], 'service1')
646 res['services'][0]['connection-type'], 'roadm-line')
648 res['services'][0]['lifecycle-state'], 'planned')
651 def test_38_check_xc1_ROADMA(self):
652 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
653 self.assertEqual(response.status_code, requests.codes.ok)
654 res = response.json()
655 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
656 self.assertDictEqual(
658 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
659 'wavelength-number': 1,
660 'opticalControlMode': 'gainLoss',
661 'target-output-power': -3.0
662 }, **res['roadm-connections'][0]),
663 res['roadm-connections'][0]
665 self.assertDictEqual(
666 {'src-if': 'SRG1-PP1-TXRX-1'},
667 res['roadm-connections'][0]['source'])
668 self.assertDictEqual(
669 {'dst-if': 'DEG1-TTP-TXRX-1'},
670 res['roadm-connections'][0]['destination'])
673 def test_39_check_xc1_ROADMC(self):
674 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
675 self.assertEqual(response.status_code, requests.codes.ok)
676 res = response.json()
677 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
678 self.assertDictEqual(
680 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
681 'wavelength-number': 1,
682 'opticalControlMode': 'gainLoss',
683 'target-output-power': 2.0
684 }, **res['roadm-connections'][0]),
685 res['roadm-connections'][0]
687 self.assertDictEqual(
688 {'src-if': 'SRG1-PP1-TXRX-1'},
689 res['roadm-connections'][0]['source'])
690 self.assertDictEqual(
691 {'dst-if': 'DEG2-TTP-TXRX-1'},
692 res['roadm-connections'][0]['destination'])
695 def test_40_create_oc_service2(self):
696 self.cr_serv_sample_data["input"]["service-name"] = "service2"
697 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
698 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
699 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
700 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
701 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
702 response = test_utils.service_create_request(self.cr_serv_sample_data)
703 self.assertEqual(response.status_code, requests.codes.ok)
704 res = response.json()
705 self.assertIn('PCE calculation in progress',
706 res['output']['configuration-response-common'][
708 time.sleep(self.WAITING)
710 def test_41_get_oc_service2(self):
711 response = test_utils.get_service_list_request("services/service2")
712 self.assertEqual(response.status_code, requests.codes.ok)
713 res = response.json()
715 res['services'][0]['administrative-state'],
718 res['services'][0]['service-name'], 'service2')
720 res['services'][0]['connection-type'], 'roadm-line')
722 res['services'][0]['lifecycle-state'], 'planned')
725 def test_42_check_xc2_ROADMA(self):
726 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2")
727 self.assertEqual(response.status_code, requests.codes.ok)
728 res = response.json()
729 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
730 self.assertDictEqual(
732 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
733 'wavelength-number': 2,
734 'opticalControlMode': 'gainLoss',
735 'target-output-power': -3.0
736 }, **res['roadm-connections'][0]),
737 res['roadm-connections'][0]
739 self.assertDictEqual(
740 {'src-if': 'SRG1-PP2-TXRX-2'},
741 res['roadm-connections'][0]['source'])
742 self.assertDictEqual(
743 {'dst-if': 'DEG1-TTP-TXRX-2'},
744 res['roadm-connections'][0]['destination'])
747 def test_43_check_topo_ROADMA(self):
748 self.test_26_check_topo_ROADMA_SRG1()
749 self.test_27_check_topo_ROADMA_DEG1()
752 def test_44_delete_oc_service1(self):
753 response = test_utils.service_delete_request("service1")
754 self.assertEqual(response.status_code, requests.codes.ok)
755 res = response.json()
756 self.assertIn('Renderer service delete in progress',
757 res['output']['configuration-response-common'][
761 def test_45_delete_oc_service2(self):
762 response = test_utils.service_delete_request("service2")
763 self.assertEqual(response.status_code, requests.codes.ok)
764 res = response.json()
765 self.assertIn('Renderer service delete in progress',
766 res['output']['configuration-response-common'][
770 def test_46_get_no_oc_services(self):
772 response = test_utils.get_service_list_request("")
773 self.assertEqual(response.status_code, requests.codes.conflict)
774 res = response.json()
777 "error-type": "application",
778 "error-tag": "data-missing",
780 "Request could not be completed because the relevant data "
781 "model content does not exist"
783 res['errors']['error'])
786 def test_47_get_no_xc_ROADMA(self):
787 response = test_utils.check_netconf_node_request("ROADMA01", "")
788 self.assertEqual(response.status_code, requests.codes.ok)
789 res = response.json()
790 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
793 def test_48_check_topo_ROADMA(self):
794 self.test_34_check_topo_ROADMA_SRG1()
795 self.test_35_check_topo_ROADMA_DEG1()
797 def test_49_loop_create_eth_service(self):
798 for i in range(1, 6):
799 print("iteration number {}".format(i))
800 print("eth service creation")
801 self.test_11_create_eth_service1()
802 print("check xc in ROADMA01")
803 self.test_13_check_xc1_ROADMA()
804 print("check xc in ROADMC01")
805 self.test_14_check_xc1_ROADMC()
806 print("eth service deletion\n")
807 self.test_30_delete_eth_service1()
809 def test_50_loop_create_oc_service(self):
810 response = test_utils.get_service_list_request("services/service1")
811 if response.status_code != 404:
812 response = test_utils.service_delete_request("service1")
815 for i in range(1, 6):
816 print("iteration number {}".format(i))
817 print("oc service creation")
818 self.test_36_create_oc_service1()
819 print("check xc in ROADMA01")
820 self.test_38_check_xc1_ROADMA()
821 print("check xc in ROADMC01")
822 self.test_39_check_xc1_ROADMC()
823 print("oc service deletion\n")
824 self.test_44_delete_oc_service1()
826 def test_51_disconnect_XPDRA(self):
827 response = test_utils.unmount_device("XPDRA01")
828 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
830 def test_52_disconnect_XPDRC(self):
831 response = test_utils.unmount_device("XPDRC01")
832 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
834 def test_53_disconnect_ROADMA(self):
835 response = test_utils.unmount_device("ROADMA01")
836 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
838 def test_54_disconnect_ROADMC(self):
839 response = test_utils.unmount_device("ROADMC01")
840 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
843 if __name__ == "__main__":
844 unittest.main(verbosity=2)