2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
16 from common import test_utils
19 class TransportPCEFulltesting(unittest.TestCase):
22 cr_serv_sample_data = {"input": {
23 "sdnc-request-header": {
24 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
25 "rpc-action": "service-create",
26 "request-system-id": "appname",
27 "notification-url": "http://localhost:8585/NotificationServer/notify"
29 "service-name": "service1",
30 "common-id": "ASATT1234567",
31 "connection-type": "service",
33 "service-rate": "100",
35 "service-format": "Ethernet",
36 "clli": "SNJSCAMCJP8",
39 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
40 "port-type": "router",
41 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
42 "port-rack": "000000.00",
46 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
47 "lgx-port-name": "LGX Back.3",
48 "lgx-port-rack": "000000.00",
49 "lgx-port-shelf": "00"
54 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
55 "port-type": "router",
56 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
57 "port-rack": "000000.00",
61 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
62 "lgx-port-name": "LGX Back.4",
63 "lgx-port-rack": "000000.00",
64 "lgx-port-shelf": "00"
70 "service-rate": "100",
72 "service-format": "Ethernet",
73 "clli": "SNJSCAMCJT4",
76 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
77 "port-type": "router",
78 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
79 "port-rack": "000000.00",
83 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
84 "lgx-port-name": "LGX Back.29",
85 "lgx-port-rack": "000000.00",
86 "lgx-port-shelf": "00"
91 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
92 "port-type": "router",
93 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
94 "port-rack": "000000.00",
98 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
99 "lgx-port-name": "LGX Back.30",
100 "lgx-port-rack": "000000.00",
101 "lgx-port-shelf": "00"
106 "due-date": "2016-11-28T00:00:01Z",
107 "operator-contact": "pw1234"
111 WAITING = 20 # nominal value is 300
115 cls.processes = test_utils.start_tpce()
116 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
119 def tearDownClass(cls):
120 for process in cls.processes:
121 test_utils.shutdown_process(process)
122 print("all processes killed")
124 def setUp(self): # instruction executed before each test method
125 print("execution of {}".format(self.id().split(".")[-1]))
127 def test_01_connect_xpdrA(self):
128 response = test_utils.mount_device("XPDR-A1", 'xpdra')
129 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
131 def test_02_connect_xpdrC(self):
132 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
133 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
135 def test_03_connect_rdmA(self):
136 response = test_utils.mount_device("ROADM-A1", 'roadma')
137 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
139 def test_04_connect_rdmC(self):
140 response = test_utils.mount_device("ROADM-C1", 'roadmc')
141 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
143 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
144 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
145 "ROADM-A1", "1", "SRG1-PP1-TXRX")
146 self.assertEqual(response.status_code, requests.codes.ok)
147 res = response.json()
148 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
151 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
152 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
153 "ROADM-A1", "1", "SRG1-PP1-TXRX")
154 self.assertEqual(response.status_code, requests.codes.ok)
155 res = response.json()
156 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
159 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
160 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
161 "ROADM-C1", "1", "SRG1-PP1-TXRX")
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
167 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
168 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
169 "ROADM-C1", "1", "SRG1-PP1-TXRX")
170 self.assertEqual(response.status_code, requests.codes.ok)
171 res = response.json()
172 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
175 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
176 # Config ROADMA-ROADMC oms-attributes
178 "auto-spanloss": "true",
179 "spanloss-base": 11.4,
180 "spanloss-current": 12,
181 "engineered-spanloss": 12.2,
182 "link-concatenation": [{
185 "SRLG-length": 100000,
187 response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
188 self.assertEqual(response.status_code, requests.codes.created)
190 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
191 # Config ROADMC-ROADMA oms-attributes
193 "auto-spanloss": "true",
194 "spanloss-base": 11.4,
195 "spanloss-current": 12,
196 "engineered-spanloss": 12.2,
197 "link-concatenation": [{
200 "SRLG-length": 100000,
202 response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
203 self.assertEqual(response.status_code, requests.codes.created)
205 # test service-create for Eth service from xpdr to xpdr
206 def test_11_create_eth_service1(self):
207 self.cr_serv_sample_data["input"]["service-name"] = "service1"
208 response = test_utils.service_create_request(self.cr_serv_sample_data)
209 self.assertEqual(response.status_code, requests.codes.ok)
210 res = response.json()
211 self.assertIn('PCE calculation in progress',
212 res['output']['configuration-response-common']['response-message'])
213 time.sleep(self.WAITING)
215 def test_12_get_eth_service1(self):
216 response = test_utils.get_service_list_request("services/service1")
217 self.assertEqual(response.status_code, requests.codes.ok)
218 res = response.json()
220 res['services'][0]['administrative-state'], 'inService')
222 res['services'][0]['service-name'], 'service1')
224 res['services'][0]['connection-type'], 'service')
226 res['services'][0]['lifecycle-state'], 'planned')
229 def test_13_check_xc1_ROADMA(self):
230 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
231 self.assertEqual(response.status_code, requests.codes.ok)
232 res = response.json()
233 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
234 self.assertDictEqual(
236 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
237 'opticalControlMode': 'gainLoss',
238 'target-output-power': -3.0
239 }, **res['roadm-connections'][0]),
240 res['roadm-connections'][0]
242 self.assertDictEqual(
243 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
244 res['roadm-connections'][0]['source'])
245 self.assertDictEqual(
246 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
247 res['roadm-connections'][0]['destination'])
250 def test_14_check_xc1_ROADMC(self):
251 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
252 self.assertEqual(response.status_code, requests.codes.ok)
253 res = response.json()
254 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
255 self.assertDictEqual(
257 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
258 'opticalControlMode': 'gainLoss',
259 'target-output-power': -3.0
260 }, **res['roadm-connections'][0]),
261 res['roadm-connections'][0]
263 self.assertDictEqual(
264 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
265 res['roadm-connections'][0]['source'])
266 self.assertDictEqual(
267 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
268 res['roadm-connections'][0]['destination'])
271 def test_15_check_topo_XPDRA(self):
272 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
273 self.assertEqual(response.status_code, requests.codes.ok)
274 res = response.json()
275 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
277 if ele['tp-id'] == 'XPDR1-NETWORK1':
278 self.assertEqual({u'frequency': 196.1,
280 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
281 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
282 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
283 if ele['tp-id'] == 'XPDR1-NETWORK2':
284 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
287 def test_16_check_topo_ROADMA_SRG1(self):
288 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
291 self.assertNotIn({u'index': 1},
292 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
293 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
295 if ele['tp-id'] == 'SRG1-PP1-TXRX':
296 self.assertIn({u'index': 1, u'frequency': 196.1,
298 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
299 if ele['tp-id'] == 'SRG1-PP2-TXRX':
300 self.assertNotIn('used-wavelength', dict.keys(ele))
303 def test_17_check_topo_ROADMA_DEG1(self):
304 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
305 self.assertEqual(response.status_code, requests.codes.ok)
306 res = response.json()
307 self.assertNotIn({u'index': 1},
308 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
309 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
311 if ele['tp-id'] == 'DEG2-CTP-TXRX':
312 self.assertIn({u'index': 1, u'frequency': 196.1,
314 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
315 if ele['tp-id'] == 'DEG2-TTP-TXRX':
316 self.assertIn({u'index': 1, u'frequency': 196.1,
318 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
321 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
322 response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
323 "ROADM-A1", "1", "SRG1-PP2-TXRX")
324 self.assertEqual(response.status_code, requests.codes.ok)
325 res = response.json()
326 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
329 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
330 response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
331 "ROADM-A1", "1", "SRG1-PP2-TXRX")
332 self.assertEqual(response.status_code, requests.codes.ok)
333 res = response.json()
334 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
337 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
338 response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
339 "ROADM-C1", "1", "SRG1-PP2-TXRX")
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
342 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
345 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
346 response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
347 "ROADM-C1", "1", "SRG1-PP2-TXRX")
348 self.assertEqual(response.status_code, requests.codes.ok)
349 res = response.json()
350 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
353 def test_22_create_eth_service2(self):
354 self.cr_serv_sample_data["input"]["service-name"] = "service2"
355 response = test_utils.service_create_request(self.cr_serv_sample_data)
356 self.assertEqual(response.status_code, requests.codes.ok)
357 res = response.json()
358 self.assertIn('PCE calculation in progress',
359 res['output']['configuration-response-common']['response-message'])
360 time.sleep(self.WAITING)
362 def test_23_get_eth_service2(self):
363 response = test_utils.get_service_list_request("services/service2")
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
367 res['services'][0]['administrative-state'],
370 res['services'][0]['service-name'], 'service2')
372 res['services'][0]['connection-type'], 'service')
374 res['services'][0]['lifecycle-state'], 'planned')
377 def test_24_check_xc2_ROADMA(self):
378 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2")
379 self.assertEqual(response.status_code, requests.codes.ok)
380 res = response.json()
381 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
382 self.assertDictEqual(
384 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
385 'opticalControlMode': 'power'
386 }, **res['roadm-connections'][0]),
387 res['roadm-connections'][0]
389 self.assertDictEqual(
390 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
391 res['roadm-connections'][0]['source'])
392 self.assertDictEqual(
393 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
394 res['roadm-connections'][0]['destination'])
396 def test_25_check_topo_XPDRA(self):
397 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
398 self.assertEqual(response.status_code, requests.codes.ok)
399 res = response.json()
400 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
402 if ele['tp-id'] == 'XPDR1-NETWORK1':
403 self.assertEqual({u'frequency': 196.1,
405 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
406 if ele['tp-id'] == 'XPDR1-NETWORK2':
407 self.assertEqual({u'frequency': 196.05,
409 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
410 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
411 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
414 def test_26_check_topo_ROADMA_SRG1(self):
415 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
416 self.assertEqual(response.status_code, requests.codes.ok)
417 res = response.json()
418 self.assertNotIn({u'index': 1}, res['node'][0]
419 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
420 self.assertNotIn({u'index': 2}, res['node'][0]
421 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
422 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
424 if ele['tp-id'] == 'SRG1-PP1-TXRX':
425 self.assertIn({u'index': 1, u'frequency': 196.1,
427 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
428 self.assertNotIn({u'index': 2, u'frequency': 196.05,
430 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
431 if ele['tp-id'] == 'SRG1-PP2-TXRX':
432 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
433 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
434 self.assertNotIn({u'index': 1, u'frequency': 196.1,
436 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
437 if ele['tp-id'] == 'SRG1-PP3-TXRX':
438 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
441 def test_27_check_topo_ROADMA_DEG2(self):
442 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
443 self.assertEqual(response.status_code, requests.codes.ok)
444 res = response.json()
445 self.assertNotIn({u'index': 1}, res['node'][0]
446 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
447 self.assertNotIn({u'index': 2}, res['node'][0]
448 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
449 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
451 if ele['tp-id'] == 'DEG2-CTP-TXRX':
452 self.assertIn({u'index': 1, u'frequency': 196.1,
454 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
455 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
456 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
457 if ele['tp-id'] == 'DEG2-TTP-TXRX':
458 self.assertIn({u'index': 1, u'frequency': 196.1,
460 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
461 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
462 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
465 # creation service test on a non-available resource
466 def test_28_create_eth_service3(self):
467 self.cr_serv_sample_data["input"]["service-name"] = "service3"
468 response = test_utils.service_create_request(self.cr_serv_sample_data)
469 self.assertEqual(response.status_code, requests.codes.ok)
470 res = response.json()
471 self.assertIn('PCE calculation in progress',
472 res['output']['configuration-response-common']['response-message'])
473 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
474 time.sleep(self.WAITING)
476 # add a test that check the openroadm-service-list still only contains 2 elements
477 def test_29_delete_eth_service3(self):
478 url = "{}/operations/org-openroadm-service:service-delete"
480 "sdnc-request-header": {
481 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
482 "rpc-action": "service-delete",
483 "request-system-id": "appname",
484 "notification-url": "http://localhost:8585/NotificationServer/notify"
486 "service-delete-req-info": {
487 "service-name": "service3",
488 "tail-retention": "no"
492 response = test_utils.post_request(url, data)
493 self.assertEqual(response.status_code, requests.codes.ok)
494 res = response.json()
495 self.assertIn('Service \'service3\' does not exist in datastore',
496 res['output']['configuration-response-common']['response-message'])
497 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
500 def test_30_delete_eth_service1(self):
501 url = "{}/operations/org-openroadm-service:service-delete"
503 "sdnc-request-header": {
504 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
505 "rpc-action": "service-delete",
506 "request-system-id": "appname",
507 "notification-url": "http://localhost:8585/NotificationServer/notify"
509 "service-delete-req-info": {
510 "service-name": "service1",
511 "tail-retention": "no"
515 response = test_utils.post_request(url, data)
516 self.assertEqual(response.status_code, requests.codes.ok)
517 res = response.json()
518 self.assertIn('Renderer service delete in progress',
519 res['output']['configuration-response-common']['response-message'])
522 def test_31_delete_eth_service2(self):
523 url = "{}/operations/org-openroadm-service:service-delete"
525 "sdnc-request-header": {
526 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
527 "rpc-action": "service-delete",
528 "request-system-id": "appname",
529 "notification-url": "http://localhost:8585/NotificationServer/notify"
531 "service-delete-req-info": {
532 "service-name": "service2",
533 "tail-retention": "no"
537 response = test_utils.post_request(url, data)
538 self.assertEqual(response.status_code, requests.codes.ok)
539 res = response.json()
540 self.assertIn('Renderer service delete in progress',
541 res['output']['configuration-response-common']['response-message'])
544 def test_32_check_no_xc_ROADMA(self):
545 response = test_utils.check_netconf_node_request("ROADM-A1", "")
546 res = response.json()
547 self.assertEqual(response.status_code, requests.codes.ok)
548 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
551 def test_33_check_topo_XPDRA(self):
552 response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
553 self.assertEqual(response.status_code, requests.codes.ok)
554 res = response.json()
555 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
557 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
558 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
559 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
560 self.assertIn(u'tail-equipment-id',
561 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
562 self.assertNotIn('wavelength', dict.keys(
563 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
566 def test_34_check_topo_ROADMA_SRG1(self):
567 response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
568 self.assertEqual(response.status_code, requests.codes.ok)
569 res = response.json()
570 self.assertIn({u'index': 1}, res['node'][0]
571 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
572 self.assertIn({u'index': 2}, res['node'][0]
573 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
574 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
576 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
577 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
579 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
582 def test_35_check_topo_ROADMA_DEG2(self):
583 response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
584 self.assertEqual(response.status_code, requests.codes.ok)
585 res = response.json()
586 self.assertIn({u'index': 1}, res['node'][0]
587 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
588 self.assertIn({u'index': 2}, res['node'][0]
589 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
590 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
592 if ele['tp-id'] == 'DEG2-CTP-TXRX':
593 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
594 if ele['tp-id'] == 'DEG2-TTP-TXRX':
595 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
598 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
599 def test_36_create_oc_service1(self):
600 self.cr_serv_sample_data["input"]["service-name"] = "service1"
601 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
602 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
603 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
604 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
605 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
606 response = test_utils.service_create_request(self.cr_serv_sample_data)
607 self.assertEqual(response.status_code, requests.codes.ok)
608 res = response.json()
609 self.assertIn('PCE calculation in progress',
610 res['output']['configuration-response-common']['response-message'])
611 time.sleep(self.WAITING)
613 def test_37_get_oc_service1(self):
614 response = test_utils.get_service_list_request("services/service1")
615 self.assertEqual(response.status_code, requests.codes.ok)
616 res = response.json()
618 res['services'][0]['administrative-state'],
621 res['services'][0]['service-name'], 'service1')
623 res['services'][0]['connection-type'], 'roadm-line')
625 res['services'][0]['lifecycle-state'], 'planned')
628 def test_38_check_xc1_ROADMA(self):
629 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
630 self.assertEqual(response.status_code, requests.codes.ok)
631 res = response.json()
632 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
633 self.assertDictEqual(
635 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
636 'opticalControlMode': 'gainLoss',
637 'target-output-power': -3.0
638 }, **res['roadm-connections'][0]),
639 res['roadm-connections'][0]
641 self.assertDictEqual(
642 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
643 res['roadm-connections'][0]['source'])
644 self.assertDictEqual(
645 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
646 res['roadm-connections'][0]['destination'])
649 def test_39_check_xc1_ROADMC(self):
650 response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
651 self.assertEqual(response.status_code, requests.codes.ok)
652 res = response.json()
653 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
654 self.assertDictEqual(
656 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
657 'opticalControlMode': 'gainLoss',
658 'target-output-power': -3.0
659 }, **res['roadm-connections'][0]),
660 res['roadm-connections'][0]
662 self.assertDictEqual(
663 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
664 res['roadm-connections'][0]['source'])
665 self.assertDictEqual(
666 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
667 res['roadm-connections'][0]['destination'])
670 def test_40_create_oc_service2(self):
671 self.cr_serv_sample_data["input"]["service-name"] = "service2"
672 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
673 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
674 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
675 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
676 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
677 response = test_utils.service_create_request(self.cr_serv_sample_data)
678 self.assertEqual(response.status_code, requests.codes.ok)
679 res = response.json()
680 self.assertIn('PCE calculation in progress',
681 res['output']['configuration-response-common']['response-message'])
682 time.sleep(self.WAITING)
684 def test_41_get_oc_service2(self):
685 response = test_utils.get_service_list_request("services/service2")
686 self.assertEqual(response.status_code, requests.codes.ok)
687 res = response.json()
689 res['services'][0]['administrative-state'],
692 res['services'][0]['service-name'], 'service2')
694 res['services'][0]['connection-type'], 'roadm-line')
696 res['services'][0]['lifecycle-state'], 'planned')
699 def test_42_check_xc2_ROADMA(self):
700 response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2")
701 self.assertEqual(response.status_code, requests.codes.ok)
702 res = response.json()
703 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
704 self.assertDictEqual(
706 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
707 'opticalControlMode': 'gainLoss',
708 'target-output-power': -3.0
709 }, **res['roadm-connections'][0]),
710 res['roadm-connections'][0]
712 self.assertDictEqual(
713 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
714 res['roadm-connections'][0]['source'])
715 self.assertDictEqual(
716 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
717 res['roadm-connections'][0]['destination'])
720 def test_43_check_topo_ROADMA(self):
721 self.test_26_check_topo_ROADMA_SRG1()
722 self.test_27_check_topo_ROADMA_DEG2()
725 def test_44_delete_oc_service1(self):
726 url = "{}/operations/org-openroadm-service:service-delete"
728 "sdnc-request-header": {
729 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
730 "rpc-action": "service-delete",
731 "request-system-id": "appname",
732 "notification-url": "http://localhost:8585/NotificationServer/notify"
734 "service-delete-req-info": {
735 "service-name": "service1",
736 "tail-retention": "no"
740 response = test_utils.post_request(url, data)
741 self.assertEqual(response.status_code, requests.codes.ok)
742 res = response.json()
743 self.assertIn('Renderer service delete in progress',
744 res['output']['configuration-response-common']['response-message'])
747 def test_45_delete_oc_service2(self):
748 url = "{}/operations/org-openroadm-service:service-delete"
750 "sdnc-request-header": {
751 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
752 "rpc-action": "service-delete",
753 "request-system-id": "appname",
754 "notification-url": "http://localhost:8585/NotificationServer/notify"
756 "service-delete-req-info": {
757 "service-name": "service2",
758 "tail-retention": "no"
762 response = test_utils.post_request(url, data)
763 self.assertEqual(response.status_code, requests.codes.ok)
764 res = response.json()
765 self.assertIn('Renderer service delete in progress',
766 res['output']['configuration-response-common']['response-message'])
769 def test_46_get_no_oc_services(self):
771 response = test_utils.get_service_list_request("")
772 self.assertEqual(response.status_code, requests.codes.not_found)
773 res = response.json()
775 {"error-type": "application", "error-tag": "data-missing",
776 "error-message": "Request could not be completed because the relevant data model content does not exist"},
777 res['errors']['error'])
780 def test_47_get_no_xc_ROADMA(self):
781 response = test_utils.check_netconf_node_request("ROADM-A1", "")
782 self.assertEqual(response.status_code, requests.codes.ok)
783 res = response.json()
784 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
787 def test_48_check_topo_ROADMA(self):
788 self.test_34_check_topo_ROADMA_SRG1()
789 self.test_35_check_topo_ROADMA_DEG2()
791 def test_49_loop_create_eth_service(self):
792 for i in range(1, 6):
793 print("iteration number {}".format(i))
794 print("eth service creation")
795 self.test_11_create_eth_service1()
796 print("check xc in ROADM-A1")
797 self.test_13_check_xc1_ROADMA()
798 print("check xc in ROADM-C1")
799 self.test_14_check_xc1_ROADMC()
800 print("eth service deletion\n")
801 self.test_30_delete_eth_service1()
803 def test_50_loop_create_oc_service(self):
804 response = test_utils.get_service_list_request("services/service1")
805 if response.status_code != 404:
806 url = "{}/operations/org-openroadm-service:service-delete"
808 "sdnc-request-header": {
809 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
810 "rpc-action": "service-delete",
811 "request-system-id": "appname",
812 "notification-url": "http://localhost:8585/NotificationServer/notify"
814 "service-delete-req-info": {
815 "service-name": "service1",
816 "tail-retention": "no"
820 test_utils.post_request(url, data)
823 for i in range(1, 6):
824 print("iteration number {}".format(i))
825 print("oc service creation")
826 self.test_36_create_oc_service1()
827 print("check xc in ROADM-A1")
828 self.test_38_check_xc1_ROADMA()
829 print("check xc in ROADM-C1")
830 self.test_39_check_xc1_ROADMC()
831 print("oc service deletion\n")
832 self.test_44_delete_oc_service1()
834 def test_51_disconnect_XPDRA(self):
835 response = test_utils.unmount_device("XPDR-A1")
836 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
838 def test_52_disconnect_XPDRC(self):
839 response = test_utils.unmount_device("XPDR-C1")
840 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
842 def test_53_disconnect_ROADMA(self):
843 response = test_utils.unmount_device("ROADM-A1")
844 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
846 def test_54_disconnect_ROADMC(self):
847 response = test_utils.unmount_device("ROADM-C1")
848 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
851 if __name__ == "__main__":
852 unittest.main(verbosity=2)