2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
11 # pylint: disable=no-member
12 # pylint: disable=too-many-public-methods
17 from common import test_utils
20 class TransportPCEFulltesting(unittest.TestCase):
23 cr_serv_sample_data = {"input": {
24 "sdnc-request-header": {
25 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
26 "rpc-action": "service-create",
27 "request-system-id": "appname",
28 "notification-url": "http://localhost:8585/NotificationServer/notify"
30 "service-name": "service1",
31 "common-id": "ASATT1234567",
32 "connection-type": "service",
34 "service-rate": "100",
36 "service-format": "Ethernet",
37 "clli": "SNJSCAMCJP8",
40 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
41 "port-type": "router",
42 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
43 "port-rack": "000000.00",
47 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
48 "lgx-port-name": "LGX Back.3",
49 "lgx-port-rack": "000000.00",
50 "lgx-port-shelf": "00"
55 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
56 "port-type": "router",
57 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
58 "port-rack": "000000.00",
62 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
63 "lgx-port-name": "LGX Back.4",
64 "lgx-port-rack": "000000.00",
65 "lgx-port-shelf": "00"
71 "service-rate": "100",
73 "service-format": "Ethernet",
74 "clli": "SNJSCAMCJT4",
77 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
78 "port-type": "router",
79 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
80 "port-rack": "000000.00",
84 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
85 "lgx-port-name": "LGX Back.29",
86 "lgx-port-rack": "000000.00",
87 "lgx-port-shelf": "00"
92 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
93 "port-type": "router",
94 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
95 "port-rack": "000000.00",
99 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
100 "lgx-port-name": "LGX Back.30",
101 "lgx-port-rack": "000000.00",
102 "lgx-port-shelf": "00"
107 "due-date": "2016-11-28T00:00:01Z",
108 "operator-contact": "pw1234"
112 WAITING = 20 # nominal value is 300
116 cls.processes = test_utils.start_tpce()
117 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
120 def tearDownClass(cls):
121 for process in cls.processes:
122 test_utils.shutdown_process(process)
123 print("all processes killed")
125 def setUp(self): # instruction executed before each test method
126 print("execution of {}".format(self.id().split(".")[-1]))
128 def test_01_connect_xpdrA(self):
129 response = test_utils.mount_device("XPDR-A1", 'xpdra')
130 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
132 def test_02_connect_xpdrC(self):
133 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
134 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
136 def test_03_connect_rdmA(self):
137 response = test_utils.mount_device("ROADM-A1", 'roadma')
138 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
140 def test_04_connect_rdmC(self):
141 response = test_utils.mount_device("ROADM-C1", 'roadmc')
142 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
144 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
145 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
146 "ROADM-A1", "1", "SRG1-PP1-TXRX")
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
149 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
152 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
153 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
154 "ROADM-A1", "1", "SRG1-PP1-TXRX")
155 self.assertEqual(response.status_code, requests.codes.ok)
156 res = response.json()
157 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
160 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
161 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
162 "ROADM-C1", "1", "SRG1-PP1-TXRX")
163 self.assertEqual(response.status_code, requests.codes.ok)
164 res = response.json()
165 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
168 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
169 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
170 "ROADM-C1", "1", "SRG1-PP1-TXRX")
171 self.assertEqual(response.status_code, requests.codes.ok)
172 res = response.json()
173 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
176 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
177 # Config ROADMA-ROADMC oms-attributes
179 "auto-spanloss": "true",
180 "spanloss-base": 11.4,
181 "spanloss-current": 12,
182 "engineered-spanloss": 12.2,
183 "link-concatenation": [{
186 "SRLG-length": 100000,
188 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
189 self.assertEqual(response.status_code, requests.codes.created)
191 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
192 # Config ROADMC-ROADMA oms-attributes
194 "auto-spanloss": "true",
195 "spanloss-base": 11.4,
196 "spanloss-current": 12,
197 "engineered-spanloss": 12.2,
198 "link-concatenation": [{
201 "SRLG-length": 100000,
203 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
204 self.assertEqual(response.status_code, requests.codes.created)
206 # test service-create for Eth service from xpdr to xpdr
207 def test_11_create_eth_service1(self):
208 self.cr_serv_sample_data["input"]["service-name"] = "service1"
209 response = test_utils.service_create_request(self.cr_serv_sample_data)
210 self.assertEqual(response.status_code, requests.codes.ok)
211 res = response.json()
212 self.assertIn('PCE calculation in progress',
213 res['output']['configuration-response-common']['response-message'])
214 time.sleep(self.WAITING)
216 def test_12_get_eth_service1(self):
217 response = test_utils.get_service_list_request("services/service1")
218 self.assertEqual(response.status_code, requests.codes.ok)
219 res = response.json()
221 res['services'][0]['administrative-state'], 'inService')
223 res['services'][0]['service-name'], 'service1')
225 res['services'][0]['connection-type'], 'service')
227 res['services'][0]['lifecycle-state'], 'planned')
230 def test_13_check_xc1_ROADMA(self):
231 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
232 self.assertEqual(response.status_code, requests.codes.ok)
233 res = response.json()
234 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
235 self.assertDictEqual(
237 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
238 'opticalControlMode': 'gainLoss',
239 'target-output-power': -3.0
240 }, **res['roadm-connections'][0]),
241 res['roadm-connections'][0]
243 self.assertDictEqual(
244 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
245 res['roadm-connections'][0]['source'])
246 self.assertDictEqual(
247 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
248 res['roadm-connections'][0]['destination'])
251 def test_14_check_xc1_ROADMC(self):
252 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
253 self.assertEqual(response.status_code, requests.codes.ok)
254 res = response.json()
255 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
256 self.assertDictEqual(
258 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
259 'opticalControlMode': 'gainLoss',
260 'target-output-power': -3.0
261 }, **res['roadm-connections'][0]),
262 res['roadm-connections'][0]
264 self.assertDictEqual(
265 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
266 res['roadm-connections'][0]['source'])
267 self.assertDictEqual(
268 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
269 res['roadm-connections'][0]['destination'])
272 def test_15_check_topo_XPDRA(self):
273 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
274 self.assertEqual(response.status_code, requests.codes.ok)
275 res = response.json()
276 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
278 if ele['tp-id'] == 'XPDR1-NETWORK1':
279 self.assertEqual({u'frequency': 196.1,
281 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
282 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
283 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
284 if ele['tp-id'] == 'XPDR1-NETWORK2':
285 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
288 def test_16_check_topo_ROADMA_SRG1(self):
289 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
290 self.assertEqual(response.status_code, requests.codes.ok)
291 res = response.json()
292 self.assertNotIn({u'index': 1},
293 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
294 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
296 if ele['tp-id'] == 'SRG1-PP1-TXRX':
297 self.assertIn({u'index': 1, u'frequency': 196.1,
299 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
300 if ele['tp-id'] == 'SRG1-PP2-TXRX':
301 self.assertNotIn('used-wavelength', dict.keys(ele))
304 def test_17_check_topo_ROADMA_DEG1(self):
305 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
306 self.assertEqual(response.status_code, requests.codes.ok)
307 res = response.json()
308 self.assertNotIn({u'index': 1},
309 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
310 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
312 if ele['tp-id'] == 'DEG2-CTP-TXRX':
313 self.assertIn({u'index': 1, u'frequency': 196.1,
315 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
316 if ele['tp-id'] == 'DEG2-TTP-TXRX':
317 self.assertIn({u'index': 1, u'frequency': 196.1,
319 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
322 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
323 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
324 "ROADM-A1", "1", "SRG1-PP2-TXRX")
325 self.assertEqual(response.status_code, requests.codes.ok)
326 res = response.json()
327 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
330 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
331 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
332 "ROADM-A1", "1", "SRG1-PP2-TXRX")
333 self.assertEqual(response.status_code, requests.codes.ok)
334 res = response.json()
335 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
338 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
339 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
340 "ROADM-C1", "1", "SRG1-PP2-TXRX")
341 self.assertEqual(response.status_code, requests.codes.ok)
342 res = response.json()
343 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
346 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
347 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
348 "ROADM-C1", "1", "SRG1-PP2-TXRX")
349 self.assertEqual(response.status_code, requests.codes.ok)
350 res = response.json()
351 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
354 def test_22_create_eth_service2(self):
355 self.cr_serv_sample_data["input"]["service-name"] = "service2"
356 response = test_utils.service_create_request(self.cr_serv_sample_data)
357 self.assertEqual(response.status_code, requests.codes.ok)
358 res = response.json()
359 self.assertIn('PCE calculation in progress',
360 res['output']['configuration-response-common']['response-message'])
361 time.sleep(self.WAITING)
363 def test_23_get_eth_service2(self):
364 response = test_utils.get_service_list_request("services/service2")
365 self.assertEqual(response.status_code, requests.codes.ok)
366 res = response.json()
368 res['services'][0]['administrative-state'],
371 res['services'][0]['service-name'], 'service2')
373 res['services'][0]['connection-type'], 'service')
375 res['services'][0]['lifecycle-state'], 'planned')
378 def test_24_check_xc2_ROADMA(self):
379 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2")
380 self.assertEqual(response.status_code, requests.codes.ok)
381 res = response.json()
382 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
383 self.assertDictEqual(
385 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
386 'opticalControlMode': 'power'
387 }, **res['roadm-connections'][0]),
388 res['roadm-connections'][0]
390 self.assertDictEqual(
391 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
392 res['roadm-connections'][0]['source'])
393 self.assertDictEqual(
394 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
395 res['roadm-connections'][0]['destination'])
397 def test_25_check_topo_XPDRA(self):
398 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
399 self.assertEqual(response.status_code, requests.codes.ok)
400 res = response.json()
401 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
403 if ele['tp-id'] == 'XPDR1-NETWORK1':
404 self.assertEqual({u'frequency': 196.1,
406 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
407 if ele['tp-id'] == 'XPDR1-NETWORK2':
408 self.assertEqual({u'frequency': 196.05,
410 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
411 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
412 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
415 def test_26_check_topo_ROADMA_SRG1(self):
416 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
417 self.assertEqual(response.status_code, requests.codes.ok)
418 res = response.json()
419 self.assertNotIn({u'index': 1}, res['node'][0]
420 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
421 self.assertNotIn({u'index': 2}, res['node'][0]
422 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
423 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
425 if ele['tp-id'] == 'SRG1-PP1-TXRX':
426 self.assertIn({u'index': 1, u'frequency': 196.1,
428 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
429 self.assertNotIn({u'index': 2, u'frequency': 196.05,
431 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
432 if ele['tp-id'] == 'SRG1-PP2-TXRX':
433 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
434 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
435 self.assertNotIn({u'index': 1, u'frequency': 196.1,
437 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
438 if ele['tp-id'] == 'SRG1-PP3-TXRX':
439 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
442 def test_27_check_topo_ROADMA_DEG2(self):
443 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
444 self.assertEqual(response.status_code, requests.codes.ok)
445 res = response.json()
446 self.assertNotIn({u'index': 1}, res['node'][0]
447 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
448 self.assertNotIn({u'index': 2}, res['node'][0]
449 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
450 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
452 if ele['tp-id'] == 'DEG2-CTP-TXRX':
453 self.assertIn({u'index': 1, u'frequency': 196.1,
455 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
456 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
457 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
458 if ele['tp-id'] == 'DEG2-TTP-TXRX':
459 self.assertIn({u'index': 1, u'frequency': 196.1,
461 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
462 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
463 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
466 # creation service test on a non-available resource
467 def test_28_create_eth_service3(self):
468 self.cr_serv_sample_data["input"]["service-name"] = "service3"
469 response = test_utils.service_create_request(self.cr_serv_sample_data)
470 self.assertEqual(response.status_code, requests.codes.ok)
471 res = response.json()
472 self.assertIn('PCE calculation in progress',
473 res['output']['configuration-response-common']['response-message'])
474 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
475 time.sleep(self.WAITING)
477 # add a test that check the openroadm-service-list still only contains 2 elements
478 def test_29_delete_eth_service3(self):
479 response = test_utils.service_delete_request("service3")
480 self.assertEqual(response.status_code, requests.codes.ok)
481 res = response.json()
482 self.assertIn('Service \'service3\' does not exist in datastore',
483 res['output']['configuration-response-common']['response-message'])
484 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
487 def test_30_delete_eth_service1(self):
488 response = test_utils.service_delete_request("service1")
489 self.assertEqual(response.status_code, requests.codes.ok)
490 res = response.json()
491 self.assertIn('Renderer service delete in progress',
492 res['output']['configuration-response-common']['response-message'])
495 def test_31_delete_eth_service2(self):
496 response = test_utils.service_delete_request("service2")
497 self.assertEqual(response.status_code, requests.codes.ok)
498 res = response.json()
499 self.assertIn('Renderer service delete in progress',
500 res['output']['configuration-response-common']['response-message'])
503 def test_32_check_no_xc_ROADMA(self):
504 response = test_utils.check_netconf_node_request("ROADM-A1", "")
505 res = response.json()
506 self.assertEqual(response.status_code, requests.codes.ok)
507 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
510 def test_33_check_topo_XPDRA(self):
511 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
512 self.assertEqual(response.status_code, requests.codes.ok)
513 res = response.json()
514 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
516 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
517 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
518 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
519 self.assertIn(u'tail-equipment-id',
520 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
521 self.assertNotIn('wavelength', dict.keys(
522 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
525 def test_34_check_topo_ROADMA_SRG1(self):
526 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
527 self.assertEqual(response.status_code, requests.codes.ok)
528 res = response.json()
529 self.assertIn({u'index': 1}, res['node'][0]
530 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
531 self.assertIn({u'index': 2}, res['node'][0]
532 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
533 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
535 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
536 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
538 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
541 def test_35_check_topo_ROADMA_DEG2(self):
542 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
543 self.assertEqual(response.status_code, requests.codes.ok)
544 res = response.json()
545 self.assertIn({u'index': 1}, res['node'][0]
546 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
547 self.assertIn({u'index': 2}, res['node'][0]
548 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
549 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
551 if ele['tp-id'] == 'DEG2-CTP-TXRX':
552 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
553 if ele['tp-id'] == 'DEG2-TTP-TXRX':
554 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
557 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
558 def test_36_create_oc_service1(self):
559 self.cr_serv_sample_data["input"]["service-name"] = "service1"
560 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
561 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
562 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
563 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
564 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
565 response = test_utils.service_create_request(self.cr_serv_sample_data)
566 self.assertEqual(response.status_code, requests.codes.ok)
567 res = response.json()
568 self.assertIn('PCE calculation in progress',
569 res['output']['configuration-response-common']['response-message'])
570 time.sleep(self.WAITING)
572 def test_37_get_oc_service1(self):
573 response = test_utils.get_service_list_request("services/service1")
574 self.assertEqual(response.status_code, requests.codes.ok)
575 res = response.json()
577 res['services'][0]['administrative-state'],
580 res['services'][0]['service-name'], 'service1')
582 res['services'][0]['connection-type'], 'roadm-line')
584 res['services'][0]['lifecycle-state'], 'planned')
587 def test_38_check_xc1_ROADMA(self):
588 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
589 self.assertEqual(response.status_code, requests.codes.ok)
590 res = response.json()
591 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
592 self.assertDictEqual(
594 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
595 'opticalControlMode': 'gainLoss',
596 'target-output-power': -3.0
597 }, **res['roadm-connections'][0]),
598 res['roadm-connections'][0]
600 self.assertDictEqual(
601 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
602 res['roadm-connections'][0]['source'])
603 self.assertDictEqual(
604 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
605 res['roadm-connections'][0]['destination'])
608 def test_39_check_xc1_ROADMC(self):
609 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
610 self.assertEqual(response.status_code, requests.codes.ok)
611 res = response.json()
612 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
613 self.assertDictEqual(
615 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
616 'opticalControlMode': 'gainLoss',
617 'target-output-power': -3.0
618 }, **res['roadm-connections'][0]),
619 res['roadm-connections'][0]
621 self.assertDictEqual(
622 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
623 res['roadm-connections'][0]['source'])
624 self.assertDictEqual(
625 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
626 res['roadm-connections'][0]['destination'])
629 def test_40_create_oc_service2(self):
630 self.cr_serv_sample_data["input"]["service-name"] = "service2"
631 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
632 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
633 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
634 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
635 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
636 response = test_utils.service_create_request(self.cr_serv_sample_data)
637 self.assertEqual(response.status_code, requests.codes.ok)
638 res = response.json()
639 self.assertIn('PCE calculation in progress',
640 res['output']['configuration-response-common']['response-message'])
641 time.sleep(self.WAITING)
643 def test_41_get_oc_service2(self):
644 response = test_utils.get_service_list_request("services/service2")
645 self.assertEqual(response.status_code, requests.codes.ok)
646 res = response.json()
648 res['services'][0]['administrative-state'],
651 res['services'][0]['service-name'], 'service2')
653 res['services'][0]['connection-type'], 'roadm-line')
655 res['services'][0]['lifecycle-state'], 'planned')
658 def test_42_check_xc2_ROADMA(self):
659 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2")
660 self.assertEqual(response.status_code, requests.codes.ok)
661 res = response.json()
662 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
663 self.assertDictEqual(
665 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
666 'opticalControlMode': 'gainLoss',
667 'target-output-power': -3.0
668 }, **res['roadm-connections'][0]),
669 res['roadm-connections'][0]
671 self.assertDictEqual(
672 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
673 res['roadm-connections'][0]['source'])
674 self.assertDictEqual(
675 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
676 res['roadm-connections'][0]['destination'])
679 def test_43_check_topo_ROADMA(self):
680 self.test_26_check_topo_ROADMA_SRG1()
681 self.test_27_check_topo_ROADMA_DEG2()
684 def test_44_delete_oc_service1(self):
685 response = test_utils.service_delete_request("service1")
686 self.assertEqual(response.status_code, requests.codes.ok)
687 res = response.json()
688 self.assertIn('Renderer service delete in progress',
689 res['output']['configuration-response-common']['response-message'])
692 def test_45_delete_oc_service2(self):
693 response = test_utils.service_delete_request("service2")
694 self.assertEqual(response.status_code, requests.codes.ok)
695 res = response.json()
696 self.assertIn('Renderer service delete in progress',
697 res['output']['configuration-response-common']['response-message'])
700 def test_46_get_no_oc_services(self):
702 response = test_utils.get_service_list_request("")
703 self.assertEqual(response.status_code, requests.codes.conflict)
704 res = response.json()
706 {"error-type": "application", "error-tag": "data-missing",
707 "error-message": "Request could not be completed because the relevant data model content does not exist"},
708 res['errors']['error'])
711 def test_47_get_no_xc_ROADMA(self):
712 response = test_utils.check_netconf_node_request("ROADM-A1", "")
713 self.assertEqual(response.status_code, requests.codes.ok)
714 res = response.json()
715 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
718 def test_48_check_topo_ROADMA(self):
719 self.test_34_check_topo_ROADMA_SRG1()
720 self.test_35_check_topo_ROADMA_DEG2()
722 def test_49_loop_create_eth_service(self):
723 for i in range(1, 6):
724 print("iteration number {}".format(i))
725 print("eth service creation")
726 self.test_11_create_eth_service1()
727 print("check xc in ROADM-A1")
728 self.test_13_check_xc1_ROADMA()
729 print("check xc in ROADM-C1")
730 self.test_14_check_xc1_ROADMC()
731 print("eth service deletion\n")
732 self.test_30_delete_eth_service1()
734 def test_50_loop_create_oc_service(self):
735 response = test_utils.get_service_list_request("services/service1")
736 if response.status_code != 404:
737 response = test_utils.service_delete_request("service1")
740 for i in range(1, 6):
741 print("iteration number {}".format(i))
742 print("oc service creation")
743 self.test_36_create_oc_service1()
744 print("check xc in ROADM-A1")
745 self.test_38_check_xc1_ROADMA()
746 print("check xc in ROADM-C1")
747 self.test_39_check_xc1_ROADMC()
748 print("oc service deletion\n")
749 self.test_44_delete_oc_service1()
751 def test_51_disconnect_XPDRA(self):
752 response = test_utils.unmount_device("XPDR-A1")
753 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
755 def test_52_disconnect_XPDRC(self):
756 response = test_utils.unmount_device("XPDR-C1")
757 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
759 def test_53_disconnect_ROADMA(self):
760 response = test_utils.unmount_device("ROADM-A1")
761 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
763 def test_54_disconnect_ROADMC(self):
764 response = test_utils.unmount_device("ROADM-C1")
765 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
768 if __name__ == "__main__":
769 unittest.main(verbosity=2)