2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
19 from common import test_utils
22 class TransportPCEFulltesting(unittest.TestCase):
24 WAITING = 20 # nominal value is 300
30 cls.processes = test_utils.start_tpce()
31 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
34 def tearDownClass(cls):
35 for process in cls.processes:
36 test_utils.shutdown_process(process)
37 print("all processes killed")
39 def setUp(self): # instruction executed before each test method
40 print("execution of {}".format(self.id().split(".")[-1]))
42 # connect netconf devices
43 def test_01_connect_xpdrA(self):
44 response = test_utils.mount_device("XPDRA01", 'xpdra')
45 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
47 def test_02_connect_xpdrC(self):
48 response = test_utils.mount_device("XPDRC01", 'xpdrc')
49 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
51 def test_03_connect_rdmA(self):
52 response = test_utils.mount_device("ROADMA01", 'roadma-full')
53 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
55 def test_04_connect_rdmC(self):
56 response = test_utils.mount_device("ROADMC01", 'roadmc-full')
57 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
59 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
60 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
61 format(test_utils.RESTCONF_BASE_URL)
63 "networkutils:input": {
64 "networkutils:links-input": {
65 "networkutils:xpdr-node": "XPDRA01",
66 "networkutils:xpdr-num": "1",
67 "networkutils:network-num": "1",
68 "networkutils:rdm-node": "ROADMA01",
69 "networkutils:srg-num": "1",
70 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
74 response = requests.request(
75 "POST", url, data=json.dumps(data),
76 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
77 self.assertEqual(response.status_code, requests.codes.ok)
79 self.assertIn('Xponder Roadm Link created successfully',
80 res["output"]["result"])
83 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
84 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
85 format(test_utils.RESTCONF_BASE_URL)
87 "networkutils:input": {
88 "networkutils:links-input": {
89 "networkutils:xpdr-node": "XPDRA01",
90 "networkutils:xpdr-num": "1",
91 "networkutils:network-num": "1",
92 "networkutils:rdm-node": "ROADMA01",
93 "networkutils:srg-num": "1",
94 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
98 response = requests.request(
99 "POST", url, data=json.dumps(data),
100 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
101 self.assertEqual(response.status_code, requests.codes.ok)
102 res = response.json()
103 self.assertIn('Roadm Xponder links created successfully',
104 res["output"]["result"])
107 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
108 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
109 format(test_utils.RESTCONF_BASE_URL)
111 "networkutils:input": {
112 "networkutils:links-input": {
113 "networkutils:xpdr-node": "XPDRC01",
114 "networkutils:xpdr-num": "1",
115 "networkutils:network-num": "1",
116 "networkutils:rdm-node": "ROADMC01",
117 "networkutils:srg-num": "1",
118 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
122 response = requests.request(
123 "POST", url, data=json.dumps(data),
124 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
125 self.assertEqual(response.status_code, requests.codes.ok)
126 res = response.json()
127 self.assertIn('Xponder Roadm Link created successfully',
128 res["output"]["result"])
131 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
132 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
133 format(test_utils.RESTCONF_BASE_URL)
135 "networkutils:input": {
136 "networkutils:links-input": {
137 "networkutils:xpdr-node": "XPDRC01",
138 "networkutils:xpdr-num": "1",
139 "networkutils:network-num": "1",
140 "networkutils:rdm-node": "ROADMC01",
141 "networkutils:srg-num": "1",
142 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
146 response = requests.request(
147 "POST", url, data=json.dumps(data),
148 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
149 self.assertEqual(response.status_code, requests.codes.ok)
150 res = response.json()
151 self.assertIn('Roadm Xponder links created successfully',
152 res["output"]["result"])
155 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
156 # Config ROADMA-ROADMC oms-attributes
158 "{}/config/ietf-network:"
159 "networks/network/openroadm-topology/ietf-network-topology:"
160 "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/"
161 "org-openroadm-network-topology:"
162 "OMS-attributes/span"
163 .format(test_utils.RESTCONF_BASE_URL))
166 "auto-spanloss": "true",
167 "spanloss-base": 11.4,
168 "spanloss-current": 12,
169 "engineered-spanloss": 12.2,
170 "link-concatenation": [{
173 "SRLG-length": 100000,
175 response = requests.request(
176 "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
177 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
178 self.assertEqual(response.status_code, requests.codes.created)
180 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
181 # Config ROADMC-ROADMA oms-attributes
183 "{}/config/ietf-network:"
184 "networks/network/openroadm-topology/ietf-network-topology:"
185 "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/"
186 "org-openroadm-network-topology:"
187 "OMS-attributes/span"
188 .format(test_utils.RESTCONF_BASE_URL))
191 "auto-spanloss": "true",
192 "spanloss-base": 11.4,
193 "spanloss-current": 12,
194 "engineered-spanloss": 12.2,
195 "link-concatenation": [{
198 "SRLG-length": 100000,
200 response = requests.request(
201 "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
202 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
203 self.assertEqual(response.status_code, requests.codes.created)
205 # test service-create for Eth service from xpdr to xpdr
206 def test_11_create_eth_service1(self):
207 url = ("{}/operations/org-openroadm-service:service-create"
208 .format(test_utils.RESTCONF_BASE_URL))
211 "sdnc-request-header": {
212 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
213 "rpc-action": "service-create",
214 "request-system-id": "appname",
216 "http://localhost:8585/NotificationServer/notify"
218 "service-name": "service1",
219 "common-id": "ASATT1234567",
220 "connection-type": "service",
222 "service-rate": "100",
223 "node-id": "XPDRA01",
224 "service-format": "Ethernet",
225 "clli": "SNJSCAMCJP8",
229 "ROUTER_SNJSCAMCJP8_000000.00_00",
230 "port-type": "router",
231 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
232 "port-rack": "000000.00",
237 "LGX Panel_SNJSCAMCJP8_000000.00_00",
238 "lgx-port-name": "LGX Back.3",
239 "lgx-port-rack": "000000.00",
240 "lgx-port-shelf": "00"
246 "ROUTER_SNJSCAMCJP8_000000.00_00",
247 "port-type": "router",
248 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
249 "port-rack": "000000.00",
254 "LGX Panel_SNJSCAMCJP8_000000.00_00",
255 "lgx-port-name": "LGX Back.4",
256 "lgx-port-rack": "000000.00",
257 "lgx-port-shelf": "00"
263 "service-rate": "100",
264 "node-id": "XPDRC01",
265 "service-format": "Ethernet",
266 "clli": "SNJSCAMCJT4",
270 "ROUTER_SNJSCAMCJT4_000000.00_00",
271 "port-type": "router",
272 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
273 "port-rack": "000000.00",
278 "LGX Panel_SNJSCAMCJT4_000000.00_00",
279 "lgx-port-name": "LGX Back.29",
280 "lgx-port-rack": "000000.00",
281 "lgx-port-shelf": "00"
287 "ROUTER_SNJSCAMCJT4_000000.00_00",
288 "port-type": "router",
289 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
290 "port-rack": "000000.00",
295 "LGX Panel_SNJSCAMCJT4_000000.00_00",
296 "lgx-port-name": "LGX Back.30",
297 "lgx-port-rack": "000000.00",
298 "lgx-port-shelf": "00"
303 "due-date": "2016-11-28T00:00:01Z",
304 "operator-contact": "pw1234"
307 response = requests.request(
308 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
309 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
310 self.assertEqual(response.status_code, requests.codes.ok)
311 res = response.json()
312 self.assertIn('PCE calculation in progress',
313 res['output']['configuration-response-common'][
315 time.sleep(self.WAITING)
317 def test_12_get_eth_service1(self):
318 url = ("{}/operational/org-openroadm-service:service-list/services/"
319 "service1".format(test_utils.RESTCONF_BASE_URL))
320 response = requests.request(
321 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
322 self.assertEqual(response.status_code, requests.codes.ok)
323 res = response.json()
325 res['services'][0]['administrative-state'],
328 res['services'][0]['service-name'], 'service1')
330 res['services'][0]['connection-type'], 'service')
332 res['services'][0]['lifecycle-state'], 'planned')
335 def test_13_check_xc1_ROADMA(self):
337 "{}/config/network-topology:"
338 "network-topology/topology/topology-netconf/"
339 "node/ROADMA01/yang-ext:"
340 "mount/org-openroadm-device:org-openroadm-device/"
341 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
342 .format(test_utils.RESTCONF_BASE_URL))
343 response = requests.request(
344 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
345 self.assertEqual(response.status_code, requests.codes.ok)
346 res = response.json()
347 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
348 self.assertDictEqual(
350 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
351 'wavelength-number': 1,
352 'opticalControlMode': 'gainLoss',
353 'target-output-power': -3.0
354 }, **res['roadm-connections'][0]),
355 res['roadm-connections'][0]
357 self.assertDictEqual(
358 {'src-if': 'SRG1-PP1-TXRX-1'},
359 res['roadm-connections'][0]['source'])
360 self.assertDictEqual(
361 {'dst-if': 'DEG1-TTP-TXRX-1'},
362 res['roadm-connections'][0]['destination'])
365 def test_14_check_xc1_ROADMC(self):
367 "{}/config/network-topology:"
368 "network-topology/topology/topology-netconf/"
369 "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
370 "org-openroadm-device/"
371 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
372 .format(test_utils.RESTCONF_BASE_URL))
373 response = requests.request(
374 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
375 self.assertEqual(response.status_code, requests.codes.ok)
376 res = response.json()
377 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
378 self.assertDictEqual(
380 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
381 'wavelength-number': 1,
382 'opticalControlMode': 'gainLoss',
383 'target-output-power': 2.0
384 }, **res['roadm-connections'][0]),
385 res['roadm-connections'][0]
387 self.assertDictEqual(
388 {'src-if': 'SRG1-PP1-TXRX-1'},
389 res['roadm-connections'][0]['source'])
390 self.assertDictEqual(
391 {'dst-if': 'DEG2-TTP-TXRX-1'},
392 res['roadm-connections'][0]['destination'])
395 def test_15_check_topo_XPDRA(self):
397 "{}/config/ietf-network:"
398 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
399 .format(test_utils.RESTCONF_BASE_URL))
400 response = requests.request(
401 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
406 if ele['tp-id'] == 'XPDR1-NETWORK1':
407 self.assertEqual({u'frequency': 196.1, u'width': 40},
409 'org-openroadm-network-topology:'
410 'xpdr-network-attributes'][
412 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
413 ele['tp-id'] == 'XPDR1-CLIENT3':
415 'org-openroadm-network-topology:xpdr-client-attributes',
417 if ele['tp-id'] == 'XPDR1-NETWORK2':
419 'org-openroadm-network-topology:xpdr-network-attributes',
423 def test_16_check_topo_ROADMA_SRG1(self):
425 "{}/config/ietf-network:"
426 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
427 .format(test_utils.RESTCONF_BASE_URL))
428 response = requests.request(
429 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
430 self.assertEqual(response.status_code, requests.codes.ok)
431 res = response.json()
432 self.assertNotIn({u'index': 1},
434 u'org-openroadm-network-topology:srg-attributes'][
435 'available-wavelengths'])
436 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
438 if ele['tp-id'] == 'SRG1-PP1-TXRX':
439 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
440 ele['org-openroadm-network-topology:'
441 'pp-attributes']['used-wavelength']
443 if ele['tp-id'] == 'SRG1-PP2-TXRX':
444 self.assertNotIn('used-wavelength', dict.keys(ele))
447 def test_17_check_topo_ROADMA_DEG1(self):
449 "{}/config/ietf-network:"
450 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
451 .format(test_utils.RESTCONF_BASE_URL))
452 response = requests.request(
453 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
454 self.assertEqual(response.status_code, requests.codes.ok)
455 res = response.json()
456 self.assertNotIn({u'index': 1},
458 u'org-openroadm-network-topology:'
459 u'degree-attributes'][
460 'available-wavelengths'])
461 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
463 if ele['tp-id'] == 'DEG1-CTP-TXRX':
464 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
465 ele['org-openroadm-network-topology:'
468 if ele['tp-id'] == 'DEG1-TTP-TXRX':
469 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
470 ele['org-openroadm-network-topology:'
471 'tx-ttp-attributes'][
475 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
476 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
477 format(test_utils.RESTCONF_BASE_URL)
479 "networkutils:input": {
480 "networkutils:links-input": {
481 "networkutils:xpdr-node": "XPDRA01",
482 "networkutils:xpdr-num": "1",
483 "networkutils:network-num": "2",
484 "networkutils:rdm-node": "ROADMA01",
485 "networkutils:srg-num": "1",
486 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
490 response = requests.request(
491 "POST", url, data=json.dumps(data),
492 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
493 self.assertEqual(response.status_code, requests.codes.ok)
494 res = response.json()
495 self.assertIn('Xponder Roadm Link created successfully',
496 res["output"]["result"])
499 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
500 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
501 format(test_utils.RESTCONF_BASE_URL)
503 "networkutils:input": {
504 "networkutils:links-input": {
505 "networkutils:xpdr-node": "XPDRA01",
506 "networkutils:xpdr-num": "1",
507 "networkutils:network-num": "2",
508 "networkutils:rdm-node": "ROADMA01",
509 "networkutils:srg-num": "1",
510 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
514 response = requests.request(
515 "POST", url, data=json.dumps(data),
516 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
517 self.assertEqual(response.status_code, requests.codes.ok)
518 res = response.json()
519 self.assertIn('Roadm Xponder links created successfully',
520 res["output"]["result"])
523 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
524 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
525 format(test_utils.RESTCONF_BASE_URL)
527 "networkutils:input": {
528 "networkutils:links-input": {
529 "networkutils:xpdr-node": "XPDRC01",
530 "networkutils:xpdr-num": "1",
531 "networkutils:network-num": "2",
532 "networkutils:rdm-node": "ROADMC01",
533 "networkutils:srg-num": "1",
534 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
538 response = requests.request(
539 "POST", url, data=json.dumps(data),
540 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
541 self.assertEqual(response.status_code, requests.codes.ok)
542 res = response.json()
543 self.assertIn('Xponder Roadm Link created successfully',
544 res["output"]["result"])
547 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
548 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
549 format(test_utils.RESTCONF_BASE_URL)
551 "networkutils:input": {
552 "networkutils:links-input": {
553 "networkutils:xpdr-node": "XPDRC01",
554 "networkutils:xpdr-num": "1",
555 "networkutils:network-num": "2",
556 "networkutils:rdm-node": "ROADMC01",
557 "networkutils:srg-num": "1",
558 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
562 response = requests.request(
563 "POST", url, data=json.dumps(data),
564 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
565 self.assertEqual(response.status_code, requests.codes.ok)
566 res = response.json()
567 self.assertIn('Roadm Xponder links created successfully',
568 res["output"]["result"])
571 def test_22_create_eth_service2(self):
572 url = ("{}/operations/org-openroadm-service:service-create"
573 .format(test_utils.RESTCONF_BASE_URL))
576 "sdnc-request-header": {
577 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
578 "rpc-action": "service-create",
579 "request-system-id": "appname",
581 "http://localhost:8585/NotificationServer/notify"
583 "service-name": "service2",
584 "common-id": "ASATT1234567",
585 "connection-type": "service",
587 "service-rate": "100",
588 "node-id": "XPDRA01",
589 "service-format": "Ethernet",
590 "clli": "SNJSCAMCJP8",
594 "ROUTER_SNJSCAMCJP8_000000.00_00",
595 "port-type": "router",
596 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
597 "port-rack": "000000.00",
602 "LGX Panel_SNJSCAMCJP8_000000.00_00",
603 "lgx-port-name": "LGX Back.3",
604 "lgx-port-rack": "000000.00",
605 "lgx-port-shelf": "00"
611 "ROUTER_SNJSCAMCJP8_000000.00_00",
612 "port-type": "router",
613 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
614 "port-rack": "000000.00",
619 "LGX Panel_SNJSCAMCJP8_000000.00_00",
620 "lgx-port-name": "LGX Back.4",
621 "lgx-port-rack": "000000.00",
622 "lgx-port-shelf": "00"
628 "service-rate": "100",
629 "node-id": "XPDRC01",
630 "service-format": "Ethernet",
631 "clli": "SNJSCAMCJT4",
635 "ROUTER_SNJSCAMCJT4_000000.00_00",
636 "port-type": "router",
637 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
638 "port-rack": "000000.00",
643 "LGX Panel_SNJSCAMCJT4_000000.00_00",
644 "lgx-port-name": "LGX Back.29",
645 "lgx-port-rack": "000000.00",
646 "lgx-port-shelf": "00"
652 "ROUTER_SNJSCAMCJT4_000000.00_00",
653 "port-type": "router",
654 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
655 "port-rack": "000000.00",
660 "LGX Panel_SNJSCAMCJT4_000000.00_00",
661 "lgx-port-name": "LGX Back.30",
662 "lgx-port-rack": "000000.00",
663 "lgx-port-shelf": "00"
668 "due-date": "2016-11-28T00:00:01Z",
669 "operator-contact": "pw1234"
672 response = requests.request(
673 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
674 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
675 self.assertEqual(response.status_code, requests.codes.ok)
676 res = response.json()
677 self.assertIn('PCE calculation in progress',
678 res['output']['configuration-response-common'][
680 time.sleep(self.WAITING)
682 def test_23_get_eth_service2(self):
683 url = ("{}/operational/org-openroadm-service:"
684 "service-list/services/service2"
685 .format(test_utils.RESTCONF_BASE_URL))
686 response = requests.request(
687 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
688 self.assertEqual(response.status_code, requests.codes.ok)
689 res = response.json()
691 res['services'][0]['administrative-state'],
694 res['services'][0]['service-name'], 'service2')
696 res['services'][0]['connection-type'], 'service')
698 res['services'][0]['lifecycle-state'], 'planned')
701 def test_24_check_xc2_ROADMA(self):
703 "{}/config/network-topology:"
704 "network-topology/topology/topology-netconf/"
705 "node/ROADMA01/yang-ext:"
706 "mount/org-openroadm-device:org-openroadm-device/"
707 "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2"
708 .format(test_utils.RESTCONF_BASE_URL))
709 response = requests.request(
710 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
711 self.assertEqual(response.status_code, requests.codes.ok)
712 res = response.json()
713 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
714 self.assertDictEqual(
716 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
717 'wavelength-number': 2,
718 'opticalControlMode': 'power'
719 }, **res['roadm-connections'][0]),
720 res['roadm-connections'][0]
722 self.assertDictEqual(
723 {'src-if': 'DEG1-TTP-TXRX-2'},
724 res['roadm-connections'][0]['source'])
725 self.assertDictEqual(
726 {'dst-if': 'SRG1-PP2-TXRX-2'},
727 res['roadm-connections'][0]['destination'])
729 def test_25_check_topo_XPDRA(self):
731 "{}/config/ietf-network:"
732 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
733 .format(test_utils.RESTCONF_BASE_URL))
734 response = requests.request(
735 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
736 self.assertEqual(response.status_code, requests.codes.ok)
737 res = response.json()
738 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
740 if ele['tp-id'] == 'XPDR1-NETWORK1':
741 self.assertEqual({u'frequency': 196.1, u'width': 40},
742 ele['org-openroadm-network-topology:'
743 'xpdr-network-attributes'][
745 if ele['tp-id'] == 'XPDR1-NETWORK2':
746 self.assertEqual({u'frequency': 196.05, u'width': 40},
747 ele['org-openroadm-network-topology:'
748 'xpdr-network-attributes'][
750 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
751 ele['tp-id'] == 'XPDR1-CLIENT3':
753 'org-openroadm-network-topology:xpdr-client-attributes',
757 def test_26_check_topo_ROADMA_SRG1(self):
759 "{}/config/ietf-network:"
760 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
761 .format(test_utils.RESTCONF_BASE_URL))
762 response = requests.request(
763 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
764 self.assertEqual(response.status_code, requests.codes.ok)
765 res = response.json()
766 self.assertNotIn({u'index': 1}, res['node'][0][
767 u'org-openroadm-network-topology:srg-attributes'][
768 'available-wavelengths'])
769 self.assertNotIn({u'index': 2}, res['node'][0][
770 u'org-openroadm-network-topology:srg-attributes'][
771 'available-wavelengths'])
772 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
774 if ele['tp-id'] == 'SRG1-PP1-TXRX':
775 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
776 ele['org-openroadm-network-topology:'
777 'pp-attributes']['used-wavelength'])
778 self.assertNotIn({u'index': 2, u'frequency': 196.05,
780 ele['org-openroadm-network-topology:'
781 'pp-attributes']['used-wavelength'])
782 if ele['tp-id'] == 'SRG1-PP2-TXRX':
783 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
784 ele['org-openroadm-network-topology:'
785 'pp-attributes']['used-wavelength'])
786 self.assertNotIn({u'index': 1, u'frequency': 196.1,
788 ele['org-openroadm-network-topology:'
789 'pp-attributes']['used-wavelength'])
790 if ele['tp-id'] == 'SRG1-PP3-TXRX':
791 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
795 def test_27_check_topo_ROADMA_DEG1(self):
797 "{}/config/ietf-network:"
798 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
799 .format(test_utils.RESTCONF_BASE_URL))
800 response = requests.request(
801 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
802 self.assertEqual(response.status_code, requests.codes.ok)
803 res = response.json()
804 self.assertNotIn({u'index': 1}, res['node'][0][
805 u'org-openroadm-network-topology:degree-attributes'][
806 'available-wavelengths'])
807 self.assertNotIn({u'index': 2}, res['node'][0][
808 u'org-openroadm-network-topology:degree-attributes'][
809 'available-wavelengths'])
810 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
812 if ele['tp-id'] == 'DEG1-CTP-TXRX':
813 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
814 ele['org-openroadm-network-topology:'
815 'ctp-attributes']['used-wavelengths'])
816 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
817 ele['org-openroadm-network-topology:'
818 'ctp-attributes']['used-wavelengths'])
819 if ele['tp-id'] == 'DEG1-TTP-TXRX':
820 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
821 ele['org-openroadm-network-topology:'
822 'tx-ttp-attributes']['used-wavelengths'])
823 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
824 ele['org-openroadm-network-topology:'
825 'tx-ttp-attributes']['used-wavelengths'])
828 # creation service test on a non-available resource
829 def test_28_create_eth_service3(self):
830 url = ("{}/operations/org-openroadm-service:service-create"
831 .format(test_utils.RESTCONF_BASE_URL))
834 "sdnc-request-header": {
835 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
836 "rpc-action": "service-create",
837 "request-system-id": "appname",
839 "http://localhost:8585/NotificationServer/notify"
841 "service-name": "service3",
842 "common-id": "ASATT1234567",
843 "connection-type": "service",
845 "service-rate": "100",
846 "node-id": "XPDRA01",
847 "service-format": "Ethernet",
848 "clli": "SNJSCAMCJP8",
852 "ROUTER_SNJSCAMCJP8_000000.00_00",
853 "port-type": "router",
854 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
855 "port-rack": "000000.00",
860 "LGX Panel_SNJSCAMCJP8_000000.00_00",
861 "lgx-port-name": "LGX Back.3",
862 "lgx-port-rack": "000000.00",
863 "lgx-port-shelf": "00"
869 "ROUTER_SNJSCAMCJP8_000000.00_00",
870 "port-type": "router",
871 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
872 "port-rack": "000000.00",
877 "LGX Panel_SNJSCAMCJP8_000000.00_00",
878 "lgx-port-name": "LGX Back.4",
879 "lgx-port-rack": "000000.00",
880 "lgx-port-shelf": "00"
886 "service-rate": "100",
887 "node-id": "XPDRC01",
888 "service-format": "Ethernet",
889 "clli": "SNJSCAMCJT4",
893 "ROUTER_SNJSCAMCJT4_000000.00_00",
894 "port-type": "router",
895 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
896 "port-rack": "000000.00",
901 "LGX Panel_SNJSCAMCJT4_000000.00_00",
902 "lgx-port-name": "LGX Back.29",
903 "lgx-port-rack": "000000.00",
904 "lgx-port-shelf": "00"
910 "ROUTER_SNJSCAMCJT4_000000.00_00",
911 "port-type": "router",
912 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
913 "port-rack": "000000.00",
918 "LGX Panel_SNJSCAMCJT4_000000.00_00",
919 "lgx-port-name": "LGX Back.30",
920 "lgx-port-rack": "000000.00",
921 "lgx-port-shelf": "00"
926 "due-date": "2016-11-28T00:00:01Z",
927 "operator-contact": "pw1234"
930 response = requests.request(
931 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
932 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
933 self.assertEqual(response.status_code, requests.codes.ok)
934 res = response.json()
935 self.assertIn('PCE calculation in progress',
936 res['output']['configuration-response-common'][
938 self.assertIn('200', res['output']['configuration-response-common'][
940 time.sleep(self.WAITING)
942 # add a test that check the openroadm-service-list still only
943 # contains 2 elements
945 def test_29_delete_eth_service3(self):
946 url = ("{}/operations/org-openroadm-service:service-delete"
947 .format(test_utils.RESTCONF_BASE_URL))
949 "sdnc-request-header": {
950 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
951 "rpc-action": "service-delete",
952 "request-system-id": "appname",
954 "http://localhost:8585/NotificationServer/notify"
956 "service-delete-req-info": {
957 "service-name": "service3",
958 "tail-retention": "no"
962 response = requests.request(
963 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
964 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
965 self.assertEqual(response.status_code, requests.codes.ok)
966 res = response.json()
967 self.assertIn('Service \'service3\' does not exist in datastore',
968 res['output']['configuration-response-common'][
970 self.assertIn('500', res['output']['configuration-response-common'][
974 def test_30_delete_eth_service1(self):
975 url = ("{}/operations/org-openroadm-service:service-delete"
976 .format(test_utils.RESTCONF_BASE_URL))
978 "sdnc-request-header": {
979 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
980 "rpc-action": "service-delete",
981 "request-system-id": "appname",
983 "http://localhost:8585/NotificationServer/notify"
985 "service-delete-req-info": {
986 "service-name": "service1",
987 "tail-retention": "no"
991 response = requests.request(
992 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
993 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
994 self.assertEqual(response.status_code, requests.codes.ok)
995 res = response.json()
996 self.assertIn('Renderer service delete in progress',
997 res['output']['configuration-response-common'][
1001 def test_31_delete_eth_service2(self):
1002 url = ("{}/operations/org-openroadm-service:service-delete"
1003 .format(test_utils.RESTCONF_BASE_URL))
1005 "sdnc-request-header": {
1006 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1007 "rpc-action": "service-delete",
1008 "request-system-id": "appname",
1010 "http://localhost:8585/NotificationServer/notify"
1012 "service-delete-req-info": {
1013 "service-name": "service2",
1014 "tail-retention": "no"
1018 response = requests.request(
1019 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1020 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1021 self.assertEqual(response.status_code, requests.codes.ok)
1022 res = response.json()
1023 self.assertIn('Renderer service delete in progress',
1024 res['output']['configuration-response-common'][
1025 'response-message'])
1028 def test_32_check_no_xc_ROADMA(self):
1030 "{}/config/network-topology:"
1031 "network-topology/topology/topology-netconf/"
1032 "node/ROADMA01/yang-ext:"
1033 "mount/org-openroadm-device:org-openroadm-device/"
1034 .format(test_utils.RESTCONF_BASE_URL))
1035 response = requests.request(
1036 "GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1037 res = response.json()
1038 self.assertEqual(response.status_code, requests.codes.ok)
1039 self.assertNotIn('roadm-connections',
1040 dict.keys(res['org-openroadm-device']))
1043 def test_33_check_topo_XPDRA(self):
1045 "{}/config/ietf-network:"
1046 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
1047 .format(test_utils.RESTCONF_BASE_URL))
1048 response = requests.request(
1049 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1050 self.assertEqual(response.status_code, requests.codes.ok)
1051 res = response.json()
1052 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1053 for ele in liste_tp:
1054 if ((ele[u'org-openroadm-common-network:tp-type'] ==
1056 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
1057 'tp-id'] == 'XPDR1-CLIENT3')):
1059 'org-openroadm-network-topology:xpdr-client-attributes',
1061 elif (ele[u'org-openroadm-common-network:tp-type'] ==
1063 self.assertIn(u'tail-equipment-id', dict.keys(
1064 ele[u'org-openroadm-network-topology:'
1065 u'xpdr-network-attributes']))
1066 self.assertNotIn('wavelength', dict.keys(
1067 ele[u'org-openroadm-network-topology:'
1068 u'xpdr-network-attributes']))
1071 def test_34_check_topo_ROADMA_SRG1(self):
1073 "{}/config/ietf-network:"
1074 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
1075 .format(test_utils.RESTCONF_BASE_URL))
1076 response = requests.request(
1077 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1078 self.assertEqual(response.status_code, requests.codes.ok)
1079 res = response.json()
1080 self.assertIn({u'index': 1}, res['node'][0][
1081 u'org-openroadm-network-topology:srg-attributes'][
1082 'available-wavelengths'])
1083 self.assertIn({u'index': 2}, res['node'][0][
1084 u'org-openroadm-network-topology:srg-attributes'][
1085 'available-wavelengths'])
1086 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1087 for ele in liste_tp:
1088 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
1089 ele['tp-id'] == 'SRG1-PP1-TXRX':
1090 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
1093 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
1097 def test_35_check_topo_ROADMA_DEG1(self):
1099 "{}/config/ietf-network:"
1100 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
1101 .format(test_utils.RESTCONF_BASE_URL))
1102 response = requests.request(
1103 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1104 self.assertEqual(response.status_code, requests.codes.ok)
1105 res = response.json()
1106 self.assertIn({u'index': 1}, res['node'][0][
1107 u'org-openroadm-network-topology:degree-attributes'][
1108 'available-wavelengths'])
1109 self.assertIn({u'index': 2}, res['node'][0][
1110 u'org-openroadm-network-topology:degree-attributes'][
1111 'available-wavelengths'])
1112 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1113 for ele in liste_tp:
1114 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1115 self.assertNotIn('org-openroadm-network-topology:'
1116 'ctp-attributes', dict.keys(ele))
1117 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1118 self.assertNotIn('org-openroadm-network-topology:'
1119 'tx-ttp-attributes', dict.keys(ele))
1122 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1123 def test_36_create_oc_service1(self):
1124 url = ("{}/operations/org-openroadm-service:service-create"
1125 .format(test_utils.RESTCONF_BASE_URL))
1128 "sdnc-request-header": {
1129 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1130 "rpc-action": "service-create",
1131 "request-system-id": "appname",
1133 "http://localhost:8585/NotificationServer/notify"
1135 "service-name": "service1",
1136 "common-id": "ASATT1234567",
1137 "connection-type": "roadm-line",
1139 "service-rate": "100",
1140 "node-id": "ROADMA01",
1141 "service-format": "OC",
1142 "clli": "SNJSCAMCJP8",
1146 "ROUTER_SNJSCAMCJP8_000000.00_00",
1147 "port-type": "router",
1148 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1149 "port-rack": "000000.00",
1154 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1155 "lgx-port-name": "LGX Back.3",
1156 "lgx-port-rack": "000000.00",
1157 "lgx-port-shelf": "00"
1163 "ROUTER_SNJSCAMCJP8_000000.00_00",
1164 "port-type": "router",
1165 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1166 "port-rack": "000000.00",
1171 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1172 "lgx-port-name": "LGX Back.4",
1173 "lgx-port-rack": "000000.00",
1174 "lgx-port-shelf": "00"
1177 "optic-type": "gray"
1180 "service-rate": "100",
1181 "node-id": "ROADMC01",
1182 "service-format": "OC",
1183 "clli": "SNJSCAMCJT4",
1187 "ROUTER_SNJSCAMCJT4_000000.00_00",
1188 "port-type": "router",
1189 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1190 "port-rack": "000000.00",
1195 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1196 "lgx-port-name": "LGX Back.29",
1197 "lgx-port-rack": "000000.00",
1198 "lgx-port-shelf": "00"
1204 "ROUTER_SNJSCAMCJT4_000000.00_00",
1205 "port-type": "router",
1206 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1207 "port-rack": "000000.00",
1212 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1213 "lgx-port-name": "LGX Back.30",
1214 "lgx-port-rack": "000000.00",
1215 "lgx-port-shelf": "00"
1218 "optic-type": "gray"
1220 "due-date": "2016-11-28T00:00:01Z",
1221 "operator-contact": "pw1234"
1224 response = requests.request(
1225 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1226 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1227 self.assertEqual(response.status_code, requests.codes.ok)
1228 res = response.json()
1229 self.assertIn('PCE calculation in progress',
1230 res['output']['configuration-response-common'][
1231 'response-message'])
1232 time.sleep(self.WAITING)
1234 def test_37_get_oc_service1(self):
1235 url = ("{}/operational/org-openroadm-service:"
1236 "service-list/services/service1"
1237 .format(test_utils.RESTCONF_BASE_URL))
1238 response = requests.request(
1239 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1240 self.assertEqual(response.status_code, requests.codes.ok)
1241 res = response.json()
1243 res['services'][0]['administrative-state'],
1246 res['services'][0]['service-name'], 'service1')
1248 res['services'][0]['connection-type'], 'roadm-line')
1250 res['services'][0]['lifecycle-state'], 'planned')
1253 def test_38_check_xc1_ROADMA(self):
1255 "{}/config/network-topology:"
1256 "network-topology/topology/topology-netconf/"
1257 "node/ROADMA01/yang-ext:"
1258 "mount/org-openroadm-device:org-openroadm-device/"
1259 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1260 .format(test_utils.RESTCONF_BASE_URL))
1261 response = requests.request(
1262 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1263 self.assertEqual(response.status_code, requests.codes.ok)
1264 res = response.json()
1265 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1266 self.assertDictEqual(
1268 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1269 'wavelength-number': 1,
1270 'opticalControlMode': 'gainLoss',
1271 'target-output-power': -3.0
1272 }, **res['roadm-connections'][0]),
1273 res['roadm-connections'][0]
1275 self.assertDictEqual(
1276 {'src-if': 'SRG1-PP1-TXRX-1'},
1277 res['roadm-connections'][0]['source'])
1278 self.assertDictEqual(
1279 {'dst-if': 'DEG1-TTP-TXRX-1'},
1280 res['roadm-connections'][0]['destination'])
1283 def test_39_check_xc1_ROADMC(self):
1285 "{}/config/network-topology:"
1286 "network-topology/topology/topology-netconf/"
1287 "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
1288 "org-openroadm-device/"
1289 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1290 .format(test_utils.RESTCONF_BASE_URL))
1291 response = requests.request(
1292 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1293 self.assertEqual(response.status_code, requests.codes.ok)
1294 res = response.json()
1295 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1296 self.assertDictEqual(
1298 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1299 'wavelength-number': 1,
1300 'opticalControlMode': 'gainLoss',
1301 'target-output-power': 2.0
1302 }, **res['roadm-connections'][0]),
1303 res['roadm-connections'][0]
1305 self.assertDictEqual(
1306 {'src-if': 'SRG1-PP1-TXRX-1'},
1307 res['roadm-connections'][0]['source'])
1308 self.assertDictEqual(
1309 {'dst-if': 'DEG2-TTP-TXRX-1'},
1310 res['roadm-connections'][0]['destination'])
1313 def test_40_create_oc_service2(self):
1314 url = ("{}/operations/org-openroadm-service:service-create"
1315 .format(test_utils.RESTCONF_BASE_URL))
1318 "sdnc-request-header": {
1319 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1320 "rpc-action": "service-create",
1321 "request-system-id": "appname",
1323 "http://localhost:8585/NotificationServer/notify"
1325 "service-name": "service2",
1326 "common-id": "ASATT1234567",
1327 "connection-type": "roadm-line",
1329 "service-rate": "100",
1330 "node-id": "ROADMA01",
1331 "service-format": "OC",
1332 "clli": "SNJSCAMCJP8",
1336 "ROUTER_SNJSCAMCJP8_000000.00_00",
1337 "port-type": "router",
1338 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1339 "port-rack": "000000.00",
1344 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1345 "lgx-port-name": "LGX Back.3",
1346 "lgx-port-rack": "000000.00",
1347 "lgx-port-shelf": "00"
1353 "ROUTER_SNJSCAMCJP8_000000.00_00",
1354 "port-type": "router",
1355 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1356 "port-rack": "000000.00",
1361 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1362 "lgx-port-name": "LGX Back.4",
1363 "lgx-port-rack": "000000.00",
1364 "lgx-port-shelf": "00"
1367 "optic-type": "gray"
1370 "service-rate": "100",
1371 "node-id": "ROADMC01",
1372 "service-format": "OC",
1373 "clli": "SNJSCAMCJT4",
1377 "ROUTER_SNJSCAMCJT4_000000.00_00",
1378 "port-type": "router",
1379 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1380 "port-rack": "000000.00",
1385 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1386 "lgx-port-name": "LGX Back.29",
1387 "lgx-port-rack": "000000.00",
1388 "lgx-port-shelf": "00"
1394 "ROUTER_SNJSCAMCJT4_000000.00_00",
1395 "port-type": "router",
1396 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1397 "port-rack": "000000.00",
1402 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1403 "lgx-port-name": "LGX Back.30",
1404 "lgx-port-rack": "000000.00",
1405 "lgx-port-shelf": "00"
1408 "optic-type": "gray"
1410 "due-date": "2016-11-28T00:00:01Z",
1411 "operator-contact": "pw1234"
1414 response = requests.request(
1415 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1416 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1417 self.assertEqual(response.status_code, requests.codes.ok)
1418 res = response.json()
1419 self.assertIn('PCE calculation in progress',
1420 res['output']['configuration-response-common'][
1421 'response-message'])
1422 time.sleep(self.WAITING)
1424 def test_41_get_oc_service2(self):
1425 url = ("{}/operational/org-openroadm-service:"
1426 "service-list/services/service2"
1427 .format(test_utils.RESTCONF_BASE_URL))
1428 response = requests.request(
1429 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1430 self.assertEqual(response.status_code, requests.codes.ok)
1431 res = response.json()
1433 res['services'][0]['administrative-state'],
1436 res['services'][0]['service-name'], 'service2')
1438 res['services'][0]['connection-type'], 'roadm-line')
1440 res['services'][0]['lifecycle-state'], 'planned')
1443 def test_42_check_xc2_ROADMA(self):
1445 "{}/config/network-topology:"
1446 "network-topology/topology/topology-netconf/"
1447 "node/ROADMA01/yang-ext:mount/org-openroadm-device:"
1448 "org-openroadm-device/"
1449 "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2"
1450 .format(test_utils.RESTCONF_BASE_URL))
1451 response = requests.request(
1452 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1453 self.assertEqual(response.status_code, requests.codes.ok)
1454 res = response.json()
1455 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1456 self.assertDictEqual(
1458 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
1459 'wavelength-number': 2,
1460 'opticalControlMode': 'gainLoss',
1461 'target-output-power': -3.0
1462 }, **res['roadm-connections'][0]),
1463 res['roadm-connections'][0]
1465 self.assertDictEqual(
1466 {'src-if': 'SRG1-PP2-TXRX-2'},
1467 res['roadm-connections'][0]['source'])
1468 self.assertDictEqual(
1469 {'dst-if': 'DEG1-TTP-TXRX-2'},
1470 res['roadm-connections'][0]['destination'])
1473 def test_43_check_topo_ROADMA(self):
1474 self.test_26_check_topo_ROADMA_SRG1()
1475 self.test_27_check_topo_ROADMA_DEG1()
1478 def test_44_delete_oc_service1(self):
1479 url = ("{}/operations/org-openroadm-service:service-delete"
1480 .format(test_utils.RESTCONF_BASE_URL))
1482 "sdnc-request-header": {
1483 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1484 "rpc-action": "service-delete",
1485 "request-system-id": "appname",
1487 "http://localhost:8585/NotificationServer/notify"
1489 "service-delete-req-info": {
1490 "service-name": "service1",
1491 "tail-retention": "no"
1495 response = requests.request(
1496 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1497 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1498 self.assertEqual(response.status_code, requests.codes.ok)
1499 res = response.json()
1500 self.assertIn('Renderer service delete in progress',
1501 res['output']['configuration-response-common'][
1502 'response-message'])
1505 def test_45_delete_oc_service2(self):
1506 url = ("{}/operations/org-openroadm-service:service-delete"
1507 .format(test_utils.RESTCONF_BASE_URL))
1509 "sdnc-request-header": {
1510 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1511 "rpc-action": "service-delete",
1512 "request-system-id": "appname",
1514 "http://localhost:8585/NotificationServer/notify"
1516 "service-delete-req-info": {
1517 "service-name": "service2",
1518 "tail-retention": "no"
1522 response = requests.request(
1523 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1524 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1525 self.assertEqual(response.status_code, requests.codes.ok)
1526 res = response.json()
1527 self.assertIn('Renderer service delete in progress',
1528 res['output']['configuration-response-common'][
1529 'response-message'])
1532 def test_46_get_no_oc_services(self):
1534 url = ("{}/operational/org-openroadm-service:service-list"
1535 .format(test_utils.RESTCONF_BASE_URL))
1536 response = requests.request(
1537 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1538 self.assertEqual(response.status_code, requests.codes.not_found)
1539 res = response.json()
1542 "error-type": "application",
1543 "error-tag": "data-missing",
1545 "Request could not be completed because the relevant data "
1546 "model content does not exist"
1548 res['errors']['error'])
1551 def test_47_get_no_xc_ROADMA(self):
1553 "{}/config/network-topology:"
1554 "network-topology/topology/topology-netconf"
1555 "/node/ROADMA01/yang-ext:mount/org-openroadm-device:"
1556 "org-openroadm-device/"
1557 .format(test_utils.RESTCONF_BASE_URL))
1558 response = requests.request(
1559 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1560 self.assertEqual(response.status_code, requests.codes.ok)
1561 res = response.json()
1562 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1565 def test_48_check_topo_ROADMA(self):
1566 self.test_34_check_topo_ROADMA_SRG1()
1567 self.test_35_check_topo_ROADMA_DEG1()
1569 def test_49_loop_create_eth_service(self):
1570 for i in range(1, 6):
1571 print("trial number {}".format(i))
1572 print("eth service creation")
1573 self.test_11_create_eth_service1()
1574 print("check xc in ROADMA01")
1575 self.test_13_check_xc1_ROADMA()
1576 print("check xc in ROADMC01")
1577 self.test_14_check_xc1_ROADMC()
1578 print("eth service deletion\n")
1579 self.test_30_delete_eth_service1()
1581 def test_50_loop_create_oc_service(self):
1582 url = ("{}/operational/org-openroadm-service:"
1583 "service-list/services/service1"
1584 .format(test_utils.RESTCONF_BASE_URL))
1585 response = requests.request("GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1586 if response.status_code != 404:
1587 url = ("{}/operations/org-openroadm-service:service-delete"
1588 .format(test_utils.RESTCONF_BASE_URL))
1590 "sdnc-request-header": {
1591 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1592 "rpc-action": "service-delete",
1593 "request-system-id": "appname",
1595 "http://localhost:8585/NotificationServer/notify"
1597 "service-delete-req-info": {
1598 "service-name": "service1",
1599 "tail-retention": "no"
1603 requests.request("POST", url, data=json.dumps(data),
1604 headers=test_utils.TYPE_APPLICATION_JSON,
1605 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD)
1609 for i in range(1, 6):
1610 print("trial number {}".format(i))
1611 print("oc service creation")
1612 self.test_36_create_oc_service1()
1613 print("check xc in ROADMA01")
1614 self.test_38_check_xc1_ROADMA()
1615 print("check xc in ROADMC01")
1616 self.test_39_check_xc1_ROADMC()
1617 print("oc service deletion\n")
1618 self.test_44_delete_oc_service1()
1620 def test_51_disconnect_XPDRA(self):
1621 response = test_utils.unmount_device("XPDRA01")
1622 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1624 def test_52_disconnect_XPDRC(self):
1625 response = test_utils.unmount_device("XPDRC01")
1626 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1628 def test_53_disconnect_ROADMA(self):
1629 response = test_utils.unmount_device("ROADMA01")
1630 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1632 def test_54_disconnect_ROADMC(self):
1633 response = test_utils.unmount_device("ROADMC01")
1634 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1637 if __name__ == "__main__":
1638 unittest.main(verbosity=2)