2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
17 from common import test_utils
20 class TransportPCEFulltesting(unittest.TestCase):
22 cr_serv_sample_data = {"input": {
23 "sdnc-request-header": {
24 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
25 "rpc-action": "service-create",
26 "request-system-id": "appname",
28 "http://localhost:8585/NotificationServer/notify"
30 "service-name": "service1",
31 "common-id": "ASATT1234567",
32 "connection-type": "service",
34 "service-rate": "100",
36 "service-format": "Ethernet",
37 "clli": "SNJSCAMCJP8",
41 "ROUTER_SNJSCAMCJP8_000000.00_00",
42 "port-type": "router",
43 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
44 "port-rack": "000000.00",
49 "LGX Panel_SNJSCAMCJP8_000000.00_00",
50 "lgx-port-name": "LGX Back.3",
51 "lgx-port-rack": "000000.00",
52 "lgx-port-shelf": "00"
58 "ROUTER_SNJSCAMCJP8_000000.00_00",
59 "port-type": "router",
60 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
61 "port-rack": "000000.00",
66 "LGX Panel_SNJSCAMCJP8_000000.00_00",
67 "lgx-port-name": "LGX Back.4",
68 "lgx-port-rack": "000000.00",
69 "lgx-port-shelf": "00"
75 "service-rate": "100",
77 "service-format": "Ethernet",
78 "clli": "SNJSCAMCJT4",
82 "ROUTER_SNJSCAMCJT4_000000.00_00",
83 "port-type": "router",
84 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
85 "port-rack": "000000.00",
90 "LGX Panel_SNJSCAMCJT4_000000.00_00",
91 "lgx-port-name": "LGX Back.29",
92 "lgx-port-rack": "000000.00",
93 "lgx-port-shelf": "00"
99 "ROUTER_SNJSCAMCJT4_000000.00_00",
100 "port-type": "router",
101 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
102 "port-rack": "000000.00",
107 "LGX Panel_SNJSCAMCJT4_000000.00_00",
108 "lgx-port-name": "LGX Back.30",
109 "lgx-port-rack": "000000.00",
110 "lgx-port-shelf": "00"
115 "due-date": "2016-11-28T00:00:01Z",
116 "operator-contact": "pw1234"
124 cls.processes = test_utils.start_tpce()
125 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
128 def tearDownClass(cls):
129 for process in cls.processes:
130 test_utils.shutdown_process(process)
131 print("all processes killed")
133 def setUp(self): # instruction executed before each test method
134 print("execution of {}".format(self.id().split(".")[-1]))
136 # connect netconf devices
137 def test_01_connect_xpdrA(self):
138 response = test_utils.mount_device("XPDRA01", 'xpdra')
139 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
141 def test_02_connect_xpdrC(self):
142 response = test_utils.mount_device("XPDRC01", 'xpdrc')
143 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
145 def test_03_connect_rdmA(self):
146 response = test_utils.mount_device("ROADMA01", 'roadma-full')
147 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
149 def test_04_connect_rdmC(self):
150 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
151 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
153 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
154 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "1",
155 "ROADMA01", "1", "SRG1-PP1-TXRX")
156 self.assertEqual(response.status_code, requests.codes.ok)
157 res = response.json()
158 self.assertIn('Xponder Roadm Link created successfully',
159 res["output"]["result"])
162 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
163 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
165 "networkutils:input": {
166 "networkutils:links-input": {
167 "networkutils:xpdr-node": "XPDRA01",
168 "networkutils:xpdr-num": "1",
169 "networkutils:network-num": "1",
170 "networkutils:rdm-node": "ROADMA01",
171 "networkutils:srg-num": "1",
172 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
176 response = test_utils.post_request(url, data)
177 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "1",
178 "ROADMA01", "1", "SRG1-PP1-TXRX")
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
181 self.assertIn('Roadm Xponder links created successfully',
182 res["output"]["result"])
185 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
186 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "1",
187 "ROADMC01", "1", "SRG1-PP1-TXRX")
188 self.assertEqual(response.status_code, requests.codes.ok)
189 res = response.json()
190 self.assertIn('Xponder Roadm Link created successfully',
191 res["output"]["result"])
194 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
195 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "1",
196 "ROADMC01", "1", "SRG1-PP1-TXRX")
197 self.assertEqual(response.status_code, requests.codes.ok)
198 res = response.json()
199 self.assertIn('Roadm Xponder links created successfully',
200 res["output"]["result"])
203 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
204 # Config ROADMA-ROADMC oms-attributes
207 "auto-spanloss": "true",
208 "spanloss-base": 11.4,
209 "spanloss-current": 12,
210 "engineered-spanloss": 12.2,
211 "link-concatenation": [{
214 "SRLG-length": 100000,
216 response = test_utils.add_oms_attr_request("ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX", data)
217 self.assertEqual(response.status_code, requests.codes.created)
219 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
220 # Config ROADMC-ROADMA oms-attributes
223 "auto-spanloss": "true",
224 "spanloss-base": 11.4,
225 "spanloss-current": 12,
226 "engineered-spanloss": 12.2,
227 "link-concatenation": [{
230 "SRLG-length": 100000,
232 response = test_utils.add_oms_attr_request("ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX", data)
233 self.assertEqual(response.status_code, requests.codes.created)
235 # test service-create for Eth service from xpdr to xpdr
236 def test_11_create_eth_service1(self):
237 self.cr_serv_sample_data["input"]["service-name"] = "service1"
238 response = test_utils.service_create_request(self.cr_serv_sample_data)
239 self.assertEqual(response.status_code, requests.codes.ok)
240 res = response.json()
241 self.assertIn('PCE calculation in progress',
242 res['output']['configuration-response-common'][
244 time.sleep(self.WAITING)
246 def test_12_get_eth_service1(self):
247 response = test_utils.get_service_list_request("services/service1")
248 self.assertEqual(response.status_code, requests.codes.ok)
249 res = response.json()
251 res['services'][0]['administrative-state'],
254 res['services'][0]['service-name'], 'service1')
256 res['services'][0]['connection-type'], 'service')
258 res['services'][0]['lifecycle-state'], 'planned')
261 def test_13_check_xc1_ROADMA(self):
262 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
265 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
266 self.assertDictEqual(
268 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
269 'wavelength-number': 1,
270 'opticalControlMode': 'gainLoss',
271 'target-output-power': -3.0
272 }, **res['roadm-connections'][0]),
273 res['roadm-connections'][0]
275 self.assertDictEqual(
276 {'src-if': 'SRG1-PP1-TXRX-1'},
277 res['roadm-connections'][0]['source'])
278 self.assertDictEqual(
279 {'dst-if': 'DEG1-TTP-TXRX-1'},
280 res['roadm-connections'][0]['destination'])
283 def test_14_check_xc1_ROADMC(self):
284 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
285 self.assertEqual(response.status_code, requests.codes.ok)
286 res = response.json()
287 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
288 self.assertDictEqual(
290 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
291 'wavelength-number': 1,
292 'opticalControlMode': 'gainLoss',
293 'target-output-power': 2.0
294 }, **res['roadm-connections'][0]),
295 res['roadm-connections'][0]
297 self.assertDictEqual(
298 {'src-if': 'SRG1-PP1-TXRX-1'},
299 res['roadm-connections'][0]['source'])
300 self.assertDictEqual(
301 {'dst-if': 'DEG2-TTP-TXRX-1'},
302 res['roadm-connections'][0]['destination'])
305 def test_15_check_topo_XPDRA(self):
306 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
307 self.assertEqual(response.status_code, requests.codes.ok)
308 res = response.json()
309 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
311 if ele['tp-id'] == 'XPDR1-NETWORK1':
312 self.assertEqual({u'frequency': 196.1, u'width': 40},
313 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
314 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
316 'org-openroadm-network-topology:xpdr-client-attributes',
318 if ele['tp-id'] == 'XPDR1-NETWORK2':
320 'org-openroadm-network-topology:xpdr-network-attributes',
324 def test_16_check_topo_ROADMA_SRG1(self):
325 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
326 self.assertEqual(response.status_code, requests.codes.ok)
327 res = response.json()
328 self.assertNotIn({u'index': 1},
330 u'org-openroadm-network-topology:srg-attributes'][
331 'available-wavelengths'])
332 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
334 if ele['tp-id'] == 'SRG1-PP1-TXRX':
335 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
336 ele['org-openroadm-network-topology:'
337 'pp-attributes']['used-wavelength']
339 if ele['tp-id'] == 'SRG1-PP2-TXRX':
340 self.assertNotIn('used-wavelength', dict.keys(ele))
343 def test_17_check_topo_ROADMA_DEG1(self):
344 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
345 self.assertEqual(response.status_code, requests.codes.ok)
346 res = response.json()
347 self.assertNotIn({u'index': 1},
349 u'org-openroadm-network-topology:'
350 u'degree-attributes'][
351 'available-wavelengths'])
352 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
354 if ele['tp-id'] == 'DEG1-CTP-TXRX':
355 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
356 ele['org-openroadm-network-topology:'
359 if ele['tp-id'] == 'DEG1-TTP-TXRX':
360 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
361 ele['org-openroadm-network-topology:'
362 'tx-ttp-attributes'][
366 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
367 response = test_utils.connect_xpdr_to_rdm_request("XPDRA01", "1", "2",
368 "ROADMA01", "1", "SRG1-PP2-TXRX")
369 self.assertEqual(response.status_code, requests.codes.ok)
370 res = response.json()
371 self.assertIn('Xponder Roadm Link created successfully',
372 res["output"]["result"])
375 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
376 response = test_utils.connect_rdm_to_xpdr_request("XPDRA01", "1", "2",
377 "ROADMA01", "1", "SRG1-PP2-TXRX")
378 self.assertEqual(response.status_code, requests.codes.ok)
379 res = response.json()
380 self.assertIn('Roadm Xponder links created successfully',
381 res["output"]["result"])
384 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
385 response = test_utils.connect_xpdr_to_rdm_request("XPDRC01", "1", "2",
386 "ROADMC01", "1", "SRG1-PP2-TXRX")
387 self.assertEqual(response.status_code, requests.codes.ok)
388 res = response.json()
389 self.assertIn('Xponder Roadm Link created successfully',
390 res["output"]["result"])
393 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
394 response = test_utils.connect_rdm_to_xpdr_request("XPDRC01", "1", "2",
395 "ROADMC01", "1", "SRG1-PP2-TXRX")
396 self.assertEqual(response.status_code, requests.codes.ok)
397 res = response.json()
398 self.assertIn('Roadm Xponder links created successfully',
399 res["output"]["result"])
402 def test_22_create_eth_service2(self):
403 self.cr_serv_sample_data["input"]["service-name"] = "service2"
404 response = test_utils.service_create_request(self.cr_serv_sample_data)
405 self.assertEqual(response.status_code, requests.codes.ok)
406 res = response.json()
407 self.assertIn('PCE calculation in progress',
408 res['output']['configuration-response-common'][
410 time.sleep(self.WAITING)
412 def test_23_get_eth_service2(self):
413 response = test_utils.get_service_list_request("services/service2")
414 self.assertEqual(response.status_code, requests.codes.ok)
415 res = response.json()
417 res['services'][0]['administrative-state'],
420 res['services'][0]['service-name'], 'service2')
422 res['services'][0]['connection-type'], 'service')
424 res['services'][0]['lifecycle-state'], 'planned')
427 def test_24_check_xc2_ROADMA(self):
428 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2")
429 self.assertEqual(response.status_code, requests.codes.ok)
430 res = response.json()
431 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
432 self.assertDictEqual(
434 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
435 'wavelength-number': 2,
436 'opticalControlMode': 'power'
437 }, **res['roadm-connections'][0]),
438 res['roadm-connections'][0]
440 self.assertDictEqual(
441 {'src-if': 'DEG1-TTP-TXRX-2'},
442 res['roadm-connections'][0]['source'])
443 self.assertDictEqual(
444 {'dst-if': 'SRG1-PP2-TXRX-2'},
445 res['roadm-connections'][0]['destination'])
447 def test_25_check_topo_XPDRA(self):
448 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
449 self.assertEqual(response.status_code, requests.codes.ok)
450 res = response.json()
451 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
453 if ele['tp-id'] == 'XPDR1-NETWORK1':
454 self.assertEqual({u'frequency': 196.1, u'width': 40},
455 ele['org-openroadm-network-topology:'
456 'xpdr-network-attributes'][
458 if ele['tp-id'] == 'XPDR1-NETWORK2':
459 self.assertEqual({u'frequency': 196.05, u'width': 40},
460 ele['org-openroadm-network-topology:'
461 'xpdr-network-attributes'][
463 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
464 ele['tp-id'] == 'XPDR1-CLIENT3':
466 'org-openroadm-network-topology:xpdr-client-attributes',
470 def test_26_check_topo_ROADMA_SRG1(self):
471 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
472 self.assertEqual(response.status_code, requests.codes.ok)
473 res = response.json()
474 self.assertNotIn({u'index': 1}, res['node'][0][
475 u'org-openroadm-network-topology:srg-attributes'][
476 'available-wavelengths'])
477 self.assertNotIn({u'index': 2}, res['node'][0][
478 u'org-openroadm-network-topology:srg-attributes'][
479 'available-wavelengths'])
480 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
482 if ele['tp-id'] == 'SRG1-PP1-TXRX':
483 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
484 ele['org-openroadm-network-topology:'
485 'pp-attributes']['used-wavelength'])
486 self.assertNotIn({u'index': 2, u'frequency': 196.05,
488 ele['org-openroadm-network-topology:'
489 'pp-attributes']['used-wavelength'])
490 if ele['tp-id'] == 'SRG1-PP2-TXRX':
491 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
492 ele['org-openroadm-network-topology:'
493 'pp-attributes']['used-wavelength'])
494 self.assertNotIn({u'index': 1, u'frequency': 196.1,
496 ele['org-openroadm-network-topology:'
497 'pp-attributes']['used-wavelength'])
498 if ele['tp-id'] == 'SRG1-PP3-TXRX':
499 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
503 def test_27_check_topo_ROADMA_DEG1(self):
504 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
505 self.assertEqual(response.status_code, requests.codes.ok)
506 res = response.json()
507 self.assertNotIn({u'index': 1}, res['node'][0][
508 u'org-openroadm-network-topology:degree-attributes'][
509 'available-wavelengths'])
510 self.assertNotIn({u'index': 2}, res['node'][0][
511 u'org-openroadm-network-topology:degree-attributes'][
512 'available-wavelengths'])
513 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
515 if ele['tp-id'] == 'DEG1-CTP-TXRX':
516 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
517 ele['org-openroadm-network-topology:'
518 'ctp-attributes']['used-wavelengths'])
519 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
520 ele['org-openroadm-network-topology:'
521 'ctp-attributes']['used-wavelengths'])
522 if ele['tp-id'] == 'DEG1-TTP-TXRX':
523 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
524 ele['org-openroadm-network-topology:'
525 'tx-ttp-attributes']['used-wavelengths'])
526 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
527 ele['org-openroadm-network-topology:'
528 'tx-ttp-attributes']['used-wavelengths'])
531 # creation service test on a non-available resource
532 def test_28_create_eth_service3(self):
533 self.cr_serv_sample_data["input"]["service-name"] = "service3"
534 response = test_utils.service_create_request(self.cr_serv_sample_data)
535 self.assertEqual(response.status_code, requests.codes.ok)
536 res = response.json()
537 self.assertIn('PCE calculation in progress',
538 res['output']['configuration-response-common'][
540 self.assertIn('200', res['output']['configuration-response-common'][
542 time.sleep(self.WAITING)
544 # add a test that check the openroadm-service-list still only
545 # contains 2 elements
547 def test_29_delete_eth_service3(self):
548 url = "{}/operations/org-openroadm-service:service-delete"
550 "sdnc-request-header": {
551 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
552 "rpc-action": "service-delete",
553 "request-system-id": "appname",
555 "http://localhost:8585/NotificationServer/notify"
557 "service-delete-req-info": {
558 "service-name": "service3",
559 "tail-retention": "no"
563 response = test_utils.post_request(url, data)
564 self.assertEqual(response.status_code, requests.codes.ok)
565 res = response.json()
566 self.assertIn('Service \'service3\' does not exist in datastore',
567 res['output']['configuration-response-common'][
569 self.assertIn('500', res['output']['configuration-response-common'][
573 def test_30_delete_eth_service1(self):
574 url = "{}/operations/org-openroadm-service:service-delete"
576 "sdnc-request-header": {
577 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
578 "rpc-action": "service-delete",
579 "request-system-id": "appname",
581 "http://localhost:8585/NotificationServer/notify"
583 "service-delete-req-info": {
584 "service-name": "service1",
585 "tail-retention": "no"
589 response = test_utils.post_request(url, data)
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 self.assertIn('Renderer service delete in progress',
593 res['output']['configuration-response-common'][
597 def test_31_delete_eth_service2(self):
598 url = "{}/operations/org-openroadm-service:service-delete"
600 "sdnc-request-header": {
601 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
602 "rpc-action": "service-delete",
603 "request-system-id": "appname",
605 "http://localhost:8585/NotificationServer/notify"
607 "service-delete-req-info": {
608 "service-name": "service2",
609 "tail-retention": "no"
613 response = test_utils.post_request(url, data)
614 self.assertEqual(response.status_code, requests.codes.ok)
615 res = response.json()
616 self.assertIn('Renderer service delete in progress',
617 res['output']['configuration-response-common'][
621 def test_32_check_no_xc_ROADMA(self):
622 response = test_utils.check_netconf_node_request("ROADMA01", "")
623 res = response.json()
624 self.assertEqual(response.status_code, requests.codes.ok)
625 self.assertNotIn('roadm-connections',
626 dict.keys(res['org-openroadm-device']))
629 def test_33_check_topo_XPDRA(self):
630 response = test_utils.get_ordm_topo_request("node/XPDRA01-XPDR1")
631 self.assertEqual(response.status_code, requests.codes.ok)
632 res = response.json()
633 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
635 if ((ele[u'org-openroadm-common-network:tp-type'] ==
637 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
638 'tp-id'] == 'XPDR1-CLIENT3')):
640 'org-openroadm-network-topology:xpdr-client-attributes',
642 elif (ele[u'org-openroadm-common-network:tp-type'] ==
644 self.assertIn(u'tail-equipment-id', dict.keys(
645 ele[u'org-openroadm-network-topology:'
646 u'xpdr-network-attributes']))
647 self.assertNotIn('wavelength', dict.keys(
648 ele[u'org-openroadm-network-topology:'
649 u'xpdr-network-attributes']))
652 def test_34_check_topo_ROADMA_SRG1(self):
653 response = test_utils.get_ordm_topo_request("node/ROADMA01-SRG1")
654 self.assertEqual(response.status_code, requests.codes.ok)
655 res = response.json()
656 self.assertIn({u'index': 1}, res['node'][0][
657 u'org-openroadm-network-topology:srg-attributes'][
658 'available-wavelengths'])
659 self.assertIn({u'index': 2}, res['node'][0][
660 u'org-openroadm-network-topology:srg-attributes'][
661 'available-wavelengths'])
662 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
664 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
665 ele['tp-id'] == 'SRG1-PP1-TXRX':
666 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
669 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
673 def test_35_check_topo_ROADMA_DEG1(self):
674 response = test_utils.get_ordm_topo_request("node/ROADMA01-DEG1")
675 self.assertEqual(response.status_code, requests.codes.ok)
676 res = response.json()
677 self.assertIn({u'index': 1}, res['node'][0][
678 u'org-openroadm-network-topology:degree-attributes'][
679 'available-wavelengths'])
680 self.assertIn({u'index': 2}, res['node'][0][
681 u'org-openroadm-network-topology:degree-attributes'][
682 'available-wavelengths'])
683 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
685 if ele['tp-id'] == 'DEG1-CTP-TXRX':
686 self.assertNotIn('org-openroadm-network-topology:'
687 'ctp-attributes', dict.keys(ele))
688 if ele['tp-id'] == 'DEG1-TTP-TXRX':
689 self.assertNotIn('org-openroadm-network-topology:'
690 'tx-ttp-attributes', dict.keys(ele))
693 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
694 def test_36_create_oc_service1(self):
695 self.cr_serv_sample_data["input"]["service-name"] = "service1"
696 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
697 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
698 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
699 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
700 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
701 response = test_utils.service_create_request(self.cr_serv_sample_data)
702 self.assertEqual(response.status_code, requests.codes.ok)
703 res = response.json()
704 self.assertIn('PCE calculation in progress',
705 res['output']['configuration-response-common'][
707 time.sleep(self.WAITING)
709 def test_37_get_oc_service1(self):
710 response = test_utils.get_service_list_request("services/service1")
711 self.assertEqual(response.status_code, requests.codes.ok)
712 res = response.json()
714 res['services'][0]['administrative-state'],
717 res['services'][0]['service-name'], 'service1')
719 res['services'][0]['connection-type'], 'roadm-line')
721 res['services'][0]['lifecycle-state'], 'planned')
724 def test_38_check_xc1_ROADMA(self):
725 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
726 self.assertEqual(response.status_code, requests.codes.ok)
727 res = response.json()
728 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
729 self.assertDictEqual(
731 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
732 'wavelength-number': 1,
733 'opticalControlMode': 'gainLoss',
734 'target-output-power': -3.0
735 }, **res['roadm-connections'][0]),
736 res['roadm-connections'][0]
738 self.assertDictEqual(
739 {'src-if': 'SRG1-PP1-TXRX-1'},
740 res['roadm-connections'][0]['source'])
741 self.assertDictEqual(
742 {'dst-if': 'DEG1-TTP-TXRX-1'},
743 res['roadm-connections'][0]['destination'])
746 def test_39_check_xc1_ROADMC(self):
747 response = test_utils.check_netconf_node_request("ROADMC01", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
748 self.assertEqual(response.status_code, requests.codes.ok)
749 res = response.json()
750 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
751 self.assertDictEqual(
753 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
754 'wavelength-number': 1,
755 'opticalControlMode': 'gainLoss',
756 'target-output-power': 2.0
757 }, **res['roadm-connections'][0]),
758 res['roadm-connections'][0]
760 self.assertDictEqual(
761 {'src-if': 'SRG1-PP1-TXRX-1'},
762 res['roadm-connections'][0]['source'])
763 self.assertDictEqual(
764 {'dst-if': 'DEG2-TTP-TXRX-1'},
765 res['roadm-connections'][0]['destination'])
768 def test_40_create_oc_service2(self):
769 self.cr_serv_sample_data["input"]["service-name"] = "service2"
770 self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
771 self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADMA01"
772 self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
773 self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADMC01"
774 self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
775 response = test_utils.service_create_request(self.cr_serv_sample_data)
776 self.assertEqual(response.status_code, requests.codes.ok)
777 res = response.json()
778 self.assertIn('PCE calculation in progress',
779 res['output']['configuration-response-common'][
781 time.sleep(self.WAITING)
783 def test_41_get_oc_service2(self):
784 response = test_utils.get_service_list_request("services/service2")
785 self.assertEqual(response.status_code, requests.codes.ok)
786 res = response.json()
788 res['services'][0]['administrative-state'],
791 res['services'][0]['service-name'], 'service2')
793 res['services'][0]['connection-type'], 'roadm-line')
795 res['services'][0]['lifecycle-state'], 'planned')
798 def test_42_check_xc2_ROADMA(self):
799 response = test_utils.check_netconf_node_request("ROADMA01", "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2")
800 self.assertEqual(response.status_code, requests.codes.ok)
801 res = response.json()
802 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
803 self.assertDictEqual(
805 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
806 'wavelength-number': 2,
807 'opticalControlMode': 'gainLoss',
808 'target-output-power': -3.0
809 }, **res['roadm-connections'][0]),
810 res['roadm-connections'][0]
812 self.assertDictEqual(
813 {'src-if': 'SRG1-PP2-TXRX-2'},
814 res['roadm-connections'][0]['source'])
815 self.assertDictEqual(
816 {'dst-if': 'DEG1-TTP-TXRX-2'},
817 res['roadm-connections'][0]['destination'])
820 def test_43_check_topo_ROADMA(self):
821 self.test_26_check_topo_ROADMA_SRG1()
822 self.test_27_check_topo_ROADMA_DEG1()
825 def test_44_delete_oc_service1(self):
826 url = "{}/operations/org-openroadm-service:service-delete"
828 "sdnc-request-header": {
829 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
830 "rpc-action": "service-delete",
831 "request-system-id": "appname",
833 "http://localhost:8585/NotificationServer/notify"
835 "service-delete-req-info": {
836 "service-name": "service1",
837 "tail-retention": "no"
841 response = test_utils.post_request(url, data)
842 self.assertEqual(response.status_code, requests.codes.ok)
843 res = response.json()
844 self.assertIn('Renderer service delete in progress',
845 res['output']['configuration-response-common'][
849 def test_45_delete_oc_service2(self):
850 url = "{}/operations/org-openroadm-service:service-delete"
852 "sdnc-request-header": {
853 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
854 "rpc-action": "service-delete",
855 "request-system-id": "appname",
857 "http://localhost:8585/NotificationServer/notify"
859 "service-delete-req-info": {
860 "service-name": "service2",
861 "tail-retention": "no"
865 response = test_utils.post_request(url, data)
866 self.assertEqual(response.status_code, requests.codes.ok)
867 res = response.json()
868 self.assertIn('Renderer service delete in progress',
869 res['output']['configuration-response-common'][
873 def test_46_get_no_oc_services(self):
875 response = test_utils.get_service_list_request("")
876 self.assertEqual(response.status_code, requests.codes.not_found)
877 res = response.json()
880 "error-type": "application",
881 "error-tag": "data-missing",
883 "Request could not be completed because the relevant data "
884 "model content does not exist"
886 res['errors']['error'])
889 def test_47_get_no_xc_ROADMA(self):
890 response = test_utils.check_netconf_node_request("ROADMA01", "")
891 self.assertEqual(response.status_code, requests.codes.ok)
892 res = response.json()
893 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
896 def test_48_check_topo_ROADMA(self):
897 self.test_34_check_topo_ROADMA_SRG1()
898 self.test_35_check_topo_ROADMA_DEG1()
900 def test_49_loop_create_eth_service(self):
901 for i in range(1, 6):
902 print("iteration number {}".format(i))
903 print("eth service creation")
904 self.test_11_create_eth_service1()
905 print("check xc in ROADMA01")
906 self.test_13_check_xc1_ROADMA()
907 print("check xc in ROADMC01")
908 self.test_14_check_xc1_ROADMC()
909 print("eth service deletion\n")
910 self.test_30_delete_eth_service1()
912 def test_50_loop_create_oc_service(self):
913 response = test_utils.get_service_list_request("services/service1")
914 if response.status_code != 404:
915 url = "{}/operations/org-openroadm-service:service-delete"
917 "sdnc-request-header": {
918 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
919 "rpc-action": "service-delete",
920 "request-system-id": "appname",
922 "http://localhost:8585/NotificationServer/notify"
924 "service-delete-req-info": {
925 "service-name": "service1",
926 "tail-retention": "no"
930 test_utils.post_request(url, data)
933 for i in range(1, 6):
934 print("iteration number {}".format(i))
935 print("oc service creation")
936 self.test_36_create_oc_service1()
937 print("check xc in ROADMA01")
938 self.test_38_check_xc1_ROADMA()
939 print("check xc in ROADMC01")
940 self.test_39_check_xc1_ROADMC()
941 print("oc service deletion\n")
942 self.test_44_delete_oc_service1()
944 def test_51_disconnect_XPDRA(self):
945 response = test_utils.unmount_device("XPDRA01")
946 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
948 def test_52_disconnect_XPDRC(self):
949 response = test_utils.unmount_device("XPDRC01")
950 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
952 def test_53_disconnect_ROADMA(self):
953 response = test_utils.unmount_device("ROADMA01")
954 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
956 def test_54_disconnect_ROADMC(self):
957 response = test_utils.unmount_device("ROADMC01")
958 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
961 if __name__ == "__main__":
962 unittest.main(verbosity=2)