2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
24 class TransportPCEFulltesting(unittest.TestCase):
27 honeynode_process1 = None
28 honeynode_process2 = None
29 honeynode_process3 = None
30 honeynode_process4 = None
31 restconf_baseurl = "http://localhost:8181/restconf"
32 WAITING = 20 # nominal value is 300
34 # START_IGNORE_XTESTING
38 print("starting honeynode1...")
39 cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
42 print("starting honeynode2...")
43 cls.honeynode_process2 = test_utils.start_roadma_honeynode()
46 print("starting honeynode3...")
47 cls.honeynode_process3 = test_utils.start_roadmc_honeynode()
50 print("starting honeynode4...")
51 cls.honeynode_process4 = test_utils.start_xpdrc_honeynode()
53 print("all honeynodes started")
55 print("starting opendaylight...")
56 cls.odl_process = test_utils.start_tpce()
58 print("opendaylight started")
61 def tearDownClass(cls):
62 for child in psutil.Process(cls.odl_process.pid).children():
63 child.send_signal(signal.SIGINT)
65 cls.odl_process.send_signal(signal.SIGINT)
66 cls.odl_process.wait()
67 for child in psutil.Process(cls.honeynode_process1.pid).children():
68 child.send_signal(signal.SIGINT)
70 cls.honeynode_process1.send_signal(signal.SIGINT)
71 cls.honeynode_process1.wait()
72 for child in psutil.Process(cls.honeynode_process2.pid).children():
73 child.send_signal(signal.SIGINT)
75 cls.honeynode_process2.send_signal(signal.SIGINT)
76 cls.honeynode_process2.wait()
77 for child in psutil.Process(cls.honeynode_process3.pid).children():
78 child.send_signal(signal.SIGINT)
80 cls.honeynode_process3.send_signal(signal.SIGINT)
81 cls.honeynode_process3.wait()
82 for child in psutil.Process(cls.honeynode_process4.pid).children():
83 child.send_signal(signal.SIGINT)
85 cls.honeynode_process4.send_signal(signal.SIGINT)
86 cls.honeynode_process4.wait()
87 print("all processes killed")
89 def setUp(self): # instruction executed before each test method
90 print("execution of {}".format(self.id().split(".")[-1]))
94 # connect netconf devices
95 def test_01_connect_xpdrA(self):
96 url = ("{}/config/network-topology:"
97 "network-topology/topology/topology-netconf/node/XPDR-A1"
98 .format(self.restconf_baseurl))
100 "node-id": "XPDR-A1",
101 "netconf-node-topology:username": "admin",
102 "netconf-node-topology:password": "admin",
103 "netconf-node-topology:host": "127.0.0.1",
104 "netconf-node-topology:port": "17840",
105 "netconf-node-topology:tcp-only": "false",
106 "netconf-node-topology:pass-through": {}}]}
107 headers = {'content-type': 'application/json'}
108 response = requests.request(
109 "PUT", url, data=json.dumps(data), headers=headers,
110 auth=('admin', 'admin'))
111 self.assertEqual(response.status_code, requests.codes.created)
114 def test_02_connect_xpdrC(self):
115 url = ("{}/config/network-topology:"
116 "network-topology/topology/topology-netconf/node/XPDR-C1"
117 .format(self.restconf_baseurl))
119 "node-id": "XPDR-C1",
120 "netconf-node-topology:username": "admin",
121 "netconf-node-topology:password": "admin",
122 "netconf-node-topology:host": "127.0.0.1",
123 "netconf-node-topology:port": "17844",
124 "netconf-node-topology:tcp-only": "false",
125 "netconf-node-topology:pass-through": {}}]}
126 headers = {'content-type': 'application/json'}
127 response = requests.request(
128 "PUT", url, data=json.dumps(data), headers=headers,
129 auth=('admin', 'admin'))
130 self.assertEqual(response.status_code, requests.codes.created)
133 def test_03_connect_rdmA(self):
134 url = ("{}/config/network-topology:"
135 "network-topology/topology/topology-netconf/node/ROADM-A1"
136 .format(self.restconf_baseurl))
138 "node-id": "ROADM-A1",
139 "netconf-node-topology:username": "admin",
140 "netconf-node-topology:password": "admin",
141 "netconf-node-topology:host": "127.0.0.1",
142 "netconf-node-topology:port": "17841",
143 "netconf-node-topology:tcp-only": "false",
144 "netconf-node-topology:pass-through": {}}]}
145 headers = {'content-type': 'application/json'}
146 response = requests.request(
147 "PUT", url, data=json.dumps(data), headers=headers,
148 auth=('admin', 'admin'))
149 self.assertEqual(response.status_code, requests.codes.created)
152 def test_04_connect_rdmC(self):
153 url = ("{}/config/network-topology:"
154 "network-topology/topology/topology-netconf/node/ROADM-C1"
155 .format(self.restconf_baseurl))
157 "node-id": "ROADM-C1",
158 "netconf-node-topology:username": "admin",
159 "netconf-node-topology:password": "admin",
160 "netconf-node-topology:host": "127.0.0.1",
161 "netconf-node-topology:port": "17843",
162 "netconf-node-topology:tcp-only": "false",
163 "netconf-node-topology:pass-through": {}}]}
164 headers = {'content-type': 'application/json'}
165 response = requests.request(
166 "PUT", url, data=json.dumps(data), headers=headers,
167 auth=('admin', 'admin'))
168 self.assertEqual(response.status_code, requests.codes.created)
171 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
172 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
174 "networkutils:input": {
175 "networkutils:links-input": {
176 "networkutils:xpdr-node": "XPDR-A1",
177 "networkutils:xpdr-num": "1",
178 "networkutils:network-num": "1",
179 "networkutils:rdm-node": "ROADM-A1",
180 "networkutils:srg-num": "1",
181 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
185 headers = {'content-type': 'application/json'}
186 response = requests.request(
187 "POST", url, data=json.dumps(data),
188 headers=headers, auth=('admin', 'admin'))
189 self.assertEqual(response.status_code, requests.codes.ok)
190 res = response.json()
191 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
194 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
195 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
197 "networkutils:input": {
198 "networkutils:links-input": {
199 "networkutils:xpdr-node": "XPDR-A1",
200 "networkutils:xpdr-num": "1",
201 "networkutils:network-num": "1",
202 "networkutils:rdm-node": "ROADM-A1",
203 "networkutils:srg-num": "1",
204 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
208 headers = {'content-type': 'application/json'}
209 response = requests.request(
210 "POST", url, data=json.dumps(data),
211 headers=headers, auth=('admin', 'admin'))
212 self.assertEqual(response.status_code, requests.codes.ok)
213 res = response.json()
214 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
217 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
218 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
220 "networkutils:input": {
221 "networkutils:links-input": {
222 "networkutils:xpdr-node": "XPDR-C1",
223 "networkutils:xpdr-num": "1",
224 "networkutils:network-num": "1",
225 "networkutils:rdm-node": "ROADM-C1",
226 "networkutils:srg-num": "1",
227 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
231 headers = {'content-type': 'application/json'}
232 response = requests.request(
233 "POST", url, data=json.dumps(data),
234 headers=headers, auth=('admin', 'admin'))
235 self.assertEqual(response.status_code, requests.codes.ok)
236 res = response.json()
237 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
240 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
241 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
243 "networkutils:input": {
244 "networkutils:links-input": {
245 "networkutils:xpdr-node": "XPDR-C1",
246 "networkutils:xpdr-num": "1",
247 "networkutils:network-num": "1",
248 "networkutils:rdm-node": "ROADM-C1",
249 "networkutils:srg-num": "1",
250 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
254 headers = {'content-type': 'application/json'}
255 response = requests.request(
256 "POST", url, data=json.dumps(data),
257 headers=headers, auth=('admin', 'admin'))
258 self.assertEqual(response.status_code, requests.codes.ok)
259 res = response.json()
260 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
263 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
264 # Config ROADMA-ROADMC oms-attributes
265 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
266 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
267 "OMS-attributes/span"
268 .format(self.restconf_baseurl))
270 "auto-spanloss": "true",
271 "spanloss-base": 11.4,
272 "spanloss-current": 12,
273 "engineered-spanloss": 12.2,
274 "link-concatenation": [{
277 "SRLG-length": 100000,
279 headers = {'content-type': 'application/json'}
280 response = requests.request(
281 "PUT", url, data=json.dumps(data), headers=headers,
282 auth=('admin', 'admin'))
283 self.assertEqual(response.status_code, requests.codes.created)
285 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
286 # Config ROADMC-ROADMA oms-attributes
287 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
288 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
289 "OMS-attributes/span"
290 .format(self.restconf_baseurl))
292 "auto-spanloss": "true",
293 "spanloss-base": 11.4,
294 "spanloss-current": 12,
295 "engineered-spanloss": 12.2,
296 "link-concatenation": [{
299 "SRLG-length": 100000,
301 headers = {'content-type': 'application/json'}
302 response = requests.request(
303 "PUT", url, data=json.dumps(data), headers=headers,
304 auth=('admin', 'admin'))
305 self.assertEqual(response.status_code, requests.codes.created)
307 # test service-create for Eth service from xpdr to xpdr
308 def test_11_create_eth_service1(self):
309 url = ("{}/operations/org-openroadm-service:service-create"
310 .format(self.restconf_baseurl))
312 "sdnc-request-header": {
313 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
314 "rpc-action": "service-create",
315 "request-system-id": "appname",
316 "notification-url": "http://localhost:8585/NotificationServer/notify"
318 "service-name": "service1",
319 "common-id": "ASATT1234567",
320 "connection-type": "service",
322 "service-rate": "100",
323 "node-id": "XPDR-A1",
324 "service-format": "Ethernet",
325 "clli": "SNJSCAMCJP8",
328 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
329 "port-type": "router",
330 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
331 "port-rack": "000000.00",
335 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
336 "lgx-port-name": "LGX Back.3",
337 "lgx-port-rack": "000000.00",
338 "lgx-port-shelf": "00"
343 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
344 "port-type": "router",
345 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
346 "port-rack": "000000.00",
350 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
351 "lgx-port-name": "LGX Back.4",
352 "lgx-port-rack": "000000.00",
353 "lgx-port-shelf": "00"
359 "service-rate": "100",
360 "node-id": "XPDR-C1",
361 "service-format": "Ethernet",
362 "clli": "SNJSCAMCJT4",
365 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
366 "port-type": "router",
367 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
368 "port-rack": "000000.00",
372 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
373 "lgx-port-name": "LGX Back.29",
374 "lgx-port-rack": "000000.00",
375 "lgx-port-shelf": "00"
380 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
381 "port-type": "router",
382 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
383 "port-rack": "000000.00",
387 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
388 "lgx-port-name": "LGX Back.30",
389 "lgx-port-rack": "000000.00",
390 "lgx-port-shelf": "00"
395 "due-date": "2016-11-28T00:00:01Z",
396 "operator-contact": "pw1234"
399 headers = {'content-type': 'application/json',
400 "Accept": "application/json"}
401 response = requests.request(
402 "POST", url, data=json.dumps(data), headers=headers,
403 auth=('admin', 'admin'))
404 self.assertEqual(response.status_code, requests.codes.ok)
405 res = response.json()
406 self.assertIn('PCE calculation in progress',
407 res['output']['configuration-response-common']['response-message'])
408 time.sleep(self.WAITING)
410 def test_12_get_eth_service1(self):
411 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
412 .format(self.restconf_baseurl))
413 headers = {'content-type': 'application/json',
414 "Accept": "application/json"}
415 response = requests.request(
416 "GET", url, headers=headers, auth=('admin', 'admin'))
417 self.assertEqual(response.status_code, requests.codes.ok)
418 res = response.json()
420 res['services'][0]['administrative-state'], 'inService')
422 res['services'][0]['service-name'], 'service1')
424 res['services'][0]['connection-type'], 'service')
426 res['services'][0]['lifecycle-state'], 'planned')
429 def test_13_check_xc1_ROADMA(self):
430 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
431 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
432 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
433 .format(self.restconf_baseurl))
434 headers = {'content-type': 'application/json'}
435 response = requests.request(
436 "GET", url, headers=headers, auth=('admin', 'admin'))
437 self.assertEqual(response.status_code, requests.codes.ok)
438 res = response.json()
439 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
440 self.assertDictEqual(
442 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
443 'opticalControlMode': 'gainLoss',
444 'target-output-power': -3.0
445 }, **res['roadm-connections'][0]),
446 res['roadm-connections'][0]
448 self.assertDictEqual(
449 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
450 res['roadm-connections'][0]['source'])
451 self.assertDictEqual(
452 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
453 res['roadm-connections'][0]['destination'])
456 def test_14_check_xc1_ROADMC(self):
457 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
458 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
459 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
460 .format(self.restconf_baseurl))
461 headers = {'content-type': 'application/json'}
462 response = requests.request(
463 "GET", url, headers=headers, auth=('admin', 'admin'))
464 self.assertEqual(response.status_code, requests.codes.ok)
465 res = response.json()
466 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
467 self.assertDictEqual(
469 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
470 'opticalControlMode': 'gainLoss',
471 'target-output-power': -3.0
472 }, **res['roadm-connections'][0]),
473 res['roadm-connections'][0]
475 self.assertDictEqual(
476 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
477 res['roadm-connections'][0]['source'])
478 self.assertDictEqual(
479 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
480 res['roadm-connections'][0]['destination'])
483 def test_15_check_topo_XPDRA(self):
484 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
485 .format(self.restconf_baseurl))
486 response = requests.request(
487 "GET", url1, auth=('admin', 'admin'))
488 self.assertEqual(response.status_code, requests.codes.ok)
489 res = response.json()
490 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
492 if ele['tp-id'] == 'XPDR1-NETWORK1':
493 self.assertEqual({u'frequency': 196.1,
495 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
496 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
497 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
498 if ele['tp-id'] == 'XPDR1-NETWORK2':
499 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
502 def test_16_check_topo_ROADMA_SRG1(self):
503 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
504 .format(self.restconf_baseurl))
505 response = requests.request(
506 "GET", url1, auth=('admin', 'admin'))
507 self.assertEqual(response.status_code, requests.codes.ok)
508 res = response.json()
509 self.assertNotIn({u'index': 1},
510 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
511 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
513 if ele['tp-id'] == 'SRG1-PP1-TXRX':
514 self.assertIn({u'index': 1, u'frequency': 196.1,
516 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
517 if ele['tp-id'] == 'SRG1-PP2-TXRX':
518 self.assertNotIn('used-wavelength', dict.keys(ele))
521 def test_17_check_topo_ROADMA_DEG1(self):
522 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
523 .format(self.restconf_baseurl))
524 response = requests.request(
525 "GET", url1, auth=('admin', 'admin'))
526 self.assertEqual(response.status_code, requests.codes.ok)
527 res = response.json()
528 self.assertNotIn({u'index': 1},
529 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
530 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
532 if ele['tp-id'] == 'DEG2-CTP-TXRX':
533 self.assertIn({u'index': 1, u'frequency': 196.1,
535 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
536 if ele['tp-id'] == 'DEG2-TTP-TXRX':
537 self.assertIn({u'index': 1, u'frequency': 196.1,
539 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
542 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
543 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
545 "networkutils:input": {
546 "networkutils:links-input": {
547 "networkutils:xpdr-node": "XPDR-A1",
548 "networkutils:xpdr-num": "1",
549 "networkutils:network-num": "2",
550 "networkutils:rdm-node": "ROADM-A1",
551 "networkutils:srg-num": "1",
552 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
556 headers = {'content-type': 'application/json'}
557 response = requests.request(
558 "POST", url, data=json.dumps(data),
559 headers=headers, auth=('admin', 'admin'))
560 self.assertEqual(response.status_code, requests.codes.ok)
561 res = response.json()
562 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
565 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
566 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
568 "networkutils:input": {
569 "networkutils:links-input": {
570 "networkutils:xpdr-node": "XPDR-A1",
571 "networkutils:xpdr-num": "1",
572 "networkutils:network-num": "2",
573 "networkutils:rdm-node": "ROADM-A1",
574 "networkutils:srg-num": "1",
575 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
579 headers = {'content-type': 'application/json'}
580 response = requests.request(
581 "POST", url, data=json.dumps(data),
582 headers=headers, auth=('admin', 'admin'))
583 self.assertEqual(response.status_code, requests.codes.ok)
584 res = response.json()
585 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
588 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
589 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
591 "networkutils:input": {
592 "networkutils:links-input": {
593 "networkutils:xpdr-node": "XPDR-C1",
594 "networkutils:xpdr-num": "1",
595 "networkutils:network-num": "2",
596 "networkutils:rdm-node": "ROADM-C1",
597 "networkutils:srg-num": "1",
598 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
602 headers = {'content-type': 'application/json'}
603 response = requests.request(
604 "POST", url, data=json.dumps(data),
605 headers=headers, auth=('admin', 'admin'))
606 self.assertEqual(response.status_code, requests.codes.ok)
607 res = response.json()
608 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
611 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
612 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
614 "networkutils:input": {
615 "networkutils:links-input": {
616 "networkutils:xpdr-node": "XPDR-C1",
617 "networkutils:xpdr-num": "1",
618 "networkutils:network-num": "2",
619 "networkutils:rdm-node": "ROADM-C1",
620 "networkutils:srg-num": "1",
621 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
625 headers = {'content-type': 'application/json'}
626 response = requests.request(
627 "POST", url, data=json.dumps(data),
628 headers=headers, auth=('admin', 'admin'))
629 self.assertEqual(response.status_code, requests.codes.ok)
630 res = response.json()
631 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
634 def test_22_create_eth_service2(self):
635 url = ("{}/operations/org-openroadm-service:service-create"
636 .format(self.restconf_baseurl))
638 "sdnc-request-header": {
639 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
640 "rpc-action": "service-create",
641 "request-system-id": "appname",
642 "notification-url": "http://localhost:8585/NotificationServer/notify"
644 "service-name": "service2",
645 "common-id": "ASATT1234567",
646 "connection-type": "service",
648 "service-rate": "100",
649 "node-id": "XPDR-A1",
650 "service-format": "Ethernet",
651 "clli": "SNJSCAMCJP8",
654 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
655 "port-type": "router",
656 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
657 "port-rack": "000000.00",
661 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
662 "lgx-port-name": "LGX Back.3",
663 "lgx-port-rack": "000000.00",
664 "lgx-port-shelf": "00"
669 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
670 "port-type": "router",
671 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
672 "port-rack": "000000.00",
676 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
677 "lgx-port-name": "LGX Back.4",
678 "lgx-port-rack": "000000.00",
679 "lgx-port-shelf": "00"
685 "service-rate": "100",
686 "node-id": "XPDR-C1",
687 "service-format": "Ethernet",
688 "clli": "SNJSCAMCJT4",
691 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
692 "port-type": "router",
693 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
694 "port-rack": "000000.00",
698 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
699 "lgx-port-name": "LGX Back.29",
700 "lgx-port-rack": "000000.00",
701 "lgx-port-shelf": "00"
706 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
707 "port-type": "router",
708 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
709 "port-rack": "000000.00",
713 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
714 "lgx-port-name": "LGX Back.30",
715 "lgx-port-rack": "000000.00",
716 "lgx-port-shelf": "00"
721 "due-date": "2016-11-28T00:00:01Z",
722 "operator-contact": "pw1234"
725 headers = {'content-type': 'application/json',
726 "Accept": "application/json"}
727 response = requests.request(
728 "POST", url, data=json.dumps(data), headers=headers,
729 auth=('admin', 'admin'))
730 self.assertEqual(response.status_code, requests.codes.ok)
731 res = response.json()
732 self.assertIn('PCE calculation in progress',
733 res['output']['configuration-response-common']['response-message'])
734 time.sleep(self.WAITING)
736 def test_23_get_eth_service2(self):
737 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
738 .format(self.restconf_baseurl))
739 headers = {'content-type': 'application/json',
740 "Accept": "application/json"}
741 response = requests.request(
742 "GET", url, headers=headers, auth=('admin', 'admin'))
743 self.assertEqual(response.status_code, requests.codes.ok)
744 res = response.json()
746 res['services'][0]['administrative-state'],
749 res['services'][0]['service-name'], 'service2')
751 res['services'][0]['connection-type'], 'service')
753 res['services'][0]['lifecycle-state'], 'planned')
756 def test_24_check_xc2_ROADMA(self):
757 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
758 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
759 "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2"
760 .format(self.restconf_baseurl))
761 headers = {'content-type': 'application/json'}
762 response = requests.request(
763 "GET", url, headers=headers, auth=('admin', 'admin'))
764 self.assertEqual(response.status_code, requests.codes.ok)
765 res = response.json()
766 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
767 self.assertDictEqual(
769 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
770 'opticalControlMode': 'power'
771 }, **res['roadm-connections'][0]),
772 res['roadm-connections'][0]
774 self.assertDictEqual(
775 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
776 res['roadm-connections'][0]['source'])
777 self.assertDictEqual(
778 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
779 res['roadm-connections'][0]['destination'])
781 def test_25_check_topo_XPDRA(self):
782 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
783 .format(self.restconf_baseurl))
784 response = requests.request(
785 "GET", url1, auth=('admin', 'admin'))
786 self.assertEqual(response.status_code, requests.codes.ok)
787 res = response.json()
788 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
790 if ele['tp-id'] == 'XPDR1-NETWORK1':
791 self.assertEqual({u'frequency': 196.1,
793 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
794 if ele['tp-id'] == 'XPDR1-NETWORK2':
795 self.assertEqual({u'frequency': 196.05,
797 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
798 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
799 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
802 def test_26_check_topo_ROADMA_SRG1(self):
803 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
804 .format(self.restconf_baseurl))
805 response = requests.request(
806 "GET", url1, auth=('admin', 'admin'))
807 self.assertEqual(response.status_code, requests.codes.ok)
808 res = response.json()
809 self.assertNotIn({u'index': 1}, res['node'][0]
810 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
811 self.assertNotIn({u'index': 2}, res['node'][0]
812 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
813 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
815 if ele['tp-id'] == 'SRG1-PP1-TXRX':
816 self.assertIn({u'index': 1, u'frequency': 196.1,
818 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
819 self.assertNotIn({u'index': 2, u'frequency': 196.05,
821 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
822 if ele['tp-id'] == 'SRG1-PP2-TXRX':
823 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
824 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
825 self.assertNotIn({u'index': 1, u'frequency': 196.1,
827 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
828 if ele['tp-id'] == 'SRG1-PP3-TXRX':
829 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
832 def test_27_check_topo_ROADMA_DEG2(self):
833 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
834 .format(self.restconf_baseurl))
835 response = requests.request(
836 "GET", url1, auth=('admin', 'admin'))
837 self.assertEqual(response.status_code, requests.codes.ok)
838 res = response.json()
839 self.assertNotIn({u'index': 1}, res['node'][0]
840 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
841 self.assertNotIn({u'index': 2}, res['node'][0]
842 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
843 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
845 if ele['tp-id'] == 'DEG2-CTP-TXRX':
846 self.assertIn({u'index': 1, u'frequency': 196.1,
848 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
849 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
850 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
851 if ele['tp-id'] == 'DEG2-TTP-TXRX':
852 self.assertIn({u'index': 1, u'frequency': 196.1,
854 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
855 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
856 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
859 # creation service test on a non-available resource
860 def test_28_create_eth_service3(self):
861 url = ("{}/operations/org-openroadm-service:service-create"
862 .format(self.restconf_baseurl))
864 "sdnc-request-header": {
865 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
866 "rpc-action": "service-create",
867 "request-system-id": "appname",
868 "notification-url": "http://localhost:8585/NotificationServer/notify"
870 "service-name": "service3",
871 "common-id": "ASATT1234567",
872 "connection-type": "service",
874 "service-rate": "100",
875 "node-id": "XPDR-A1",
876 "service-format": "Ethernet",
877 "clli": "SNJSCAMCJP8",
880 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
881 "port-type": "router",
882 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
883 "port-rack": "000000.00",
887 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
888 "lgx-port-name": "LGX Back.3",
889 "lgx-port-rack": "000000.00",
890 "lgx-port-shelf": "00"
895 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
896 "port-type": "router",
897 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
898 "port-rack": "000000.00",
902 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
903 "lgx-port-name": "LGX Back.4",
904 "lgx-port-rack": "000000.00",
905 "lgx-port-shelf": "00"
911 "service-rate": "100",
912 "node-id": "XPDR-C1",
913 "service-format": "Ethernet",
914 "clli": "SNJSCAMCJT4",
917 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
918 "port-type": "router",
919 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
920 "port-rack": "000000.00",
924 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
925 "lgx-port-name": "LGX Back.29",
926 "lgx-port-rack": "000000.00",
927 "lgx-port-shelf": "00"
932 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
933 "port-type": "router",
934 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
935 "port-rack": "000000.00",
939 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
940 "lgx-port-name": "LGX Back.30",
941 "lgx-port-rack": "000000.00",
942 "lgx-port-shelf": "00"
947 "due-date": "2016-11-28T00:00:01Z",
948 "operator-contact": "pw1234"
951 headers = {'content-type': 'application/json',
952 "Accept": "application/json"}
953 response = requests.request(
954 "POST", url, data=json.dumps(data), headers=headers,
955 auth=('admin', 'admin'))
956 self.assertEqual(response.status_code, requests.codes.ok)
957 res = response.json()
958 self.assertIn('PCE calculation in progress',
959 res['output']['configuration-response-common']['response-message'])
960 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
961 time.sleep(self.WAITING)
963 # add a test that check the openroadm-service-list still only contains 2 elements
964 def test_29_delete_eth_service3(self):
965 url = ("{}/operations/org-openroadm-service:service-delete"
966 .format(self.restconf_baseurl))
968 "sdnc-request-header": {
969 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
970 "rpc-action": "service-delete",
971 "request-system-id": "appname",
972 "notification-url": "http://localhost:8585/NotificationServer/notify"
974 "service-delete-req-info": {
975 "service-name": "service3",
976 "tail-retention": "no"
980 headers = {'content-type': 'application/json'}
981 response = requests.request(
982 "POST", url, data=json.dumps(data), headers=headers,
983 auth=('admin', 'admin'))
984 self.assertEqual(response.status_code, requests.codes.ok)
985 res = response.json()
986 self.assertIn('Service \'service3\' does not exist in datastore',
987 res['output']['configuration-response-common']['response-message'])
988 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
991 def test_30_delete_eth_service1(self):
992 url = ("{}/operations/org-openroadm-service:service-delete"
993 .format(self.restconf_baseurl))
995 "sdnc-request-header": {
996 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
997 "rpc-action": "service-delete",
998 "request-system-id": "appname",
999 "notification-url": "http://localhost:8585/NotificationServer/notify"
1001 "service-delete-req-info": {
1002 "service-name": "service1",
1003 "tail-retention": "no"
1007 headers = {'content-type': 'application/json'}
1008 response = requests.request(
1009 "POST", url, data=json.dumps(data), headers=headers,
1010 auth=('admin', 'admin'))
1011 self.assertEqual(response.status_code, requests.codes.ok)
1012 res = response.json()
1013 self.assertIn('Renderer service delete in progress',
1014 res['output']['configuration-response-common']['response-message'])
1017 def test_31_delete_eth_service2(self):
1018 url = ("{}/operations/org-openroadm-service:service-delete"
1019 .format(self.restconf_baseurl))
1021 "sdnc-request-header": {
1022 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1023 "rpc-action": "service-delete",
1024 "request-system-id": "appname",
1025 "notification-url": "http://localhost:8585/NotificationServer/notify"
1027 "service-delete-req-info": {
1028 "service-name": "service2",
1029 "tail-retention": "no"
1033 headers = {'content-type': 'application/json'}
1034 response = requests.request(
1035 "POST", url, data=json.dumps(data), headers=headers,
1036 auth=('admin', 'admin'))
1037 self.assertEqual(response.status_code, requests.codes.ok)
1038 res = response.json()
1039 self.assertIn('Renderer service delete in progress',
1040 res['output']['configuration-response-common']['response-message'])
1043 def test_32_check_no_xc_ROADMA(self):
1044 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1045 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1046 .format(self.restconf_baseurl))
1047 response = requests.request(
1048 "GET", url, auth=('admin', 'admin'))
1049 res = response.json()
1050 self.assertEqual(response.status_code, requests.codes.ok)
1051 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
1054 def test_33_check_topo_XPDRA(self):
1055 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
1056 .format(self.restconf_baseurl))
1057 response = requests.request(
1058 "GET", url1, auth=('admin', 'admin'))
1059 self.assertEqual(response.status_code, requests.codes.ok)
1060 res = response.json()
1061 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1062 for ele in liste_tp:
1063 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
1064 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1065 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
1066 self.assertIn(u'tail-equipment-id',
1067 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
1068 self.assertNotIn('wavelength', dict.keys(
1069 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
1072 def test_34_check_topo_ROADMA_SRG1(self):
1073 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
1074 .format(self.restconf_baseurl))
1075 response = requests.request(
1076 "GET", url1, auth=('admin', 'admin'))
1077 self.assertEqual(response.status_code, requests.codes.ok)
1078 res = response.json()
1079 self.assertIn({u'index': 1}, res['node'][0]
1080 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1081 self.assertIn({u'index': 2}, res['node'][0]
1082 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1083 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1084 for ele in liste_tp:
1085 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
1086 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1088 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1091 def test_35_check_topo_ROADMA_DEG2(self):
1092 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
1093 .format(self.restconf_baseurl))
1094 response = requests.request(
1095 "GET", url1, auth=('admin', 'admin'))
1096 self.assertEqual(response.status_code, requests.codes.ok)
1097 res = response.json()
1098 self.assertIn({u'index': 1}, res['node'][0]
1099 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1100 self.assertIn({u'index': 2}, res['node'][0]
1101 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1102 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1103 for ele in liste_tp:
1104 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1105 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
1106 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1107 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
1110 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1111 def test_36_create_oc_service1(self):
1112 url = ("{}/operations/org-openroadm-service:service-create"
1113 .format(self.restconf_baseurl))
1115 "sdnc-request-header": {
1116 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1117 "rpc-action": "service-create",
1118 "request-system-id": "appname",
1119 "notification-url": "http://localhost:8585/NotificationServer/notify"
1121 "service-name": "service1",
1122 "common-id": "ASATT1234567",
1123 "connection-type": "roadm-line",
1125 "service-rate": "100",
1126 "node-id": "ROADM-A1",
1127 "service-format": "OC",
1128 "clli": "SNJSCAMCJP8",
1131 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1132 "port-type": "router",
1133 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1134 "port-rack": "000000.00",
1138 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1139 "lgx-port-name": "LGX Back.3",
1140 "lgx-port-rack": "000000.00",
1141 "lgx-port-shelf": "00"
1146 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1147 "port-type": "router",
1148 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1149 "port-rack": "000000.00",
1153 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1154 "lgx-port-name": "LGX Back.4",
1155 "lgx-port-rack": "000000.00",
1156 "lgx-port-shelf": "00"
1159 "optic-type": "gray"
1162 "service-rate": "100",
1163 "node-id": "ROADM-C1",
1164 "service-format": "OC",
1165 "clli": "SNJSCAMCJT4",
1168 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1169 "port-type": "router",
1170 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1171 "port-rack": "000000.00",
1175 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1176 "lgx-port-name": "LGX Back.29",
1177 "lgx-port-rack": "000000.00",
1178 "lgx-port-shelf": "00"
1183 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1184 "port-type": "router",
1185 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1186 "port-rack": "000000.00",
1190 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1191 "lgx-port-name": "LGX Back.30",
1192 "lgx-port-rack": "000000.00",
1193 "lgx-port-shelf": "00"
1196 "optic-type": "gray"
1198 "due-date": "2016-11-28T00:00:01Z",
1199 "operator-contact": "pw1234"
1202 headers = {'content-type': 'application/json',
1203 "Accept": "application/json"}
1204 response = requests.request(
1205 "POST", url, data=json.dumps(data), headers=headers,
1206 auth=('admin', 'admin'))
1207 self.assertEqual(response.status_code, requests.codes.ok)
1208 res = response.json()
1209 self.assertIn('PCE calculation in progress',
1210 res['output']['configuration-response-common']['response-message'])
1211 time.sleep(self.WAITING)
1213 def test_37_get_oc_service1(self):
1214 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1215 .format(self.restconf_baseurl))
1216 headers = {'content-type': 'application/json',
1217 "Accept": "application/json"}
1218 response = requests.request(
1219 "GET", url, headers=headers, auth=('admin', 'admin'))
1220 self.assertEqual(response.status_code, requests.codes.ok)
1221 res = response.json()
1223 res['services'][0]['administrative-state'],
1226 res['services'][0]['service-name'], 'service1')
1228 res['services'][0]['connection-type'], 'roadm-line')
1230 res['services'][0]['lifecycle-state'], 'planned')
1233 def test_38_check_xc1_ROADMA(self):
1234 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1235 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1236 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1237 .format(self.restconf_baseurl))
1238 headers = {'content-type': 'application/json'}
1239 response = requests.request(
1240 "GET", url, headers=headers, auth=('admin', 'admin'))
1241 self.assertEqual(response.status_code, requests.codes.ok)
1242 res = response.json()
1243 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1244 self.assertDictEqual(
1246 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1247 'opticalControlMode': 'gainLoss',
1248 'target-output-power': -3.0
1249 }, **res['roadm-connections'][0]),
1250 res['roadm-connections'][0]
1252 self.assertDictEqual(
1253 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1254 res['roadm-connections'][0]['source'])
1255 self.assertDictEqual(
1256 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
1257 res['roadm-connections'][0]['destination'])
1260 def test_39_check_xc1_ROADMC(self):
1261 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1262 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1263 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1264 .format(self.restconf_baseurl))
1265 headers = {'content-type': 'application/json'}
1266 response = requests.request(
1267 "GET", url, headers=headers, auth=('admin', 'admin'))
1268 self.assertEqual(response.status_code, requests.codes.ok)
1269 res = response.json()
1270 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1271 self.assertDictEqual(
1273 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1274 'opticalControlMode': 'gainLoss',
1275 'target-output-power': -3.0
1276 }, **res['roadm-connections'][0]),
1277 res['roadm-connections'][0]
1279 self.assertDictEqual(
1280 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1281 res['roadm-connections'][0]['source'])
1282 self.assertDictEqual(
1283 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
1284 res['roadm-connections'][0]['destination'])
1287 def test_40_create_oc_service2(self):
1288 url = ("{}/operations/org-openroadm-service:service-create"
1289 .format(self.restconf_baseurl))
1291 "sdnc-request-header": {
1292 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1293 "rpc-action": "service-create",
1294 "request-system-id": "appname",
1295 "notification-url": "http://localhost:8585/NotificationServer/notify"
1297 "service-name": "service2",
1298 "common-id": "ASATT1234567",
1299 "connection-type": "roadm-line",
1301 "service-rate": "100",
1302 "node-id": "ROADM-A1",
1303 "service-format": "OC",
1304 "clli": "SNJSCAMCJP8",
1307 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1308 "port-type": "router",
1309 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1310 "port-rack": "000000.00",
1314 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1315 "lgx-port-name": "LGX Back.3",
1316 "lgx-port-rack": "000000.00",
1317 "lgx-port-shelf": "00"
1322 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1323 "port-type": "router",
1324 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1325 "port-rack": "000000.00",
1329 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1330 "lgx-port-name": "LGX Back.4",
1331 "lgx-port-rack": "000000.00",
1332 "lgx-port-shelf": "00"
1335 "optic-type": "gray"
1338 "service-rate": "100",
1339 "node-id": "ROADM-C1",
1340 "service-format": "OC",
1341 "clli": "SNJSCAMCJT4",
1344 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1345 "port-type": "router",
1346 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1347 "port-rack": "000000.00",
1351 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1352 "lgx-port-name": "LGX Back.29",
1353 "lgx-port-rack": "000000.00",
1354 "lgx-port-shelf": "00"
1359 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1360 "port-type": "router",
1361 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1362 "port-rack": "000000.00",
1366 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1367 "lgx-port-name": "LGX Back.30",
1368 "lgx-port-rack": "000000.00",
1369 "lgx-port-shelf": "00"
1372 "optic-type": "gray"
1374 "due-date": "2016-11-28T00:00:01Z",
1375 "operator-contact": "pw1234"
1378 headers = {'content-type': 'application/json',
1379 "Accept": "application/json"}
1380 response = requests.request(
1381 "POST", url, data=json.dumps(data), headers=headers,
1382 auth=('admin', 'admin'))
1383 self.assertEqual(response.status_code, requests.codes.ok)
1384 res = response.json()
1385 self.assertIn('PCE calculation in progress',
1386 res['output']['configuration-response-common']['response-message'])
1387 time.sleep(self.WAITING)
1389 def test_41_get_oc_service2(self):
1390 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
1391 .format(self.restconf_baseurl))
1392 headers = {'content-type': 'application/json',
1393 "Accept": "application/json"}
1394 response = requests.request(
1395 "GET", url, headers=headers, auth=('admin', 'admin'))
1396 self.assertEqual(response.status_code, requests.codes.ok)
1397 res = response.json()
1399 res['services'][0]['administrative-state'],
1402 res['services'][0]['service-name'], 'service2')
1404 res['services'][0]['connection-type'], 'roadm-line')
1406 res['services'][0]['lifecycle-state'], 'planned')
1409 def test_42_check_xc2_ROADMA(self):
1410 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1411 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1412 "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2"
1413 .format(self.restconf_baseurl))
1414 headers = {'content-type': 'application/json'}
1415 response = requests.request(
1416 "GET", url, headers=headers, auth=('admin', 'admin'))
1417 self.assertEqual(response.status_code, requests.codes.ok)
1418 res = response.json()
1419 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1420 self.assertDictEqual(
1422 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
1423 'opticalControlMode': 'gainLoss',
1424 'target-output-power': -3.0
1425 }, **res['roadm-connections'][0]),
1426 res['roadm-connections'][0]
1428 self.assertDictEqual(
1429 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
1430 res['roadm-connections'][0]['source'])
1431 self.assertDictEqual(
1432 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
1433 res['roadm-connections'][0]['destination'])
1436 def test_43_check_topo_ROADMA(self):
1437 self.test_26_check_topo_ROADMA_SRG1()
1438 self.test_27_check_topo_ROADMA_DEG2()
1441 def test_44_delete_oc_service1(self):
1442 url = ("{}/operations/org-openroadm-service:service-delete"
1443 .format(self.restconf_baseurl))
1445 "sdnc-request-header": {
1446 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1447 "rpc-action": "service-delete",
1448 "request-system-id": "appname",
1449 "notification-url": "http://localhost:8585/NotificationServer/notify"
1451 "service-delete-req-info": {
1452 "service-name": "service1",
1453 "tail-retention": "no"
1457 headers = {'content-type': 'application/json'}
1458 response = requests.request(
1459 "POST", url, data=json.dumps(data), headers=headers,
1460 auth=('admin', 'admin'))
1461 self.assertEqual(response.status_code, requests.codes.ok)
1462 res = response.json()
1463 self.assertIn('Renderer service delete in progress',
1464 res['output']['configuration-response-common']['response-message'])
1467 def test_45_delete_oc_service2(self):
1468 url = ("{}/operations/org-openroadm-service:service-delete"
1469 .format(self.restconf_baseurl))
1471 "sdnc-request-header": {
1472 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1473 "rpc-action": "service-delete",
1474 "request-system-id": "appname",
1475 "notification-url": "http://localhost:8585/NotificationServer/notify"
1477 "service-delete-req-info": {
1478 "service-name": "service2",
1479 "tail-retention": "no"
1483 headers = {'content-type': 'application/json'}
1484 response = requests.request(
1485 "POST", url, data=json.dumps(data), headers=headers,
1486 auth=('admin', 'admin'))
1487 self.assertEqual(response.status_code, requests.codes.ok)
1488 res = response.json()
1489 self.assertIn('Renderer service delete in progress',
1490 res['output']['configuration-response-common']['response-message'])
1493 def test_46_get_no_oc_services(self):
1495 url = ("{}/operational/org-openroadm-service:service-list"
1496 .format(self.restconf_baseurl))
1497 headers = {'content-type': 'application/json',
1498 "Accept": "application/json"}
1499 response = requests.request(
1500 "GET", url, headers=headers, auth=('admin', 'admin'))
1501 self.assertEqual(response.status_code, requests.codes.not_found)
1502 res = response.json()
1504 {"error-type": "application", "error-tag": "data-missing",
1505 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1506 res['errors']['error'])
1509 def test_47_get_no_xc_ROADMA(self):
1510 url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
1511 "/node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1512 .format(self.restconf_baseurl))
1513 headers = {'content-type': 'application/json',
1514 "Accept": "application/json"}
1515 response = requests.request(
1516 "GET", url, headers=headers, auth=('admin', 'admin'))
1517 self.assertEqual(response.status_code, requests.codes.ok)
1518 res = response.json()
1519 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1522 def test_48_check_topo_ROADMA(self):
1523 self.test_34_check_topo_ROADMA_SRG1()
1524 self.test_35_check_topo_ROADMA_DEG2()
1526 def test_49_loop_create_eth_service(self):
1527 for i in range(1, 6):
1528 print("trial number {}".format(i))
1529 print("eth service creation")
1530 self.test_11_create_eth_service1()
1531 print("check xc in ROADM-A1")
1532 self.test_13_check_xc1_ROADMA()
1533 print("check xc in ROADM-C1")
1534 self.test_14_check_xc1_ROADMC()
1535 print("eth service deletion\n")
1536 self.test_30_delete_eth_service1()
1538 def test_50_loop_create_oc_service(self):
1539 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1540 .format(self.restconf_baseurl))
1541 response = requests.request("GET", url, auth=('admin', 'admin'))
1542 if response.status_code != 404:
1543 url = ("{}/operations/org-openroadm-service:service-delete"
1544 .format(self.restconf_baseurl))
1546 "sdnc-request-header": {
1547 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1548 "rpc-action": "service-delete",
1549 "request-system-id": "appname",
1550 "notification-url": "http://localhost:8585/NotificationServer/notify"
1552 "service-delete-req-info": {
1553 "service-name": "service1",
1554 "tail-retention": "no"
1558 headers = {'content-type': 'application/json'}
1559 requests.request("POST", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin'))
1562 for i in range(1, 6):
1563 print("trial number {}".format(i))
1564 print("oc service creation")
1565 self.test_36_create_oc_service1()
1566 print("check xc in ROADM-A1")
1567 self.test_38_check_xc1_ROADMA()
1568 print("check xc in ROADM-C1")
1569 self.test_39_check_xc1_ROADMC()
1570 print("oc service deletion\n")
1571 self.test_44_delete_oc_service1()
1573 def test_51_disconnect_XPDRA(self):
1574 url = ("{}/config/network-topology:"
1575 "network-topology/topology/topology-netconf/node/XPDR-A1"
1576 .format(self.restconf_baseurl))
1577 headers = {'content-type': 'application/json'}
1578 response = requests.request(
1579 "DELETE", url, headers=headers,
1580 auth=('admin', 'admin'))
1581 self.assertEqual(response.status_code, requests.codes.ok)
1584 def test_52_disconnect_XPDRC(self):
1585 url = ("{}/config/network-topology:"
1586 "network-topology/topology/topology-netconf/node/XPDR-C1"
1587 .format(self.restconf_baseurl))
1588 headers = {'content-type': 'application/json'}
1589 response = requests.request(
1590 "DELETE", url, headers=headers,
1591 auth=('admin', 'admin'))
1592 self.assertEqual(response.status_code, requests.codes.ok)
1595 def test_53_disconnect_ROADMA(self):
1596 url = ("{}/config/network-topology:"
1597 "network-topology/topology/topology-netconf/node/ROADM-A1"
1598 .format(self.restconf_baseurl))
1599 headers = {'content-type': 'application/json'}
1600 response = requests.request(
1601 "DELETE", url, headers=headers,
1602 auth=('admin', 'admin'))
1603 self.assertEqual(response.status_code, requests.codes.ok)
1606 def test_54_disconnect_ROADMC(self):
1607 url = ("{}/config/network-topology:"
1608 "network-topology/topology/topology-netconf/node/ROADM-C1"
1609 .format(self.restconf_baseurl))
1610 headers = {'content-type': 'application/json'}
1611 response = requests.request(
1612 "DELETE", url, headers=headers,
1613 auth=('admin', 'admin'))
1614 self.assertEqual(response.status_code, requests.codes.ok)
1618 if __name__ == "__main__":
1619 unittest.main(verbosity=2)