2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
19 from common import test_utils
22 class TransportPCEFulltesting(unittest.TestCase):
24 headers = {'content-type': 'application/json'}
25 WAITING = 20 # nominal value is 300
28 restconf_baseurl = "http://localhost:8181/restconf"
32 cls.processes = test_utils.start_tpce()
33 cls.processes = test_utils.start_sims(['xpdra', 'roadma-full', 'roadmc-full', 'xpdrc'])
36 def tearDownClass(cls):
37 for process in cls.processes:
38 test_utils.shutdown_process(process)
39 print("all processes killed")
41 def setUp(self): # instruction executed before each test method
42 print("execution of {}".format(self.id().split(".")[-1]))
44 # connect netconf devices
45 def test_01_connect_xpdrA(self):
46 url = ("{}/config/network-topology:"
47 "network-topology/topology/topology-netconf/node/XPDRA01"
48 .format(self.restconf_baseurl))
51 "netconf-node-topology:username": "admin",
52 "netconf-node-topology:password": "admin",
53 "netconf-node-topology:host": "127.0.0.1",
54 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
55 "netconf-node-topology:tcp-only": "false",
56 "netconf-node-topology:pass-through": {}}]}
57 headers = {'content-type': 'application/json'}
58 response = requests.request(
59 "PUT", url, data=json.dumps(data), headers=headers,
60 auth=('admin', 'admin'))
61 self.assertEqual(response.status_code, requests.codes.created)
64 def test_02_connect_xpdrC(self):
65 url = ("{}/config/network-topology:"
66 "network-topology/topology/topology-netconf/node/XPDRC01"
67 .format(self.restconf_baseurl))
70 "netconf-node-topology:username": "admin",
71 "netconf-node-topology:password": "admin",
72 "netconf-node-topology:host": "127.0.0.1",
73 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
74 "netconf-node-topology:tcp-only": "false",
75 "netconf-node-topology:pass-through": {}}]}
76 headers = {'content-type': 'application/json'}
77 response = requests.request(
78 "PUT", url, data=json.dumps(data), headers=headers,
79 auth=('admin', 'admin'))
80 self.assertEqual(response.status_code, requests.codes.created)
83 def test_03_connect_rdmA(self):
84 url = ("{}/config/network-topology:"
85 "network-topology/topology/topology-netconf/node/ROADMA01"
86 .format(self.restconf_baseurl))
88 "node-id": "ROADMA01",
89 "netconf-node-topology:username": "admin",
90 "netconf-node-topology:password": "admin",
91 "netconf-node-topology:host": "127.0.0.1",
92 "netconf-node-topology:port": test_utils.sims['roadma-full']['port'],
93 "netconf-node-topology:tcp-only": "false",
94 "netconf-node-topology:pass-through": {}}]}
95 headers = {'content-type': 'application/json'}
96 response = requests.request(
97 "PUT", url, data=json.dumps(data), headers=headers,
98 auth=('admin', 'admin'))
99 self.assertEqual(response.status_code, requests.codes.created)
102 def test_04_connect_rdmC(self):
103 url = ("{}/config/network-topology:"
104 "network-topology/topology/topology-netconf/node/ROADMC01"
105 .format(self.restconf_baseurl))
107 "node-id": "ROADMC01",
108 "netconf-node-topology:username": "admin",
109 "netconf-node-topology:password": "admin",
110 "netconf-node-topology:host": "127.0.0.1",
111 "netconf-node-topology:port": test_utils.sims['roadmc-full']['port'],
112 "netconf-node-topology:tcp-only": "false",
113 "netconf-node-topology:pass-through": {}}]}
114 headers = {'content-type': 'application/json'}
115 response = requests.request(
116 "PUT", url, data=json.dumps(data), headers=headers,
117 auth=('admin', 'admin'))
118 self.assertEqual(response.status_code, requests.codes.created)
121 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
122 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
123 format(self.restconf_baseurl)
125 "networkutils:input": {
126 "networkutils:links-input": {
127 "networkutils:xpdr-node": "XPDRA01",
128 "networkutils:xpdr-num": "1",
129 "networkutils:network-num": "1",
130 "networkutils:rdm-node": "ROADMA01",
131 "networkutils:srg-num": "1",
132 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
136 headers = {'content-type': 'application/json'}
137 response = requests.request(
138 "POST", url, data=json.dumps(data),
139 headers=headers, auth=('admin', 'admin'))
140 self.assertEqual(response.status_code, requests.codes.ok)
141 res = response.json()
142 self.assertIn('Xponder Roadm Link created successfully',
143 res["output"]["result"])
146 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
147 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
148 format(self.restconf_baseurl)
150 "networkutils:input": {
151 "networkutils:links-input": {
152 "networkutils:xpdr-node": "XPDRA01",
153 "networkutils:xpdr-num": "1",
154 "networkutils:network-num": "1",
155 "networkutils:rdm-node": "ROADMA01",
156 "networkutils:srg-num": "1",
157 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
161 headers = {'content-type': 'application/json'}
162 response = requests.request(
163 "POST", url, data=json.dumps(data),
164 headers=headers, auth=('admin', 'admin'))
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
167 self.assertIn('Roadm Xponder links created successfully',
168 res["output"]["result"])
171 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
172 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
173 format(self.restconf_baseurl)
175 "networkutils:input": {
176 "networkutils:links-input": {
177 "networkutils:xpdr-node": "XPDRC01",
178 "networkutils:xpdr-num": "1",
179 "networkutils:network-num": "1",
180 "networkutils:rdm-node": "ROADMC01",
181 "networkutils:srg-num": "1",
182 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
186 headers = {'content-type': 'application/json'}
187 response = requests.request(
188 "POST", url, data=json.dumps(data),
189 headers=headers, auth=('admin', 'admin'))
190 self.assertEqual(response.status_code, requests.codes.ok)
191 res = response.json()
192 self.assertIn('Xponder Roadm Link created successfully',
193 res["output"]["result"])
196 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
197 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
198 format(self.restconf_baseurl)
200 "networkutils:input": {
201 "networkutils:links-input": {
202 "networkutils:xpdr-node": "XPDRC01",
203 "networkutils:xpdr-num": "1",
204 "networkutils:network-num": "1",
205 "networkutils:rdm-node": "ROADMC01",
206 "networkutils:srg-num": "1",
207 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
211 headers = {'content-type': 'application/json'}
212 response = requests.request(
213 "POST", url, data=json.dumps(data),
214 headers=headers, auth=('admin', 'admin'))
215 self.assertEqual(response.status_code, requests.codes.ok)
216 res = response.json()
217 self.assertIn('Roadm Xponder links created successfully',
218 res["output"]["result"])
221 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
222 # Config ROADMA-ROADMC oms-attributes
224 "{}/config/ietf-network:"
225 "networks/network/openroadm-topology/ietf-network-topology:"
226 "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/"
227 "org-openroadm-network-topology:"
228 "OMS-attributes/span"
229 .format(self.restconf_baseurl))
232 "auto-spanloss": "true",
233 "spanloss-base": 11.4,
234 "spanloss-current": 12,
235 "engineered-spanloss": 12.2,
236 "link-concatenation": [{
239 "SRLG-length": 100000,
241 headers = {'content-type': 'application/json'}
242 response = requests.request(
243 "PUT", url, data=json.dumps(data), headers=headers,
244 auth=('admin', 'admin'))
245 self.assertEqual(response.status_code, requests.codes.created)
247 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
248 # Config ROADMC-ROADMA oms-attributes
250 "{}/config/ietf-network:"
251 "networks/network/openroadm-topology/ietf-network-topology:"
252 "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/"
253 "org-openroadm-network-topology:"
254 "OMS-attributes/span"
255 .format(self.restconf_baseurl))
258 "auto-spanloss": "true",
259 "spanloss-base": 11.4,
260 "spanloss-current": 12,
261 "engineered-spanloss": 12.2,
262 "link-concatenation": [{
265 "SRLG-length": 100000,
267 headers = {'content-type': 'application/json'}
268 response = requests.request(
269 "PUT", url, data=json.dumps(data), headers=headers,
270 auth=('admin', 'admin'))
271 self.assertEqual(response.status_code, requests.codes.created)
273 # test service-create for Eth service from xpdr to xpdr
274 def test_11_create_eth_service1(self):
275 url = ("{}/operations/org-openroadm-service:service-create"
276 .format(self.restconf_baseurl))
279 "sdnc-request-header": {
280 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
281 "rpc-action": "service-create",
282 "request-system-id": "appname",
284 "http://localhost:8585/NotificationServer/notify"
286 "service-name": "service1",
287 "common-id": "ASATT1234567",
288 "connection-type": "service",
290 "service-rate": "100",
291 "node-id": "XPDRA01",
292 "service-format": "Ethernet",
293 "clli": "SNJSCAMCJP8",
297 "ROUTER_SNJSCAMCJP8_000000.00_00",
298 "port-type": "router",
299 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
300 "port-rack": "000000.00",
305 "LGX Panel_SNJSCAMCJP8_000000.00_00",
306 "lgx-port-name": "LGX Back.3",
307 "lgx-port-rack": "000000.00",
308 "lgx-port-shelf": "00"
314 "ROUTER_SNJSCAMCJP8_000000.00_00",
315 "port-type": "router",
316 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
317 "port-rack": "000000.00",
322 "LGX Panel_SNJSCAMCJP8_000000.00_00",
323 "lgx-port-name": "LGX Back.4",
324 "lgx-port-rack": "000000.00",
325 "lgx-port-shelf": "00"
331 "service-rate": "100",
332 "node-id": "XPDRC01",
333 "service-format": "Ethernet",
334 "clli": "SNJSCAMCJT4",
338 "ROUTER_SNJSCAMCJT4_000000.00_00",
339 "port-type": "router",
340 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
341 "port-rack": "000000.00",
346 "LGX Panel_SNJSCAMCJT4_000000.00_00",
347 "lgx-port-name": "LGX Back.29",
348 "lgx-port-rack": "000000.00",
349 "lgx-port-shelf": "00"
355 "ROUTER_SNJSCAMCJT4_000000.00_00",
356 "port-type": "router",
357 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
358 "port-rack": "000000.00",
363 "LGX Panel_SNJSCAMCJT4_000000.00_00",
364 "lgx-port-name": "LGX Back.30",
365 "lgx-port-rack": "000000.00",
366 "lgx-port-shelf": "00"
371 "due-date": "2016-11-28T00:00:01Z",
372 "operator-contact": "pw1234"
375 headers = {'content-type': 'application/json',
376 "Accept": "application/json"}
377 response = requests.request(
378 "POST", url, data=json.dumps(data), headers=headers,
379 auth=('admin', 'admin'))
380 self.assertEqual(response.status_code, requests.codes.ok)
381 res = response.json()
382 self.assertIn('PCE calculation in progress',
383 res['output']['configuration-response-common'][
385 time.sleep(self.WAITING)
387 def test_12_get_eth_service1(self):
388 url = ("{}/operational/org-openroadm-service:service-list/services/"
389 "service1".format(self.restconf_baseurl))
390 headers = {'content-type': 'application/json',
391 "Accept": "application/json"}
392 response = requests.request(
393 "GET", url, headers=headers, auth=('admin', 'admin'))
394 self.assertEqual(response.status_code, requests.codes.ok)
395 res = response.json()
397 res['services'][0]['administrative-state'],
400 res['services'][0]['service-name'], 'service1')
402 res['services'][0]['connection-type'], 'service')
404 res['services'][0]['lifecycle-state'], 'planned')
407 def test_13_check_xc1_ROADMA(self):
409 "{}/config/network-topology:"
410 "network-topology/topology/topology-netconf/"
411 "node/ROADMA01/yang-ext:"
412 "mount/org-openroadm-device:org-openroadm-device/"
413 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
414 .format(self.restconf_baseurl))
415 headers = {'content-type': 'application/json'}
416 response = requests.request(
417 "GET", url, headers=headers, auth=('admin', 'admin'))
418 self.assertEqual(response.status_code, requests.codes.ok)
419 res = response.json()
420 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
421 self.assertDictEqual(
423 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
424 'wavelength-number': 1,
425 'opticalControlMode': 'gainLoss',
426 'target-output-power': -3.0
427 }, **res['roadm-connections'][0]),
428 res['roadm-connections'][0]
430 self.assertDictEqual(
431 {'src-if': 'SRG1-PP1-TXRX-1'},
432 res['roadm-connections'][0]['source'])
433 self.assertDictEqual(
434 {'dst-if': 'DEG1-TTP-TXRX-1'},
435 res['roadm-connections'][0]['destination'])
438 def test_14_check_xc1_ROADMC(self):
440 "{}/config/network-topology:"
441 "network-topology/topology/topology-netconf/"
442 "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
443 "org-openroadm-device/"
444 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
445 .format(self.restconf_baseurl))
446 headers = {'content-type': 'application/json'}
447 response = requests.request(
448 "GET", url, headers=headers, auth=('admin', 'admin'))
449 self.assertEqual(response.status_code, requests.codes.ok)
450 res = response.json()
451 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
452 self.assertDictEqual(
454 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
455 'wavelength-number': 1,
456 'opticalControlMode': 'gainLoss',
457 'target-output-power': 2.0
458 }, **res['roadm-connections'][0]),
459 res['roadm-connections'][0]
461 self.assertDictEqual(
462 {'src-if': 'SRG1-PP1-TXRX-1'},
463 res['roadm-connections'][0]['source'])
464 self.assertDictEqual(
465 {'dst-if': 'DEG2-TTP-TXRX-1'},
466 res['roadm-connections'][0]['destination'])
469 def test_15_check_topo_XPDRA(self):
471 "{}/config/ietf-network:"
472 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
473 .format(self.restconf_baseurl))
474 response = requests.request(
475 "GET", url1, auth=('admin', 'admin'))
476 self.assertEqual(response.status_code, requests.codes.ok)
477 res = response.json()
478 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
480 if ele['tp-id'] == 'XPDR1-NETWORK1':
481 self.assertEqual({u'frequency': 196.1, u'width': 40},
483 'org-openroadm-network-topology:'
484 'xpdr-network-attributes'][
486 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
487 ele['tp-id'] == 'XPDR1-CLIENT3':
489 'org-openroadm-network-topology:xpdr-client-attributes',
491 if ele['tp-id'] == 'XPDR1-NETWORK2':
493 'org-openroadm-network-topology:xpdr-network-attributes',
497 def test_16_check_topo_ROADMA_SRG1(self):
499 "{}/config/ietf-network:"
500 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
501 .format(self.restconf_baseurl))
502 response = requests.request(
503 "GET", url1, auth=('admin', 'admin'))
504 self.assertEqual(response.status_code, requests.codes.ok)
505 res = response.json()
506 self.assertNotIn({u'index': 1},
508 u'org-openroadm-network-topology:srg-attributes'][
509 'available-wavelengths'])
510 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
512 if ele['tp-id'] == 'SRG1-PP1-TXRX':
513 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
514 ele['org-openroadm-network-topology:'
515 'pp-attributes']['used-wavelength']
517 if ele['tp-id'] == 'SRG1-PP2-TXRX':
518 self.assertNotIn('used-wavelength', dict.keys(ele))
521 def test_17_check_topo_ROADMA_DEG1(self):
523 "{}/config/ietf-network:"
524 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
525 .format(self.restconf_baseurl))
526 response = requests.request(
527 "GET", url1, auth=('admin', 'admin'))
528 self.assertEqual(response.status_code, requests.codes.ok)
529 res = response.json()
530 self.assertNotIn({u'index': 1},
532 u'org-openroadm-network-topology:'
533 u'degree-attributes'][
534 'available-wavelengths'])
535 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
537 if ele['tp-id'] == 'DEG1-CTP-TXRX':
538 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
539 ele['org-openroadm-network-topology:'
542 if ele['tp-id'] == 'DEG1-TTP-TXRX':
543 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
544 ele['org-openroadm-network-topology:'
545 'tx-ttp-attributes'][
549 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
550 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
551 format(self.restconf_baseurl)
553 "networkutils:input": {
554 "networkutils:links-input": {
555 "networkutils:xpdr-node": "XPDRA01",
556 "networkutils:xpdr-num": "1",
557 "networkutils:network-num": "2",
558 "networkutils:rdm-node": "ROADMA01",
559 "networkutils:srg-num": "1",
560 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
564 headers = {'content-type': 'application/json'}
565 response = requests.request(
566 "POST", url, data=json.dumps(data),
567 headers=headers, auth=('admin', 'admin'))
568 self.assertEqual(response.status_code, requests.codes.ok)
569 res = response.json()
570 self.assertIn('Xponder Roadm Link created successfully',
571 res["output"]["result"])
574 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
575 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
576 format(self.restconf_baseurl)
578 "networkutils:input": {
579 "networkutils:links-input": {
580 "networkutils:xpdr-node": "XPDRA01",
581 "networkutils:xpdr-num": "1",
582 "networkutils:network-num": "2",
583 "networkutils:rdm-node": "ROADMA01",
584 "networkutils:srg-num": "1",
585 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
589 headers = {'content-type': 'application/json'}
590 response = requests.request(
591 "POST", url, data=json.dumps(data),
592 headers=headers, auth=('admin', 'admin'))
593 self.assertEqual(response.status_code, requests.codes.ok)
594 res = response.json()
595 self.assertIn('Roadm Xponder links created successfully',
596 res["output"]["result"])
599 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
600 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
601 format(self.restconf_baseurl)
603 "networkutils:input": {
604 "networkutils:links-input": {
605 "networkutils:xpdr-node": "XPDRC01",
606 "networkutils:xpdr-num": "1",
607 "networkutils:network-num": "2",
608 "networkutils:rdm-node": "ROADMC01",
609 "networkutils:srg-num": "1",
610 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
614 headers = {'content-type': 'application/json'}
615 response = requests.request(
616 "POST", url, data=json.dumps(data),
617 headers=headers, auth=('admin', 'admin'))
618 self.assertEqual(response.status_code, requests.codes.ok)
619 res = response.json()
620 self.assertIn('Xponder Roadm Link created successfully',
621 res["output"]["result"])
624 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
625 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
626 format(self.restconf_baseurl)
628 "networkutils:input": {
629 "networkutils:links-input": {
630 "networkutils:xpdr-node": "XPDRC01",
631 "networkutils:xpdr-num": "1",
632 "networkutils:network-num": "2",
633 "networkutils:rdm-node": "ROADMC01",
634 "networkutils:srg-num": "1",
635 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
639 headers = {'content-type': 'application/json'}
640 response = requests.request(
641 "POST", url, data=json.dumps(data),
642 headers=headers, auth=('admin', 'admin'))
643 self.assertEqual(response.status_code, requests.codes.ok)
644 res = response.json()
645 self.assertIn('Roadm Xponder links created successfully',
646 res["output"]["result"])
649 def test_22_create_eth_service2(self):
650 url = ("{}/operations/org-openroadm-service:service-create"
651 .format(self.restconf_baseurl))
654 "sdnc-request-header": {
655 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
656 "rpc-action": "service-create",
657 "request-system-id": "appname",
659 "http://localhost:8585/NotificationServer/notify"
661 "service-name": "service2",
662 "common-id": "ASATT1234567",
663 "connection-type": "service",
665 "service-rate": "100",
666 "node-id": "XPDRA01",
667 "service-format": "Ethernet",
668 "clli": "SNJSCAMCJP8",
672 "ROUTER_SNJSCAMCJP8_000000.00_00",
673 "port-type": "router",
674 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
675 "port-rack": "000000.00",
680 "LGX Panel_SNJSCAMCJP8_000000.00_00",
681 "lgx-port-name": "LGX Back.3",
682 "lgx-port-rack": "000000.00",
683 "lgx-port-shelf": "00"
689 "ROUTER_SNJSCAMCJP8_000000.00_00",
690 "port-type": "router",
691 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
692 "port-rack": "000000.00",
697 "LGX Panel_SNJSCAMCJP8_000000.00_00",
698 "lgx-port-name": "LGX Back.4",
699 "lgx-port-rack": "000000.00",
700 "lgx-port-shelf": "00"
706 "service-rate": "100",
707 "node-id": "XPDRC01",
708 "service-format": "Ethernet",
709 "clli": "SNJSCAMCJT4",
713 "ROUTER_SNJSCAMCJT4_000000.00_00",
714 "port-type": "router",
715 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
716 "port-rack": "000000.00",
721 "LGX Panel_SNJSCAMCJT4_000000.00_00",
722 "lgx-port-name": "LGX Back.29",
723 "lgx-port-rack": "000000.00",
724 "lgx-port-shelf": "00"
730 "ROUTER_SNJSCAMCJT4_000000.00_00",
731 "port-type": "router",
732 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
733 "port-rack": "000000.00",
738 "LGX Panel_SNJSCAMCJT4_000000.00_00",
739 "lgx-port-name": "LGX Back.30",
740 "lgx-port-rack": "000000.00",
741 "lgx-port-shelf": "00"
746 "due-date": "2016-11-28T00:00:01Z",
747 "operator-contact": "pw1234"
750 headers = {'content-type': 'application/json',
751 "Accept": "application/json"}
752 response = requests.request(
753 "POST", url, data=json.dumps(data), headers=headers,
754 auth=('admin', 'admin'))
755 self.assertEqual(response.status_code, requests.codes.ok)
756 res = response.json()
757 self.assertIn('PCE calculation in progress',
758 res['output']['configuration-response-common'][
760 time.sleep(self.WAITING)
762 def test_23_get_eth_service2(self):
763 url = ("{}/operational/org-openroadm-service:"
764 "service-list/services/service2"
765 .format(self.restconf_baseurl))
766 headers = {'content-type': 'application/json',
767 "Accept": "application/json"}
768 response = requests.request(
769 "GET", url, headers=headers, auth=('admin', 'admin'))
770 self.assertEqual(response.status_code, requests.codes.ok)
771 res = response.json()
773 res['services'][0]['administrative-state'],
776 res['services'][0]['service-name'], 'service2')
778 res['services'][0]['connection-type'], 'service')
780 res['services'][0]['lifecycle-state'], 'planned')
783 def test_24_check_xc2_ROADMA(self):
785 "{}/config/network-topology:"
786 "network-topology/topology/topology-netconf/"
787 "node/ROADMA01/yang-ext:"
788 "mount/org-openroadm-device:org-openroadm-device/"
789 "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2"
790 .format(self.restconf_baseurl))
791 headers = {'content-type': 'application/json'}
792 response = requests.request(
793 "GET", url, headers=headers, auth=('admin', 'admin'))
794 self.assertEqual(response.status_code, requests.codes.ok)
795 res = response.json()
796 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
797 self.assertDictEqual(
799 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
800 'wavelength-number': 2,
801 'opticalControlMode': 'power'
802 }, **res['roadm-connections'][0]),
803 res['roadm-connections'][0]
805 self.assertDictEqual(
806 {'src-if': 'DEG1-TTP-TXRX-2'},
807 res['roadm-connections'][0]['source'])
808 self.assertDictEqual(
809 {'dst-if': 'SRG1-PP2-TXRX-2'},
810 res['roadm-connections'][0]['destination'])
812 def test_25_check_topo_XPDRA(self):
814 "{}/config/ietf-network:"
815 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
816 .format(self.restconf_baseurl))
817 response = requests.request(
818 "GET", url1, auth=('admin', 'admin'))
819 self.assertEqual(response.status_code, requests.codes.ok)
820 res = response.json()
821 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
823 if ele['tp-id'] == 'XPDR1-NETWORK1':
824 self.assertEqual({u'frequency': 196.1, u'width': 40},
825 ele['org-openroadm-network-topology:'
826 'xpdr-network-attributes'][
828 if ele['tp-id'] == 'XPDR1-NETWORK2':
829 self.assertEqual({u'frequency': 196.05, u'width': 40},
830 ele['org-openroadm-network-topology:'
831 'xpdr-network-attributes'][
833 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
834 ele['tp-id'] == 'XPDR1-CLIENT3':
836 'org-openroadm-network-topology:xpdr-client-attributes',
840 def test_26_check_topo_ROADMA_SRG1(self):
842 "{}/config/ietf-network:"
843 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
844 .format(self.restconf_baseurl))
845 response = requests.request(
846 "GET", url1, auth=('admin', 'admin'))
847 self.assertEqual(response.status_code, requests.codes.ok)
848 res = response.json()
849 self.assertNotIn({u'index': 1}, res['node'][0][
850 u'org-openroadm-network-topology:srg-attributes'][
851 'available-wavelengths'])
852 self.assertNotIn({u'index': 2}, res['node'][0][
853 u'org-openroadm-network-topology:srg-attributes'][
854 'available-wavelengths'])
855 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
857 if ele['tp-id'] == 'SRG1-PP1-TXRX':
858 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
859 ele['org-openroadm-network-topology:'
860 'pp-attributes']['used-wavelength'])
861 self.assertNotIn({u'index': 2, u'frequency': 196.05,
863 ele['org-openroadm-network-topology:'
864 'pp-attributes']['used-wavelength'])
865 if ele['tp-id'] == 'SRG1-PP2-TXRX':
866 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
867 ele['org-openroadm-network-topology:'
868 'pp-attributes']['used-wavelength'])
869 self.assertNotIn({u'index': 1, u'frequency': 196.1,
871 ele['org-openroadm-network-topology:'
872 'pp-attributes']['used-wavelength'])
873 if ele['tp-id'] == 'SRG1-PP3-TXRX':
874 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
878 def test_27_check_topo_ROADMA_DEG1(self):
880 "{}/config/ietf-network:"
881 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
882 .format(self.restconf_baseurl))
883 response = requests.request(
884 "GET", url1, auth=('admin', 'admin'))
885 self.assertEqual(response.status_code, requests.codes.ok)
886 res = response.json()
887 self.assertNotIn({u'index': 1}, res['node'][0][
888 u'org-openroadm-network-topology:degree-attributes'][
889 'available-wavelengths'])
890 self.assertNotIn({u'index': 2}, res['node'][0][
891 u'org-openroadm-network-topology:degree-attributes'][
892 'available-wavelengths'])
893 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
895 if ele['tp-id'] == 'DEG1-CTP-TXRX':
896 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
897 ele['org-openroadm-network-topology:'
898 'ctp-attributes']['used-wavelengths'])
899 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
900 ele['org-openroadm-network-topology:'
901 'ctp-attributes']['used-wavelengths'])
902 if ele['tp-id'] == 'DEG1-TTP-TXRX':
903 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
904 ele['org-openroadm-network-topology:'
905 'tx-ttp-attributes']['used-wavelengths'])
906 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
907 ele['org-openroadm-network-topology:'
908 'tx-ttp-attributes']['used-wavelengths'])
911 # creation service test on a non-available resource
912 def test_28_create_eth_service3(self):
913 url = ("{}/operations/org-openroadm-service:service-create"
914 .format(self.restconf_baseurl))
917 "sdnc-request-header": {
918 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
919 "rpc-action": "service-create",
920 "request-system-id": "appname",
922 "http://localhost:8585/NotificationServer/notify"
924 "service-name": "service3",
925 "common-id": "ASATT1234567",
926 "connection-type": "service",
928 "service-rate": "100",
929 "node-id": "XPDRA01",
930 "service-format": "Ethernet",
931 "clli": "SNJSCAMCJP8",
935 "ROUTER_SNJSCAMCJP8_000000.00_00",
936 "port-type": "router",
937 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
938 "port-rack": "000000.00",
943 "LGX Panel_SNJSCAMCJP8_000000.00_00",
944 "lgx-port-name": "LGX Back.3",
945 "lgx-port-rack": "000000.00",
946 "lgx-port-shelf": "00"
952 "ROUTER_SNJSCAMCJP8_000000.00_00",
953 "port-type": "router",
954 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
955 "port-rack": "000000.00",
960 "LGX Panel_SNJSCAMCJP8_000000.00_00",
961 "lgx-port-name": "LGX Back.4",
962 "lgx-port-rack": "000000.00",
963 "lgx-port-shelf": "00"
969 "service-rate": "100",
970 "node-id": "XPDRC01",
971 "service-format": "Ethernet",
972 "clli": "SNJSCAMCJT4",
976 "ROUTER_SNJSCAMCJT4_000000.00_00",
977 "port-type": "router",
978 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
979 "port-rack": "000000.00",
984 "LGX Panel_SNJSCAMCJT4_000000.00_00",
985 "lgx-port-name": "LGX Back.29",
986 "lgx-port-rack": "000000.00",
987 "lgx-port-shelf": "00"
993 "ROUTER_SNJSCAMCJT4_000000.00_00",
994 "port-type": "router",
995 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
996 "port-rack": "000000.00",
1001 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1002 "lgx-port-name": "LGX Back.30",
1003 "lgx-port-rack": "000000.00",
1004 "lgx-port-shelf": "00"
1007 "optic-type": "gray"
1009 "due-date": "2016-11-28T00:00:01Z",
1010 "operator-contact": "pw1234"
1013 headers = {'content-type': 'application/json',
1014 "Accept": "application/json"}
1015 response = requests.request(
1016 "POST", url, data=json.dumps(data), headers=headers,
1017 auth=('admin', 'admin'))
1018 self.assertEqual(response.status_code, requests.codes.ok)
1019 res = response.json()
1020 self.assertIn('PCE calculation in progress',
1021 res['output']['configuration-response-common'][
1022 'response-message'])
1023 self.assertIn('200', res['output']['configuration-response-common'][
1025 time.sleep(self.WAITING)
1027 # add a test that check the openroadm-service-list still only
1028 # contains 2 elements
1030 def test_29_delete_eth_service3(self):
1031 url = ("{}/operations/org-openroadm-service:service-delete"
1032 .format(self.restconf_baseurl))
1034 "sdnc-request-header": {
1035 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1036 "rpc-action": "service-delete",
1037 "request-system-id": "appname",
1039 "http://localhost:8585/NotificationServer/notify"
1041 "service-delete-req-info": {
1042 "service-name": "service3",
1043 "tail-retention": "no"
1047 headers = {'content-type': 'application/json'}
1048 response = requests.request(
1049 "POST", url, data=json.dumps(data), headers=headers,
1050 auth=('admin', 'admin'))
1051 self.assertEqual(response.status_code, requests.codes.ok)
1052 res = response.json()
1053 self.assertIn('Service \'service3\' does not exist in datastore',
1054 res['output']['configuration-response-common'][
1055 'response-message'])
1056 self.assertIn('500', res['output']['configuration-response-common'][
1060 def test_30_delete_eth_service1(self):
1061 url = ("{}/operations/org-openroadm-service:service-delete"
1062 .format(self.restconf_baseurl))
1064 "sdnc-request-header": {
1065 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1066 "rpc-action": "service-delete",
1067 "request-system-id": "appname",
1069 "http://localhost:8585/NotificationServer/notify"
1071 "service-delete-req-info": {
1072 "service-name": "service1",
1073 "tail-retention": "no"
1077 headers = {'content-type': 'application/json'}
1078 response = requests.request(
1079 "POST", url, data=json.dumps(data), headers=headers,
1080 auth=('admin', 'admin'))
1081 self.assertEqual(response.status_code, requests.codes.ok)
1082 res = response.json()
1083 self.assertIn('Renderer service delete in progress',
1084 res['output']['configuration-response-common'][
1085 'response-message'])
1088 def test_31_delete_eth_service2(self):
1089 url = ("{}/operations/org-openroadm-service:service-delete"
1090 .format(self.restconf_baseurl))
1092 "sdnc-request-header": {
1093 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1094 "rpc-action": "service-delete",
1095 "request-system-id": "appname",
1097 "http://localhost:8585/NotificationServer/notify"
1099 "service-delete-req-info": {
1100 "service-name": "service2",
1101 "tail-retention": "no"
1105 headers = {'content-type': 'application/json'}
1106 response = requests.request(
1107 "POST", url, data=json.dumps(data), headers=headers,
1108 auth=('admin', 'admin'))
1109 self.assertEqual(response.status_code, requests.codes.ok)
1110 res = response.json()
1111 self.assertIn('Renderer service delete in progress',
1112 res['output']['configuration-response-common'][
1113 'response-message'])
1116 def test_32_check_no_xc_ROADMA(self):
1118 "{}/config/network-topology:"
1119 "network-topology/topology/topology-netconf/"
1120 "node/ROADMA01/yang-ext:"
1121 "mount/org-openroadm-device:org-openroadm-device/"
1122 .format(self.restconf_baseurl))
1123 response = requests.request(
1124 "GET", url, auth=('admin', 'admin'))
1125 res = response.json()
1126 self.assertEqual(response.status_code, requests.codes.ok)
1127 self.assertNotIn('roadm-connections',
1128 dict.keys(res['org-openroadm-device']))
1131 def test_33_check_topo_XPDRA(self):
1133 "{}/config/ietf-network:"
1134 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
1135 .format(self.restconf_baseurl))
1136 response = requests.request(
1137 "GET", url1, auth=('admin', 'admin'))
1138 self.assertEqual(response.status_code, requests.codes.ok)
1139 res = response.json()
1140 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1141 for ele in liste_tp:
1142 if ((ele[u'org-openroadm-common-network:tp-type'] ==
1144 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
1145 'tp-id'] == 'XPDR1-CLIENT3')):
1147 'org-openroadm-network-topology:xpdr-client-attributes',
1149 elif (ele[u'org-openroadm-common-network:tp-type'] ==
1151 self.assertIn(u'tail-equipment-id', dict.keys(
1152 ele[u'org-openroadm-network-topology:'
1153 u'xpdr-network-attributes']))
1154 self.assertNotIn('wavelength', dict.keys(
1155 ele[u'org-openroadm-network-topology:'
1156 u'xpdr-network-attributes']))
1159 def test_34_check_topo_ROADMA_SRG1(self):
1161 "{}/config/ietf-network:"
1162 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
1163 .format(self.restconf_baseurl))
1164 response = requests.request(
1165 "GET", url1, auth=('admin', 'admin'))
1166 self.assertEqual(response.status_code, requests.codes.ok)
1167 res = response.json()
1168 self.assertIn({u'index': 1}, res['node'][0][
1169 u'org-openroadm-network-topology:srg-attributes'][
1170 'available-wavelengths'])
1171 self.assertIn({u'index': 2}, res['node'][0][
1172 u'org-openroadm-network-topology:srg-attributes'][
1173 'available-wavelengths'])
1174 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1175 for ele in liste_tp:
1176 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
1177 ele['tp-id'] == 'SRG1-PP1-TXRX':
1178 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
1181 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
1185 def test_35_check_topo_ROADMA_DEG1(self):
1187 "{}/config/ietf-network:"
1188 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
1189 .format(self.restconf_baseurl))
1190 response = requests.request(
1191 "GET", url1, auth=('admin', 'admin'))
1192 self.assertEqual(response.status_code, requests.codes.ok)
1193 res = response.json()
1194 self.assertIn({u'index': 1}, res['node'][0][
1195 u'org-openroadm-network-topology:degree-attributes'][
1196 'available-wavelengths'])
1197 self.assertIn({u'index': 2}, res['node'][0][
1198 u'org-openroadm-network-topology:degree-attributes'][
1199 'available-wavelengths'])
1200 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1201 for ele in liste_tp:
1202 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1203 self.assertNotIn('org-openroadm-network-topology:'
1204 'ctp-attributes', dict.keys(ele))
1205 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1206 self.assertNotIn('org-openroadm-network-topology:'
1207 'tx-ttp-attributes', dict.keys(ele))
1210 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1211 def test_36_create_oc_service1(self):
1212 url = ("{}/operations/org-openroadm-service:service-create"
1213 .format(self.restconf_baseurl))
1216 "sdnc-request-header": {
1217 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1218 "rpc-action": "service-create",
1219 "request-system-id": "appname",
1221 "http://localhost:8585/NotificationServer/notify"
1223 "service-name": "service1",
1224 "common-id": "ASATT1234567",
1225 "connection-type": "roadm-line",
1227 "service-rate": "100",
1228 "node-id": "ROADMA01",
1229 "service-format": "OC",
1230 "clli": "SNJSCAMCJP8",
1234 "ROUTER_SNJSCAMCJP8_000000.00_00",
1235 "port-type": "router",
1236 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1237 "port-rack": "000000.00",
1242 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1243 "lgx-port-name": "LGX Back.3",
1244 "lgx-port-rack": "000000.00",
1245 "lgx-port-shelf": "00"
1251 "ROUTER_SNJSCAMCJP8_000000.00_00",
1252 "port-type": "router",
1253 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1254 "port-rack": "000000.00",
1259 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1260 "lgx-port-name": "LGX Back.4",
1261 "lgx-port-rack": "000000.00",
1262 "lgx-port-shelf": "00"
1265 "optic-type": "gray"
1268 "service-rate": "100",
1269 "node-id": "ROADMC01",
1270 "service-format": "OC",
1271 "clli": "SNJSCAMCJT4",
1275 "ROUTER_SNJSCAMCJT4_000000.00_00",
1276 "port-type": "router",
1277 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1278 "port-rack": "000000.00",
1283 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1284 "lgx-port-name": "LGX Back.29",
1285 "lgx-port-rack": "000000.00",
1286 "lgx-port-shelf": "00"
1292 "ROUTER_SNJSCAMCJT4_000000.00_00",
1293 "port-type": "router",
1294 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1295 "port-rack": "000000.00",
1300 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1301 "lgx-port-name": "LGX Back.30",
1302 "lgx-port-rack": "000000.00",
1303 "lgx-port-shelf": "00"
1306 "optic-type": "gray"
1308 "due-date": "2016-11-28T00:00:01Z",
1309 "operator-contact": "pw1234"
1312 headers = {'content-type': 'application/json',
1313 "Accept": "application/json"}
1314 response = requests.request(
1315 "POST", url, data=json.dumps(data), headers=headers,
1316 auth=('admin', 'admin'))
1317 self.assertEqual(response.status_code, requests.codes.ok)
1318 res = response.json()
1319 self.assertIn('PCE calculation in progress',
1320 res['output']['configuration-response-common'][
1321 'response-message'])
1322 time.sleep(self.WAITING)
1324 def test_37_get_oc_service1(self):
1325 url = ("{}/operational/org-openroadm-service:"
1326 "service-list/services/service1"
1327 .format(self.restconf_baseurl))
1328 headers = {'content-type': 'application/json',
1329 "Accept": "application/json"}
1330 response = requests.request(
1331 "GET", url, headers=headers, auth=('admin', 'admin'))
1332 self.assertEqual(response.status_code, requests.codes.ok)
1333 res = response.json()
1335 res['services'][0]['administrative-state'],
1338 res['services'][0]['service-name'], 'service1')
1340 res['services'][0]['connection-type'], 'roadm-line')
1342 res['services'][0]['lifecycle-state'], 'planned')
1345 def test_38_check_xc1_ROADMA(self):
1347 "{}/config/network-topology:"
1348 "network-topology/topology/topology-netconf/"
1349 "node/ROADMA01/yang-ext:"
1350 "mount/org-openroadm-device:org-openroadm-device/"
1351 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1352 .format(self.restconf_baseurl))
1353 headers = {'content-type': 'application/json'}
1354 response = requests.request(
1355 "GET", url, headers=headers, auth=('admin', 'admin'))
1356 self.assertEqual(response.status_code, requests.codes.ok)
1357 res = response.json()
1358 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1359 self.assertDictEqual(
1361 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1362 'wavelength-number': 1,
1363 'opticalControlMode': 'gainLoss',
1364 'target-output-power': -3.0
1365 }, **res['roadm-connections'][0]),
1366 res['roadm-connections'][0]
1368 self.assertDictEqual(
1369 {'src-if': 'SRG1-PP1-TXRX-1'},
1370 res['roadm-connections'][0]['source'])
1371 self.assertDictEqual(
1372 {'dst-if': 'DEG1-TTP-TXRX-1'},
1373 res['roadm-connections'][0]['destination'])
1376 def test_39_check_xc1_ROADMC(self):
1378 "{}/config/network-topology:"
1379 "network-topology/topology/topology-netconf/"
1380 "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
1381 "org-openroadm-device/"
1382 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1383 .format(self.restconf_baseurl))
1384 headers = {'content-type': 'application/json'}
1385 response = requests.request(
1386 "GET", url, headers=headers, auth=('admin', 'admin'))
1387 self.assertEqual(response.status_code, requests.codes.ok)
1388 res = response.json()
1389 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1390 self.assertDictEqual(
1392 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1393 'wavelength-number': 1,
1394 'opticalControlMode': 'gainLoss',
1395 'target-output-power': 2.0
1396 }, **res['roadm-connections'][0]),
1397 res['roadm-connections'][0]
1399 self.assertDictEqual(
1400 {'src-if': 'SRG1-PP1-TXRX-1'},
1401 res['roadm-connections'][0]['source'])
1402 self.assertDictEqual(
1403 {'dst-if': 'DEG2-TTP-TXRX-1'},
1404 res['roadm-connections'][0]['destination'])
1407 def test_40_create_oc_service2(self):
1408 url = ("{}/operations/org-openroadm-service:service-create"
1409 .format(self.restconf_baseurl))
1412 "sdnc-request-header": {
1413 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1414 "rpc-action": "service-create",
1415 "request-system-id": "appname",
1417 "http://localhost:8585/NotificationServer/notify"
1419 "service-name": "service2",
1420 "common-id": "ASATT1234567",
1421 "connection-type": "roadm-line",
1423 "service-rate": "100",
1424 "node-id": "ROADMA01",
1425 "service-format": "OC",
1426 "clli": "SNJSCAMCJP8",
1430 "ROUTER_SNJSCAMCJP8_000000.00_00",
1431 "port-type": "router",
1432 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1433 "port-rack": "000000.00",
1438 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1439 "lgx-port-name": "LGX Back.3",
1440 "lgx-port-rack": "000000.00",
1441 "lgx-port-shelf": "00"
1447 "ROUTER_SNJSCAMCJP8_000000.00_00",
1448 "port-type": "router",
1449 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1450 "port-rack": "000000.00",
1455 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1456 "lgx-port-name": "LGX Back.4",
1457 "lgx-port-rack": "000000.00",
1458 "lgx-port-shelf": "00"
1461 "optic-type": "gray"
1464 "service-rate": "100",
1465 "node-id": "ROADMC01",
1466 "service-format": "OC",
1467 "clli": "SNJSCAMCJT4",
1471 "ROUTER_SNJSCAMCJT4_000000.00_00",
1472 "port-type": "router",
1473 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1474 "port-rack": "000000.00",
1479 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1480 "lgx-port-name": "LGX Back.29",
1481 "lgx-port-rack": "000000.00",
1482 "lgx-port-shelf": "00"
1488 "ROUTER_SNJSCAMCJT4_000000.00_00",
1489 "port-type": "router",
1490 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1491 "port-rack": "000000.00",
1496 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1497 "lgx-port-name": "LGX Back.30",
1498 "lgx-port-rack": "000000.00",
1499 "lgx-port-shelf": "00"
1502 "optic-type": "gray"
1504 "due-date": "2016-11-28T00:00:01Z",
1505 "operator-contact": "pw1234"
1508 headers = {'content-type': 'application/json',
1509 "Accept": "application/json"}
1510 response = requests.request(
1511 "POST", url, data=json.dumps(data), headers=headers,
1512 auth=('admin', 'admin'))
1513 self.assertEqual(response.status_code, requests.codes.ok)
1514 res = response.json()
1515 self.assertIn('PCE calculation in progress',
1516 res['output']['configuration-response-common'][
1517 'response-message'])
1518 time.sleep(self.WAITING)
1520 def test_41_get_oc_service2(self):
1521 url = ("{}/operational/org-openroadm-service:"
1522 "service-list/services/service2"
1523 .format(self.restconf_baseurl))
1524 headers = {'content-type': 'application/json',
1525 "Accept": "application/json"}
1526 response = requests.request(
1527 "GET", url, headers=headers, auth=('admin', 'admin'))
1528 self.assertEqual(response.status_code, requests.codes.ok)
1529 res = response.json()
1531 res['services'][0]['administrative-state'],
1534 res['services'][0]['service-name'], 'service2')
1536 res['services'][0]['connection-type'], 'roadm-line')
1538 res['services'][0]['lifecycle-state'], 'planned')
1541 def test_42_check_xc2_ROADMA(self):
1543 "{}/config/network-topology:"
1544 "network-topology/topology/topology-netconf/"
1545 "node/ROADMA01/yang-ext:mount/org-openroadm-device:"
1546 "org-openroadm-device/"
1547 "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2"
1548 .format(self.restconf_baseurl))
1549 headers = {'content-type': 'application/json'}
1550 response = requests.request(
1551 "GET", url, headers=headers, auth=('admin', 'admin'))
1552 self.assertEqual(response.status_code, requests.codes.ok)
1553 res = response.json()
1554 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1555 self.assertDictEqual(
1557 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
1558 'wavelength-number': 2,
1559 'opticalControlMode': 'gainLoss',
1560 'target-output-power': -3.0
1561 }, **res['roadm-connections'][0]),
1562 res['roadm-connections'][0]
1564 self.assertDictEqual(
1565 {'src-if': 'SRG1-PP2-TXRX-2'},
1566 res['roadm-connections'][0]['source'])
1567 self.assertDictEqual(
1568 {'dst-if': 'DEG1-TTP-TXRX-2'},
1569 res['roadm-connections'][0]['destination'])
1572 def test_43_check_topo_ROADMA(self):
1573 self.test_26_check_topo_ROADMA_SRG1()
1574 self.test_27_check_topo_ROADMA_DEG1()
1577 def test_44_delete_oc_service1(self):
1578 url = ("{}/operations/org-openroadm-service:service-delete"
1579 .format(self.restconf_baseurl))
1581 "sdnc-request-header": {
1582 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1583 "rpc-action": "service-delete",
1584 "request-system-id": "appname",
1586 "http://localhost:8585/NotificationServer/notify"
1588 "service-delete-req-info": {
1589 "service-name": "service1",
1590 "tail-retention": "no"
1594 headers = {'content-type': 'application/json'}
1595 response = requests.request(
1596 "POST", url, data=json.dumps(data), headers=headers,
1597 auth=('admin', 'admin'))
1598 self.assertEqual(response.status_code, requests.codes.ok)
1599 res = response.json()
1600 self.assertIn('Renderer service delete in progress',
1601 res['output']['configuration-response-common'][
1602 'response-message'])
1605 def test_45_delete_oc_service2(self):
1606 url = ("{}/operations/org-openroadm-service:service-delete"
1607 .format(self.restconf_baseurl))
1609 "sdnc-request-header": {
1610 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1611 "rpc-action": "service-delete",
1612 "request-system-id": "appname",
1614 "http://localhost:8585/NotificationServer/notify"
1616 "service-delete-req-info": {
1617 "service-name": "service2",
1618 "tail-retention": "no"
1622 headers = {'content-type': 'application/json'}
1623 response = requests.request(
1624 "POST", url, data=json.dumps(data), headers=headers,
1625 auth=('admin', 'admin'))
1626 self.assertEqual(response.status_code, requests.codes.ok)
1627 res = response.json()
1628 self.assertIn('Renderer service delete in progress',
1629 res['output']['configuration-response-common'][
1630 'response-message'])
1633 def test_46_get_no_oc_services(self):
1635 url = ("{}/operational/org-openroadm-service:service-list"
1636 .format(self.restconf_baseurl))
1637 headers = {'content-type': 'application/json',
1638 "Accept": "application/json"}
1639 response = requests.request(
1640 "GET", url, headers=headers, auth=('admin', 'admin'))
1641 self.assertEqual(response.status_code, requests.codes.not_found)
1642 res = response.json()
1645 "error-type": "application",
1646 "error-tag": "data-missing",
1648 "Request could not be completed because the relevant data "
1649 "model content does not exist"
1651 res['errors']['error'])
1654 def test_47_get_no_xc_ROADMA(self):
1656 "{}/config/network-topology:"
1657 "network-topology/topology/topology-netconf"
1658 "/node/ROADMA01/yang-ext:mount/org-openroadm-device:"
1659 "org-openroadm-device/"
1660 .format(self.restconf_baseurl))
1661 headers = {'content-type': 'application/json',
1662 "Accept": "application/json"}
1663 response = requests.request(
1664 "GET", url, headers=headers, auth=('admin', 'admin'))
1665 self.assertEqual(response.status_code, requests.codes.ok)
1666 res = response.json()
1667 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1670 def test_48_check_topo_ROADMA(self):
1671 self.test_34_check_topo_ROADMA_SRG1()
1672 self.test_35_check_topo_ROADMA_DEG1()
1674 def test_49_loop_create_eth_service(self):
1675 for i in range(1, 6):
1676 print("trial number {}".format(i))
1677 print("eth service creation")
1678 self.test_11_create_eth_service1()
1679 print("check xc in ROADMA01")
1680 self.test_13_check_xc1_ROADMA()
1681 print("check xc in ROADMC01")
1682 self.test_14_check_xc1_ROADMC()
1683 print("eth service deletion\n")
1684 self.test_30_delete_eth_service1()
1686 def test_50_loop_create_oc_service(self):
1687 url = ("{}/operational/org-openroadm-service:"
1688 "service-list/services/service1"
1689 .format(self.restconf_baseurl))
1690 response = requests.request("GET", url, auth=('admin', 'admin'))
1691 if response.status_code != 404:
1692 url = ("{}/operations/org-openroadm-service:service-delete"
1693 .format(self.restconf_baseurl))
1695 "sdnc-request-header": {
1696 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1697 "rpc-action": "service-delete",
1698 "request-system-id": "appname",
1700 "http://localhost:8585/NotificationServer/notify"
1702 "service-delete-req-info": {
1703 "service-name": "service1",
1704 "tail-retention": "no"
1708 headers = {'content-type': 'application/json'}
1709 requests.request("POST", url, data=json.dumps(data),
1711 auth=('admin', 'admin')
1715 for i in range(1, 6):
1716 print("trial number {}".format(i))
1717 print("oc service creation")
1718 self.test_36_create_oc_service1()
1719 print("check xc in ROADMA01")
1720 self.test_38_check_xc1_ROADMA()
1721 print("check xc in ROADMC01")
1722 self.test_39_check_xc1_ROADMC()
1723 print("oc service deletion\n")
1724 self.test_44_delete_oc_service1()
1726 def test_51_disconnect_XPDRA(self):
1727 url = ("{}/config/network-topology:"
1728 "network-topology/topology/topology-netconf/node/XPDRA01"
1729 .format(self.restconf_baseurl))
1730 headers = {'content-type': 'application/json'}
1731 response = requests.request(
1732 "DELETE", url, headers=headers,
1733 auth=('admin', 'admin'))
1734 self.assertEqual(response.status_code, requests.codes.ok)
1737 def test_52_disconnect_XPDRC(self):
1738 url = ("{}/config/network-topology:"
1739 "network-topology/topology/topology-netconf/node/XPDRC01"
1740 .format(self.restconf_baseurl))
1741 headers = {'content-type': 'application/json'}
1742 response = requests.request(
1743 "DELETE", url, headers=headers,
1744 auth=('admin', 'admin'))
1745 self.assertEqual(response.status_code, requests.codes.ok)
1748 def test_53_disconnect_ROADMA(self):
1749 url = ("{}/config/network-topology:"
1750 "network-topology/topology/topology-netconf/node/ROADMA01"
1751 .format(self.restconf_baseurl))
1752 headers = {'content-type': 'application/json'}
1753 response = requests.request(
1754 "DELETE", url, headers=headers,
1755 auth=('admin', 'admin'))
1756 self.assertEqual(response.status_code, requests.codes.ok)
1759 def test_54_disconnect_ROADMC(self):
1760 url = ("{}/config/network-topology:"
1761 "network-topology/topology/topology-netconf/node/ROADMC01"
1762 .format(self.restconf_baseurl))
1763 headers = {'content-type': 'application/json'}
1764 response = requests.request(
1765 "DELETE", url, headers=headers,
1766 auth=('admin', 'admin'))
1767 self.assertEqual(response.status_code, requests.codes.ok)
1771 if __name__ == "__main__":
1772 unittest.main(verbosity=2)