2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
16 from common import test_utils
19 class TransportPCEFulltesting(unittest.TestCase):
22 cr_serv_sample_data = {"input": {
23 "sdnc-request-header": {
24 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
25 "rpc-action": "service-create",
26 "request-system-id": "appname",
27 "notification-url": "http://localhost:8585/NotificationServer/notify"
29 "service-name": "service1",
30 "common-id": "ASATT1234567",
31 "connection-type": "service",
33 "service-rate": "100",
35 "service-format": "Ethernet",
36 "clli": "SNJSCAMCJP8",
39 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
40 "port-type": "router",
41 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
42 "port-rack": "000000.00",
46 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
47 "lgx-port-name": "LGX Back.3",
48 "lgx-port-rack": "000000.00",
49 "lgx-port-shelf": "00"
54 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
55 "port-type": "router",
56 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
57 "port-rack": "000000.00",
61 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
62 "lgx-port-name": "LGX Back.4",
63 "lgx-port-rack": "000000.00",
64 "lgx-port-shelf": "00"
70 "service-rate": "100",
72 "service-format": "Ethernet",
73 "clli": "SNJSCAMCJT4",
76 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
77 "port-type": "router",
78 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
79 "port-rack": "000000.00",
83 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
84 "lgx-port-name": "LGX Back.29",
85 "lgx-port-rack": "000000.00",
86 "lgx-port-shelf": "00"
91 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
92 "port-type": "router",
93 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
94 "port-rack": "000000.00",
98 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
99 "lgx-port-name": "LGX Back.30",
100 "lgx-port-rack": "000000.00",
101 "lgx-port-shelf": "00"
106 "due-date": "2016-11-28T00:00:01Z",
107 "operator-contact": "pw1234"
111 WAITING = 20 # nominal value is 300
115 cls.processes = test_utils.start_tpce()
116 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
119 def tearDownClass(cls):
120 for process in cls.processes:
121 test_utils.shutdown_process(process)
122 print("all processes killed")
124 def setUp(self): # instruction executed before each test method
125 print("execution of {}".format(self.id().split(".")[-1]))
127 def test_01_connect_xpdrA(self):
128 response = test_utils.mount_device("XPDR-A1", 'xpdra')
129 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
131 def test_02_connect_xpdrC(self):
132 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
133 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
135 def test_03_connect_rdmA(self):
136 response = test_utils.mount_device("ROADM-A1", 'roadma')
137 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
139 def test_04_connect_rdmC(self):
140 response = test_utils.mount_device("ROADM-C1", 'roadmc')
141 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
144 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
145 "ROADM-A1", "1", "SRG1-PP1-TXRX")
146 self.assertEqual(response.status_code, requests.codes.ok)
147 res = response.json()
148 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
151 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
152 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
153 "ROADM-A1", "1", "SRG1-PP1-TXRX")
154 self.assertEqual(response.status_code, requests.codes.ok)
155 res = response.json()
156 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
159 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
160 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
161 "ROADM-C1", "1", "SRG1-PP1-TXRX")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
167 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
168 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
169 "ROADM-C1", "1", "SRG1-PP1-TXRX")
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
172 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
175 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
176 # Config ROADMA-ROADMC oms-attributes
178 "auto-spanloss": "true",
179 "spanloss-base": 11.4,
180 "spanloss-current": 12,
181 "engineered-spanloss": 12.2,
182 "link-concatenation": [{
185 "SRLG-length": 100000,
187 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
188 self.assertEqual(response.status_code, requests.codes.created)
190 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
191 # Config ROADMC-ROADMA oms-attributes
193 "auto-spanloss": "true",
194 "spanloss-base": 11.4,
195 "spanloss-current": 12,
196 "engineered-spanloss": 12.2,
197 "link-concatenation": [{
200 "SRLG-length": 100000,
202 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
203 self.assertEqual(response.status_code, requests.codes.created)
205 # test service-create for Eth service from xpdr to xpdr
206 def test_11_create_eth_service1(self):
207 self.cr_serv_sample_data["input"]["service-name"] = "service1"
208 response = test_utils.service_create_request(self.cr_serv_sample_data)
209 self.assertEqual(response.status_code, requests.codes.ok)
210 res = response.json()
211 self.assertIn('PCE calculation in progress',
212 res['output']['configuration-response-common']['response-message'])
213 time.sleep(self.WAITING)
215 def test_12_get_eth_service1(self):
216 response = test_utils.get_service_list_request("services/service1")
217 self.assertEqual(response.status_code, requests.codes.ok)
218 res = response.json()
220 res['services'][0]['administrative-state'], 'inService')
222 res['services'][0]['service-name'], 'service1')
224 res['services'][0]['connection-type'], 'service')
226 res['services'][0]['lifecycle-state'], 'planned')
229 def test_13_check_xc1_ROADMA(self):
230 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
231 self.assertEqual(response.status_code, requests.codes.ok)
232 res = response.json()
233 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
234 self.assertDictEqual(
236 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
237 'opticalControlMode': 'gainLoss',
238 'target-output-power': -3.0
239 }, **res['roadm-connections'][0]),
240 res['roadm-connections'][0]
242 self.assertDictEqual(
243 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
244 res['roadm-connections'][0]['source'])
245 self.assertDictEqual(
246 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
247 res['roadm-connections'][0]['destination'])
250 def test_14_check_xc1_ROADMC(self):
251 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
252 self.assertEqual(response.status_code, requests.codes.ok)
253 res = response.json()
254 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
255 self.assertDictEqual(
257 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
258 'opticalControlMode': 'gainLoss',
259 'target-output-power': -3.0
260 }, **res['roadm-connections'][0]),
261 res['roadm-connections'][0]
263 self.assertDictEqual(
264 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
265 res['roadm-connections'][0]['source'])
266 self.assertDictEqual(
267 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
268 res['roadm-connections'][0]['destination'])
271 def test_15_check_topo_XPDRA(self):
272 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
273 self.assertEqual(response.status_code, requests.codes.ok)
274 res = response.json()
275 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
277 if ele['tp-id'] == 'XPDR1-NETWORK1':
278 self.assertEqual({u'frequency': 196.1,
280 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
281 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
282 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
283 if ele['tp-id'] == 'XPDR1-NETWORK2':
284 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
287 def test_16_check_topo_ROADMA_SRG1(self):
288 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertNotIn({u'index': 1},
292 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
293 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
295 if ele['tp-id'] == 'SRG1-PP1-TXRX':
296 self.assertIn({u'index': 1, u'frequency': 196.1,
298 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
299 if ele['tp-id'] == 'SRG1-PP2-TXRX':
300 self.assertNotIn('used-wavelength', dict.keys(ele))
303 def test_17_check_topo_ROADMA_DEG1(self):
304 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
305 self.assertEqual(response.status_code, requests.codes.ok)
306 res = response.json()
307 self.assertNotIn({u'index': 1},
308 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
309 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
311 if ele['tp-id'] == 'DEG2-CTP-TXRX':
312 self.assertIn({u'index': 1, u'frequency': 196.1,
314 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
315 if ele['tp-id'] == 'DEG2-TTP-TXRX':
316 self.assertIn({u'index': 1, u'frequency': 196.1,
318 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
321 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
322 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
323 "ROADM-A1", "1", "SRG1-PP2-TXRX")
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
326 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
329 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
330 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
331 "ROADM-A1", "1", "SRG1-PP2-TXRX")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
337 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
338 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
339 "ROADM-C1", "1", "SRG1-PP2-TXRX")
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
342 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
345 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
346 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
347 "ROADM-C1", "1", "SRG1-PP2-TXRX")
348 self.assertEqual(response.status_code, requests.codes.ok)
349 res = response.json()
350 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
353 def test_22_create_eth_service2(self):
354 self.cr_serv_sample_data["input"]["service-name"] = "service2"
355 response = test_utils.service_create_request(self.cr_serv_sample_data)
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 self.assertIn('PCE calculation in progress',
359 res['output']['configuration-response-common']['response-message'])
360 time.sleep(self.WAITING)
362 def test_23_get_eth_service2(self):
363 response = test_utils.get_service_list_request("services/service2")
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
367 res['services'][0]['administrative-state'],
370 res['services'][0]['service-name'], 'service2')
372 res['services'][0]['connection-type'], 'service')
374 res['services'][0]['lifecycle-state'], 'planned')
377 def test_24_check_xc2_ROADMA(self):
378 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2")
379 self.assertEqual(response.status_code, requests.codes.ok)
380 res = response.json()
381 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
382 self.assertDictEqual(
384 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
385 'opticalControlMode': 'power'
386 }, **res['roadm-connections'][0]),
387 res['roadm-connections'][0]
389 self.assertDictEqual(
390 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
391 res['roadm-connections'][0]['source'])
392 self.assertDictEqual(
393 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
394 res['roadm-connections'][0]['destination'])
396 def test_25_check_topo_XPDRA(self):
397 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
400 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
402 if ele['tp-id'] == 'XPDR1-NETWORK1':
403 self.assertEqual({u'frequency': 196.1,
405 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
406 if ele['tp-id'] == 'XPDR1-NETWORK2':
407 self.assertEqual({u'frequency': 196.05,
409 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
410 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
411 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
414 def test_26_check_topo_ROADMA_SRG1(self):
415 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 self.assertNotIn({u'index': 1}, res['node'][0]
419 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
420 self.assertNotIn({u'index': 2}, res['node'][0]
421 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
422 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
424 if ele['tp-id'] == 'SRG1-PP1-TXRX':
425 self.assertIn({u'index': 1, u'frequency': 196.1,
427 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
428 self.assertNotIn({u'index': 2, u'frequency': 196.05,
430 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
431 if ele['tp-id'] == 'SRG1-PP2-TXRX':
432 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
433 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
434 self.assertNotIn({u'index': 1, u'frequency': 196.1,
436 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
437 if ele['tp-id'] == 'SRG1-PP3-TXRX':
438 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
441 def test_27_check_topo_ROADMA_DEG2(self):
442 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 self.assertNotIn({u'index': 1}, res['node'][0]
446 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
447 self.assertNotIn({u'index': 2}, res['node'][0]
448 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
449 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
451 if ele['tp-id'] == 'DEG2-CTP-TXRX':
452 self.assertIn({u'index': 1, u'frequency': 196.1,
454 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
455 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
456 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
457 if ele['tp-id'] == 'DEG2-TTP-TXRX':
458 self.assertIn({u'index': 1, u'frequency': 196.1,
460 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
461 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
462 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
465 # creation service test on a non-available resource
466 def test_28_create_eth_service3(self):
467 self.cr_serv_sample_data["input"]["service-name"] = "service3"
468 response = test_utils.service_create_request(self.cr_serv_sample_data)
469 self.assertEqual(response.status_code, requests.codes.ok)
470 res = response.json()
471 self.assertIn('PCE calculation in progress',
472 res['output']['configuration-response-common']['response-message'])
473 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
474 time.sleep(self.WAITING)
476 # add a test that check the openroadm-service-list still only contains 2 elements
477 def test_29_delete_eth_service3(self):
478 response = test_utils.service_delete_request("service3")
479 self.assertEqual(response.status_code, requests.codes.ok)
480 res = response.json()
481 self.assertIn('Service \'service3\' does not exist in datastore',
482 res['output']['configuration-response-common']['response-message'])
483 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
486 def test_30_delete_eth_service1(self):
487 response = test_utils.service_delete_request("service1")
488 self.assertEqual(response.status_code, requests.codes.ok)
489 res = response.json()
490 self.assertIn('Renderer service delete in progress',
491 res['output']['configuration-response-common']['response-message'])
494 def test_31_delete_eth_service2(self):
495 response = test_utils.service_delete_request("service2")
496 self.assertEqual(response.status_code, requests.codes.ok)
497 res = response.json()
498 self.assertIn('Renderer service delete in progress',
499 res['output']['configuration-response-common']['response-message'])
502 def test_32_check_no_xc_ROADMA(self):
503 response = test_utils.check_netconf_node_request("ROADM-A1", "")
504 res = response.json()
505 self.assertEqual(response.status_code, requests.codes.ok)
506 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
509 def test_33_check_topo_XPDRA(self):
510 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
511 self.assertEqual(response.status_code, requests.codes.ok)
512 res = response.json()
513 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
515 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
516 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
517 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
518 self.assertIn(u'tail-equipment-id',
519 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
520 self.assertNotIn('wavelength', dict.keys(
521 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
524 def test_34_check_topo_ROADMA_SRG1(self):
525 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
526 self.assertEqual(response.status_code, requests.codes.ok)
527 res = response.json()
528 self.assertIn({u'index': 1}, res['node'][0]
529 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
530 self.assertIn({u'index': 2}, res['node'][0]
531 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
532 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
534 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
535 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
537 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
540 def test_35_check_topo_ROADMA_DEG2(self):
541 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
542 self.assertEqual(response.status_code, requests.codes.ok)
543 res = response.json()
544 self.assertIn({u'index': 1}, res['node'][0]
545 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
546 self.assertIn({u'index': 2}, res['node'][0]
547 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
548 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
550 if ele['tp-id'] == 'DEG2-CTP-TXRX':
551 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
552 if ele['tp-id'] == 'DEG2-TTP-TXRX':
553 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
556 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
557 def test_36_create_oc_service1(self):
558 self.cr_serv_sample_data["input"]["service-name"] = "service1"
559 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
560 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
561 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
562 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
563 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
564 response = test_utils.service_create_request(self.cr_serv_sample_data)
565 self.assertEqual(response.status_code, requests.codes.ok)
566 res = response.json()
567 self.assertIn('PCE calculation in progress',
568 res['output']['configuration-response-common']['response-message'])
569 time.sleep(self.WAITING)
571 def test_37_get_oc_service1(self):
572 response = test_utils.get_service_list_request("services/service1")
573 self.assertEqual(response.status_code, requests.codes.ok)
574 res = response.json()
576 res['services'][0]['administrative-state'],
579 res['services'][0]['service-name'], 'service1')
581 res['services'][0]['connection-type'], 'roadm-line')
583 res['services'][0]['lifecycle-state'], 'planned')
586 def test_38_check_xc1_ROADMA(self):
587 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
588 self.assertEqual(response.status_code, requests.codes.ok)
589 res = response.json()
590 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
591 self.assertDictEqual(
593 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
594 'opticalControlMode': 'gainLoss',
595 'target-output-power': -3.0
596 }, **res['roadm-connections'][0]),
597 res['roadm-connections'][0]
599 self.assertDictEqual(
600 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
601 res['roadm-connections'][0]['source'])
602 self.assertDictEqual(
603 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
604 res['roadm-connections'][0]['destination'])
607 def test_39_check_xc1_ROADMC(self):
608 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
609 self.assertEqual(response.status_code, requests.codes.ok)
610 res = response.json()
611 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
612 self.assertDictEqual(
614 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
615 'opticalControlMode': 'gainLoss',
616 'target-output-power': -3.0
617 }, **res['roadm-connections'][0]),
618 res['roadm-connections'][0]
620 self.assertDictEqual(
621 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
622 res['roadm-connections'][0]['source'])
623 self.assertDictEqual(
624 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
625 res['roadm-connections'][0]['destination'])
628 def test_40_create_oc_service2(self):
629 self.cr_serv_sample_data["input"]["service-name"] = "service2"
630 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
631 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
632 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
633 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
634 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
635 response = test_utils.service_create_request(self.cr_serv_sample_data)
636 self.assertEqual(response.status_code, requests.codes.ok)
637 res = response.json()
638 self.assertIn('PCE calculation in progress',
639 res['output']['configuration-response-common']['response-message'])
640 time.sleep(self.WAITING)
642 def test_41_get_oc_service2(self):
643 response = test_utils.get_service_list_request("services/service2")
644 self.assertEqual(response.status_code, requests.codes.ok)
645 res = response.json()
647 res['services'][0]['administrative-state'],
650 res['services'][0]['service-name'], 'service2')
652 res['services'][0]['connection-type'], 'roadm-line')
654 res['services'][0]['lifecycle-state'], 'planned')
657 def test_42_check_xc2_ROADMA(self):
658 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2")
659 self.assertEqual(response.status_code, requests.codes.ok)
660 res = response.json()
661 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
662 self.assertDictEqual(
664 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
665 'opticalControlMode': 'gainLoss',
666 'target-output-power': -3.0
667 }, **res['roadm-connections'][0]),
668 res['roadm-connections'][0]
670 self.assertDictEqual(
671 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
672 res['roadm-connections'][0]['source'])
673 self.assertDictEqual(
674 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
675 res['roadm-connections'][0]['destination'])
678 def test_43_check_topo_ROADMA(self):
679 self.test_26_check_topo_ROADMA_SRG1()
680 self.test_27_check_topo_ROADMA_DEG2()
683 def test_44_delete_oc_service1(self):
684 response = test_utils.service_delete_request("service1")
685 self.assertEqual(response.status_code, requests.codes.ok)
686 res = response.json()
687 self.assertIn('Renderer service delete in progress',
688 res['output']['configuration-response-common']['response-message'])
691 def test_45_delete_oc_service2(self):
692 response = test_utils.service_delete_request("service2")
693 self.assertEqual(response.status_code, requests.codes.ok)
694 res = response.json()
695 self.assertIn('Renderer service delete in progress',
696 res['output']['configuration-response-common']['response-message'])
699 def test_46_get_no_oc_services(self):
701 response = test_utils.get_service_list_request("")
702 self.assertEqual(response.status_code, requests.codes.conflict)
703 res = response.json()
705 {"error-type": "application", "error-tag": "data-missing",
706 "error-message": "Request could not be completed because the relevant data model content does not exist"},
707 res['errors']['error'])
710 def test_47_get_no_xc_ROADMA(self):
711 response = test_utils.check_netconf_node_request("ROADM-A1", "")
712 self.assertEqual(response.status_code, requests.codes.ok)
713 res = response.json()
714 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
717 def test_48_check_topo_ROADMA(self):
718 self.test_34_check_topo_ROADMA_SRG1()
719 self.test_35_check_topo_ROADMA_DEG2()
721 def test_49_loop_create_eth_service(self):
722 for i in range(1, 6):
723 print("iteration number {}".format(i))
724 print("eth service creation")
725 self.test_11_create_eth_service1()
726 print("check xc in ROADM-A1")
727 self.test_13_check_xc1_ROADMA()
728 print("check xc in ROADM-C1")
729 self.test_14_check_xc1_ROADMC()
730 print("eth service deletion\n")
731 self.test_30_delete_eth_service1()
733 def test_50_loop_create_oc_service(self):
734 response = test_utils.get_service_list_request("services/service1")
735 if response.status_code != 404:
736 response = test_utils.service_delete_request("service1")
739 for i in range(1, 6):
740 print("iteration number {}".format(i))
741 print("oc service creation")
742 self.test_36_create_oc_service1()
743 print("check xc in ROADM-A1")
744 self.test_38_check_xc1_ROADMA()
745 print("check xc in ROADM-C1")
746 self.test_39_check_xc1_ROADMC()
747 print("oc service deletion\n")
748 self.test_44_delete_oc_service1()
750 def test_51_disconnect_XPDRA(self):
751 response = test_utils.unmount_device("XPDR-A1")
752 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
754 def test_52_disconnect_XPDRC(self):
755 response = test_utils.unmount_device("XPDR-C1")
756 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
758 def test_53_disconnect_ROADMA(self):
759 response = test_utils.unmount_device("ROADM-A1")
760 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
762 def test_54_disconnect_ROADMC(self):
763 response = test_utils.unmount_device("ROADM-C1")
764 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
767 if __name__ == "__main__":
768 unittest.main(verbosity=2)