2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
17 from common import test_utils
20 class TransportPCEFulltesting(unittest.TestCase):
22 WAITING = 20 # nominal value is 300
28 cls.processes = test_utils.start_tpce()
29 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
32 def tearDownClass(cls):
33 for process in cls.processes:
34 test_utils.shutdown_process(process)
35 print("all processes killed")
37 def setUp(self): # instruction executed before each test method
38 print("execution of {}".format(self.id().split(".")[-1]))
40 # connect netconf devices
41 def test_01_connect_xpdrA(self):
42 response = test_utils.mount_device("XPDRA01", 'xpdra')
43 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
45 def test_02_connect_xpdrC(self):
46 response = test_utils.mount_device("XPDRC01", 'xpdrc')
47 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
49 def test_03_connect_rdmA(self):
50 response = test_utils.mount_device("ROADMA01", 'roadma-full')
51 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
53 def test_04_connect_rdmC(self):
54 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
55 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
57 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
58 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
59 format(test_utils.RESTCONF_BASE_URL)
61 "networkutils:input": {
62 "networkutils:links-input": {
63 "networkutils:xpdr-node": "XPDRA01",
64 "networkutils:xpdr-num": "1",
65 "networkutils:network-num": "1",
66 "networkutils:rdm-node": "ROADMA01",
67 "networkutils:srg-num": "1",
68 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
72 response = requests.request(
73 "POST", url, data=json.dumps(data),
74 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
75 self.assertEqual(response.status_code, requests.codes.ok)
77 self.assertIn('Xponder Roadm Link created successfully',
78 res["output"]["result"])
81 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
82 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
83 format(test_utils.RESTCONF_BASE_URL)
85 "networkutils:input": {
86 "networkutils:links-input": {
87 "networkutils:xpdr-node": "XPDRA01",
88 "networkutils:xpdr-num": "1",
89 "networkutils:network-num": "1",
90 "networkutils:rdm-node": "ROADMA01",
91 "networkutils:srg-num": "1",
92 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
96 response = requests.request(
97 "POST", url, data=json.dumps(data),
98 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
99 self.assertEqual(response.status_code, requests.codes.ok)
100 res = response.json()
101 self.assertIn('Roadm Xponder links created successfully',
102 res["output"]["result"])
105 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
106 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
107 format(test_utils.RESTCONF_BASE_URL)
109 "networkutils:input": {
110 "networkutils:links-input": {
111 "networkutils:xpdr-node": "XPDRC01",
112 "networkutils:xpdr-num": "1",
113 "networkutils:network-num": "1",
114 "networkutils:rdm-node": "ROADMC01",
115 "networkutils:srg-num": "1",
116 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
120 response = requests.request(
121 "POST", url, data=json.dumps(data),
122 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
123 self.assertEqual(response.status_code, requests.codes.ok)
124 res = response.json()
125 self.assertIn('Xponder Roadm Link created successfully',
126 res["output"]["result"])
129 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
130 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
131 format(test_utils.RESTCONF_BASE_URL)
133 "networkutils:input": {
134 "networkutils:links-input": {
135 "networkutils:xpdr-node": "XPDRC01",
136 "networkutils:xpdr-num": "1",
137 "networkutils:network-num": "1",
138 "networkutils:rdm-node": "ROADMC01",
139 "networkutils:srg-num": "1",
140 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
144 response = requests.request(
145 "POST", url, data=json.dumps(data),
146 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
147 self.assertEqual(response.status_code, requests.codes.ok)
148 res = response.json()
149 self.assertIn('Roadm Xponder links created successfully',
150 res["output"]["result"])
153 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
154 # Config ROADMA-ROADMC oms-attributes
156 "{}/config/ietf-network:"
157 "networks/network/openroadm-topology/ietf-network-topology:"
158 "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/"
159 "org-openroadm-network-topology:"
160 "OMS-attributes/span"
161 .format(test_utils.RESTCONF_BASE_URL))
164 "auto-spanloss": "true",
165 "spanloss-base": 11.4,
166 "spanloss-current": 12,
167 "engineered-spanloss": 12.2,
168 "link-concatenation": [{
171 "SRLG-length": 100000,
173 response = requests.request(
174 "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
175 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
176 self.assertEqual(response.status_code, requests.codes.created)
178 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
179 # Config ROADMC-ROADMA oms-attributes
181 "{}/config/ietf-network:"
182 "networks/network/openroadm-topology/ietf-network-topology:"
183 "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/"
184 "org-openroadm-network-topology:"
185 "OMS-attributes/span"
186 .format(test_utils.RESTCONF_BASE_URL))
189 "auto-spanloss": "true",
190 "spanloss-base": 11.4,
191 "spanloss-current": 12,
192 "engineered-spanloss": 12.2,
193 "link-concatenation": [{
196 "SRLG-length": 100000,
198 response = requests.request(
199 "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
200 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
201 self.assertEqual(response.status_code, requests.codes.created)
203 # test service-create for Eth service from xpdr to xpdr
204 def test_11_create_eth_service1(self):
205 url = ("{}/operations/org-openroadm-service:service-create"
206 .format(test_utils.RESTCONF_BASE_URL))
209 "sdnc-request-header": {
210 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
211 "rpc-action": "service-create",
212 "request-system-id": "appname",
214 "http://localhost:8585/NotificationServer/notify"
216 "service-name": "service1",
217 "common-id": "ASATT1234567",
218 "connection-type": "service",
220 "service-rate": "100",
221 "node-id": "XPDRA01",
222 "service-format": "Ethernet",
223 "clli": "SNJSCAMCJP8",
227 "ROUTER_SNJSCAMCJP8_000000.00_00",
228 "port-type": "router",
229 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
230 "port-rack": "000000.00",
235 "LGX Panel_SNJSCAMCJP8_000000.00_00",
236 "lgx-port-name": "LGX Back.3",
237 "lgx-port-rack": "000000.00",
238 "lgx-port-shelf": "00"
244 "ROUTER_SNJSCAMCJP8_000000.00_00",
245 "port-type": "router",
246 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
247 "port-rack": "000000.00",
252 "LGX Panel_SNJSCAMCJP8_000000.00_00",
253 "lgx-port-name": "LGX Back.4",
254 "lgx-port-rack": "000000.00",
255 "lgx-port-shelf": "00"
261 "service-rate": "100",
262 "node-id": "XPDRC01",
263 "service-format": "Ethernet",
264 "clli": "SNJSCAMCJT4",
268 "ROUTER_SNJSCAMCJT4_000000.00_00",
269 "port-type": "router",
270 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
271 "port-rack": "000000.00",
276 "LGX Panel_SNJSCAMCJT4_000000.00_00",
277 "lgx-port-name": "LGX Back.29",
278 "lgx-port-rack": "000000.00",
279 "lgx-port-shelf": "00"
285 "ROUTER_SNJSCAMCJT4_000000.00_00",
286 "port-type": "router",
287 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
288 "port-rack": "000000.00",
293 "LGX Panel_SNJSCAMCJT4_000000.00_00",
294 "lgx-port-name": "LGX Back.30",
295 "lgx-port-rack": "000000.00",
296 "lgx-port-shelf": "00"
301 "due-date": "2016-11-28T00:00:01Z",
302 "operator-contact": "pw1234"
305 response = requests.request(
306 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
307 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
308 self.assertEqual(response.status_code, requests.codes.ok)
309 res = response.json()
310 self.assertIn('PCE calculation in progress',
311 res['output']['configuration-response-common'][
313 time.sleep(self.WAITING)
315 def test_12_get_eth_service1(self):
316 url = ("{}/operational/org-openroadm-service:service-list/services/"
317 "service1".format(test_utils.RESTCONF_BASE_URL))
318 response = requests.request(
319 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
320 self.assertEqual(response.status_code, requests.codes.ok)
321 res = response.json()
323 res['services'][0]['administrative-state'],
326 res['services'][0]['service-name'], 'service1')
328 res['services'][0]['connection-type'], 'service')
330 res['services'][0]['lifecycle-state'], 'planned')
333 def test_13_check_xc1_ROADMA(self):
335 "{}/config/network-topology:"
336 "network-topology/topology/topology-netconf/"
337 "node/ROADMA01/yang-ext:"
338 "mount/org-openroadm-device:org-openroadm-device/"
339 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
340 .format(test_utils.RESTCONF_BASE_URL))
341 response = requests.request(
342 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
343 self.assertEqual(response.status_code, requests.codes.ok)
344 res = response.json()
345 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
346 self.assertDictEqual(
348 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
349 'wavelength-number': 1,
350 'opticalControlMode': 'gainLoss',
351 'target-output-power': -3.0
352 }, **res['roadm-connections'][0]),
353 res['roadm-connections'][0]
355 self.assertDictEqual(
356 {'src-if': 'SRG1-PP1-TXRX-1'},
357 res['roadm-connections'][0]['source'])
358 self.assertDictEqual(
359 {'dst-if': 'DEG1-TTP-TXRX-1'},
360 res['roadm-connections'][0]['destination'])
363 def test_14_check_xc1_ROADMC(self):
365 "{}/config/network-topology:"
366 "network-topology/topology/topology-netconf/"
367 "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
368 "org-openroadm-device/"
369 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
370 .format(test_utils.RESTCONF_BASE_URL))
371 response = requests.request(
372 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
373 self.assertEqual(response.status_code, requests.codes.ok)
374 res = response.json()
375 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
376 self.assertDictEqual(
378 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
379 'wavelength-number': 1,
380 'opticalControlMode': 'gainLoss',
381 'target-output-power': 2.0
382 }, **res['roadm-connections'][0]),
383 res['roadm-connections'][0]
385 self.assertDictEqual(
386 {'src-if': 'SRG1-PP1-TXRX-1'},
387 res['roadm-connections'][0]['source'])
388 self.assertDictEqual(
389 {'dst-if': 'DEG2-TTP-TXRX-1'},
390 res['roadm-connections'][0]['destination'])
393 def test_15_check_topo_XPDRA(self):
395 "{}/config/ietf-network:"
396 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
397 .format(test_utils.RESTCONF_BASE_URL))
398 response = requests.request(
399 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
400 self.assertEqual(response.status_code, requests.codes.ok)
401 res = response.json()
402 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
404 if ele['tp-id'] == 'XPDR1-NETWORK1':
405 self.assertEqual({u'frequency': 196.1, u'width': 40},
406 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
407 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT3':
409 'org-openroadm-network-topology:xpdr-client-attributes',
411 if ele['tp-id'] == 'XPDR1-NETWORK2':
413 'org-openroadm-network-topology:xpdr-network-attributes',
417 def test_16_check_topo_ROADMA_SRG1(self):
419 "{}/config/ietf-network:"
420 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
421 .format(test_utils.RESTCONF_BASE_URL))
422 response = requests.request(
423 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
424 self.assertEqual(response.status_code, requests.codes.ok)
425 res = response.json()
426 self.assertNotIn({u'index': 1},
428 u'org-openroadm-network-topology:srg-attributes'][
429 'available-wavelengths'])
430 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
432 if ele['tp-id'] == 'SRG1-PP1-TXRX':
433 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
434 ele['org-openroadm-network-topology:'
435 'pp-attributes']['used-wavelength']
437 if ele['tp-id'] == 'SRG1-PP2-TXRX':
438 self.assertNotIn('used-wavelength', dict.keys(ele))
441 def test_17_check_topo_ROADMA_DEG1(self):
443 "{}/config/ietf-network:"
444 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
445 .format(test_utils.RESTCONF_BASE_URL))
446 response = requests.request(
447 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
448 self.assertEqual(response.status_code, requests.codes.ok)
449 res = response.json()
450 self.assertNotIn({u'index': 1},
452 u'org-openroadm-network-topology:'
453 u'degree-attributes'][
454 'available-wavelengths'])
455 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
457 if ele['tp-id'] == 'DEG1-CTP-TXRX':
458 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
459 ele['org-openroadm-network-topology:'
462 if ele['tp-id'] == 'DEG1-TTP-TXRX':
463 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
464 ele['org-openroadm-network-topology:'
465 'tx-ttp-attributes'][
469 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
470 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
471 format(test_utils.RESTCONF_BASE_URL)
473 "networkutils:input": {
474 "networkutils:links-input": {
475 "networkutils:xpdr-node": "XPDRA01",
476 "networkutils:xpdr-num": "1",
477 "networkutils:network-num": "2",
478 "networkutils:rdm-node": "ROADMA01",
479 "networkutils:srg-num": "1",
480 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
484 response = requests.request(
485 "POST", url, data=json.dumps(data),
486 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
487 self.assertEqual(response.status_code, requests.codes.ok)
488 res = response.json()
489 self.assertIn('Xponder Roadm Link created successfully',
490 res["output"]["result"])
493 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
494 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
495 format(test_utils.RESTCONF_BASE_URL)
497 "networkutils:input": {
498 "networkutils:links-input": {
499 "networkutils:xpdr-node": "XPDRA01",
500 "networkutils:xpdr-num": "1",
501 "networkutils:network-num": "2",
502 "networkutils:rdm-node": "ROADMA01",
503 "networkutils:srg-num": "1",
504 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
508 response = requests.request(
509 "POST", url, data=json.dumps(data),
510 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
511 self.assertEqual(response.status_code, requests.codes.ok)
512 res = response.json()
513 self.assertIn('Roadm Xponder links created successfully',
514 res["output"]["result"])
517 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
518 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
519 format(test_utils.RESTCONF_BASE_URL)
521 "networkutils:input": {
522 "networkutils:links-input": {
523 "networkutils:xpdr-node": "XPDRC01",
524 "networkutils:xpdr-num": "1",
525 "networkutils:network-num": "2",
526 "networkutils:rdm-node": "ROADMC01",
527 "networkutils:srg-num": "1",
528 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
532 response = requests.request(
533 "POST", url, data=json.dumps(data),
534 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
535 self.assertEqual(response.status_code, requests.codes.ok)
536 res = response.json()
537 self.assertIn('Xponder Roadm Link created successfully',
538 res["output"]["result"])
541 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
542 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
543 format(test_utils.RESTCONF_BASE_URL)
545 "networkutils:input": {
546 "networkutils:links-input": {
547 "networkutils:xpdr-node": "XPDRC01",
548 "networkutils:xpdr-num": "1",
549 "networkutils:network-num": "2",
550 "networkutils:rdm-node": "ROADMC01",
551 "networkutils:srg-num": "1",
552 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
556 response = requests.request(
557 "POST", url, data=json.dumps(data),
558 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
559 self.assertEqual(response.status_code, requests.codes.ok)
560 res = response.json()
561 self.assertIn('Roadm Xponder links created successfully',
562 res["output"]["result"])
565 def test_22_create_eth_service2(self):
566 url = ("{}/operations/org-openroadm-service:service-create"
567 .format(test_utils.RESTCONF_BASE_URL))
570 "sdnc-request-header": {
571 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
572 "rpc-action": "service-create",
573 "request-system-id": "appname",
575 "http://localhost:8585/NotificationServer/notify"
577 "service-name": "service2",
578 "common-id": "ASATT1234567",
579 "connection-type": "service",
581 "service-rate": "100",
582 "node-id": "XPDRA01",
583 "service-format": "Ethernet",
584 "clli": "SNJSCAMCJP8",
588 "ROUTER_SNJSCAMCJP8_000000.00_00",
589 "port-type": "router",
590 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
591 "port-rack": "000000.00",
596 "LGX Panel_SNJSCAMCJP8_000000.00_00",
597 "lgx-port-name": "LGX Back.3",
598 "lgx-port-rack": "000000.00",
599 "lgx-port-shelf": "00"
605 "ROUTER_SNJSCAMCJP8_000000.00_00",
606 "port-type": "router",
607 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
608 "port-rack": "000000.00",
613 "LGX Panel_SNJSCAMCJP8_000000.00_00",
614 "lgx-port-name": "LGX Back.4",
615 "lgx-port-rack": "000000.00",
616 "lgx-port-shelf": "00"
622 "service-rate": "100",
623 "node-id": "XPDRC01",
624 "service-format": "Ethernet",
625 "clli": "SNJSCAMCJT4",
629 "ROUTER_SNJSCAMCJT4_000000.00_00",
630 "port-type": "router",
631 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
632 "port-rack": "000000.00",
637 "LGX Panel_SNJSCAMCJT4_000000.00_00",
638 "lgx-port-name": "LGX Back.29",
639 "lgx-port-rack": "000000.00",
640 "lgx-port-shelf": "00"
646 "ROUTER_SNJSCAMCJT4_000000.00_00",
647 "port-type": "router",
648 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
649 "port-rack": "000000.00",
654 "LGX Panel_SNJSCAMCJT4_000000.00_00",
655 "lgx-port-name": "LGX Back.30",
656 "lgx-port-rack": "000000.00",
657 "lgx-port-shelf": "00"
662 "due-date": "2016-11-28T00:00:01Z",
663 "operator-contact": "pw1234"
666 response = requests.request(
667 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
668 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
669 self.assertEqual(response.status_code, requests.codes.ok)
670 res = response.json()
671 self.assertIn('PCE calculation in progress',
672 res['output']['configuration-response-common'][
674 time.sleep(self.WAITING)
676 def test_23_get_eth_service2(self):
677 url = ("{}/operational/org-openroadm-service:"
678 "service-list/services/service2"
679 .format(test_utils.RESTCONF_BASE_URL))
680 response = requests.request(
681 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
682 self.assertEqual(response.status_code, requests.codes.ok)
683 res = response.json()
685 res['services'][0]['administrative-state'],
688 res['services'][0]['service-name'], 'service2')
690 res['services'][0]['connection-type'], 'service')
692 res['services'][0]['lifecycle-state'], 'planned')
695 def test_24_check_xc2_ROADMA(self):
697 "{}/config/network-topology:"
698 "network-topology/topology/topology-netconf/"
699 "node/ROADMA01/yang-ext:"
700 "mount/org-openroadm-device:org-openroadm-device/"
701 "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2"
702 .format(test_utils.RESTCONF_BASE_URL))
703 response = requests.request(
704 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
705 self.assertEqual(response.status_code, requests.codes.ok)
706 res = response.json()
707 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
708 self.assertDictEqual(
710 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
711 'wavelength-number': 2,
712 'opticalControlMode': 'power'
713 }, **res['roadm-connections'][0]),
714 res['roadm-connections'][0]
716 self.assertDictEqual(
717 {'src-if': 'DEG1-TTP-TXRX-2'},
718 res['roadm-connections'][0]['source'])
719 self.assertDictEqual(
720 {'dst-if': 'SRG1-PP2-TXRX-2'},
721 res['roadm-connections'][0]['destination'])
723 def test_25_check_topo_XPDRA(self):
725 "{}/config/ietf-network:"
726 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
727 .format(test_utils.RESTCONF_BASE_URL))
728 response = requests.request(
729 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
730 self.assertEqual(response.status_code, requests.codes.ok)
731 res = response.json()
732 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
734 if ele['tp-id'] == 'XPDR1-NETWORK1':
735 self.assertEqual({u'frequency': 196.1, u'width': 40},
736 ele['org-openroadm-network-topology:'
737 'xpdr-network-attributes'][
739 if ele['tp-id'] == 'XPDR1-NETWORK2':
740 self.assertEqual({u'frequency': 196.05, u'width': 40},
741 ele['org-openroadm-network-topology:'
742 'xpdr-network-attributes'][
744 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
745 ele['tp-id'] == 'XPDR1-CLIENT3':
747 'org-openroadm-network-topology:xpdr-client-attributes',
751 def test_26_check_topo_ROADMA_SRG1(self):
753 "{}/config/ietf-network:"
754 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
755 .format(test_utils.RESTCONF_BASE_URL))
756 response = requests.request(
757 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
758 self.assertEqual(response.status_code, requests.codes.ok)
759 res = response.json()
760 self.assertNotIn({u'index': 1}, res['node'][0][
761 u'org-openroadm-network-topology:srg-attributes'][
762 'available-wavelengths'])
763 self.assertNotIn({u'index': 2}, res['node'][0][
764 u'org-openroadm-network-topology:srg-attributes'][
765 'available-wavelengths'])
766 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
768 if ele['tp-id'] == 'SRG1-PP1-TXRX':
769 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
770 ele['org-openroadm-network-topology:'
771 'pp-attributes']['used-wavelength'])
772 self.assertNotIn({u'index': 2, u'frequency': 196.05,
774 ele['org-openroadm-network-topology:'
775 'pp-attributes']['used-wavelength'])
776 if ele['tp-id'] == 'SRG1-PP2-TXRX':
777 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
778 ele['org-openroadm-network-topology:'
779 'pp-attributes']['used-wavelength'])
780 self.assertNotIn({u'index': 1, u'frequency': 196.1,
782 ele['org-openroadm-network-topology:'
783 'pp-attributes']['used-wavelength'])
784 if ele['tp-id'] == 'SRG1-PP3-TXRX':
785 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
789 def test_27_check_topo_ROADMA_DEG1(self):
791 "{}/config/ietf-network:"
792 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
793 .format(test_utils.RESTCONF_BASE_URL))
794 response = requests.request(
795 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
796 self.assertEqual(response.status_code, requests.codes.ok)
797 res = response.json()
798 self.assertNotIn({u'index': 1}, res['node'][0][
799 u'org-openroadm-network-topology:degree-attributes'][
800 'available-wavelengths'])
801 self.assertNotIn({u'index': 2}, res['node'][0][
802 u'org-openroadm-network-topology:degree-attributes'][
803 'available-wavelengths'])
804 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
806 if ele['tp-id'] == 'DEG1-CTP-TXRX':
807 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
808 ele['org-openroadm-network-topology:'
809 'ctp-attributes']['used-wavelengths'])
810 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
811 ele['org-openroadm-network-topology:'
812 'ctp-attributes']['used-wavelengths'])
813 if ele['tp-id'] == 'DEG1-TTP-TXRX':
814 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
815 ele['org-openroadm-network-topology:'
816 'tx-ttp-attributes']['used-wavelengths'])
817 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
818 ele['org-openroadm-network-topology:'
819 'tx-ttp-attributes']['used-wavelengths'])
822 # creation service test on a non-available resource
823 def test_28_create_eth_service3(self):
824 url = ("{}/operations/org-openroadm-service:service-create"
825 .format(test_utils.RESTCONF_BASE_URL))
828 "sdnc-request-header": {
829 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
830 "rpc-action": "service-create",
831 "request-system-id": "appname",
833 "http://localhost:8585/NotificationServer/notify"
835 "service-name": "service3",
836 "common-id": "ASATT1234567",
837 "connection-type": "service",
839 "service-rate": "100",
840 "node-id": "XPDRA01",
841 "service-format": "Ethernet",
842 "clli": "SNJSCAMCJP8",
846 "ROUTER_SNJSCAMCJP8_000000.00_00",
847 "port-type": "router",
848 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
849 "port-rack": "000000.00",
854 "LGX Panel_SNJSCAMCJP8_000000.00_00",
855 "lgx-port-name": "LGX Back.3",
856 "lgx-port-rack": "000000.00",
857 "lgx-port-shelf": "00"
863 "ROUTER_SNJSCAMCJP8_000000.00_00",
864 "port-type": "router",
865 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
866 "port-rack": "000000.00",
871 "LGX Panel_SNJSCAMCJP8_000000.00_00",
872 "lgx-port-name": "LGX Back.4",
873 "lgx-port-rack": "000000.00",
874 "lgx-port-shelf": "00"
880 "service-rate": "100",
881 "node-id": "XPDRC01",
882 "service-format": "Ethernet",
883 "clli": "SNJSCAMCJT4",
887 "ROUTER_SNJSCAMCJT4_000000.00_00",
888 "port-type": "router",
889 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
890 "port-rack": "000000.00",
895 "LGX Panel_SNJSCAMCJT4_000000.00_00",
896 "lgx-port-name": "LGX Back.29",
897 "lgx-port-rack": "000000.00",
898 "lgx-port-shelf": "00"
904 "ROUTER_SNJSCAMCJT4_000000.00_00",
905 "port-type": "router",
906 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
907 "port-rack": "000000.00",
912 "LGX Panel_SNJSCAMCJT4_000000.00_00",
913 "lgx-port-name": "LGX Back.30",
914 "lgx-port-rack": "000000.00",
915 "lgx-port-shelf": "00"
920 "due-date": "2016-11-28T00:00:01Z",
921 "operator-contact": "pw1234"
924 response = requests.request(
925 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
926 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
927 self.assertEqual(response.status_code, requests.codes.ok)
928 res = response.json()
929 self.assertIn('PCE calculation in progress',
930 res['output']['configuration-response-common'][
932 self.assertIn('200', res['output']['configuration-response-common'][
934 time.sleep(self.WAITING)
936 # add a test that check the openroadm-service-list still only
937 # contains 2 elements
939 def test_29_delete_eth_service3(self):
940 url = ("{}/operations/org-openroadm-service:service-delete"
941 .format(test_utils.RESTCONF_BASE_URL))
943 "sdnc-request-header": {
944 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
945 "rpc-action": "service-delete",
946 "request-system-id": "appname",
948 "http://localhost:8585/NotificationServer/notify"
950 "service-delete-req-info": {
951 "service-name": "service3",
952 "tail-retention": "no"
956 response = requests.request(
957 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
958 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
959 self.assertEqual(response.status_code, requests.codes.ok)
960 res = response.json()
961 self.assertIn('Service \'service3\' does not exist in datastore',
962 res['output']['configuration-response-common'][
964 self.assertIn('500', res['output']['configuration-response-common'][
968 def test_30_delete_eth_service1(self):
969 url = ("{}/operations/org-openroadm-service:service-delete"
970 .format(test_utils.RESTCONF_BASE_URL))
972 "sdnc-request-header": {
973 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
974 "rpc-action": "service-delete",
975 "request-system-id": "appname",
977 "http://localhost:8585/NotificationServer/notify"
979 "service-delete-req-info": {
980 "service-name": "service1",
981 "tail-retention": "no"
985 response = requests.request(
986 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
987 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
988 self.assertEqual(response.status_code, requests.codes.ok)
989 res = response.json()
990 self.assertIn('Renderer service delete in progress',
991 res['output']['configuration-response-common'][
995 def test_31_delete_eth_service2(self):
996 url = ("{}/operations/org-openroadm-service:service-delete"
997 .format(test_utils.RESTCONF_BASE_URL))
999 "sdnc-request-header": {
1000 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1001 "rpc-action": "service-delete",
1002 "request-system-id": "appname",
1004 "http://localhost:8585/NotificationServer/notify"
1006 "service-delete-req-info": {
1007 "service-name": "service2",
1008 "tail-retention": "no"
1012 response = requests.request(
1013 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1014 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1015 self.assertEqual(response.status_code, requests.codes.ok)
1016 res = response.json()
1017 self.assertIn('Renderer service delete in progress',
1018 res['output']['configuration-response-common'][
1019 'response-message'])
1022 def test_32_check_no_xc_ROADMA(self):
1024 "{}/config/network-topology:"
1025 "network-topology/topology/topology-netconf/"
1026 "node/ROADMA01/yang-ext:"
1027 "mount/org-openroadm-device:org-openroadm-device/"
1028 .format(test_utils.RESTCONF_BASE_URL))
1029 response = requests.request(
1030 "GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1031 res = response.json()
1032 self.assertEqual(response.status_code, requests.codes.ok)
1033 self.assertNotIn('roadm-connections',
1034 dict.keys(res['org-openroadm-device']))
1037 def test_33_check_topo_XPDRA(self):
1039 "{}/config/ietf-network:"
1040 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
1041 .format(test_utils.RESTCONF_BASE_URL))
1042 response = requests.request(
1043 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1044 self.assertEqual(response.status_code, requests.codes.ok)
1045 res = response.json()
1046 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1047 for ele in liste_tp:
1048 if ((ele[u'org-openroadm-common-network:tp-type'] ==
1050 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
1051 'tp-id'] == 'XPDR1-CLIENT3')):
1053 'org-openroadm-network-topology:xpdr-client-attributes',
1055 elif (ele[u'org-openroadm-common-network:tp-type'] ==
1057 self.assertIn(u'tail-equipment-id', dict.keys(
1058 ele[u'org-openroadm-network-topology:'
1059 u'xpdr-network-attributes']))
1060 self.assertNotIn('wavelength', dict.keys(
1061 ele[u'org-openroadm-network-topology:'
1062 u'xpdr-network-attributes']))
1065 def test_34_check_topo_ROADMA_SRG1(self):
1067 "{}/config/ietf-network:"
1068 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
1069 .format(test_utils.RESTCONF_BASE_URL))
1070 response = requests.request(
1071 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1072 self.assertEqual(response.status_code, requests.codes.ok)
1073 res = response.json()
1074 self.assertIn({u'index': 1}, res['node'][0][
1075 u'org-openroadm-network-topology:srg-attributes'][
1076 'available-wavelengths'])
1077 self.assertIn({u'index': 2}, res['node'][0][
1078 u'org-openroadm-network-topology:srg-attributes'][
1079 'available-wavelengths'])
1080 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1081 for ele in liste_tp:
1082 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
1083 ele['tp-id'] == 'SRG1-PP1-TXRX':
1084 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
1087 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
1091 def test_35_check_topo_ROADMA_DEG1(self):
1093 "{}/config/ietf-network:"
1094 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
1095 .format(test_utils.RESTCONF_BASE_URL))
1096 response = requests.request(
1097 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1098 self.assertEqual(response.status_code, requests.codes.ok)
1099 res = response.json()
1100 self.assertIn({u'index': 1}, res['node'][0][
1101 u'org-openroadm-network-topology:degree-attributes'][
1102 'available-wavelengths'])
1103 self.assertIn({u'index': 2}, res['node'][0][
1104 u'org-openroadm-network-topology:degree-attributes'][
1105 'available-wavelengths'])
1106 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1107 for ele in liste_tp:
1108 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1109 self.assertNotIn('org-openroadm-network-topology:'
1110 'ctp-attributes', dict.keys(ele))
1111 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1112 self.assertNotIn('org-openroadm-network-topology:'
1113 'tx-ttp-attributes', dict.keys(ele))
1116 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1117 def test_36_create_oc_service1(self):
1118 url = ("{}/operations/org-openroadm-service:service-create"
1119 .format(test_utils.RESTCONF_BASE_URL))
1122 "sdnc-request-header": {
1123 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1124 "rpc-action": "service-create",
1125 "request-system-id": "appname",
1127 "http://localhost:8585/NotificationServer/notify"
1129 "service-name": "service1",
1130 "common-id": "ASATT1234567",
1131 "connection-type": "roadm-line",
1133 "service-rate": "100",
1134 "node-id": "ROADMA01",
1135 "service-format": "OC",
1136 "clli": "SNJSCAMCJP8",
1140 "ROUTER_SNJSCAMCJP8_000000.00_00",
1141 "port-type": "router",
1142 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1143 "port-rack": "000000.00",
1148 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1149 "lgx-port-name": "LGX Back.3",
1150 "lgx-port-rack": "000000.00",
1151 "lgx-port-shelf": "00"
1157 "ROUTER_SNJSCAMCJP8_000000.00_00",
1158 "port-type": "router",
1159 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1160 "port-rack": "000000.00",
1165 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1166 "lgx-port-name": "LGX Back.4",
1167 "lgx-port-rack": "000000.00",
1168 "lgx-port-shelf": "00"
1171 "optic-type": "gray"
1174 "service-rate": "100",
1175 "node-id": "ROADMC01",
1176 "service-format": "OC",
1177 "clli": "SNJSCAMCJT4",
1181 "ROUTER_SNJSCAMCJT4_000000.00_00",
1182 "port-type": "router",
1183 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1184 "port-rack": "000000.00",
1189 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1190 "lgx-port-name": "LGX Back.29",
1191 "lgx-port-rack": "000000.00",
1192 "lgx-port-shelf": "00"
1198 "ROUTER_SNJSCAMCJT4_000000.00_00",
1199 "port-type": "router",
1200 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1201 "port-rack": "000000.00",
1206 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1207 "lgx-port-name": "LGX Back.30",
1208 "lgx-port-rack": "000000.00",
1209 "lgx-port-shelf": "00"
1212 "optic-type": "gray"
1214 "due-date": "2016-11-28T00:00:01Z",
1215 "operator-contact": "pw1234"
1218 response = requests.request(
1219 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1220 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1221 self.assertEqual(response.status_code, requests.codes.ok)
1222 res = response.json()
1223 self.assertIn('PCE calculation in progress',
1224 res['output']['configuration-response-common'][
1225 'response-message'])
1226 time.sleep(self.WAITING)
1228 def test_37_get_oc_service1(self):
1229 url = ("{}/operational/org-openroadm-service:"
1230 "service-list/services/service1"
1231 .format(test_utils.RESTCONF_BASE_URL))
1232 response = requests.request(
1233 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1234 self.assertEqual(response.status_code, requests.codes.ok)
1235 res = response.json()
1237 res['services'][0]['administrative-state'],
1240 res['services'][0]['service-name'], 'service1')
1242 res['services'][0]['connection-type'], 'roadm-line')
1244 res['services'][0]['lifecycle-state'], 'planned')
1247 def test_38_check_xc1_ROADMA(self):
1249 "{}/config/network-topology:"
1250 "network-topology/topology/topology-netconf/"
1251 "node/ROADMA01/yang-ext:"
1252 "mount/org-openroadm-device:org-openroadm-device/"
1253 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1254 .format(test_utils.RESTCONF_BASE_URL))
1255 response = requests.request(
1256 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1257 self.assertEqual(response.status_code, requests.codes.ok)
1258 res = response.json()
1259 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1260 self.assertDictEqual(
1262 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1263 'wavelength-number': 1,
1264 'opticalControlMode': 'gainLoss',
1265 'target-output-power': -3.0
1266 }, **res['roadm-connections'][0]),
1267 res['roadm-connections'][0]
1269 self.assertDictEqual(
1270 {'src-if': 'SRG1-PP1-TXRX-1'},
1271 res['roadm-connections'][0]['source'])
1272 self.assertDictEqual(
1273 {'dst-if': 'DEG1-TTP-TXRX-1'},
1274 res['roadm-connections'][0]['destination'])
1277 def test_39_check_xc1_ROADMC(self):
1279 "{}/config/network-topology:"
1280 "network-topology/topology/topology-netconf/"
1281 "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
1282 "org-openroadm-device/"
1283 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1284 .format(test_utils.RESTCONF_BASE_URL))
1285 response = requests.request(
1286 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1287 self.assertEqual(response.status_code, requests.codes.ok)
1288 res = response.json()
1289 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1290 self.assertDictEqual(
1292 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1293 'wavelength-number': 1,
1294 'opticalControlMode': 'gainLoss',
1295 'target-output-power': 2.0
1296 }, **res['roadm-connections'][0]),
1297 res['roadm-connections'][0]
1299 self.assertDictEqual(
1300 {'src-if': 'SRG1-PP1-TXRX-1'},
1301 res['roadm-connections'][0]['source'])
1302 self.assertDictEqual(
1303 {'dst-if': 'DEG2-TTP-TXRX-1'},
1304 res['roadm-connections'][0]['destination'])
1307 def test_40_create_oc_service2(self):
1308 url = ("{}/operations/org-openroadm-service:service-create"
1309 .format(test_utils.RESTCONF_BASE_URL))
1312 "sdnc-request-header": {
1313 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1314 "rpc-action": "service-create",
1315 "request-system-id": "appname",
1317 "http://localhost:8585/NotificationServer/notify"
1319 "service-name": "service2",
1320 "common-id": "ASATT1234567",
1321 "connection-type": "roadm-line",
1323 "service-rate": "100",
1324 "node-id": "ROADMA01",
1325 "service-format": "OC",
1326 "clli": "SNJSCAMCJP8",
1330 "ROUTER_SNJSCAMCJP8_000000.00_00",
1331 "port-type": "router",
1332 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1333 "port-rack": "000000.00",
1338 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1339 "lgx-port-name": "LGX Back.3",
1340 "lgx-port-rack": "000000.00",
1341 "lgx-port-shelf": "00"
1347 "ROUTER_SNJSCAMCJP8_000000.00_00",
1348 "port-type": "router",
1349 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1350 "port-rack": "000000.00",
1355 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1356 "lgx-port-name": "LGX Back.4",
1357 "lgx-port-rack": "000000.00",
1358 "lgx-port-shelf": "00"
1361 "optic-type": "gray"
1364 "service-rate": "100",
1365 "node-id": "ROADMC01",
1366 "service-format": "OC",
1367 "clli": "SNJSCAMCJT4",
1371 "ROUTER_SNJSCAMCJT4_000000.00_00",
1372 "port-type": "router",
1373 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1374 "port-rack": "000000.00",
1379 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1380 "lgx-port-name": "LGX Back.29",
1381 "lgx-port-rack": "000000.00",
1382 "lgx-port-shelf": "00"
1388 "ROUTER_SNJSCAMCJT4_000000.00_00",
1389 "port-type": "router",
1390 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1391 "port-rack": "000000.00",
1396 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1397 "lgx-port-name": "LGX Back.30",
1398 "lgx-port-rack": "000000.00",
1399 "lgx-port-shelf": "00"
1402 "optic-type": "gray"
1404 "due-date": "2016-11-28T00:00:01Z",
1405 "operator-contact": "pw1234"
1408 response = requests.request(
1409 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1410 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1411 self.assertEqual(response.status_code, requests.codes.ok)
1412 res = response.json()
1413 self.assertIn('PCE calculation in progress',
1414 res['output']['configuration-response-common'][
1415 'response-message'])
1416 time.sleep(self.WAITING)
1418 def test_41_get_oc_service2(self):
1419 url = ("{}/operational/org-openroadm-service:"
1420 "service-list/services/service2"
1421 .format(test_utils.RESTCONF_BASE_URL))
1422 response = requests.request(
1423 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1424 self.assertEqual(response.status_code, requests.codes.ok)
1425 res = response.json()
1427 res['services'][0]['administrative-state'],
1430 res['services'][0]['service-name'], 'service2')
1432 res['services'][0]['connection-type'], 'roadm-line')
1434 res['services'][0]['lifecycle-state'], 'planned')
1437 def test_42_check_xc2_ROADMA(self):
1439 "{}/config/network-topology:"
1440 "network-topology/topology/topology-netconf/"
1441 "node/ROADMA01/yang-ext:mount/org-openroadm-device:"
1442 "org-openroadm-device/"
1443 "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2"
1444 .format(test_utils.RESTCONF_BASE_URL))
1445 response = requests.request(
1446 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1447 self.assertEqual(response.status_code, requests.codes.ok)
1448 res = response.json()
1449 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1450 self.assertDictEqual(
1452 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
1453 'wavelength-number': 2,
1454 'opticalControlMode': 'gainLoss',
1455 'target-output-power': -3.0
1456 }, **res['roadm-connections'][0]),
1457 res['roadm-connections'][0]
1459 self.assertDictEqual(
1460 {'src-if': 'SRG1-PP2-TXRX-2'},
1461 res['roadm-connections'][0]['source'])
1462 self.assertDictEqual(
1463 {'dst-if': 'DEG1-TTP-TXRX-2'},
1464 res['roadm-connections'][0]['destination'])
1467 def test_43_check_topo_ROADMA(self):
1468 self.test_26_check_topo_ROADMA_SRG1()
1469 self.test_27_check_topo_ROADMA_DEG1()
1472 def test_44_delete_oc_service1(self):
1473 url = ("{}/operations/org-openroadm-service:service-delete"
1474 .format(test_utils.RESTCONF_BASE_URL))
1476 "sdnc-request-header": {
1477 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1478 "rpc-action": "service-delete",
1479 "request-system-id": "appname",
1481 "http://localhost:8585/NotificationServer/notify"
1483 "service-delete-req-info": {
1484 "service-name": "service1",
1485 "tail-retention": "no"
1489 response = requests.request(
1490 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1491 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1492 self.assertEqual(response.status_code, requests.codes.ok)
1493 res = response.json()
1494 self.assertIn('Renderer service delete in progress',
1495 res['output']['configuration-response-common'][
1496 'response-message'])
1499 def test_45_delete_oc_service2(self):
1500 url = ("{}/operations/org-openroadm-service:service-delete"
1501 .format(test_utils.RESTCONF_BASE_URL))
1503 "sdnc-request-header": {
1504 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1505 "rpc-action": "service-delete",
1506 "request-system-id": "appname",
1508 "http://localhost:8585/NotificationServer/notify"
1510 "service-delete-req-info": {
1511 "service-name": "service2",
1512 "tail-retention": "no"
1516 response = requests.request(
1517 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1518 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1519 self.assertEqual(response.status_code, requests.codes.ok)
1520 res = response.json()
1521 self.assertIn('Renderer service delete in progress',
1522 res['output']['configuration-response-common'][
1523 'response-message'])
1526 def test_46_get_no_oc_services(self):
1528 url = ("{}/operational/org-openroadm-service:service-list"
1529 .format(test_utils.RESTCONF_BASE_URL))
1530 response = requests.request(
1531 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1532 self.assertEqual(response.status_code, requests.codes.not_found)
1533 res = response.json()
1536 "error-type": "application",
1537 "error-tag": "data-missing",
1539 "Request could not be completed because the relevant data "
1540 "model content does not exist"
1542 res['errors']['error'])
1545 def test_47_get_no_xc_ROADMA(self):
1547 "{}/config/network-topology:"
1548 "network-topology/topology/topology-netconf"
1549 "/node/ROADMA01/yang-ext:mount/org-openroadm-device:"
1550 "org-openroadm-device/"
1551 .format(test_utils.RESTCONF_BASE_URL))
1552 response = requests.request(
1553 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1554 self.assertEqual(response.status_code, requests.codes.ok)
1555 res = response.json()
1556 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1559 def test_48_check_topo_ROADMA(self):
1560 self.test_34_check_topo_ROADMA_SRG1()
1561 self.test_35_check_topo_ROADMA_DEG1()
1563 def test_49_loop_create_eth_service(self):
1564 for i in range(1, 6):
1565 print("iteration number {}".format(i))
1566 print("eth service creation")
1567 self.test_11_create_eth_service1()
1568 print("check xc in ROADMA01")
1569 self.test_13_check_xc1_ROADMA()
1570 print("check xc in ROADMC01")
1571 self.test_14_check_xc1_ROADMC()
1572 print("eth service deletion\n")
1573 self.test_30_delete_eth_service1()
1575 def test_50_loop_create_oc_service(self):
1576 url = ("{}/operational/org-openroadm-service:"
1577 "service-list/services/service1"
1578 .format(test_utils.RESTCONF_BASE_URL))
1579 response = requests.request("GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1580 if response.status_code != 404:
1581 url = ("{}/operations/org-openroadm-service:service-delete"
1582 .format(test_utils.RESTCONF_BASE_URL))
1584 "sdnc-request-header": {
1585 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1586 "rpc-action": "service-delete",
1587 "request-system-id": "appname",
1589 "http://localhost:8585/NotificationServer/notify"
1591 "service-delete-req-info": {
1592 "service-name": "service1",
1593 "tail-retention": "no"
1597 requests.request("POST", url, data=json.dumps(data),
1598 headers=test_utils.TYPE_APPLICATION_JSON,
1599 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD)
1603 for i in range(1, 6):
1604 print("iteration number {}".format(i))
1605 print("oc service creation")
1606 self.test_36_create_oc_service1()
1607 print("check xc in ROADMA01")
1608 self.test_38_check_xc1_ROADMA()
1609 print("check xc in ROADMC01")
1610 self.test_39_check_xc1_ROADMC()
1611 print("oc service deletion\n")
1612 self.test_44_delete_oc_service1()
1614 def test_51_disconnect_XPDRA(self):
1615 response = test_utils.unmount_device("XPDRA01")
1616 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1618 def test_52_disconnect_XPDRC(self):
1619 response = test_utils.unmount_device("XPDRC01")
1620 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1622 def test_53_disconnect_ROADMA(self):
1623 response = test_utils.unmount_device("ROADMA01")
1624 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1626 def test_54_disconnect_ROADMC(self):
1627 response = test_utils.unmount_device("ROADMC01")
1628 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1631 if __name__ == "__main__":
1632 unittest.main(verbosity=2)