2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
24 class TransportPCEFulltesting(unittest.TestCase):
31 restconf_baseurl = "http://localhost:8181/restconf"
32 WAITING = 20 # nominal value is 300
34 # START_IGNORE_XTESTING
38 cls.odl_process = test_utils.start_tpce()
39 cls.sim_process1 = test_utils.start_sim('xpdra')
40 cls.sim_process2 = test_utils.start_sim('roadma')
41 cls.sim_process3 = test_utils.start_sim('roadmc')
42 cls.sim_process4 = test_utils.start_sim('xpdrc')
45 def tearDownClass(cls):
46 for child in psutil.Process(cls.odl_process.pid).children():
47 child.send_signal(signal.SIGINT)
49 cls.odl_process.send_signal(signal.SIGINT)
50 cls.odl_process.wait()
51 for child in psutil.Process(cls.sim_process1.pid).children():
52 child.send_signal(signal.SIGINT)
54 cls.sim_process1.send_signal(signal.SIGINT)
55 cls.sim_process1.wait()
56 for child in psutil.Process(cls.sim_process2.pid).children():
57 child.send_signal(signal.SIGINT)
59 cls.sim_process2.send_signal(signal.SIGINT)
60 cls.sim_process2.wait()
61 for child in psutil.Process(cls.sim_process3.pid).children():
62 child.send_signal(signal.SIGINT)
64 cls.sim_process3.send_signal(signal.SIGINT)
65 cls.sim_process3.wait()
66 for child in psutil.Process(cls.sim_process4.pid).children():
67 child.send_signal(signal.SIGINT)
69 cls.sim_process4.send_signal(signal.SIGINT)
70 cls.sim_process4.wait()
71 print("all processes killed")
73 def setUp(self): # instruction executed before each test method
74 print("execution of {}".format(self.id().split(".")[-1]))
78 # connect netconf devices
79 def test_01_connect_xpdrA(self):
80 url = ("{}/config/network-topology:"
81 "network-topology/topology/topology-netconf/node/XPDR-A1"
82 .format(self.restconf_baseurl))
85 "netconf-node-topology:username": "admin",
86 "netconf-node-topology:password": "admin",
87 "netconf-node-topology:host": "127.0.0.1",
88 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
89 "netconf-node-topology:tcp-only": "false",
90 "netconf-node-topology:pass-through": {}}]}
91 headers = {'content-type': 'application/json'}
92 response = requests.request(
93 "PUT", url, data=json.dumps(data), headers=headers,
94 auth=('admin', 'admin'))
95 self.assertEqual(response.status_code, requests.codes.created)
98 def test_02_connect_xpdrC(self):
99 url = ("{}/config/network-topology:"
100 "network-topology/topology/topology-netconf/node/XPDR-C1"
101 .format(self.restconf_baseurl))
103 "node-id": "XPDR-C1",
104 "netconf-node-topology:username": "admin",
105 "netconf-node-topology:password": "admin",
106 "netconf-node-topology:host": "127.0.0.1",
107 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
108 "netconf-node-topology:tcp-only": "false",
109 "netconf-node-topology:pass-through": {}}]}
110 headers = {'content-type': 'application/json'}
111 response = requests.request(
112 "PUT", url, data=json.dumps(data), headers=headers,
113 auth=('admin', 'admin'))
114 self.assertEqual(response.status_code, requests.codes.created)
117 def test_03_connect_rdmA(self):
118 url = ("{}/config/network-topology:"
119 "network-topology/topology/topology-netconf/node/ROADM-A1"
120 .format(self.restconf_baseurl))
122 "node-id": "ROADM-A1",
123 "netconf-node-topology:username": "admin",
124 "netconf-node-topology:password": "admin",
125 "netconf-node-topology:host": "127.0.0.1",
126 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
127 "netconf-node-topology:tcp-only": "false",
128 "netconf-node-topology:pass-through": {}}]}
129 headers = {'content-type': 'application/json'}
130 response = requests.request(
131 "PUT", url, data=json.dumps(data), headers=headers,
132 auth=('admin', 'admin'))
133 self.assertEqual(response.status_code, requests.codes.created)
136 def test_04_connect_rdmC(self):
137 url = ("{}/config/network-topology:"
138 "network-topology/topology/topology-netconf/node/ROADM-C1"
139 .format(self.restconf_baseurl))
141 "node-id": "ROADM-C1",
142 "netconf-node-topology:username": "admin",
143 "netconf-node-topology:password": "admin",
144 "netconf-node-topology:host": "127.0.0.1",
145 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
146 "netconf-node-topology:tcp-only": "false",
147 "netconf-node-topology:pass-through": {}}]}
148 headers = {'content-type': 'application/json'}
149 response = requests.request(
150 "PUT", url, data=json.dumps(data), headers=headers,
151 auth=('admin', 'admin'))
152 self.assertEqual(response.status_code, requests.codes.created)
155 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
156 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
158 "networkutils:input": {
159 "networkutils:links-input": {
160 "networkutils:xpdr-node": "XPDR-A1",
161 "networkutils:xpdr-num": "1",
162 "networkutils:network-num": "1",
163 "networkutils:rdm-node": "ROADM-A1",
164 "networkutils:srg-num": "1",
165 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
169 headers = {'content-type': 'application/json'}
170 response = requests.request(
171 "POST", url, data=json.dumps(data),
172 headers=headers, auth=('admin', 'admin'))
173 self.assertEqual(response.status_code, requests.codes.ok)
174 res = response.json()
175 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
178 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
179 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
181 "networkutils:input": {
182 "networkutils:links-input": {
183 "networkutils:xpdr-node": "XPDR-A1",
184 "networkutils:xpdr-num": "1",
185 "networkutils:network-num": "1",
186 "networkutils:rdm-node": "ROADM-A1",
187 "networkutils:srg-num": "1",
188 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
192 headers = {'content-type': 'application/json'}
193 response = requests.request(
194 "POST", url, data=json.dumps(data),
195 headers=headers, auth=('admin', 'admin'))
196 self.assertEqual(response.status_code, requests.codes.ok)
197 res = response.json()
198 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
201 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
202 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
204 "networkutils:input": {
205 "networkutils:links-input": {
206 "networkutils:xpdr-node": "XPDR-C1",
207 "networkutils:xpdr-num": "1",
208 "networkutils:network-num": "1",
209 "networkutils:rdm-node": "ROADM-C1",
210 "networkutils:srg-num": "1",
211 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
215 headers = {'content-type': 'application/json'}
216 response = requests.request(
217 "POST", url, data=json.dumps(data),
218 headers=headers, auth=('admin', 'admin'))
219 self.assertEqual(response.status_code, requests.codes.ok)
220 res = response.json()
221 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
224 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
225 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
227 "networkutils:input": {
228 "networkutils:links-input": {
229 "networkutils:xpdr-node": "XPDR-C1",
230 "networkutils:xpdr-num": "1",
231 "networkutils:network-num": "1",
232 "networkutils:rdm-node": "ROADM-C1",
233 "networkutils:srg-num": "1",
234 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
238 headers = {'content-type': 'application/json'}
239 response = requests.request(
240 "POST", url, data=json.dumps(data),
241 headers=headers, auth=('admin', 'admin'))
242 self.assertEqual(response.status_code, requests.codes.ok)
243 res = response.json()
244 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
247 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
248 # Config ROADMA-ROADMC oms-attributes
249 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
250 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
251 "OMS-attributes/span"
252 .format(self.restconf_baseurl))
254 "auto-spanloss": "true",
255 "spanloss-base": 11.4,
256 "spanloss-current": 12,
257 "engineered-spanloss": 12.2,
258 "link-concatenation": [{
261 "SRLG-length": 100000,
263 headers = {'content-type': 'application/json'}
264 response = requests.request(
265 "PUT", url, data=json.dumps(data), headers=headers,
266 auth=('admin', 'admin'))
267 self.assertEqual(response.status_code, requests.codes.created)
269 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
270 # Config ROADMC-ROADMA oms-attributes
271 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
272 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
273 "OMS-attributes/span"
274 .format(self.restconf_baseurl))
276 "auto-spanloss": "true",
277 "spanloss-base": 11.4,
278 "spanloss-current": 12,
279 "engineered-spanloss": 12.2,
280 "link-concatenation": [{
283 "SRLG-length": 100000,
285 headers = {'content-type': 'application/json'}
286 response = requests.request(
287 "PUT", url, data=json.dumps(data), headers=headers,
288 auth=('admin', 'admin'))
289 self.assertEqual(response.status_code, requests.codes.created)
291 # test service-create for Eth service from xpdr to xpdr
292 def test_11_create_eth_service1(self):
293 url = ("{}/operations/org-openroadm-service:service-create"
294 .format(self.restconf_baseurl))
296 "sdnc-request-header": {
297 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
298 "rpc-action": "service-create",
299 "request-system-id": "appname",
300 "notification-url": "http://localhost:8585/NotificationServer/notify"
302 "service-name": "service1",
303 "common-id": "ASATT1234567",
304 "connection-type": "service",
306 "service-rate": "100",
307 "node-id": "XPDR-A1",
308 "service-format": "Ethernet",
309 "clli": "SNJSCAMCJP8",
312 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
313 "port-type": "router",
314 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
315 "port-rack": "000000.00",
319 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
320 "lgx-port-name": "LGX Back.3",
321 "lgx-port-rack": "000000.00",
322 "lgx-port-shelf": "00"
327 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
328 "port-type": "router",
329 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
330 "port-rack": "000000.00",
334 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
335 "lgx-port-name": "LGX Back.4",
336 "lgx-port-rack": "000000.00",
337 "lgx-port-shelf": "00"
343 "service-rate": "100",
344 "node-id": "XPDR-C1",
345 "service-format": "Ethernet",
346 "clli": "SNJSCAMCJT4",
349 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
350 "port-type": "router",
351 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
352 "port-rack": "000000.00",
356 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
357 "lgx-port-name": "LGX Back.29",
358 "lgx-port-rack": "000000.00",
359 "lgx-port-shelf": "00"
364 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
365 "port-type": "router",
366 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
367 "port-rack": "000000.00",
371 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
372 "lgx-port-name": "LGX Back.30",
373 "lgx-port-rack": "000000.00",
374 "lgx-port-shelf": "00"
379 "due-date": "2016-11-28T00:00:01Z",
380 "operator-contact": "pw1234"
383 headers = {'content-type': 'application/json',
384 "Accept": "application/json"}
385 response = requests.request(
386 "POST", url, data=json.dumps(data), headers=headers,
387 auth=('admin', 'admin'))
388 self.assertEqual(response.status_code, requests.codes.ok)
389 res = response.json()
390 self.assertIn('PCE calculation in progress',
391 res['output']['configuration-response-common']['response-message'])
392 time.sleep(self.WAITING)
394 def test_12_get_eth_service1(self):
395 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
396 .format(self.restconf_baseurl))
397 headers = {'content-type': 'application/json',
398 "Accept": "application/json"}
399 response = requests.request(
400 "GET", url, headers=headers, auth=('admin', 'admin'))
401 self.assertEqual(response.status_code, requests.codes.ok)
402 res = response.json()
404 res['services'][0]['administrative-state'], 'inService')
406 res['services'][0]['service-name'], 'service1')
408 res['services'][0]['connection-type'], 'service')
410 res['services'][0]['lifecycle-state'], 'planned')
413 def test_13_check_xc1_ROADMA(self):
414 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
415 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
416 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
417 .format(self.restconf_baseurl))
418 headers = {'content-type': 'application/json'}
419 response = requests.request(
420 "GET", url, headers=headers, auth=('admin', 'admin'))
421 self.assertEqual(response.status_code, requests.codes.ok)
422 res = response.json()
423 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
424 self.assertDictEqual(
426 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
427 'opticalControlMode': 'gainLoss',
428 'target-output-power': -3.0
429 }, **res['roadm-connections'][0]),
430 res['roadm-connections'][0]
432 self.assertDictEqual(
433 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
434 res['roadm-connections'][0]['source'])
435 self.assertDictEqual(
436 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
437 res['roadm-connections'][0]['destination'])
440 def test_14_check_xc1_ROADMC(self):
441 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
442 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
443 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
444 .format(self.restconf_baseurl))
445 headers = {'content-type': 'application/json'}
446 response = requests.request(
447 "GET", url, headers=headers, auth=('admin', 'admin'))
448 self.assertEqual(response.status_code, requests.codes.ok)
449 res = response.json()
450 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
451 self.assertDictEqual(
453 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
454 'opticalControlMode': 'gainLoss',
455 'target-output-power': -3.0
456 }, **res['roadm-connections'][0]),
457 res['roadm-connections'][0]
459 self.assertDictEqual(
460 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
461 res['roadm-connections'][0]['source'])
462 self.assertDictEqual(
463 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
464 res['roadm-connections'][0]['destination'])
467 def test_15_check_topo_XPDRA(self):
468 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
469 .format(self.restconf_baseurl))
470 response = requests.request(
471 "GET", url1, auth=('admin', 'admin'))
472 self.assertEqual(response.status_code, requests.codes.ok)
473 res = response.json()
474 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
476 if ele['tp-id'] == 'XPDR1-NETWORK1':
477 self.assertEqual({u'frequency': 196.1,
479 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
480 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
481 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
482 if ele['tp-id'] == 'XPDR1-NETWORK2':
483 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
486 def test_16_check_topo_ROADMA_SRG1(self):
487 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
488 .format(self.restconf_baseurl))
489 response = requests.request(
490 "GET", url1, auth=('admin', 'admin'))
491 self.assertEqual(response.status_code, requests.codes.ok)
492 res = response.json()
493 self.assertNotIn({u'index': 1},
494 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
495 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
497 if ele['tp-id'] == 'SRG1-PP1-TXRX':
498 self.assertIn({u'index': 1, u'frequency': 196.1,
500 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
501 if ele['tp-id'] == 'SRG1-PP2-TXRX':
502 self.assertNotIn('used-wavelength', dict.keys(ele))
505 def test_17_check_topo_ROADMA_DEG1(self):
506 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
507 .format(self.restconf_baseurl))
508 response = requests.request(
509 "GET", url1, auth=('admin', 'admin'))
510 self.assertEqual(response.status_code, requests.codes.ok)
511 res = response.json()
512 self.assertNotIn({u'index': 1},
513 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
514 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
516 if ele['tp-id'] == 'DEG2-CTP-TXRX':
517 self.assertIn({u'index': 1, u'frequency': 196.1,
519 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
520 if ele['tp-id'] == 'DEG2-TTP-TXRX':
521 self.assertIn({u'index': 1, u'frequency': 196.1,
523 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
526 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
527 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
529 "networkutils:input": {
530 "networkutils:links-input": {
531 "networkutils:xpdr-node": "XPDR-A1",
532 "networkutils:xpdr-num": "1",
533 "networkutils:network-num": "2",
534 "networkutils:rdm-node": "ROADM-A1",
535 "networkutils:srg-num": "1",
536 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
540 headers = {'content-type': 'application/json'}
541 response = requests.request(
542 "POST", url, data=json.dumps(data),
543 headers=headers, auth=('admin', 'admin'))
544 self.assertEqual(response.status_code, requests.codes.ok)
545 res = response.json()
546 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
549 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
550 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
552 "networkutils:input": {
553 "networkutils:links-input": {
554 "networkutils:xpdr-node": "XPDR-A1",
555 "networkutils:xpdr-num": "1",
556 "networkutils:network-num": "2",
557 "networkutils:rdm-node": "ROADM-A1",
558 "networkutils:srg-num": "1",
559 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
563 headers = {'content-type': 'application/json'}
564 response = requests.request(
565 "POST", url, data=json.dumps(data),
566 headers=headers, auth=('admin', 'admin'))
567 self.assertEqual(response.status_code, requests.codes.ok)
568 res = response.json()
569 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
572 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
573 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
575 "networkutils:input": {
576 "networkutils:links-input": {
577 "networkutils:xpdr-node": "XPDR-C1",
578 "networkutils:xpdr-num": "1",
579 "networkutils:network-num": "2",
580 "networkutils:rdm-node": "ROADM-C1",
581 "networkutils:srg-num": "1",
582 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
586 headers = {'content-type': 'application/json'}
587 response = requests.request(
588 "POST", url, data=json.dumps(data),
589 headers=headers, auth=('admin', 'admin'))
590 self.assertEqual(response.status_code, requests.codes.ok)
591 res = response.json()
592 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
595 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
596 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
598 "networkutils:input": {
599 "networkutils:links-input": {
600 "networkutils:xpdr-node": "XPDR-C1",
601 "networkutils:xpdr-num": "1",
602 "networkutils:network-num": "2",
603 "networkutils:rdm-node": "ROADM-C1",
604 "networkutils:srg-num": "1",
605 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
609 headers = {'content-type': 'application/json'}
610 response = requests.request(
611 "POST", url, data=json.dumps(data),
612 headers=headers, auth=('admin', 'admin'))
613 self.assertEqual(response.status_code, requests.codes.ok)
614 res = response.json()
615 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
618 def test_22_create_eth_service2(self):
619 url = ("{}/operations/org-openroadm-service:service-create"
620 .format(self.restconf_baseurl))
622 "sdnc-request-header": {
623 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
624 "rpc-action": "service-create",
625 "request-system-id": "appname",
626 "notification-url": "http://localhost:8585/NotificationServer/notify"
628 "service-name": "service2",
629 "common-id": "ASATT1234567",
630 "connection-type": "service",
632 "service-rate": "100",
633 "node-id": "XPDR-A1",
634 "service-format": "Ethernet",
635 "clli": "SNJSCAMCJP8",
638 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
639 "port-type": "router",
640 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
641 "port-rack": "000000.00",
645 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
646 "lgx-port-name": "LGX Back.3",
647 "lgx-port-rack": "000000.00",
648 "lgx-port-shelf": "00"
653 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
654 "port-type": "router",
655 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
656 "port-rack": "000000.00",
660 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
661 "lgx-port-name": "LGX Back.4",
662 "lgx-port-rack": "000000.00",
663 "lgx-port-shelf": "00"
669 "service-rate": "100",
670 "node-id": "XPDR-C1",
671 "service-format": "Ethernet",
672 "clli": "SNJSCAMCJT4",
675 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
676 "port-type": "router",
677 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
678 "port-rack": "000000.00",
682 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
683 "lgx-port-name": "LGX Back.29",
684 "lgx-port-rack": "000000.00",
685 "lgx-port-shelf": "00"
690 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
691 "port-type": "router",
692 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
693 "port-rack": "000000.00",
697 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
698 "lgx-port-name": "LGX Back.30",
699 "lgx-port-rack": "000000.00",
700 "lgx-port-shelf": "00"
705 "due-date": "2016-11-28T00:00:01Z",
706 "operator-contact": "pw1234"
709 headers = {'content-type': 'application/json',
710 "Accept": "application/json"}
711 response = requests.request(
712 "POST", url, data=json.dumps(data), headers=headers,
713 auth=('admin', 'admin'))
714 self.assertEqual(response.status_code, requests.codes.ok)
715 res = response.json()
716 self.assertIn('PCE calculation in progress',
717 res['output']['configuration-response-common']['response-message'])
718 time.sleep(self.WAITING)
720 def test_23_get_eth_service2(self):
721 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
722 .format(self.restconf_baseurl))
723 headers = {'content-type': 'application/json',
724 "Accept": "application/json"}
725 response = requests.request(
726 "GET", url, headers=headers, auth=('admin', 'admin'))
727 self.assertEqual(response.status_code, requests.codes.ok)
728 res = response.json()
730 res['services'][0]['administrative-state'],
733 res['services'][0]['service-name'], 'service2')
735 res['services'][0]['connection-type'], 'service')
737 res['services'][0]['lifecycle-state'], 'planned')
740 def test_24_check_xc2_ROADMA(self):
741 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
742 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
743 "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2"
744 .format(self.restconf_baseurl))
745 headers = {'content-type': 'application/json'}
746 response = requests.request(
747 "GET", url, headers=headers, auth=('admin', 'admin'))
748 self.assertEqual(response.status_code, requests.codes.ok)
749 res = response.json()
750 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
751 self.assertDictEqual(
753 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
754 'opticalControlMode': 'power'
755 }, **res['roadm-connections'][0]),
756 res['roadm-connections'][0]
758 self.assertDictEqual(
759 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
760 res['roadm-connections'][0]['source'])
761 self.assertDictEqual(
762 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
763 res['roadm-connections'][0]['destination'])
765 def test_25_check_topo_XPDRA(self):
766 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
767 .format(self.restconf_baseurl))
768 response = requests.request(
769 "GET", url1, auth=('admin', 'admin'))
770 self.assertEqual(response.status_code, requests.codes.ok)
771 res = response.json()
772 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
774 if ele['tp-id'] == 'XPDR1-NETWORK1':
775 self.assertEqual({u'frequency': 196.1,
777 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
778 if ele['tp-id'] == 'XPDR1-NETWORK2':
779 self.assertEqual({u'frequency': 196.05,
781 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
782 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
783 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
786 def test_26_check_topo_ROADMA_SRG1(self):
787 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
788 .format(self.restconf_baseurl))
789 response = requests.request(
790 "GET", url1, auth=('admin', 'admin'))
791 self.assertEqual(response.status_code, requests.codes.ok)
792 res = response.json()
793 self.assertNotIn({u'index': 1}, res['node'][0]
794 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
795 self.assertNotIn({u'index': 2}, res['node'][0]
796 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
797 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
799 if ele['tp-id'] == 'SRG1-PP1-TXRX':
800 self.assertIn({u'index': 1, u'frequency': 196.1,
802 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
803 self.assertNotIn({u'index': 2, u'frequency': 196.05,
805 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
806 if ele['tp-id'] == 'SRG1-PP2-TXRX':
807 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
808 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
809 self.assertNotIn({u'index': 1, u'frequency': 196.1,
811 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
812 if ele['tp-id'] == 'SRG1-PP3-TXRX':
813 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
816 def test_27_check_topo_ROADMA_DEG2(self):
817 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
818 .format(self.restconf_baseurl))
819 response = requests.request(
820 "GET", url1, auth=('admin', 'admin'))
821 self.assertEqual(response.status_code, requests.codes.ok)
822 res = response.json()
823 self.assertNotIn({u'index': 1}, res['node'][0]
824 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
825 self.assertNotIn({u'index': 2}, res['node'][0]
826 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
827 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
829 if ele['tp-id'] == 'DEG2-CTP-TXRX':
830 self.assertIn({u'index': 1, u'frequency': 196.1,
832 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
833 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
834 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
835 if ele['tp-id'] == 'DEG2-TTP-TXRX':
836 self.assertIn({u'index': 1, u'frequency': 196.1,
838 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
839 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
840 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
843 # creation service test on a non-available resource
844 def test_28_create_eth_service3(self):
845 url = ("{}/operations/org-openroadm-service:service-create"
846 .format(self.restconf_baseurl))
848 "sdnc-request-header": {
849 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
850 "rpc-action": "service-create",
851 "request-system-id": "appname",
852 "notification-url": "http://localhost:8585/NotificationServer/notify"
854 "service-name": "service3",
855 "common-id": "ASATT1234567",
856 "connection-type": "service",
858 "service-rate": "100",
859 "node-id": "XPDR-A1",
860 "service-format": "Ethernet",
861 "clli": "SNJSCAMCJP8",
864 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
865 "port-type": "router",
866 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
867 "port-rack": "000000.00",
871 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
872 "lgx-port-name": "LGX Back.3",
873 "lgx-port-rack": "000000.00",
874 "lgx-port-shelf": "00"
879 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
880 "port-type": "router",
881 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
882 "port-rack": "000000.00",
886 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
887 "lgx-port-name": "LGX Back.4",
888 "lgx-port-rack": "000000.00",
889 "lgx-port-shelf": "00"
895 "service-rate": "100",
896 "node-id": "XPDR-C1",
897 "service-format": "Ethernet",
898 "clli": "SNJSCAMCJT4",
901 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
902 "port-type": "router",
903 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
904 "port-rack": "000000.00",
908 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
909 "lgx-port-name": "LGX Back.29",
910 "lgx-port-rack": "000000.00",
911 "lgx-port-shelf": "00"
916 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
917 "port-type": "router",
918 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
919 "port-rack": "000000.00",
923 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
924 "lgx-port-name": "LGX Back.30",
925 "lgx-port-rack": "000000.00",
926 "lgx-port-shelf": "00"
931 "due-date": "2016-11-28T00:00:01Z",
932 "operator-contact": "pw1234"
935 headers = {'content-type': 'application/json',
936 "Accept": "application/json"}
937 response = requests.request(
938 "POST", url, data=json.dumps(data), headers=headers,
939 auth=('admin', 'admin'))
940 self.assertEqual(response.status_code, requests.codes.ok)
941 res = response.json()
942 self.assertIn('PCE calculation in progress',
943 res['output']['configuration-response-common']['response-message'])
944 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
945 time.sleep(self.WAITING)
947 # add a test that check the openroadm-service-list still only contains 2 elements
948 def test_29_delete_eth_service3(self):
949 url = ("{}/operations/org-openroadm-service:service-delete"
950 .format(self.restconf_baseurl))
952 "sdnc-request-header": {
953 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
954 "rpc-action": "service-delete",
955 "request-system-id": "appname",
956 "notification-url": "http://localhost:8585/NotificationServer/notify"
958 "service-delete-req-info": {
959 "service-name": "service3",
960 "tail-retention": "no"
964 headers = {'content-type': 'application/json'}
965 response = requests.request(
966 "POST", url, data=json.dumps(data), headers=headers,
967 auth=('admin', 'admin'))
968 self.assertEqual(response.status_code, requests.codes.ok)
969 res = response.json()
970 self.assertIn('Service \'service3\' does not exist in datastore',
971 res['output']['configuration-response-common']['response-message'])
972 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
975 def test_30_delete_eth_service1(self):
976 url = ("{}/operations/org-openroadm-service:service-delete"
977 .format(self.restconf_baseurl))
979 "sdnc-request-header": {
980 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
981 "rpc-action": "service-delete",
982 "request-system-id": "appname",
983 "notification-url": "http://localhost:8585/NotificationServer/notify"
985 "service-delete-req-info": {
986 "service-name": "service1",
987 "tail-retention": "no"
991 headers = {'content-type': 'application/json'}
992 response = requests.request(
993 "POST", url, data=json.dumps(data), headers=headers,
994 auth=('admin', 'admin'))
995 self.assertEqual(response.status_code, requests.codes.ok)
996 res = response.json()
997 self.assertIn('Renderer service delete in progress',
998 res['output']['configuration-response-common']['response-message'])
1001 def test_31_delete_eth_service2(self):
1002 url = ("{}/operations/org-openroadm-service:service-delete"
1003 .format(self.restconf_baseurl))
1005 "sdnc-request-header": {
1006 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1007 "rpc-action": "service-delete",
1008 "request-system-id": "appname",
1009 "notification-url": "http://localhost:8585/NotificationServer/notify"
1011 "service-delete-req-info": {
1012 "service-name": "service2",
1013 "tail-retention": "no"
1017 headers = {'content-type': 'application/json'}
1018 response = requests.request(
1019 "POST", url, data=json.dumps(data), headers=headers,
1020 auth=('admin', 'admin'))
1021 self.assertEqual(response.status_code, requests.codes.ok)
1022 res = response.json()
1023 self.assertIn('Renderer service delete in progress',
1024 res['output']['configuration-response-common']['response-message'])
1027 def test_32_check_no_xc_ROADMA(self):
1028 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1029 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1030 .format(self.restconf_baseurl))
1031 response = requests.request(
1032 "GET", url, auth=('admin', 'admin'))
1033 res = response.json()
1034 self.assertEqual(response.status_code, requests.codes.ok)
1035 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
1038 def test_33_check_topo_XPDRA(self):
1039 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
1040 .format(self.restconf_baseurl))
1041 response = requests.request(
1042 "GET", url1, auth=('admin', 'admin'))
1043 self.assertEqual(response.status_code, requests.codes.ok)
1044 res = response.json()
1045 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1046 for ele in liste_tp:
1047 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
1048 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1049 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
1050 self.assertIn(u'tail-equipment-id',
1051 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
1052 self.assertNotIn('wavelength', dict.keys(
1053 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
1056 def test_34_check_topo_ROADMA_SRG1(self):
1057 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
1058 .format(self.restconf_baseurl))
1059 response = requests.request(
1060 "GET", url1, auth=('admin', 'admin'))
1061 self.assertEqual(response.status_code, requests.codes.ok)
1062 res = response.json()
1063 self.assertIn({u'index': 1}, res['node'][0]
1064 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1065 self.assertIn({u'index': 2}, res['node'][0]
1066 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1067 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1068 for ele in liste_tp:
1069 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
1070 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1072 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1075 def test_35_check_topo_ROADMA_DEG2(self):
1076 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
1077 .format(self.restconf_baseurl))
1078 response = requests.request(
1079 "GET", url1, auth=('admin', 'admin'))
1080 self.assertEqual(response.status_code, requests.codes.ok)
1081 res = response.json()
1082 self.assertIn({u'index': 1}, res['node'][0]
1083 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1084 self.assertIn({u'index': 2}, res['node'][0]
1085 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1086 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1087 for ele in liste_tp:
1088 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1089 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
1090 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1091 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
1094 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1095 def test_36_create_oc_service1(self):
1096 url = ("{}/operations/org-openroadm-service:service-create"
1097 .format(self.restconf_baseurl))
1099 "sdnc-request-header": {
1100 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1101 "rpc-action": "service-create",
1102 "request-system-id": "appname",
1103 "notification-url": "http://localhost:8585/NotificationServer/notify"
1105 "service-name": "service1",
1106 "common-id": "ASATT1234567",
1107 "connection-type": "roadm-line",
1109 "service-rate": "100",
1110 "node-id": "ROADM-A1",
1111 "service-format": "OC",
1112 "clli": "SNJSCAMCJP8",
1115 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1116 "port-type": "router",
1117 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1118 "port-rack": "000000.00",
1122 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1123 "lgx-port-name": "LGX Back.3",
1124 "lgx-port-rack": "000000.00",
1125 "lgx-port-shelf": "00"
1130 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1131 "port-type": "router",
1132 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1133 "port-rack": "000000.00",
1137 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1138 "lgx-port-name": "LGX Back.4",
1139 "lgx-port-rack": "000000.00",
1140 "lgx-port-shelf": "00"
1143 "optic-type": "gray"
1146 "service-rate": "100",
1147 "node-id": "ROADM-C1",
1148 "service-format": "OC",
1149 "clli": "SNJSCAMCJT4",
1152 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1153 "port-type": "router",
1154 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1155 "port-rack": "000000.00",
1159 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1160 "lgx-port-name": "LGX Back.29",
1161 "lgx-port-rack": "000000.00",
1162 "lgx-port-shelf": "00"
1167 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1168 "port-type": "router",
1169 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1170 "port-rack": "000000.00",
1174 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1175 "lgx-port-name": "LGX Back.30",
1176 "lgx-port-rack": "000000.00",
1177 "lgx-port-shelf": "00"
1180 "optic-type": "gray"
1182 "due-date": "2016-11-28T00:00:01Z",
1183 "operator-contact": "pw1234"
1186 headers = {'content-type': 'application/json',
1187 "Accept": "application/json"}
1188 response = requests.request(
1189 "POST", url, data=json.dumps(data), headers=headers,
1190 auth=('admin', 'admin'))
1191 self.assertEqual(response.status_code, requests.codes.ok)
1192 res = response.json()
1193 self.assertIn('PCE calculation in progress',
1194 res['output']['configuration-response-common']['response-message'])
1195 time.sleep(self.WAITING)
1197 def test_37_get_oc_service1(self):
1198 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1199 .format(self.restconf_baseurl))
1200 headers = {'content-type': 'application/json',
1201 "Accept": "application/json"}
1202 response = requests.request(
1203 "GET", url, headers=headers, auth=('admin', 'admin'))
1204 self.assertEqual(response.status_code, requests.codes.ok)
1205 res = response.json()
1207 res['services'][0]['administrative-state'],
1210 res['services'][0]['service-name'], 'service1')
1212 res['services'][0]['connection-type'], 'roadm-line')
1214 res['services'][0]['lifecycle-state'], 'planned')
1217 def test_38_check_xc1_ROADMA(self):
1218 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1219 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1220 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1221 .format(self.restconf_baseurl))
1222 headers = {'content-type': 'application/json'}
1223 response = requests.request(
1224 "GET", url, headers=headers, auth=('admin', 'admin'))
1225 self.assertEqual(response.status_code, requests.codes.ok)
1226 res = response.json()
1227 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1228 self.assertDictEqual(
1230 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1231 'opticalControlMode': 'gainLoss',
1232 'target-output-power': -3.0
1233 }, **res['roadm-connections'][0]),
1234 res['roadm-connections'][0]
1236 self.assertDictEqual(
1237 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1238 res['roadm-connections'][0]['source'])
1239 self.assertDictEqual(
1240 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
1241 res['roadm-connections'][0]['destination'])
1244 def test_39_check_xc1_ROADMC(self):
1245 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1246 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1247 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1248 .format(self.restconf_baseurl))
1249 headers = {'content-type': 'application/json'}
1250 response = requests.request(
1251 "GET", url, headers=headers, auth=('admin', 'admin'))
1252 self.assertEqual(response.status_code, requests.codes.ok)
1253 res = response.json()
1254 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1255 self.assertDictEqual(
1257 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1258 'opticalControlMode': 'gainLoss',
1259 'target-output-power': -3.0
1260 }, **res['roadm-connections'][0]),
1261 res['roadm-connections'][0]
1263 self.assertDictEqual(
1264 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1265 res['roadm-connections'][0]['source'])
1266 self.assertDictEqual(
1267 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
1268 res['roadm-connections'][0]['destination'])
1271 def test_40_create_oc_service2(self):
1272 url = ("{}/operations/org-openroadm-service:service-create"
1273 .format(self.restconf_baseurl))
1275 "sdnc-request-header": {
1276 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1277 "rpc-action": "service-create",
1278 "request-system-id": "appname",
1279 "notification-url": "http://localhost:8585/NotificationServer/notify"
1281 "service-name": "service2",
1282 "common-id": "ASATT1234567",
1283 "connection-type": "roadm-line",
1285 "service-rate": "100",
1286 "node-id": "ROADM-A1",
1287 "service-format": "OC",
1288 "clli": "SNJSCAMCJP8",
1291 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1292 "port-type": "router",
1293 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1294 "port-rack": "000000.00",
1298 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1299 "lgx-port-name": "LGX Back.3",
1300 "lgx-port-rack": "000000.00",
1301 "lgx-port-shelf": "00"
1306 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1307 "port-type": "router",
1308 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1309 "port-rack": "000000.00",
1313 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1314 "lgx-port-name": "LGX Back.4",
1315 "lgx-port-rack": "000000.00",
1316 "lgx-port-shelf": "00"
1319 "optic-type": "gray"
1322 "service-rate": "100",
1323 "node-id": "ROADM-C1",
1324 "service-format": "OC",
1325 "clli": "SNJSCAMCJT4",
1328 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1329 "port-type": "router",
1330 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1331 "port-rack": "000000.00",
1335 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1336 "lgx-port-name": "LGX Back.29",
1337 "lgx-port-rack": "000000.00",
1338 "lgx-port-shelf": "00"
1343 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1344 "port-type": "router",
1345 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1346 "port-rack": "000000.00",
1350 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1351 "lgx-port-name": "LGX Back.30",
1352 "lgx-port-rack": "000000.00",
1353 "lgx-port-shelf": "00"
1356 "optic-type": "gray"
1358 "due-date": "2016-11-28T00:00:01Z",
1359 "operator-contact": "pw1234"
1362 headers = {'content-type': 'application/json',
1363 "Accept": "application/json"}
1364 response = requests.request(
1365 "POST", url, data=json.dumps(data), headers=headers,
1366 auth=('admin', 'admin'))
1367 self.assertEqual(response.status_code, requests.codes.ok)
1368 res = response.json()
1369 self.assertIn('PCE calculation in progress',
1370 res['output']['configuration-response-common']['response-message'])
1371 time.sleep(self.WAITING)
1373 def test_41_get_oc_service2(self):
1374 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
1375 .format(self.restconf_baseurl))
1376 headers = {'content-type': 'application/json',
1377 "Accept": "application/json"}
1378 response = requests.request(
1379 "GET", url, headers=headers, auth=('admin', 'admin'))
1380 self.assertEqual(response.status_code, requests.codes.ok)
1381 res = response.json()
1383 res['services'][0]['administrative-state'],
1386 res['services'][0]['service-name'], 'service2')
1388 res['services'][0]['connection-type'], 'roadm-line')
1390 res['services'][0]['lifecycle-state'], 'planned')
1393 def test_42_check_xc2_ROADMA(self):
1394 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1395 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1396 "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2"
1397 .format(self.restconf_baseurl))
1398 headers = {'content-type': 'application/json'}
1399 response = requests.request(
1400 "GET", url, headers=headers, auth=('admin', 'admin'))
1401 self.assertEqual(response.status_code, requests.codes.ok)
1402 res = response.json()
1403 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1404 self.assertDictEqual(
1406 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
1407 'opticalControlMode': 'gainLoss',
1408 'target-output-power': -3.0
1409 }, **res['roadm-connections'][0]),
1410 res['roadm-connections'][0]
1412 self.assertDictEqual(
1413 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
1414 res['roadm-connections'][0]['source'])
1415 self.assertDictEqual(
1416 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
1417 res['roadm-connections'][0]['destination'])
1420 def test_43_check_topo_ROADMA(self):
1421 self.test_26_check_topo_ROADMA_SRG1()
1422 self.test_27_check_topo_ROADMA_DEG2()
1425 def test_44_delete_oc_service1(self):
1426 url = ("{}/operations/org-openroadm-service:service-delete"
1427 .format(self.restconf_baseurl))
1429 "sdnc-request-header": {
1430 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1431 "rpc-action": "service-delete",
1432 "request-system-id": "appname",
1433 "notification-url": "http://localhost:8585/NotificationServer/notify"
1435 "service-delete-req-info": {
1436 "service-name": "service1",
1437 "tail-retention": "no"
1441 headers = {'content-type': 'application/json'}
1442 response = requests.request(
1443 "POST", url, data=json.dumps(data), headers=headers,
1444 auth=('admin', 'admin'))
1445 self.assertEqual(response.status_code, requests.codes.ok)
1446 res = response.json()
1447 self.assertIn('Renderer service delete in progress',
1448 res['output']['configuration-response-common']['response-message'])
1451 def test_45_delete_oc_service2(self):
1452 url = ("{}/operations/org-openroadm-service:service-delete"
1453 .format(self.restconf_baseurl))
1455 "sdnc-request-header": {
1456 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1457 "rpc-action": "service-delete",
1458 "request-system-id": "appname",
1459 "notification-url": "http://localhost:8585/NotificationServer/notify"
1461 "service-delete-req-info": {
1462 "service-name": "service2",
1463 "tail-retention": "no"
1467 headers = {'content-type': 'application/json'}
1468 response = requests.request(
1469 "POST", url, data=json.dumps(data), headers=headers,
1470 auth=('admin', 'admin'))
1471 self.assertEqual(response.status_code, requests.codes.ok)
1472 res = response.json()
1473 self.assertIn('Renderer service delete in progress',
1474 res['output']['configuration-response-common']['response-message'])
1477 def test_46_get_no_oc_services(self):
1479 url = ("{}/operational/org-openroadm-service:service-list"
1480 .format(self.restconf_baseurl))
1481 headers = {'content-type': 'application/json',
1482 "Accept": "application/json"}
1483 response = requests.request(
1484 "GET", url, headers=headers, auth=('admin', 'admin'))
1485 self.assertEqual(response.status_code, requests.codes.not_found)
1486 res = response.json()
1488 {"error-type": "application", "error-tag": "data-missing",
1489 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1490 res['errors']['error'])
1493 def test_47_get_no_xc_ROADMA(self):
1494 url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
1495 "/node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1496 .format(self.restconf_baseurl))
1497 headers = {'content-type': 'application/json',
1498 "Accept": "application/json"}
1499 response = requests.request(
1500 "GET", url, headers=headers, auth=('admin', 'admin'))
1501 self.assertEqual(response.status_code, requests.codes.ok)
1502 res = response.json()
1503 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1506 def test_48_check_topo_ROADMA(self):
1507 self.test_34_check_topo_ROADMA_SRG1()
1508 self.test_35_check_topo_ROADMA_DEG2()
1510 def test_49_loop_create_eth_service(self):
1511 for i in range(1, 6):
1512 print("trial number {}".format(i))
1513 print("eth service creation")
1514 self.test_11_create_eth_service1()
1515 print("check xc in ROADM-A1")
1516 self.test_13_check_xc1_ROADMA()
1517 print("check xc in ROADM-C1")
1518 self.test_14_check_xc1_ROADMC()
1519 print("eth service deletion\n")
1520 self.test_30_delete_eth_service1()
1522 def test_50_loop_create_oc_service(self):
1523 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1524 .format(self.restconf_baseurl))
1525 response = requests.request("GET", url, auth=('admin', 'admin'))
1526 if response.status_code != 404:
1527 url = ("{}/operations/org-openroadm-service:service-delete"
1528 .format(self.restconf_baseurl))
1530 "sdnc-request-header": {
1531 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1532 "rpc-action": "service-delete",
1533 "request-system-id": "appname",
1534 "notification-url": "http://localhost:8585/NotificationServer/notify"
1536 "service-delete-req-info": {
1537 "service-name": "service1",
1538 "tail-retention": "no"
1542 headers = {'content-type': 'application/json'}
1543 requests.request("POST", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin'))
1546 for i in range(1, 6):
1547 print("trial number {}".format(i))
1548 print("oc service creation")
1549 self.test_36_create_oc_service1()
1550 print("check xc in ROADM-A1")
1551 self.test_38_check_xc1_ROADMA()
1552 print("check xc in ROADM-C1")
1553 self.test_39_check_xc1_ROADMC()
1554 print("oc service deletion\n")
1555 self.test_44_delete_oc_service1()
1557 def test_51_disconnect_XPDRA(self):
1558 url = ("{}/config/network-topology:"
1559 "network-topology/topology/topology-netconf/node/XPDR-A1"
1560 .format(self.restconf_baseurl))
1561 headers = {'content-type': 'application/json'}
1562 response = requests.request(
1563 "DELETE", url, headers=headers,
1564 auth=('admin', 'admin'))
1565 self.assertEqual(response.status_code, requests.codes.ok)
1568 def test_52_disconnect_XPDRC(self):
1569 url = ("{}/config/network-topology:"
1570 "network-topology/topology/topology-netconf/node/XPDR-C1"
1571 .format(self.restconf_baseurl))
1572 headers = {'content-type': 'application/json'}
1573 response = requests.request(
1574 "DELETE", url, headers=headers,
1575 auth=('admin', 'admin'))
1576 self.assertEqual(response.status_code, requests.codes.ok)
1579 def test_53_disconnect_ROADMA(self):
1580 url = ("{}/config/network-topology:"
1581 "network-topology/topology/topology-netconf/node/ROADM-A1"
1582 .format(self.restconf_baseurl))
1583 headers = {'content-type': 'application/json'}
1584 response = requests.request(
1585 "DELETE", url, headers=headers,
1586 auth=('admin', 'admin'))
1587 self.assertEqual(response.status_code, requests.codes.ok)
1590 def test_54_disconnect_ROADMC(self):
1591 url = ("{}/config/network-topology:"
1592 "network-topology/topology/topology-netconf/node/ROADM-C1"
1593 .format(self.restconf_baseurl))
1594 headers = {'content-type': 'application/json'}
1595 response = requests.request(
1596 "DELETE", url, headers=headers,
1597 auth=('admin', 'admin'))
1598 self.assertEqual(response.status_code, requests.codes.ok)
1602 if __name__ == "__main__":
1603 unittest.main(verbosity=2)