2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
22 class TransportPCEFulltesting(unittest.TestCase):
23 headers = {'content-type': 'application/json'}
29 restconf_baseurl = "http://localhost:8181/restconf"
30 WAITING = 20 # nominal value is 300
32 # START_IGNORE_XTESTING
36 cls.sim_process1 = test_utils.start_sim('xpdra')
39 cls.sim_process2 = test_utils.start_sim('roadma-full')
42 cls.sim_process3 = test_utils.start_sim('roadmc-full')
45 cls.sim_process4 = test_utils.start_sim('xpdrc')
47 print("all sims started")
49 cls.odl_process = test_utils.start_tpce()
51 print("opendaylight started")
54 def tearDownClass(cls):
55 for child in psutil.Process(cls.odl_process.pid).children():
56 child.send_signal(signal.SIGINT)
58 cls.odl_process.send_signal(signal.SIGINT)
59 cls.odl_process.wait()
60 for child in psutil.Process(cls.sim_process1.pid).children():
61 child.send_signal(signal.SIGINT)
63 cls.sim_process1.send_signal(signal.SIGINT)
64 cls.sim_process1.wait()
65 for child in psutil.Process(cls.sim_process2.pid).children():
66 child.send_signal(signal.SIGINT)
68 cls.sim_process2.send_signal(signal.SIGINT)
69 cls.sim_process2.wait()
70 for child in psutil.Process(cls.sim_process3.pid).children():
71 child.send_signal(signal.SIGINT)
73 cls.sim_process3.send_signal(signal.SIGINT)
74 cls.sim_process3.wait()
75 for child in psutil.Process(cls.sim_process4.pid).children():
76 child.send_signal(signal.SIGINT)
78 cls.sim_process4.send_signal(signal.SIGINT)
79 cls.sim_process4.wait()
80 print("all processes killed")
82 def setUp(self): # instruction executed before each test method
83 print("execution of {}".format(self.id().split(".")[-1]))
87 # connect netconf devices
88 def test_01_connect_xpdrA(self):
89 url = ("{}/config/network-topology:"
90 "network-topology/topology/topology-netconf/node/XPDRA01"
91 .format(self.restconf_baseurl))
94 "netconf-node-topology:username": "admin",
95 "netconf-node-topology:password": "admin",
96 "netconf-node-topology:host": "127.0.0.1",
97 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
98 "netconf-node-topology:tcp-only": "false",
99 "netconf-node-topology:pass-through": {}}]}
100 headers = {'content-type': 'application/json'}
101 response = requests.request(
102 "PUT", url, data=json.dumps(data), headers=headers,
103 auth=('admin', 'admin'))
104 self.assertEqual(response.status_code, requests.codes.created)
107 def test_02_connect_xpdrC(self):
108 url = ("{}/config/network-topology:"
109 "network-topology/topology/topology-netconf/node/XPDRC01"
110 .format(self.restconf_baseurl))
112 "node-id": "XPDRC01",
113 "netconf-node-topology:username": "admin",
114 "netconf-node-topology:password": "admin",
115 "netconf-node-topology:host": "127.0.0.1",
116 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
117 "netconf-node-topology:tcp-only": "false",
118 "netconf-node-topology:pass-through": {}}]}
119 headers = {'content-type': 'application/json'}
120 response = requests.request(
121 "PUT", url, data=json.dumps(data), headers=headers,
122 auth=('admin', 'admin'))
123 self.assertEqual(response.status_code, requests.codes.created)
126 def test_03_connect_rdmA(self):
127 url = ("{}/config/network-topology:"
128 "network-topology/topology/topology-netconf/node/ROADMA01"
129 .format(self.restconf_baseurl))
131 "node-id": "ROADMA01",
132 "netconf-node-topology:username": "admin",
133 "netconf-node-topology:password": "admin",
134 "netconf-node-topology:host": "127.0.0.1",
135 "netconf-node-topology:port": test_utils.sims['roadma-full']['port'],
136 "netconf-node-topology:tcp-only": "false",
137 "netconf-node-topology:pass-through": {}}]}
138 headers = {'content-type': 'application/json'}
139 response = requests.request(
140 "PUT", url, data=json.dumps(data), headers=headers,
141 auth=('admin', 'admin'))
142 self.assertEqual(response.status_code, requests.codes.created)
145 def test_04_connect_rdmC(self):
146 url = ("{}/config/network-topology:"
147 "network-topology/topology/topology-netconf/node/ROADMC01"
148 .format(self.restconf_baseurl))
150 "node-id": "ROADMC01",
151 "netconf-node-topology:username": "admin",
152 "netconf-node-topology:password": "admin",
153 "netconf-node-topology:host": "127.0.0.1",
154 "netconf-node-topology:port": test_utils.sims['roadmc-full']['port'],
155 "netconf-node-topology:tcp-only": "false",
156 "netconf-node-topology:pass-through": {}}]}
157 headers = {'content-type': 'application/json'}
158 response = requests.request(
159 "PUT", url, data=json.dumps(data), headers=headers,
160 auth=('admin', 'admin'))
161 self.assertEqual(response.status_code, requests.codes.created)
164 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
165 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
166 format(self.restconf_baseurl)
168 "networkutils:input": {
169 "networkutils:links-input": {
170 "networkutils:xpdr-node": "XPDRA01",
171 "networkutils:xpdr-num": "1",
172 "networkutils:network-num": "1",
173 "networkutils:rdm-node": "ROADMA01",
174 "networkutils:srg-num": "1",
175 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
179 headers = {'content-type': 'application/json'}
180 response = requests.request(
181 "POST", url, data=json.dumps(data),
182 headers=headers, auth=('admin', 'admin'))
183 self.assertEqual(response.status_code, requests.codes.ok)
184 res = response.json()
185 self.assertIn('Xponder Roadm Link created successfully',
186 res["output"]["result"])
189 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
190 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
191 format(self.restconf_baseurl)
193 "networkutils:input": {
194 "networkutils:links-input": {
195 "networkutils:xpdr-node": "XPDRA01",
196 "networkutils:xpdr-num": "1",
197 "networkutils:network-num": "1",
198 "networkutils:rdm-node": "ROADMA01",
199 "networkutils:srg-num": "1",
200 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
204 headers = {'content-type': 'application/json'}
205 response = requests.request(
206 "POST", url, data=json.dumps(data),
207 headers=headers, auth=('admin', 'admin'))
208 self.assertEqual(response.status_code, requests.codes.ok)
209 res = response.json()
210 self.assertIn('Roadm Xponder links created successfully',
211 res["output"]["result"])
214 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
215 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
216 format(self.restconf_baseurl)
218 "networkutils:input": {
219 "networkutils:links-input": {
220 "networkutils:xpdr-node": "XPDRC01",
221 "networkutils:xpdr-num": "1",
222 "networkutils:network-num": "1",
223 "networkutils:rdm-node": "ROADMC01",
224 "networkutils:srg-num": "1",
225 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
229 headers = {'content-type': 'application/json'}
230 response = requests.request(
231 "POST", url, data=json.dumps(data),
232 headers=headers, auth=('admin', 'admin'))
233 self.assertEqual(response.status_code, requests.codes.ok)
234 res = response.json()
235 self.assertIn('Xponder Roadm Link created successfully',
236 res["output"]["result"])
239 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
240 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
241 format(self.restconf_baseurl)
243 "networkutils:input": {
244 "networkutils:links-input": {
245 "networkutils:xpdr-node": "XPDRC01",
246 "networkutils:xpdr-num": "1",
247 "networkutils:network-num": "1",
248 "networkutils:rdm-node": "ROADMC01",
249 "networkutils:srg-num": "1",
250 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
254 headers = {'content-type': 'application/json'}
255 response = requests.request(
256 "POST", url, data=json.dumps(data),
257 headers=headers, auth=('admin', 'admin'))
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
260 self.assertIn('Roadm Xponder links created successfully',
261 res["output"]["result"])
264 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
265 # Config ROADMA-ROADMC oms-attributes
267 "{}/config/ietf-network:"
268 "networks/network/openroadm-topology/ietf-network-topology:"
269 "link/ROADMA01-DEG1-DEG1-TTP-TXRXtoROADMC01-DEG2-DEG2-TTP-TXRX/"
270 "org-openroadm-network-topology:"
271 "OMS-attributes/span"
272 .format(self.restconf_baseurl))
275 "auto-spanloss": "true",
276 "spanloss-base": 11.4,
277 "spanloss-current": 12,
278 "engineered-spanloss": 12.2,
279 "link-concatenation": [{
282 "SRLG-length": 100000,
284 headers = {'content-type': 'application/json'}
285 response = requests.request(
286 "PUT", url, data=json.dumps(data), headers=headers,
287 auth=('admin', 'admin'))
288 self.assertEqual(response.status_code, requests.codes.created)
290 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
291 # Config ROADMC-ROADMA oms-attributes
293 "{}/config/ietf-network:"
294 "networks/network/openroadm-topology/ietf-network-topology:"
295 "link/ROADMC01-DEG2-DEG2-TTP-TXRXtoROADMA01-DEG1-DEG1-TTP-TXRX/"
296 "org-openroadm-network-topology:"
297 "OMS-attributes/span"
298 .format(self.restconf_baseurl))
301 "auto-spanloss": "true",
302 "spanloss-base": 11.4,
303 "spanloss-current": 12,
304 "engineered-spanloss": 12.2,
305 "link-concatenation": [{
308 "SRLG-length": 100000,
310 headers = {'content-type': 'application/json'}
311 response = requests.request(
312 "PUT", url, data=json.dumps(data), headers=headers,
313 auth=('admin', 'admin'))
314 self.assertEqual(response.status_code, requests.codes.created)
316 # test service-create for Eth service from xpdr to xpdr
317 def test_11_create_eth_service1(self):
318 url = ("{}/operations/org-openroadm-service:service-create"
319 .format(self.restconf_baseurl))
322 "sdnc-request-header": {
323 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
324 "rpc-action": "service-create",
325 "request-system-id": "appname",
327 "http://localhost:8585/NotificationServer/notify"
329 "service-name": "service1",
330 "common-id": "ASATT1234567",
331 "connection-type": "service",
333 "service-rate": "100",
334 "node-id": "XPDRA01",
335 "service-format": "Ethernet",
336 "clli": "SNJSCAMCJP8",
340 "ROUTER_SNJSCAMCJP8_000000.00_00",
341 "port-type": "router",
342 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
343 "port-rack": "000000.00",
348 "LGX Panel_SNJSCAMCJP8_000000.00_00",
349 "lgx-port-name": "LGX Back.3",
350 "lgx-port-rack": "000000.00",
351 "lgx-port-shelf": "00"
357 "ROUTER_SNJSCAMCJP8_000000.00_00",
358 "port-type": "router",
359 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
360 "port-rack": "000000.00",
365 "LGX Panel_SNJSCAMCJP8_000000.00_00",
366 "lgx-port-name": "LGX Back.4",
367 "lgx-port-rack": "000000.00",
368 "lgx-port-shelf": "00"
374 "service-rate": "100",
375 "node-id": "XPDRC01",
376 "service-format": "Ethernet",
377 "clli": "SNJSCAMCJT4",
381 "ROUTER_SNJSCAMCJT4_000000.00_00",
382 "port-type": "router",
383 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
384 "port-rack": "000000.00",
389 "LGX Panel_SNJSCAMCJT4_000000.00_00",
390 "lgx-port-name": "LGX Back.29",
391 "lgx-port-rack": "000000.00",
392 "lgx-port-shelf": "00"
398 "ROUTER_SNJSCAMCJT4_000000.00_00",
399 "port-type": "router",
400 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
401 "port-rack": "000000.00",
406 "LGX Panel_SNJSCAMCJT4_000000.00_00",
407 "lgx-port-name": "LGX Back.30",
408 "lgx-port-rack": "000000.00",
409 "lgx-port-shelf": "00"
414 "due-date": "2016-11-28T00:00:01Z",
415 "operator-contact": "pw1234"
418 headers = {'content-type': 'application/json',
419 "Accept": "application/json"}
420 response = requests.request(
421 "POST", url, data=json.dumps(data), headers=headers,
422 auth=('admin', 'admin'))
423 self.assertEqual(response.status_code, requests.codes.ok)
424 res = response.json()
425 self.assertIn('PCE calculation in progress',
426 res['output']['configuration-response-common'][
428 time.sleep(self.WAITING)
430 def test_12_get_eth_service1(self):
431 url = ("{}/operational/org-openroadm-service:service-list/services/"
432 "service1".format(self.restconf_baseurl))
433 headers = {'content-type': 'application/json',
434 "Accept": "application/json"}
435 response = requests.request(
436 "GET", url, headers=headers, auth=('admin', 'admin'))
437 self.assertEqual(response.status_code, requests.codes.ok)
438 res = response.json()
440 res['services'][0]['administrative-state'],
443 res['services'][0]['service-name'], 'service1')
445 res['services'][0]['connection-type'], 'service')
447 res['services'][0]['lifecycle-state'], 'planned')
450 def test_13_check_xc1_ROADMA(self):
452 "{}/config/network-topology:"
453 "network-topology/topology/topology-netconf/"
454 "node/ROADMA01/yang-ext:"
455 "mount/org-openroadm-device:org-openroadm-device/"
456 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
457 .format(self.restconf_baseurl))
458 headers = {'content-type': 'application/json'}
459 response = requests.request(
460 "GET", url, headers=headers, auth=('admin', 'admin'))
461 self.assertEqual(response.status_code, requests.codes.ok)
462 res = response.json()
463 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
464 self.assertDictEqual(
466 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
467 'wavelength-number': 1,
468 'opticalControlMode': 'gainLoss',
469 'target-output-power': -3.0
470 }, **res['roadm-connections'][0]),
471 res['roadm-connections'][0]
473 self.assertDictEqual(
474 {'src-if': 'SRG1-PP1-TXRX-1'},
475 res['roadm-connections'][0]['source'])
476 self.assertDictEqual(
477 {'dst-if': 'DEG1-TTP-TXRX-1'},
478 res['roadm-connections'][0]['destination'])
481 def test_14_check_xc1_ROADMC(self):
483 "{}/config/network-topology:"
484 "network-topology/topology/topology-netconf/"
485 "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
486 "org-openroadm-device/"
487 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
488 .format(self.restconf_baseurl))
489 headers = {'content-type': 'application/json'}
490 response = requests.request(
491 "GET", url, headers=headers, auth=('admin', 'admin'))
492 self.assertEqual(response.status_code, requests.codes.ok)
493 res = response.json()
494 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
495 self.assertDictEqual(
497 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
498 'wavelength-number': 1,
499 'opticalControlMode': 'gainLoss',
500 'target-output-power': 2.0
501 }, **res['roadm-connections'][0]),
502 res['roadm-connections'][0]
504 self.assertDictEqual(
505 {'src-if': 'SRG1-PP1-TXRX-1'},
506 res['roadm-connections'][0]['source'])
507 self.assertDictEqual(
508 {'dst-if': 'DEG2-TTP-TXRX-1'},
509 res['roadm-connections'][0]['destination'])
512 def test_15_check_topo_XPDRA(self):
514 "{}/config/ietf-network:"
515 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
516 .format(self.restconf_baseurl))
517 response = requests.request(
518 "GET", url1, auth=('admin', 'admin'))
519 self.assertEqual(response.status_code, requests.codes.ok)
520 res = response.json()
521 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
523 if ele['tp-id'] == 'XPDR1-NETWORK1':
524 self.assertEqual({u'frequency': 196.1, u'width': 40},
526 'org-openroadm-network-topology:'
527 'xpdr-network-attributes'][
529 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
530 ele['tp-id'] == 'XPDR1-CLIENT3':
532 'org-openroadm-network-topology:xpdr-client-attributes',
534 if ele['tp-id'] == 'XPDR1-NETWORK2':
536 'org-openroadm-network-topology:xpdr-network-attributes',
540 def test_16_check_topo_ROADMA_SRG1(self):
542 "{}/config/ietf-network:"
543 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
544 .format(self.restconf_baseurl))
545 response = requests.request(
546 "GET", url1, auth=('admin', 'admin'))
547 self.assertEqual(response.status_code, requests.codes.ok)
548 res = response.json()
549 self.assertNotIn({u'index': 1},
551 u'org-openroadm-network-topology:srg-attributes'][
552 'available-wavelengths'])
553 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
555 if ele['tp-id'] == 'SRG1-PP1-TXRX':
556 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
557 ele['org-openroadm-network-topology:'
558 'pp-attributes']['used-wavelength']
560 if ele['tp-id'] == 'SRG1-PP2-TXRX':
561 self.assertNotIn('used-wavelength', dict.keys(ele))
564 def test_17_check_topo_ROADMA_DEG1(self):
566 "{}/config/ietf-network:"
567 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
568 .format(self.restconf_baseurl))
569 response = requests.request(
570 "GET", url1, auth=('admin', 'admin'))
571 self.assertEqual(response.status_code, requests.codes.ok)
572 res = response.json()
573 self.assertNotIn({u'index': 1},
575 u'org-openroadm-network-topology:'
576 u'degree-attributes'][
577 'available-wavelengths'])
578 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
580 if ele['tp-id'] == 'DEG1-CTP-TXRX':
581 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
582 ele['org-openroadm-network-topology:'
585 if ele['tp-id'] == 'DEG1-TTP-TXRX':
586 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
587 ele['org-openroadm-network-topology:'
588 'tx-ttp-attributes'][
592 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
593 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
594 format(self.restconf_baseurl)
596 "networkutils:input": {
597 "networkutils:links-input": {
598 "networkutils:xpdr-node": "XPDRA01",
599 "networkutils:xpdr-num": "1",
600 "networkutils:network-num": "2",
601 "networkutils:rdm-node": "ROADMA01",
602 "networkutils:srg-num": "1",
603 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
607 headers = {'content-type': 'application/json'}
608 response = requests.request(
609 "POST", url, data=json.dumps(data),
610 headers=headers, auth=('admin', 'admin'))
611 self.assertEqual(response.status_code, requests.codes.ok)
612 res = response.json()
613 self.assertIn('Xponder Roadm Link created successfully',
614 res["output"]["result"])
617 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
618 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
619 format(self.restconf_baseurl)
621 "networkutils:input": {
622 "networkutils:links-input": {
623 "networkutils:xpdr-node": "XPDRA01",
624 "networkutils:xpdr-num": "1",
625 "networkutils:network-num": "2",
626 "networkutils:rdm-node": "ROADMA01",
627 "networkutils:srg-num": "1",
628 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
632 headers = {'content-type': 'application/json'}
633 response = requests.request(
634 "POST", url, data=json.dumps(data),
635 headers=headers, auth=('admin', 'admin'))
636 self.assertEqual(response.status_code, requests.codes.ok)
637 res = response.json()
638 self.assertIn('Roadm Xponder links created successfully',
639 res["output"]["result"])
642 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
643 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".\
644 format(self.restconf_baseurl)
646 "networkutils:input": {
647 "networkutils:links-input": {
648 "networkutils:xpdr-node": "XPDRC01",
649 "networkutils:xpdr-num": "1",
650 "networkutils:network-num": "2",
651 "networkutils:rdm-node": "ROADMC01",
652 "networkutils:srg-num": "1",
653 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
657 headers = {'content-type': 'application/json'}
658 response = requests.request(
659 "POST", url, data=json.dumps(data),
660 headers=headers, auth=('admin', 'admin'))
661 self.assertEqual(response.status_code, requests.codes.ok)
662 res = response.json()
663 self.assertIn('Xponder Roadm Link created successfully',
664 res["output"]["result"])
667 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
668 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".\
669 format(self.restconf_baseurl)
671 "networkutils:input": {
672 "networkutils:links-input": {
673 "networkutils:xpdr-node": "XPDRC01",
674 "networkutils:xpdr-num": "1",
675 "networkutils:network-num": "2",
676 "networkutils:rdm-node": "ROADMC01",
677 "networkutils:srg-num": "1",
678 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
682 headers = {'content-type': 'application/json'}
683 response = requests.request(
684 "POST", url, data=json.dumps(data),
685 headers=headers, auth=('admin', 'admin'))
686 self.assertEqual(response.status_code, requests.codes.ok)
687 res = response.json()
688 self.assertIn('Roadm Xponder links created successfully',
689 res["output"]["result"])
692 def test_22_create_eth_service2(self):
693 url = ("{}/operations/org-openroadm-service:service-create"
694 .format(self.restconf_baseurl))
697 "sdnc-request-header": {
698 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
699 "rpc-action": "service-create",
700 "request-system-id": "appname",
702 "http://localhost:8585/NotificationServer/notify"
704 "service-name": "service2",
705 "common-id": "ASATT1234567",
706 "connection-type": "service",
708 "service-rate": "100",
709 "node-id": "XPDRA01",
710 "service-format": "Ethernet",
711 "clli": "SNJSCAMCJP8",
715 "ROUTER_SNJSCAMCJP8_000000.00_00",
716 "port-type": "router",
717 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
718 "port-rack": "000000.00",
723 "LGX Panel_SNJSCAMCJP8_000000.00_00",
724 "lgx-port-name": "LGX Back.3",
725 "lgx-port-rack": "000000.00",
726 "lgx-port-shelf": "00"
732 "ROUTER_SNJSCAMCJP8_000000.00_00",
733 "port-type": "router",
734 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
735 "port-rack": "000000.00",
740 "LGX Panel_SNJSCAMCJP8_000000.00_00",
741 "lgx-port-name": "LGX Back.4",
742 "lgx-port-rack": "000000.00",
743 "lgx-port-shelf": "00"
749 "service-rate": "100",
750 "node-id": "XPDRC01",
751 "service-format": "Ethernet",
752 "clli": "SNJSCAMCJT4",
756 "ROUTER_SNJSCAMCJT4_000000.00_00",
757 "port-type": "router",
758 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
759 "port-rack": "000000.00",
764 "LGX Panel_SNJSCAMCJT4_000000.00_00",
765 "lgx-port-name": "LGX Back.29",
766 "lgx-port-rack": "000000.00",
767 "lgx-port-shelf": "00"
773 "ROUTER_SNJSCAMCJT4_000000.00_00",
774 "port-type": "router",
775 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
776 "port-rack": "000000.00",
781 "LGX Panel_SNJSCAMCJT4_000000.00_00",
782 "lgx-port-name": "LGX Back.30",
783 "lgx-port-rack": "000000.00",
784 "lgx-port-shelf": "00"
789 "due-date": "2016-11-28T00:00:01Z",
790 "operator-contact": "pw1234"
793 headers = {'content-type': 'application/json',
794 "Accept": "application/json"}
795 response = requests.request(
796 "POST", url, data=json.dumps(data), headers=headers,
797 auth=('admin', 'admin'))
798 self.assertEqual(response.status_code, requests.codes.ok)
799 res = response.json()
800 self.assertIn('PCE calculation in progress',
801 res['output']['configuration-response-common'][
803 time.sleep(self.WAITING)
805 def test_23_get_eth_service2(self):
806 url = ("{}/operational/org-openroadm-service:"
807 "service-list/services/service2"
808 .format(self.restconf_baseurl))
809 headers = {'content-type': 'application/json',
810 "Accept": "application/json"}
811 response = requests.request(
812 "GET", url, headers=headers, auth=('admin', 'admin'))
813 self.assertEqual(response.status_code, requests.codes.ok)
814 res = response.json()
816 res['services'][0]['administrative-state'],
819 res['services'][0]['service-name'], 'service2')
821 res['services'][0]['connection-type'], 'service')
823 res['services'][0]['lifecycle-state'], 'planned')
826 def test_24_check_xc2_ROADMA(self):
828 "{}/config/network-topology:"
829 "network-topology/topology/topology-netconf/"
830 "node/ROADMA01/yang-ext:"
831 "mount/org-openroadm-device:org-openroadm-device/"
832 "roadm-connections/DEG1-TTP-TXRX-SRG1-PP2-TXRX-2"
833 .format(self.restconf_baseurl))
834 headers = {'content-type': 'application/json'}
835 response = requests.request(
836 "GET", url, headers=headers, auth=('admin', 'admin'))
837 self.assertEqual(response.status_code, requests.codes.ok)
838 res = response.json()
839 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
840 self.assertDictEqual(
842 'connection-number': 'DEG1-TTP-TXRX-SRG1-PP2-TXRX-2',
843 'wavelength-number': 2,
844 'opticalControlMode': 'power'
845 }, **res['roadm-connections'][0]),
846 res['roadm-connections'][0]
848 self.assertDictEqual(
849 {'src-if': 'DEG1-TTP-TXRX-2'},
850 res['roadm-connections'][0]['source'])
851 self.assertDictEqual(
852 {'dst-if': 'SRG1-PP2-TXRX-2'},
853 res['roadm-connections'][0]['destination'])
855 def test_25_check_topo_XPDRA(self):
857 "{}/config/ietf-network:"
858 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
859 .format(self.restconf_baseurl))
860 response = requests.request(
861 "GET", url1, auth=('admin', 'admin'))
862 self.assertEqual(response.status_code, requests.codes.ok)
863 res = response.json()
864 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
866 if ele['tp-id'] == 'XPDR1-NETWORK1':
867 self.assertEqual({u'frequency': 196.1, u'width': 40},
868 ele['org-openroadm-network-topology:'
869 'xpdr-network-attributes'][
871 if ele['tp-id'] == 'XPDR1-NETWORK2':
872 self.assertEqual({u'frequency': 196.05, u'width': 40},
873 ele['org-openroadm-network-topology:'
874 'xpdr-network-attributes'][
876 if ele['tp-id'] == 'XPDR1-CLIENT1' or \
877 ele['tp-id'] == 'XPDR1-CLIENT3':
879 'org-openroadm-network-topology:xpdr-client-attributes',
883 def test_26_check_topo_ROADMA_SRG1(self):
885 "{}/config/ietf-network:"
886 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
887 .format(self.restconf_baseurl))
888 response = requests.request(
889 "GET", url1, auth=('admin', 'admin'))
890 self.assertEqual(response.status_code, requests.codes.ok)
891 res = response.json()
892 self.assertNotIn({u'index': 1}, res['node'][0][
893 u'org-openroadm-network-topology:srg-attributes'][
894 'available-wavelengths'])
895 self.assertNotIn({u'index': 2}, res['node'][0][
896 u'org-openroadm-network-topology:srg-attributes'][
897 'available-wavelengths'])
898 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
900 if ele['tp-id'] == 'SRG1-PP1-TXRX':
901 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
902 ele['org-openroadm-network-topology:'
903 'pp-attributes']['used-wavelength'])
904 self.assertNotIn({u'index': 2, u'frequency': 196.05,
906 ele['org-openroadm-network-topology:'
907 'pp-attributes']['used-wavelength'])
908 if ele['tp-id'] == 'SRG1-PP2-TXRX':
909 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
910 ele['org-openroadm-network-topology:'
911 'pp-attributes']['used-wavelength'])
912 self.assertNotIn({u'index': 1, u'frequency': 196.1,
914 ele['org-openroadm-network-topology:'
915 'pp-attributes']['used-wavelength'])
916 if ele['tp-id'] == 'SRG1-PP3-TXRX':
917 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
921 def test_27_check_topo_ROADMA_DEG1(self):
923 "{}/config/ietf-network:"
924 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
925 .format(self.restconf_baseurl))
926 response = requests.request(
927 "GET", url1, auth=('admin', 'admin'))
928 self.assertEqual(response.status_code, requests.codes.ok)
929 res = response.json()
930 self.assertNotIn({u'index': 1}, res['node'][0][
931 u'org-openroadm-network-topology:degree-attributes'][
932 'available-wavelengths'])
933 self.assertNotIn({u'index': 2}, res['node'][0][
934 u'org-openroadm-network-topology:degree-attributes'][
935 'available-wavelengths'])
936 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
938 if ele['tp-id'] == 'DEG1-CTP-TXRX':
939 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
940 ele['org-openroadm-network-topology:'
941 'ctp-attributes']['used-wavelengths'])
942 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
943 ele['org-openroadm-network-topology:'
944 'ctp-attributes']['used-wavelengths'])
945 if ele['tp-id'] == 'DEG1-TTP-TXRX':
946 self.assertIn({u'index': 1, u'frequency': 196.1, u'width': 40},
947 ele['org-openroadm-network-topology:'
948 'tx-ttp-attributes']['used-wavelengths'])
949 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
950 ele['org-openroadm-network-topology:'
951 'tx-ttp-attributes']['used-wavelengths'])
954 # creation service test on a non-available resource
955 def test_28_create_eth_service3(self):
956 url = ("{}/operations/org-openroadm-service:service-create"
957 .format(self.restconf_baseurl))
960 "sdnc-request-header": {
961 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
962 "rpc-action": "service-create",
963 "request-system-id": "appname",
965 "http://localhost:8585/NotificationServer/notify"
967 "service-name": "service3",
968 "common-id": "ASATT1234567",
969 "connection-type": "service",
971 "service-rate": "100",
972 "node-id": "XPDRA01",
973 "service-format": "Ethernet",
974 "clli": "SNJSCAMCJP8",
978 "ROUTER_SNJSCAMCJP8_000000.00_00",
979 "port-type": "router",
980 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
981 "port-rack": "000000.00",
986 "LGX Panel_SNJSCAMCJP8_000000.00_00",
987 "lgx-port-name": "LGX Back.3",
988 "lgx-port-rack": "000000.00",
989 "lgx-port-shelf": "00"
995 "ROUTER_SNJSCAMCJP8_000000.00_00",
996 "port-type": "router",
997 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
998 "port-rack": "000000.00",
1003 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1004 "lgx-port-name": "LGX Back.4",
1005 "lgx-port-rack": "000000.00",
1006 "lgx-port-shelf": "00"
1009 "optic-type": "gray"
1012 "service-rate": "100",
1013 "node-id": "XPDRC01",
1014 "service-format": "Ethernet",
1015 "clli": "SNJSCAMCJT4",
1019 "ROUTER_SNJSCAMCJT4_000000.00_00",
1020 "port-type": "router",
1021 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1022 "port-rack": "000000.00",
1027 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1028 "lgx-port-name": "LGX Back.29",
1029 "lgx-port-rack": "000000.00",
1030 "lgx-port-shelf": "00"
1036 "ROUTER_SNJSCAMCJT4_000000.00_00",
1037 "port-type": "router",
1038 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1039 "port-rack": "000000.00",
1044 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1045 "lgx-port-name": "LGX Back.30",
1046 "lgx-port-rack": "000000.00",
1047 "lgx-port-shelf": "00"
1050 "optic-type": "gray"
1052 "due-date": "2016-11-28T00:00:01Z",
1053 "operator-contact": "pw1234"
1056 headers = {'content-type': 'application/json',
1057 "Accept": "application/json"}
1058 response = requests.request(
1059 "POST", url, data=json.dumps(data), headers=headers,
1060 auth=('admin', 'admin'))
1061 self.assertEqual(response.status_code, requests.codes.ok)
1062 res = response.json()
1063 self.assertIn('PCE calculation in progress',
1064 res['output']['configuration-response-common'][
1065 'response-message'])
1066 self.assertIn('200', res['output']['configuration-response-common'][
1068 time.sleep(self.WAITING)
1070 # add a test that check the openroadm-service-list still only
1071 # contains 2 elements
1073 def test_29_delete_eth_service3(self):
1074 url = ("{}/operations/org-openroadm-service:service-delete"
1075 .format(self.restconf_baseurl))
1077 "sdnc-request-header": {
1078 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1079 "rpc-action": "service-delete",
1080 "request-system-id": "appname",
1082 "http://localhost:8585/NotificationServer/notify"
1084 "service-delete-req-info": {
1085 "service-name": "service3",
1086 "tail-retention": "no"
1090 headers = {'content-type': 'application/json'}
1091 response = requests.request(
1092 "POST", url, data=json.dumps(data), headers=headers,
1093 auth=('admin', 'admin'))
1094 self.assertEqual(response.status_code, requests.codes.ok)
1095 res = response.json()
1096 self.assertIn('Service \'service3\' does not exist in datastore',
1097 res['output']['configuration-response-common'][
1098 'response-message'])
1099 self.assertIn('500', res['output']['configuration-response-common'][
1103 def test_30_delete_eth_service1(self):
1104 url = ("{}/operations/org-openroadm-service:service-delete"
1105 .format(self.restconf_baseurl))
1107 "sdnc-request-header": {
1108 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1109 "rpc-action": "service-delete",
1110 "request-system-id": "appname",
1112 "http://localhost:8585/NotificationServer/notify"
1114 "service-delete-req-info": {
1115 "service-name": "service1",
1116 "tail-retention": "no"
1120 headers = {'content-type': 'application/json'}
1121 response = requests.request(
1122 "POST", url, data=json.dumps(data), headers=headers,
1123 auth=('admin', 'admin'))
1124 self.assertEqual(response.status_code, requests.codes.ok)
1125 res = response.json()
1126 self.assertIn('Renderer service delete in progress',
1127 res['output']['configuration-response-common'][
1128 'response-message'])
1131 def test_31_delete_eth_service2(self):
1132 url = ("{}/operations/org-openroadm-service:service-delete"
1133 .format(self.restconf_baseurl))
1135 "sdnc-request-header": {
1136 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1137 "rpc-action": "service-delete",
1138 "request-system-id": "appname",
1140 "http://localhost:8585/NotificationServer/notify"
1142 "service-delete-req-info": {
1143 "service-name": "service2",
1144 "tail-retention": "no"
1148 headers = {'content-type': 'application/json'}
1149 response = requests.request(
1150 "POST", url, data=json.dumps(data), headers=headers,
1151 auth=('admin', 'admin'))
1152 self.assertEqual(response.status_code, requests.codes.ok)
1153 res = response.json()
1154 self.assertIn('Renderer service delete in progress',
1155 res['output']['configuration-response-common'][
1156 'response-message'])
1159 def test_32_check_no_xc_ROADMA(self):
1161 "{}/config/network-topology:"
1162 "network-topology/topology/topology-netconf/"
1163 "node/ROADMA01/yang-ext:"
1164 "mount/org-openroadm-device:org-openroadm-device/"
1165 .format(self.restconf_baseurl))
1166 response = requests.request(
1167 "GET", url, auth=('admin', 'admin'))
1168 res = response.json()
1169 self.assertEqual(response.status_code, requests.codes.ok)
1170 self.assertNotIn('roadm-connections',
1171 dict.keys(res['org-openroadm-device']))
1174 def test_33_check_topo_XPDRA(self):
1176 "{}/config/ietf-network:"
1177 "networks/network/openroadm-topology/node/XPDRA01-XPDR1"
1178 .format(self.restconf_baseurl))
1179 response = requests.request(
1180 "GET", url1, auth=('admin', 'admin'))
1181 self.assertEqual(response.status_code, requests.codes.ok)
1182 res = response.json()
1183 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1184 for ele in liste_tp:
1185 if ((ele[u'org-openroadm-common-network:tp-type'] ==
1187 and (ele['tp-id'] == 'XPDR1-CLIENT1' or ele[
1188 'tp-id'] == 'XPDR1-CLIENT3')):
1190 'org-openroadm-network-topology:xpdr-client-attributes',
1192 elif (ele[u'org-openroadm-common-network:tp-type'] ==
1194 self.assertIn(u'tail-equipment-id', dict.keys(
1195 ele[u'org-openroadm-network-topology:'
1196 u'xpdr-network-attributes']))
1197 self.assertNotIn('wavelength', dict.keys(
1198 ele[u'org-openroadm-network-topology:'
1199 u'xpdr-network-attributes']))
1202 def test_34_check_topo_ROADMA_SRG1(self):
1204 "{}/config/ietf-network:"
1205 "networks/network/openroadm-topology/node/ROADMA01-SRG1"
1206 .format(self.restconf_baseurl))
1207 response = requests.request(
1208 "GET", url1, auth=('admin', 'admin'))
1209 self.assertEqual(response.status_code, requests.codes.ok)
1210 res = response.json()
1211 self.assertIn({u'index': 1}, res['node'][0][
1212 u'org-openroadm-network-topology:srg-attributes'][
1213 'available-wavelengths'])
1214 self.assertIn({u'index': 2}, res['node'][0][
1215 u'org-openroadm-network-topology:srg-attributes'][
1216 'available-wavelengths'])
1217 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1218 for ele in liste_tp:
1219 if ele['tp-id'] == 'SRG1-PP1-TXRX' or \
1220 ele['tp-id'] == 'SRG1-PP1-TXRX':
1221 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
1224 self.assertNotIn('org-openroadm-network-topology:pp-attributes',
1228 def test_35_check_topo_ROADMA_DEG1(self):
1230 "{}/config/ietf-network:"
1231 "networks/network/openroadm-topology/node/ROADMA01-DEG1"
1232 .format(self.restconf_baseurl))
1233 response = requests.request(
1234 "GET", url1, auth=('admin', 'admin'))
1235 self.assertEqual(response.status_code, requests.codes.ok)
1236 res = response.json()
1237 self.assertIn({u'index': 1}, res['node'][0][
1238 u'org-openroadm-network-topology:degree-attributes'][
1239 'available-wavelengths'])
1240 self.assertIn({u'index': 2}, res['node'][0][
1241 u'org-openroadm-network-topology:degree-attributes'][
1242 'available-wavelengths'])
1243 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1244 for ele in liste_tp:
1245 if ele['tp-id'] == 'DEG1-CTP-TXRX':
1246 self.assertNotIn('org-openroadm-network-topology:'
1247 'ctp-attributes', dict.keys(ele))
1248 if ele['tp-id'] == 'DEG1-TTP-TXRX':
1249 self.assertNotIn('org-openroadm-network-topology:'
1250 'tx-ttp-attributes', dict.keys(ele))
1253 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1254 def test_36_create_oc_service1(self):
1255 url = ("{}/operations/org-openroadm-service:service-create"
1256 .format(self.restconf_baseurl))
1259 "sdnc-request-header": {
1260 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1261 "rpc-action": "service-create",
1262 "request-system-id": "appname",
1264 "http://localhost:8585/NotificationServer/notify"
1266 "service-name": "service1",
1267 "common-id": "ASATT1234567",
1268 "connection-type": "roadm-line",
1270 "service-rate": "100",
1271 "node-id": "ROADMA01",
1272 "service-format": "OC",
1273 "clli": "SNJSCAMCJP8",
1277 "ROUTER_SNJSCAMCJP8_000000.00_00",
1278 "port-type": "router",
1279 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1280 "port-rack": "000000.00",
1285 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1286 "lgx-port-name": "LGX Back.3",
1287 "lgx-port-rack": "000000.00",
1288 "lgx-port-shelf": "00"
1294 "ROUTER_SNJSCAMCJP8_000000.00_00",
1295 "port-type": "router",
1296 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1297 "port-rack": "000000.00",
1302 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1303 "lgx-port-name": "LGX Back.4",
1304 "lgx-port-rack": "000000.00",
1305 "lgx-port-shelf": "00"
1308 "optic-type": "gray"
1311 "service-rate": "100",
1312 "node-id": "ROADMC01",
1313 "service-format": "OC",
1314 "clli": "SNJSCAMCJT4",
1318 "ROUTER_SNJSCAMCJT4_000000.00_00",
1319 "port-type": "router",
1320 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1321 "port-rack": "000000.00",
1326 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1327 "lgx-port-name": "LGX Back.29",
1328 "lgx-port-rack": "000000.00",
1329 "lgx-port-shelf": "00"
1335 "ROUTER_SNJSCAMCJT4_000000.00_00",
1336 "port-type": "router",
1337 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1338 "port-rack": "000000.00",
1343 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1344 "lgx-port-name": "LGX Back.30",
1345 "lgx-port-rack": "000000.00",
1346 "lgx-port-shelf": "00"
1349 "optic-type": "gray"
1351 "due-date": "2016-11-28T00:00:01Z",
1352 "operator-contact": "pw1234"
1355 headers = {'content-type': 'application/json',
1356 "Accept": "application/json"}
1357 response = requests.request(
1358 "POST", url, data=json.dumps(data), headers=headers,
1359 auth=('admin', 'admin'))
1360 self.assertEqual(response.status_code, requests.codes.ok)
1361 res = response.json()
1362 self.assertIn('PCE calculation in progress',
1363 res['output']['configuration-response-common'][
1364 'response-message'])
1365 time.sleep(self.WAITING)
1367 def test_37_get_oc_service1(self):
1368 url = ("{}/operational/org-openroadm-service:"
1369 "service-list/services/service1"
1370 .format(self.restconf_baseurl))
1371 headers = {'content-type': 'application/json',
1372 "Accept": "application/json"}
1373 response = requests.request(
1374 "GET", url, headers=headers, auth=('admin', 'admin'))
1375 self.assertEqual(response.status_code, requests.codes.ok)
1376 res = response.json()
1378 res['services'][0]['administrative-state'],
1381 res['services'][0]['service-name'], 'service1')
1383 res['services'][0]['connection-type'], 'roadm-line')
1385 res['services'][0]['lifecycle-state'], 'planned')
1388 def test_38_check_xc1_ROADMA(self):
1390 "{}/config/network-topology:"
1391 "network-topology/topology/topology-netconf/"
1392 "node/ROADMA01/yang-ext:"
1393 "mount/org-openroadm-device:org-openroadm-device/"
1394 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1395 .format(self.restconf_baseurl))
1396 headers = {'content-type': 'application/json'}
1397 response = requests.request(
1398 "GET", url, headers=headers, auth=('admin', 'admin'))
1399 self.assertEqual(response.status_code, requests.codes.ok)
1400 res = response.json()
1401 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1402 self.assertDictEqual(
1404 'connection-number': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1405 'wavelength-number': 1,
1406 'opticalControlMode': 'gainLoss',
1407 'target-output-power': -3.0
1408 }, **res['roadm-connections'][0]),
1409 res['roadm-connections'][0]
1411 self.assertDictEqual(
1412 {'src-if': 'SRG1-PP1-TXRX-1'},
1413 res['roadm-connections'][0]['source'])
1414 self.assertDictEqual(
1415 {'dst-if': 'DEG1-TTP-TXRX-1'},
1416 res['roadm-connections'][0]['destination'])
1419 def test_39_check_xc1_ROADMC(self):
1421 "{}/config/network-topology:"
1422 "network-topology/topology/topology-netconf/"
1423 "node/ROADMC01/yang-ext:mount/org-openroadm-device:"
1424 "org-openroadm-device/"
1425 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1426 .format(self.restconf_baseurl))
1427 headers = {'content-type': 'application/json'}
1428 response = requests.request(
1429 "GET", url, headers=headers, auth=('admin', 'admin'))
1430 self.assertEqual(response.status_code, requests.codes.ok)
1431 res = response.json()
1432 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1433 self.assertDictEqual(
1435 'connection-number': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1436 'wavelength-number': 1,
1437 'opticalControlMode': 'gainLoss',
1438 'target-output-power': 2.0
1439 }, **res['roadm-connections'][0]),
1440 res['roadm-connections'][0]
1442 self.assertDictEqual(
1443 {'src-if': 'SRG1-PP1-TXRX-1'},
1444 res['roadm-connections'][0]['source'])
1445 self.assertDictEqual(
1446 {'dst-if': 'DEG2-TTP-TXRX-1'},
1447 res['roadm-connections'][0]['destination'])
1450 def test_40_create_oc_service2(self):
1451 url = ("{}/operations/org-openroadm-service:service-create"
1452 .format(self.restconf_baseurl))
1455 "sdnc-request-header": {
1456 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1457 "rpc-action": "service-create",
1458 "request-system-id": "appname",
1460 "http://localhost:8585/NotificationServer/notify"
1462 "service-name": "service2",
1463 "common-id": "ASATT1234567",
1464 "connection-type": "roadm-line",
1466 "service-rate": "100",
1467 "node-id": "ROADMA01",
1468 "service-format": "OC",
1469 "clli": "SNJSCAMCJP8",
1473 "ROUTER_SNJSCAMCJP8_000000.00_00",
1474 "port-type": "router",
1475 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1476 "port-rack": "000000.00",
1481 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1482 "lgx-port-name": "LGX Back.3",
1483 "lgx-port-rack": "000000.00",
1484 "lgx-port-shelf": "00"
1490 "ROUTER_SNJSCAMCJP8_000000.00_00",
1491 "port-type": "router",
1492 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1493 "port-rack": "000000.00",
1498 "LGX Panel_SNJSCAMCJP8_000000.00_00",
1499 "lgx-port-name": "LGX Back.4",
1500 "lgx-port-rack": "000000.00",
1501 "lgx-port-shelf": "00"
1504 "optic-type": "gray"
1507 "service-rate": "100",
1508 "node-id": "ROADMC01",
1509 "service-format": "OC",
1510 "clli": "SNJSCAMCJT4",
1514 "ROUTER_SNJSCAMCJT4_000000.00_00",
1515 "port-type": "router",
1516 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1517 "port-rack": "000000.00",
1522 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1523 "lgx-port-name": "LGX Back.29",
1524 "lgx-port-rack": "000000.00",
1525 "lgx-port-shelf": "00"
1531 "ROUTER_SNJSCAMCJT4_000000.00_00",
1532 "port-type": "router",
1533 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1534 "port-rack": "000000.00",
1539 "LGX Panel_SNJSCAMCJT4_000000.00_00",
1540 "lgx-port-name": "LGX Back.30",
1541 "lgx-port-rack": "000000.00",
1542 "lgx-port-shelf": "00"
1545 "optic-type": "gray"
1547 "due-date": "2016-11-28T00:00:01Z",
1548 "operator-contact": "pw1234"
1551 headers = {'content-type': 'application/json',
1552 "Accept": "application/json"}
1553 response = requests.request(
1554 "POST", url, data=json.dumps(data), headers=headers,
1555 auth=('admin', 'admin'))
1556 self.assertEqual(response.status_code, requests.codes.ok)
1557 res = response.json()
1558 self.assertIn('PCE calculation in progress',
1559 res['output']['configuration-response-common'][
1560 'response-message'])
1561 time.sleep(self.WAITING)
1563 def test_41_get_oc_service2(self):
1564 url = ("{}/operational/org-openroadm-service:"
1565 "service-list/services/service2"
1566 .format(self.restconf_baseurl))
1567 headers = {'content-type': 'application/json',
1568 "Accept": "application/json"}
1569 response = requests.request(
1570 "GET", url, headers=headers, auth=('admin', 'admin'))
1571 self.assertEqual(response.status_code, requests.codes.ok)
1572 res = response.json()
1574 res['services'][0]['administrative-state'],
1577 res['services'][0]['service-name'], 'service2')
1579 res['services'][0]['connection-type'], 'roadm-line')
1581 res['services'][0]['lifecycle-state'], 'planned')
1584 def test_42_check_xc2_ROADMA(self):
1586 "{}/config/network-topology:"
1587 "network-topology/topology/topology-netconf/"
1588 "node/ROADMA01/yang-ext:mount/org-openroadm-device:"
1589 "org-openroadm-device/"
1590 "roadm-connections/SRG1-PP2-TXRX-DEG1-TTP-TXRX-2"
1591 .format(self.restconf_baseurl))
1592 headers = {'content-type': 'application/json'}
1593 response = requests.request(
1594 "GET", url, headers=headers, auth=('admin', 'admin'))
1595 self.assertEqual(response.status_code, requests.codes.ok)
1596 res = response.json()
1597 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1598 self.assertDictEqual(
1600 'connection-number': 'SRG1-PP2-TXRX-DEG1-TTP-TXRX-2',
1601 'wavelength-number': 2,
1602 'opticalControlMode': 'gainLoss',
1603 'target-output-power': -3.0
1604 }, **res['roadm-connections'][0]),
1605 res['roadm-connections'][0]
1607 self.assertDictEqual(
1608 {'src-if': 'SRG1-PP2-TXRX-2'},
1609 res['roadm-connections'][0]['source'])
1610 self.assertDictEqual(
1611 {'dst-if': 'DEG1-TTP-TXRX-2'},
1612 res['roadm-connections'][0]['destination'])
1615 def test_43_check_topo_ROADMA(self):
1616 self.test_26_check_topo_ROADMA_SRG1()
1617 self.test_27_check_topo_ROADMA_DEG1()
1620 def test_44_delete_oc_service1(self):
1621 url = ("{}/operations/org-openroadm-service:service-delete"
1622 .format(self.restconf_baseurl))
1624 "sdnc-request-header": {
1625 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1626 "rpc-action": "service-delete",
1627 "request-system-id": "appname",
1629 "http://localhost:8585/NotificationServer/notify"
1631 "service-delete-req-info": {
1632 "service-name": "service1",
1633 "tail-retention": "no"
1637 headers = {'content-type': 'application/json'}
1638 response = requests.request(
1639 "POST", url, data=json.dumps(data), headers=headers,
1640 auth=('admin', 'admin'))
1641 self.assertEqual(response.status_code, requests.codes.ok)
1642 res = response.json()
1643 self.assertIn('Renderer service delete in progress',
1644 res['output']['configuration-response-common'][
1645 'response-message'])
1648 def test_45_delete_oc_service2(self):
1649 url = ("{}/operations/org-openroadm-service:service-delete"
1650 .format(self.restconf_baseurl))
1652 "sdnc-request-header": {
1653 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1654 "rpc-action": "service-delete",
1655 "request-system-id": "appname",
1657 "http://localhost:8585/NotificationServer/notify"
1659 "service-delete-req-info": {
1660 "service-name": "service2",
1661 "tail-retention": "no"
1665 headers = {'content-type': 'application/json'}
1666 response = requests.request(
1667 "POST", url, data=json.dumps(data), headers=headers,
1668 auth=('admin', 'admin'))
1669 self.assertEqual(response.status_code, requests.codes.ok)
1670 res = response.json()
1671 self.assertIn('Renderer service delete in progress',
1672 res['output']['configuration-response-common'][
1673 'response-message'])
1676 def test_46_get_no_oc_services(self):
1678 url = ("{}/operational/org-openroadm-service:service-list"
1679 .format(self.restconf_baseurl))
1680 headers = {'content-type': 'application/json',
1681 "Accept": "application/json"}
1682 response = requests.request(
1683 "GET", url, headers=headers, auth=('admin', 'admin'))
1684 self.assertEqual(response.status_code, requests.codes.not_found)
1685 res = response.json()
1688 "error-type": "application",
1689 "error-tag": "data-missing",
1691 "Request could not be completed because the relevant data "
1692 "model content does not exist"
1694 res['errors']['error'])
1697 def test_47_get_no_xc_ROADMA(self):
1699 "{}/config/network-topology:"
1700 "network-topology/topology/topology-netconf"
1701 "/node/ROADMA01/yang-ext:mount/org-openroadm-device:"
1702 "org-openroadm-device/"
1703 .format(self.restconf_baseurl))
1704 headers = {'content-type': 'application/json',
1705 "Accept": "application/json"}
1706 response = requests.request(
1707 "GET", url, headers=headers, auth=('admin', 'admin'))
1708 self.assertEqual(response.status_code, requests.codes.ok)
1709 res = response.json()
1710 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1713 def test_48_check_topo_ROADMA(self):
1714 self.test_34_check_topo_ROADMA_SRG1()
1715 self.test_35_check_topo_ROADMA_DEG1()
1717 def test_49_loop_create_eth_service(self):
1718 for i in range(1, 6):
1719 print("trial number {}".format(i))
1720 print("eth service creation")
1721 self.test_11_create_eth_service1()
1722 print("check xc in ROADMA01")
1723 self.test_13_check_xc1_ROADMA()
1724 print("check xc in ROADMC01")
1725 self.test_14_check_xc1_ROADMC()
1726 print("eth service deletion\n")
1727 self.test_30_delete_eth_service1()
1729 def test_50_loop_create_oc_service(self):
1730 url = ("{}/operational/org-openroadm-service:"
1731 "service-list/services/service1"
1732 .format(self.restconf_baseurl))
1733 response = requests.request("GET", url, auth=('admin', 'admin'))
1734 if response.status_code != 404:
1735 url = ("{}/operations/org-openroadm-service:service-delete"
1736 .format(self.restconf_baseurl))
1738 "sdnc-request-header": {
1739 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1740 "rpc-action": "service-delete",
1741 "request-system-id": "appname",
1743 "http://localhost:8585/NotificationServer/notify"
1745 "service-delete-req-info": {
1746 "service-name": "service1",
1747 "tail-retention": "no"
1751 headers = {'content-type': 'application/json'}
1752 requests.request("POST", url, data=json.dumps(data),
1754 auth=('admin', 'admin')
1758 for i in range(1, 6):
1759 print("trial number {}".format(i))
1760 print("oc service creation")
1761 self.test_36_create_oc_service1()
1762 print("check xc in ROADMA01")
1763 self.test_38_check_xc1_ROADMA()
1764 print("check xc in ROADMC01")
1765 self.test_39_check_xc1_ROADMC()
1766 print("oc service deletion\n")
1767 self.test_44_delete_oc_service1()
1769 def test_51_disconnect_XPDRA(self):
1770 url = ("{}/config/network-topology:"
1771 "network-topology/topology/topology-netconf/node/XPDRA01"
1772 .format(self.restconf_baseurl))
1773 headers = {'content-type': 'application/json'}
1774 response = requests.request(
1775 "DELETE", url, headers=headers,
1776 auth=('admin', 'admin'))
1777 self.assertEqual(response.status_code, requests.codes.ok)
1780 def test_52_disconnect_XPDRC(self):
1781 url = ("{}/config/network-topology:"
1782 "network-topology/topology/topology-netconf/node/XPDRC01"
1783 .format(self.restconf_baseurl))
1784 headers = {'content-type': 'application/json'}
1785 response = requests.request(
1786 "DELETE", url, headers=headers,
1787 auth=('admin', 'admin'))
1788 self.assertEqual(response.status_code, requests.codes.ok)
1791 def test_53_disconnect_ROADMA(self):
1792 url = ("{}/config/network-topology:"
1793 "network-topology/topology/topology-netconf/node/ROADMA01"
1794 .format(self.restconf_baseurl))
1795 headers = {'content-type': 'application/json'}
1796 response = requests.request(
1797 "DELETE", url, headers=headers,
1798 auth=('admin', 'admin'))
1799 self.assertEqual(response.status_code, requests.codes.ok)
1802 def test_54_disconnect_ROADMC(self):
1803 url = ("{}/config/network-topology:"
1804 "network-topology/topology/topology-netconf/node/ROADMC01"
1805 .format(self.restconf_baseurl))
1806 headers = {'content-type': 'application/json'}
1807 response = requests.request(
1808 "DELETE", url, headers=headers,
1809 auth=('admin', 'admin'))
1810 self.assertEqual(response.status_code, requests.codes.ok)
1814 if __name__ == "__main__":
1815 unittest.main(verbosity=2)