2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
24 class TransportPCEFulltesting(unittest.TestCase):
31 restconf_baseurl = "http://localhost:8181/restconf"
32 WAITING = 20 # nominal value is 300
34 # START_IGNORE_XTESTING
38 cls.sim_process1 = test_utils.start_sim('xpdra')
40 cls.sim_process2 = test_utils.start_sim('roadma')
42 cls.sim_process3 = test_utils.start_sim('roadmc')
44 cls.sim_process4 = test_utils.start_sim('xpdrc')
45 print("all sims started")
47 cls.odl_process = test_utils.start_tpce()
49 print("opendaylight started")
52 def tearDownClass(cls):
53 for child in psutil.Process(cls.odl_process.pid).children():
54 child.send_signal(signal.SIGINT)
56 cls.odl_process.send_signal(signal.SIGINT)
57 cls.odl_process.wait()
58 for child in psutil.Process(cls.sim_process1.pid).children():
59 child.send_signal(signal.SIGINT)
61 cls.sim_process1.send_signal(signal.SIGINT)
62 cls.sim_process1.wait()
63 for child in psutil.Process(cls.sim_process2.pid).children():
64 child.send_signal(signal.SIGINT)
66 cls.sim_process2.send_signal(signal.SIGINT)
67 cls.sim_process2.wait()
68 for child in psutil.Process(cls.sim_process3.pid).children():
69 child.send_signal(signal.SIGINT)
71 cls.sim_process3.send_signal(signal.SIGINT)
72 cls.sim_process3.wait()
73 for child in psutil.Process(cls.sim_process4.pid).children():
74 child.send_signal(signal.SIGINT)
76 cls.sim_process4.send_signal(signal.SIGINT)
77 cls.sim_process4.wait()
78 print("all processes killed")
80 def setUp(self): # instruction executed before each test method
81 print("execution of {}".format(self.id().split(".")[-1]))
85 # connect netconf devices
86 def test_01_connect_xpdrA(self):
87 url = ("{}/config/network-topology:"
88 "network-topology/topology/topology-netconf/node/XPDR-A1"
89 .format(self.restconf_baseurl))
92 "netconf-node-topology:username": "admin",
93 "netconf-node-topology:password": "admin",
94 "netconf-node-topology:host": "127.0.0.1",
95 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
96 "netconf-node-topology:tcp-only": "false",
97 "netconf-node-topology:pass-through": {}}]}
98 headers = {'content-type': 'application/json'}
99 response = requests.request(
100 "PUT", url, data=json.dumps(data), headers=headers,
101 auth=('admin', 'admin'))
102 self.assertEqual(response.status_code, requests.codes.created)
105 def test_02_connect_xpdrC(self):
106 url = ("{}/config/network-topology:"
107 "network-topology/topology/topology-netconf/node/XPDR-C1"
108 .format(self.restconf_baseurl))
110 "node-id": "XPDR-C1",
111 "netconf-node-topology:username": "admin",
112 "netconf-node-topology:password": "admin",
113 "netconf-node-topology:host": "127.0.0.1",
114 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
115 "netconf-node-topology:tcp-only": "false",
116 "netconf-node-topology:pass-through": {}}]}
117 headers = {'content-type': 'application/json'}
118 response = requests.request(
119 "PUT", url, data=json.dumps(data), headers=headers,
120 auth=('admin', 'admin'))
121 self.assertEqual(response.status_code, requests.codes.created)
124 def test_03_connect_rdmA(self):
125 url = ("{}/config/network-topology:"
126 "network-topology/topology/topology-netconf/node/ROADM-A1"
127 .format(self.restconf_baseurl))
129 "node-id": "ROADM-A1",
130 "netconf-node-topology:username": "admin",
131 "netconf-node-topology:password": "admin",
132 "netconf-node-topology:host": "127.0.0.1",
133 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
134 "netconf-node-topology:tcp-only": "false",
135 "netconf-node-topology:pass-through": {}}]}
136 headers = {'content-type': 'application/json'}
137 response = requests.request(
138 "PUT", url, data=json.dumps(data), headers=headers,
139 auth=('admin', 'admin'))
140 self.assertEqual(response.status_code, requests.codes.created)
143 def test_04_connect_rdmC(self):
144 url = ("{}/config/network-topology:"
145 "network-topology/topology/topology-netconf/node/ROADM-C1"
146 .format(self.restconf_baseurl))
148 "node-id": "ROADM-C1",
149 "netconf-node-topology:username": "admin",
150 "netconf-node-topology:password": "admin",
151 "netconf-node-topology:host": "127.0.0.1",
152 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
153 "netconf-node-topology:tcp-only": "false",
154 "netconf-node-topology:pass-through": {}}]}
155 headers = {'content-type': 'application/json'}
156 response = requests.request(
157 "PUT", url, data=json.dumps(data), headers=headers,
158 auth=('admin', 'admin'))
159 self.assertEqual(response.status_code, requests.codes.created)
162 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
163 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
165 "networkutils:input": {
166 "networkutils:links-input": {
167 "networkutils:xpdr-node": "XPDR-A1",
168 "networkutils:xpdr-num": "1",
169 "networkutils:network-num": "1",
170 "networkutils:rdm-node": "ROADM-A1",
171 "networkutils:srg-num": "1",
172 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
176 headers = {'content-type': 'application/json'}
177 response = requests.request(
178 "POST", url, data=json.dumps(data),
179 headers=headers, auth=('admin', 'admin'))
180 self.assertEqual(response.status_code, requests.codes.ok)
181 res = response.json()
182 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
185 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
186 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
188 "networkutils:input": {
189 "networkutils:links-input": {
190 "networkutils:xpdr-node": "XPDR-A1",
191 "networkutils:xpdr-num": "1",
192 "networkutils:network-num": "1",
193 "networkutils:rdm-node": "ROADM-A1",
194 "networkutils:srg-num": "1",
195 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
199 headers = {'content-type': 'application/json'}
200 response = requests.request(
201 "POST", url, data=json.dumps(data),
202 headers=headers, auth=('admin', 'admin'))
203 self.assertEqual(response.status_code, requests.codes.ok)
204 res = response.json()
205 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
208 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
209 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
211 "networkutils:input": {
212 "networkutils:links-input": {
213 "networkutils:xpdr-node": "XPDR-C1",
214 "networkutils:xpdr-num": "1",
215 "networkutils:network-num": "1",
216 "networkutils:rdm-node": "ROADM-C1",
217 "networkutils:srg-num": "1",
218 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
222 headers = {'content-type': 'application/json'}
223 response = requests.request(
224 "POST", url, data=json.dumps(data),
225 headers=headers, auth=('admin', 'admin'))
226 self.assertEqual(response.status_code, requests.codes.ok)
227 res = response.json()
228 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
231 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
232 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
234 "networkutils:input": {
235 "networkutils:links-input": {
236 "networkutils:xpdr-node": "XPDR-C1",
237 "networkutils:xpdr-num": "1",
238 "networkutils:network-num": "1",
239 "networkutils:rdm-node": "ROADM-C1",
240 "networkutils:srg-num": "1",
241 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
245 headers = {'content-type': 'application/json'}
246 response = requests.request(
247 "POST", url, data=json.dumps(data),
248 headers=headers, auth=('admin', 'admin'))
249 self.assertEqual(response.status_code, requests.codes.ok)
250 res = response.json()
251 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
254 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
255 # Config ROADMA-ROADMC oms-attributes
256 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
257 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
258 "OMS-attributes/span"
259 .format(self.restconf_baseurl))
261 "auto-spanloss": "true",
262 "spanloss-base": 11.4,
263 "spanloss-current": 12,
264 "engineered-spanloss": 12.2,
265 "link-concatenation": [{
268 "SRLG-length": 100000,
270 headers = {'content-type': 'application/json'}
271 response = requests.request(
272 "PUT", url, data=json.dumps(data), headers=headers,
273 auth=('admin', 'admin'))
274 self.assertEqual(response.status_code, requests.codes.created)
276 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
277 # Config ROADMC-ROADMA oms-attributes
278 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
279 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
280 "OMS-attributes/span"
281 .format(self.restconf_baseurl))
283 "auto-spanloss": "true",
284 "spanloss-base": 11.4,
285 "spanloss-current": 12,
286 "engineered-spanloss": 12.2,
287 "link-concatenation": [{
290 "SRLG-length": 100000,
292 headers = {'content-type': 'application/json'}
293 response = requests.request(
294 "PUT", url, data=json.dumps(data), headers=headers,
295 auth=('admin', 'admin'))
296 self.assertEqual(response.status_code, requests.codes.created)
298 # test service-create for Eth service from xpdr to xpdr
299 def test_11_create_eth_service1(self):
300 url = ("{}/operations/org-openroadm-service:service-create"
301 .format(self.restconf_baseurl))
303 "sdnc-request-header": {
304 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
305 "rpc-action": "service-create",
306 "request-system-id": "appname",
307 "notification-url": "http://localhost:8585/NotificationServer/notify"
309 "service-name": "service1",
310 "common-id": "ASATT1234567",
311 "connection-type": "service",
313 "service-rate": "100",
314 "node-id": "XPDR-A1",
315 "service-format": "Ethernet",
316 "clli": "SNJSCAMCJP8",
319 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
320 "port-type": "router",
321 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
322 "port-rack": "000000.00",
326 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
327 "lgx-port-name": "LGX Back.3",
328 "lgx-port-rack": "000000.00",
329 "lgx-port-shelf": "00"
334 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
335 "port-type": "router",
336 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
337 "port-rack": "000000.00",
341 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
342 "lgx-port-name": "LGX Back.4",
343 "lgx-port-rack": "000000.00",
344 "lgx-port-shelf": "00"
350 "service-rate": "100",
351 "node-id": "XPDR-C1",
352 "service-format": "Ethernet",
353 "clli": "SNJSCAMCJT4",
356 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
357 "port-type": "router",
358 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
359 "port-rack": "000000.00",
363 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
364 "lgx-port-name": "LGX Back.29",
365 "lgx-port-rack": "000000.00",
366 "lgx-port-shelf": "00"
371 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
372 "port-type": "router",
373 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
374 "port-rack": "000000.00",
378 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
379 "lgx-port-name": "LGX Back.30",
380 "lgx-port-rack": "000000.00",
381 "lgx-port-shelf": "00"
386 "due-date": "2016-11-28T00:00:01Z",
387 "operator-contact": "pw1234"
390 headers = {'content-type': 'application/json',
391 "Accept": "application/json"}
392 response = requests.request(
393 "POST", url, data=json.dumps(data), headers=headers,
394 auth=('admin', 'admin'))
395 self.assertEqual(response.status_code, requests.codes.ok)
396 res = response.json()
397 self.assertIn('PCE calculation in progress',
398 res['output']['configuration-response-common']['response-message'])
399 time.sleep(self.WAITING)
401 def test_12_get_eth_service1(self):
402 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
403 .format(self.restconf_baseurl))
404 headers = {'content-type': 'application/json',
405 "Accept": "application/json"}
406 response = requests.request(
407 "GET", url, headers=headers, auth=('admin', 'admin'))
408 self.assertEqual(response.status_code, requests.codes.ok)
409 res = response.json()
411 res['services'][0]['administrative-state'], 'inService')
413 res['services'][0]['service-name'], 'service1')
415 res['services'][0]['connection-type'], 'service')
417 res['services'][0]['lifecycle-state'], 'planned')
420 def test_13_check_xc1_ROADMA(self):
421 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
422 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
423 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
424 .format(self.restconf_baseurl))
425 headers = {'content-type': 'application/json'}
426 response = requests.request(
427 "GET", url, headers=headers, auth=('admin', 'admin'))
428 self.assertEqual(response.status_code, requests.codes.ok)
429 res = response.json()
430 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
431 self.assertDictEqual(
433 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
434 'opticalControlMode': 'gainLoss',
435 'target-output-power': -3.0
436 }, **res['roadm-connections'][0]),
437 res['roadm-connections'][0]
439 self.assertDictEqual(
440 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
441 res['roadm-connections'][0]['source'])
442 self.assertDictEqual(
443 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
444 res['roadm-connections'][0]['destination'])
447 def test_14_check_xc1_ROADMC(self):
448 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
449 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
450 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
451 .format(self.restconf_baseurl))
452 headers = {'content-type': 'application/json'}
453 response = requests.request(
454 "GET", url, headers=headers, auth=('admin', 'admin'))
455 self.assertEqual(response.status_code, requests.codes.ok)
456 res = response.json()
457 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
458 self.assertDictEqual(
460 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
461 'opticalControlMode': 'gainLoss',
462 'target-output-power': -3.0
463 }, **res['roadm-connections'][0]),
464 res['roadm-connections'][0]
466 self.assertDictEqual(
467 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
468 res['roadm-connections'][0]['source'])
469 self.assertDictEqual(
470 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
471 res['roadm-connections'][0]['destination'])
474 def test_15_check_topo_XPDRA(self):
475 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
476 .format(self.restconf_baseurl))
477 response = requests.request(
478 "GET", url1, auth=('admin', 'admin'))
479 self.assertEqual(response.status_code, requests.codes.ok)
480 res = response.json()
481 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
483 if ele['tp-id'] == 'XPDR1-NETWORK1':
484 self.assertEqual({u'frequency': 196.1,
486 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
487 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
488 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
489 if ele['tp-id'] == 'XPDR1-NETWORK2':
490 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
493 def test_16_check_topo_ROADMA_SRG1(self):
494 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
495 .format(self.restconf_baseurl))
496 response = requests.request(
497 "GET", url1, auth=('admin', 'admin'))
498 self.assertEqual(response.status_code, requests.codes.ok)
499 res = response.json()
500 self.assertNotIn({u'index': 1},
501 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
502 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
504 if ele['tp-id'] == 'SRG1-PP1-TXRX':
505 self.assertIn({u'index': 1, u'frequency': 196.1,
507 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
508 if ele['tp-id'] == 'SRG1-PP2-TXRX':
509 self.assertNotIn('used-wavelength', dict.keys(ele))
512 def test_17_check_topo_ROADMA_DEG1(self):
513 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
514 .format(self.restconf_baseurl))
515 response = requests.request(
516 "GET", url1, auth=('admin', 'admin'))
517 self.assertEqual(response.status_code, requests.codes.ok)
518 res = response.json()
519 self.assertNotIn({u'index': 1},
520 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
521 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
523 if ele['tp-id'] == 'DEG2-CTP-TXRX':
524 self.assertIn({u'index': 1, u'frequency': 196.1,
526 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
527 if ele['tp-id'] == 'DEG2-TTP-TXRX':
528 self.assertIn({u'index': 1, u'frequency': 196.1,
530 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
533 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
534 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
536 "networkutils:input": {
537 "networkutils:links-input": {
538 "networkutils:xpdr-node": "XPDR-A1",
539 "networkutils:xpdr-num": "1",
540 "networkutils:network-num": "2",
541 "networkutils:rdm-node": "ROADM-A1",
542 "networkutils:srg-num": "1",
543 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
547 headers = {'content-type': 'application/json'}
548 response = requests.request(
549 "POST", url, data=json.dumps(data),
550 headers=headers, auth=('admin', 'admin'))
551 self.assertEqual(response.status_code, requests.codes.ok)
552 res = response.json()
553 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
556 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
557 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
559 "networkutils:input": {
560 "networkutils:links-input": {
561 "networkutils:xpdr-node": "XPDR-A1",
562 "networkutils:xpdr-num": "1",
563 "networkutils:network-num": "2",
564 "networkutils:rdm-node": "ROADM-A1",
565 "networkutils:srg-num": "1",
566 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
570 headers = {'content-type': 'application/json'}
571 response = requests.request(
572 "POST", url, data=json.dumps(data),
573 headers=headers, auth=('admin', 'admin'))
574 self.assertEqual(response.status_code, requests.codes.ok)
575 res = response.json()
576 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
579 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
580 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
582 "networkutils:input": {
583 "networkutils:links-input": {
584 "networkutils:xpdr-node": "XPDR-C1",
585 "networkutils:xpdr-num": "1",
586 "networkutils:network-num": "2",
587 "networkutils:rdm-node": "ROADM-C1",
588 "networkutils:srg-num": "1",
589 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
593 headers = {'content-type': 'application/json'}
594 response = requests.request(
595 "POST", url, data=json.dumps(data),
596 headers=headers, auth=('admin', 'admin'))
597 self.assertEqual(response.status_code, requests.codes.ok)
598 res = response.json()
599 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
602 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
603 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
605 "networkutils:input": {
606 "networkutils:links-input": {
607 "networkutils:xpdr-node": "XPDR-C1",
608 "networkutils:xpdr-num": "1",
609 "networkutils:network-num": "2",
610 "networkutils:rdm-node": "ROADM-C1",
611 "networkutils:srg-num": "1",
612 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
616 headers = {'content-type': 'application/json'}
617 response = requests.request(
618 "POST", url, data=json.dumps(data),
619 headers=headers, auth=('admin', 'admin'))
620 self.assertEqual(response.status_code, requests.codes.ok)
621 res = response.json()
622 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
625 def test_22_create_eth_service2(self):
626 url = ("{}/operations/org-openroadm-service:service-create"
627 .format(self.restconf_baseurl))
629 "sdnc-request-header": {
630 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
631 "rpc-action": "service-create",
632 "request-system-id": "appname",
633 "notification-url": "http://localhost:8585/NotificationServer/notify"
635 "service-name": "service2",
636 "common-id": "ASATT1234567",
637 "connection-type": "service",
639 "service-rate": "100",
640 "node-id": "XPDR-A1",
641 "service-format": "Ethernet",
642 "clli": "SNJSCAMCJP8",
645 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
646 "port-type": "router",
647 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
648 "port-rack": "000000.00",
652 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
653 "lgx-port-name": "LGX Back.3",
654 "lgx-port-rack": "000000.00",
655 "lgx-port-shelf": "00"
660 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
661 "port-type": "router",
662 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
663 "port-rack": "000000.00",
667 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
668 "lgx-port-name": "LGX Back.4",
669 "lgx-port-rack": "000000.00",
670 "lgx-port-shelf": "00"
676 "service-rate": "100",
677 "node-id": "XPDR-C1",
678 "service-format": "Ethernet",
679 "clli": "SNJSCAMCJT4",
682 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
683 "port-type": "router",
684 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
685 "port-rack": "000000.00",
689 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
690 "lgx-port-name": "LGX Back.29",
691 "lgx-port-rack": "000000.00",
692 "lgx-port-shelf": "00"
697 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
698 "port-type": "router",
699 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
700 "port-rack": "000000.00",
704 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
705 "lgx-port-name": "LGX Back.30",
706 "lgx-port-rack": "000000.00",
707 "lgx-port-shelf": "00"
712 "due-date": "2016-11-28T00:00:01Z",
713 "operator-contact": "pw1234"
716 headers = {'content-type': 'application/json',
717 "Accept": "application/json"}
718 response = requests.request(
719 "POST", url, data=json.dumps(data), headers=headers,
720 auth=('admin', 'admin'))
721 self.assertEqual(response.status_code, requests.codes.ok)
722 res = response.json()
723 self.assertIn('PCE calculation in progress',
724 res['output']['configuration-response-common']['response-message'])
725 time.sleep(self.WAITING)
727 def test_23_get_eth_service2(self):
728 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
729 .format(self.restconf_baseurl))
730 headers = {'content-type': 'application/json',
731 "Accept": "application/json"}
732 response = requests.request(
733 "GET", url, headers=headers, auth=('admin', 'admin'))
734 self.assertEqual(response.status_code, requests.codes.ok)
735 res = response.json()
737 res['services'][0]['administrative-state'],
740 res['services'][0]['service-name'], 'service2')
742 res['services'][0]['connection-type'], 'service')
744 res['services'][0]['lifecycle-state'], 'planned')
747 def test_24_check_xc2_ROADMA(self):
748 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
749 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
750 "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2"
751 .format(self.restconf_baseurl))
752 headers = {'content-type': 'application/json'}
753 response = requests.request(
754 "GET", url, headers=headers, auth=('admin', 'admin'))
755 self.assertEqual(response.status_code, requests.codes.ok)
756 res = response.json()
757 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
758 self.assertDictEqual(
760 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
761 'opticalControlMode': 'power'
762 }, **res['roadm-connections'][0]),
763 res['roadm-connections'][0]
765 self.assertDictEqual(
766 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
767 res['roadm-connections'][0]['source'])
768 self.assertDictEqual(
769 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
770 res['roadm-connections'][0]['destination'])
772 def test_25_check_topo_XPDRA(self):
773 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
774 .format(self.restconf_baseurl))
775 response = requests.request(
776 "GET", url1, auth=('admin', 'admin'))
777 self.assertEqual(response.status_code, requests.codes.ok)
778 res = response.json()
779 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
781 if ele['tp-id'] == 'XPDR1-NETWORK1':
782 self.assertEqual({u'frequency': 196.1,
784 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
785 if ele['tp-id'] == 'XPDR1-NETWORK2':
786 self.assertEqual({u'frequency': 196.05,
788 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
789 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
790 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
793 def test_26_check_topo_ROADMA_SRG1(self):
794 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
795 .format(self.restconf_baseurl))
796 response = requests.request(
797 "GET", url1, auth=('admin', 'admin'))
798 self.assertEqual(response.status_code, requests.codes.ok)
799 res = response.json()
800 self.assertNotIn({u'index': 1}, res['node'][0]
801 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
802 self.assertNotIn({u'index': 2}, res['node'][0]
803 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
804 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
806 if ele['tp-id'] == 'SRG1-PP1-TXRX':
807 self.assertIn({u'index': 1, u'frequency': 196.1,
809 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
810 self.assertNotIn({u'index': 2, u'frequency': 196.05,
812 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
813 if ele['tp-id'] == 'SRG1-PP2-TXRX':
814 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
815 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
816 self.assertNotIn({u'index': 1, u'frequency': 196.1,
818 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
819 if ele['tp-id'] == 'SRG1-PP3-TXRX':
820 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
823 def test_27_check_topo_ROADMA_DEG2(self):
824 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
825 .format(self.restconf_baseurl))
826 response = requests.request(
827 "GET", url1, auth=('admin', 'admin'))
828 self.assertEqual(response.status_code, requests.codes.ok)
829 res = response.json()
830 self.assertNotIn({u'index': 1}, res['node'][0]
831 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
832 self.assertNotIn({u'index': 2}, res['node'][0]
833 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
834 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
836 if ele['tp-id'] == 'DEG2-CTP-TXRX':
837 self.assertIn({u'index': 1, u'frequency': 196.1,
839 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
840 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
841 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
842 if ele['tp-id'] == 'DEG2-TTP-TXRX':
843 self.assertIn({u'index': 1, u'frequency': 196.1,
845 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
846 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
847 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
850 # creation service test on a non-available resource
851 def test_28_create_eth_service3(self):
852 url = ("{}/operations/org-openroadm-service:service-create"
853 .format(self.restconf_baseurl))
855 "sdnc-request-header": {
856 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
857 "rpc-action": "service-create",
858 "request-system-id": "appname",
859 "notification-url": "http://localhost:8585/NotificationServer/notify"
861 "service-name": "service3",
862 "common-id": "ASATT1234567",
863 "connection-type": "service",
865 "service-rate": "100",
866 "node-id": "XPDR-A1",
867 "service-format": "Ethernet",
868 "clli": "SNJSCAMCJP8",
871 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
872 "port-type": "router",
873 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
874 "port-rack": "000000.00",
878 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
879 "lgx-port-name": "LGX Back.3",
880 "lgx-port-rack": "000000.00",
881 "lgx-port-shelf": "00"
886 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
887 "port-type": "router",
888 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
889 "port-rack": "000000.00",
893 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
894 "lgx-port-name": "LGX Back.4",
895 "lgx-port-rack": "000000.00",
896 "lgx-port-shelf": "00"
902 "service-rate": "100",
903 "node-id": "XPDR-C1",
904 "service-format": "Ethernet",
905 "clli": "SNJSCAMCJT4",
908 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
909 "port-type": "router",
910 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
911 "port-rack": "000000.00",
915 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
916 "lgx-port-name": "LGX Back.29",
917 "lgx-port-rack": "000000.00",
918 "lgx-port-shelf": "00"
923 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
924 "port-type": "router",
925 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
926 "port-rack": "000000.00",
930 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
931 "lgx-port-name": "LGX Back.30",
932 "lgx-port-rack": "000000.00",
933 "lgx-port-shelf": "00"
938 "due-date": "2016-11-28T00:00:01Z",
939 "operator-contact": "pw1234"
942 headers = {'content-type': 'application/json',
943 "Accept": "application/json"}
944 response = requests.request(
945 "POST", url, data=json.dumps(data), headers=headers,
946 auth=('admin', 'admin'))
947 self.assertEqual(response.status_code, requests.codes.ok)
948 res = response.json()
949 self.assertIn('PCE calculation in progress',
950 res['output']['configuration-response-common']['response-message'])
951 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
952 time.sleep(self.WAITING)
954 # add a test that check the openroadm-service-list still only contains 2 elements
955 def test_29_delete_eth_service3(self):
956 url = ("{}/operations/org-openroadm-service:service-delete"
957 .format(self.restconf_baseurl))
959 "sdnc-request-header": {
960 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
961 "rpc-action": "service-delete",
962 "request-system-id": "appname",
963 "notification-url": "http://localhost:8585/NotificationServer/notify"
965 "service-delete-req-info": {
966 "service-name": "service3",
967 "tail-retention": "no"
971 headers = {'content-type': 'application/json'}
972 response = requests.request(
973 "POST", url, data=json.dumps(data), headers=headers,
974 auth=('admin', 'admin'))
975 self.assertEqual(response.status_code, requests.codes.ok)
976 res = response.json()
977 self.assertIn('Service \'service3\' does not exist in datastore',
978 res['output']['configuration-response-common']['response-message'])
979 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
982 def test_30_delete_eth_service1(self):
983 url = ("{}/operations/org-openroadm-service:service-delete"
984 .format(self.restconf_baseurl))
986 "sdnc-request-header": {
987 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
988 "rpc-action": "service-delete",
989 "request-system-id": "appname",
990 "notification-url": "http://localhost:8585/NotificationServer/notify"
992 "service-delete-req-info": {
993 "service-name": "service1",
994 "tail-retention": "no"
998 headers = {'content-type': 'application/json'}
999 response = requests.request(
1000 "POST", url, data=json.dumps(data), headers=headers,
1001 auth=('admin', 'admin'))
1002 self.assertEqual(response.status_code, requests.codes.ok)
1003 res = response.json()
1004 self.assertIn('Renderer service delete in progress',
1005 res['output']['configuration-response-common']['response-message'])
1008 def test_31_delete_eth_service2(self):
1009 url = ("{}/operations/org-openroadm-service:service-delete"
1010 .format(self.restconf_baseurl))
1012 "sdnc-request-header": {
1013 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1014 "rpc-action": "service-delete",
1015 "request-system-id": "appname",
1016 "notification-url": "http://localhost:8585/NotificationServer/notify"
1018 "service-delete-req-info": {
1019 "service-name": "service2",
1020 "tail-retention": "no"
1024 headers = {'content-type': 'application/json'}
1025 response = requests.request(
1026 "POST", url, data=json.dumps(data), headers=headers,
1027 auth=('admin', 'admin'))
1028 self.assertEqual(response.status_code, requests.codes.ok)
1029 res = response.json()
1030 self.assertIn('Renderer service delete in progress',
1031 res['output']['configuration-response-common']['response-message'])
1034 def test_32_check_no_xc_ROADMA(self):
1035 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1036 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1037 .format(self.restconf_baseurl))
1038 response = requests.request(
1039 "GET", url, auth=('admin', 'admin'))
1040 res = response.json()
1041 self.assertEqual(response.status_code, requests.codes.ok)
1042 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
1045 def test_33_check_topo_XPDRA(self):
1046 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
1047 .format(self.restconf_baseurl))
1048 response = requests.request(
1049 "GET", url1, auth=('admin', 'admin'))
1050 self.assertEqual(response.status_code, requests.codes.ok)
1051 res = response.json()
1052 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1053 for ele in liste_tp:
1054 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
1055 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1056 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
1057 self.assertIn(u'tail-equipment-id',
1058 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
1059 self.assertNotIn('wavelength', dict.keys(
1060 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
1063 def test_34_check_topo_ROADMA_SRG1(self):
1064 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
1065 .format(self.restconf_baseurl))
1066 response = requests.request(
1067 "GET", url1, auth=('admin', 'admin'))
1068 self.assertEqual(response.status_code, requests.codes.ok)
1069 res = response.json()
1070 self.assertIn({u'index': 1}, res['node'][0]
1071 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1072 self.assertIn({u'index': 2}, res['node'][0]
1073 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1074 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1075 for ele in liste_tp:
1076 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
1077 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1079 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1082 def test_35_check_topo_ROADMA_DEG2(self):
1083 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
1084 .format(self.restconf_baseurl))
1085 response = requests.request(
1086 "GET", url1, auth=('admin', 'admin'))
1087 self.assertEqual(response.status_code, requests.codes.ok)
1088 res = response.json()
1089 self.assertIn({u'index': 1}, res['node'][0]
1090 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1091 self.assertIn({u'index': 2}, res['node'][0]
1092 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1093 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1094 for ele in liste_tp:
1095 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1096 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
1097 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1098 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
1101 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1102 def test_36_create_oc_service1(self):
1103 url = ("{}/operations/org-openroadm-service:service-create"
1104 .format(self.restconf_baseurl))
1106 "sdnc-request-header": {
1107 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1108 "rpc-action": "service-create",
1109 "request-system-id": "appname",
1110 "notification-url": "http://localhost:8585/NotificationServer/notify"
1112 "service-name": "service1",
1113 "common-id": "ASATT1234567",
1114 "connection-type": "roadm-line",
1116 "service-rate": "100",
1117 "node-id": "ROADM-A1",
1118 "service-format": "OC",
1119 "clli": "SNJSCAMCJP8",
1122 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1123 "port-type": "router",
1124 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1125 "port-rack": "000000.00",
1129 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1130 "lgx-port-name": "LGX Back.3",
1131 "lgx-port-rack": "000000.00",
1132 "lgx-port-shelf": "00"
1137 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1138 "port-type": "router",
1139 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1140 "port-rack": "000000.00",
1144 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1145 "lgx-port-name": "LGX Back.4",
1146 "lgx-port-rack": "000000.00",
1147 "lgx-port-shelf": "00"
1150 "optic-type": "gray"
1153 "service-rate": "100",
1154 "node-id": "ROADM-C1",
1155 "service-format": "OC",
1156 "clli": "SNJSCAMCJT4",
1159 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1160 "port-type": "router",
1161 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1162 "port-rack": "000000.00",
1166 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1167 "lgx-port-name": "LGX Back.29",
1168 "lgx-port-rack": "000000.00",
1169 "lgx-port-shelf": "00"
1174 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1175 "port-type": "router",
1176 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1177 "port-rack": "000000.00",
1181 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1182 "lgx-port-name": "LGX Back.30",
1183 "lgx-port-rack": "000000.00",
1184 "lgx-port-shelf": "00"
1187 "optic-type": "gray"
1189 "due-date": "2016-11-28T00:00:01Z",
1190 "operator-contact": "pw1234"
1193 headers = {'content-type': 'application/json',
1194 "Accept": "application/json"}
1195 response = requests.request(
1196 "POST", url, data=json.dumps(data), headers=headers,
1197 auth=('admin', 'admin'))
1198 self.assertEqual(response.status_code, requests.codes.ok)
1199 res = response.json()
1200 self.assertIn('PCE calculation in progress',
1201 res['output']['configuration-response-common']['response-message'])
1202 time.sleep(self.WAITING)
1204 def test_37_get_oc_service1(self):
1205 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1206 .format(self.restconf_baseurl))
1207 headers = {'content-type': 'application/json',
1208 "Accept": "application/json"}
1209 response = requests.request(
1210 "GET", url, headers=headers, auth=('admin', 'admin'))
1211 self.assertEqual(response.status_code, requests.codes.ok)
1212 res = response.json()
1214 res['services'][0]['administrative-state'],
1217 res['services'][0]['service-name'], 'service1')
1219 res['services'][0]['connection-type'], 'roadm-line')
1221 res['services'][0]['lifecycle-state'], 'planned')
1224 def test_38_check_xc1_ROADMA(self):
1225 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1226 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1227 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1228 .format(self.restconf_baseurl))
1229 headers = {'content-type': 'application/json'}
1230 response = requests.request(
1231 "GET", url, headers=headers, auth=('admin', 'admin'))
1232 self.assertEqual(response.status_code, requests.codes.ok)
1233 res = response.json()
1234 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1235 self.assertDictEqual(
1237 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1238 'opticalControlMode': 'gainLoss',
1239 'target-output-power': -3.0
1240 }, **res['roadm-connections'][0]),
1241 res['roadm-connections'][0]
1243 self.assertDictEqual(
1244 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1245 res['roadm-connections'][0]['source'])
1246 self.assertDictEqual(
1247 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
1248 res['roadm-connections'][0]['destination'])
1251 def test_39_check_xc1_ROADMC(self):
1252 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1253 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1254 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1255 .format(self.restconf_baseurl))
1256 headers = {'content-type': 'application/json'}
1257 response = requests.request(
1258 "GET", url, headers=headers, auth=('admin', 'admin'))
1259 self.assertEqual(response.status_code, requests.codes.ok)
1260 res = response.json()
1261 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1262 self.assertDictEqual(
1264 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1265 'opticalControlMode': 'gainLoss',
1266 'target-output-power': -3.0
1267 }, **res['roadm-connections'][0]),
1268 res['roadm-connections'][0]
1270 self.assertDictEqual(
1271 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1272 res['roadm-connections'][0]['source'])
1273 self.assertDictEqual(
1274 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
1275 res['roadm-connections'][0]['destination'])
1278 def test_40_create_oc_service2(self):
1279 url = ("{}/operations/org-openroadm-service:service-create"
1280 .format(self.restconf_baseurl))
1282 "sdnc-request-header": {
1283 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1284 "rpc-action": "service-create",
1285 "request-system-id": "appname",
1286 "notification-url": "http://localhost:8585/NotificationServer/notify"
1288 "service-name": "service2",
1289 "common-id": "ASATT1234567",
1290 "connection-type": "roadm-line",
1292 "service-rate": "100",
1293 "node-id": "ROADM-A1",
1294 "service-format": "OC",
1295 "clli": "SNJSCAMCJP8",
1298 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1299 "port-type": "router",
1300 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1301 "port-rack": "000000.00",
1305 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1306 "lgx-port-name": "LGX Back.3",
1307 "lgx-port-rack": "000000.00",
1308 "lgx-port-shelf": "00"
1313 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1314 "port-type": "router",
1315 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1316 "port-rack": "000000.00",
1320 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1321 "lgx-port-name": "LGX Back.4",
1322 "lgx-port-rack": "000000.00",
1323 "lgx-port-shelf": "00"
1326 "optic-type": "gray"
1329 "service-rate": "100",
1330 "node-id": "ROADM-C1",
1331 "service-format": "OC",
1332 "clli": "SNJSCAMCJT4",
1335 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1336 "port-type": "router",
1337 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1338 "port-rack": "000000.00",
1342 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1343 "lgx-port-name": "LGX Back.29",
1344 "lgx-port-rack": "000000.00",
1345 "lgx-port-shelf": "00"
1350 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1351 "port-type": "router",
1352 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1353 "port-rack": "000000.00",
1357 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1358 "lgx-port-name": "LGX Back.30",
1359 "lgx-port-rack": "000000.00",
1360 "lgx-port-shelf": "00"
1363 "optic-type": "gray"
1365 "due-date": "2016-11-28T00:00:01Z",
1366 "operator-contact": "pw1234"
1369 headers = {'content-type': 'application/json',
1370 "Accept": "application/json"}
1371 response = requests.request(
1372 "POST", url, data=json.dumps(data), headers=headers,
1373 auth=('admin', 'admin'))
1374 self.assertEqual(response.status_code, requests.codes.ok)
1375 res = response.json()
1376 self.assertIn('PCE calculation in progress',
1377 res['output']['configuration-response-common']['response-message'])
1378 time.sleep(self.WAITING)
1380 def test_41_get_oc_service2(self):
1381 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
1382 .format(self.restconf_baseurl))
1383 headers = {'content-type': 'application/json',
1384 "Accept": "application/json"}
1385 response = requests.request(
1386 "GET", url, headers=headers, auth=('admin', 'admin'))
1387 self.assertEqual(response.status_code, requests.codes.ok)
1388 res = response.json()
1390 res['services'][0]['administrative-state'],
1393 res['services'][0]['service-name'], 'service2')
1395 res['services'][0]['connection-type'], 'roadm-line')
1397 res['services'][0]['lifecycle-state'], 'planned')
1400 def test_42_check_xc2_ROADMA(self):
1401 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1402 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1403 "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2"
1404 .format(self.restconf_baseurl))
1405 headers = {'content-type': 'application/json'}
1406 response = requests.request(
1407 "GET", url, headers=headers, auth=('admin', 'admin'))
1408 self.assertEqual(response.status_code, requests.codes.ok)
1409 res = response.json()
1410 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1411 self.assertDictEqual(
1413 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
1414 'opticalControlMode': 'gainLoss',
1415 'target-output-power': -3.0
1416 }, **res['roadm-connections'][0]),
1417 res['roadm-connections'][0]
1419 self.assertDictEqual(
1420 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
1421 res['roadm-connections'][0]['source'])
1422 self.assertDictEqual(
1423 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
1424 res['roadm-connections'][0]['destination'])
1427 def test_43_check_topo_ROADMA(self):
1428 self.test_26_check_topo_ROADMA_SRG1()
1429 self.test_27_check_topo_ROADMA_DEG2()
1432 def test_44_delete_oc_service1(self):
1433 url = ("{}/operations/org-openroadm-service:service-delete"
1434 .format(self.restconf_baseurl))
1436 "sdnc-request-header": {
1437 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1438 "rpc-action": "service-delete",
1439 "request-system-id": "appname",
1440 "notification-url": "http://localhost:8585/NotificationServer/notify"
1442 "service-delete-req-info": {
1443 "service-name": "service1",
1444 "tail-retention": "no"
1448 headers = {'content-type': 'application/json'}
1449 response = requests.request(
1450 "POST", url, data=json.dumps(data), headers=headers,
1451 auth=('admin', 'admin'))
1452 self.assertEqual(response.status_code, requests.codes.ok)
1453 res = response.json()
1454 self.assertIn('Renderer service delete in progress',
1455 res['output']['configuration-response-common']['response-message'])
1458 def test_45_delete_oc_service2(self):
1459 url = ("{}/operations/org-openroadm-service:service-delete"
1460 .format(self.restconf_baseurl))
1462 "sdnc-request-header": {
1463 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1464 "rpc-action": "service-delete",
1465 "request-system-id": "appname",
1466 "notification-url": "http://localhost:8585/NotificationServer/notify"
1468 "service-delete-req-info": {
1469 "service-name": "service2",
1470 "tail-retention": "no"
1474 headers = {'content-type': 'application/json'}
1475 response = requests.request(
1476 "POST", url, data=json.dumps(data), headers=headers,
1477 auth=('admin', 'admin'))
1478 self.assertEqual(response.status_code, requests.codes.ok)
1479 res = response.json()
1480 self.assertIn('Renderer service delete in progress',
1481 res['output']['configuration-response-common']['response-message'])
1484 def test_46_get_no_oc_services(self):
1486 url = ("{}/operational/org-openroadm-service:service-list"
1487 .format(self.restconf_baseurl))
1488 headers = {'content-type': 'application/json',
1489 "Accept": "application/json"}
1490 response = requests.request(
1491 "GET", url, headers=headers, auth=('admin', 'admin'))
1492 self.assertEqual(response.status_code, requests.codes.not_found)
1493 res = response.json()
1495 {"error-type": "application", "error-tag": "data-missing",
1496 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1497 res['errors']['error'])
1500 def test_47_get_no_xc_ROADMA(self):
1501 url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
1502 "/node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1503 .format(self.restconf_baseurl))
1504 headers = {'content-type': 'application/json',
1505 "Accept": "application/json"}
1506 response = requests.request(
1507 "GET", url, headers=headers, auth=('admin', 'admin'))
1508 self.assertEqual(response.status_code, requests.codes.ok)
1509 res = response.json()
1510 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1513 def test_48_check_topo_ROADMA(self):
1514 self.test_34_check_topo_ROADMA_SRG1()
1515 self.test_35_check_topo_ROADMA_DEG2()
1517 def test_49_loop_create_eth_service(self):
1518 for i in range(1, 6):
1519 print("trial number {}".format(i))
1520 print("eth service creation")
1521 self.test_11_create_eth_service1()
1522 print("check xc in ROADM-A1")
1523 self.test_13_check_xc1_ROADMA()
1524 print("check xc in ROADM-C1")
1525 self.test_14_check_xc1_ROADMC()
1526 print("eth service deletion\n")
1527 self.test_30_delete_eth_service1()
1529 def test_50_loop_create_oc_service(self):
1530 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1531 .format(self.restconf_baseurl))
1532 response = requests.request("GET", url, auth=('admin', 'admin'))
1533 if response.status_code != 404:
1534 url = ("{}/operations/org-openroadm-service:service-delete"
1535 .format(self.restconf_baseurl))
1537 "sdnc-request-header": {
1538 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1539 "rpc-action": "service-delete",
1540 "request-system-id": "appname",
1541 "notification-url": "http://localhost:8585/NotificationServer/notify"
1543 "service-delete-req-info": {
1544 "service-name": "service1",
1545 "tail-retention": "no"
1549 headers = {'content-type': 'application/json'}
1550 requests.request("POST", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin'))
1553 for i in range(1, 6):
1554 print("trial number {}".format(i))
1555 print("oc service creation")
1556 self.test_36_create_oc_service1()
1557 print("check xc in ROADM-A1")
1558 self.test_38_check_xc1_ROADMA()
1559 print("check xc in ROADM-C1")
1560 self.test_39_check_xc1_ROADMC()
1561 print("oc service deletion\n")
1562 self.test_44_delete_oc_service1()
1564 def test_51_disconnect_XPDRA(self):
1565 url = ("{}/config/network-topology:"
1566 "network-topology/topology/topology-netconf/node/XPDR-A1"
1567 .format(self.restconf_baseurl))
1568 headers = {'content-type': 'application/json'}
1569 response = requests.request(
1570 "DELETE", url, headers=headers,
1571 auth=('admin', 'admin'))
1572 self.assertEqual(response.status_code, requests.codes.ok)
1575 def test_52_disconnect_XPDRC(self):
1576 url = ("{}/config/network-topology:"
1577 "network-topology/topology/topology-netconf/node/XPDR-C1"
1578 .format(self.restconf_baseurl))
1579 headers = {'content-type': 'application/json'}
1580 response = requests.request(
1581 "DELETE", url, headers=headers,
1582 auth=('admin', 'admin'))
1583 self.assertEqual(response.status_code, requests.codes.ok)
1586 def test_53_disconnect_ROADMA(self):
1587 url = ("{}/config/network-topology:"
1588 "network-topology/topology/topology-netconf/node/ROADM-A1"
1589 .format(self.restconf_baseurl))
1590 headers = {'content-type': 'application/json'}
1591 response = requests.request(
1592 "DELETE", url, headers=headers,
1593 auth=('admin', 'admin'))
1594 self.assertEqual(response.status_code, requests.codes.ok)
1597 def test_54_disconnect_ROADMC(self):
1598 url = ("{}/config/network-topology:"
1599 "network-topology/topology/topology-netconf/node/ROADM-C1"
1600 .format(self.restconf_baseurl))
1601 headers = {'content-type': 'application/json'}
1602 response = requests.request(
1603 "DELETE", url, headers=headers,
1604 auth=('admin', 'admin'))
1605 self.assertEqual(response.status_code, requests.codes.ok)
1609 if __name__ == "__main__":
1610 unittest.main(verbosity=2)