2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
21 from common import test_utils
24 class TransportPCEFulltesting(unittest.TestCase):
27 restconf_baseurl = "http://localhost:8181/restconf"
28 WAITING = 20 # nominal value is 300
32 cls.processes = test_utils.start_tpce()
33 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
36 def tearDownClass(cls):
37 for process in cls.processes:
38 test_utils.shutdown_process(process)
39 print("all processes killed")
41 def setUp(self): # instruction executed before each test method
42 print("execution of {}".format(self.id().split(".")[-1]))
44 # connect netconf devices
45 def test_01_connect_xpdrA(self):
46 url = ("{}/config/network-topology:"
47 "network-topology/topology/topology-netconf/node/XPDR-A1"
48 .format(self.restconf_baseurl))
51 "netconf-node-topology:username": "admin",
52 "netconf-node-topology:password": "admin",
53 "netconf-node-topology:host": "127.0.0.1",
54 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
55 "netconf-node-topology:tcp-only": "false",
56 "netconf-node-topology:pass-through": {}}]}
57 headers = {'content-type': 'application/json'}
58 response = requests.request(
59 "PUT", url, data=json.dumps(data), headers=headers,
60 auth=('admin', 'admin'))
61 self.assertEqual(response.status_code, requests.codes.created)
64 def test_02_connect_xpdrC(self):
65 url = ("{}/config/network-topology:"
66 "network-topology/topology/topology-netconf/node/XPDR-C1"
67 .format(self.restconf_baseurl))
70 "netconf-node-topology:username": "admin",
71 "netconf-node-topology:password": "admin",
72 "netconf-node-topology:host": "127.0.0.1",
73 "netconf-node-topology:port": test_utils.sims['xpdrc']['port'],
74 "netconf-node-topology:tcp-only": "false",
75 "netconf-node-topology:pass-through": {}}]}
76 headers = {'content-type': 'application/json'}
77 response = requests.request(
78 "PUT", url, data=json.dumps(data), headers=headers,
79 auth=('admin', 'admin'))
80 self.assertEqual(response.status_code, requests.codes.created)
83 def test_03_connect_rdmA(self):
84 url = ("{}/config/network-topology:"
85 "network-topology/topology/topology-netconf/node/ROADM-A1"
86 .format(self.restconf_baseurl))
88 "node-id": "ROADM-A1",
89 "netconf-node-topology:username": "admin",
90 "netconf-node-topology:password": "admin",
91 "netconf-node-topology:host": "127.0.0.1",
92 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
93 "netconf-node-topology:tcp-only": "false",
94 "netconf-node-topology:pass-through": {}}]}
95 headers = {'content-type': 'application/json'}
96 response = requests.request(
97 "PUT", url, data=json.dumps(data), headers=headers,
98 auth=('admin', 'admin'))
99 self.assertEqual(response.status_code, requests.codes.created)
102 def test_04_connect_rdmC(self):
103 url = ("{}/config/network-topology:"
104 "network-topology/topology/topology-netconf/node/ROADM-C1"
105 .format(self.restconf_baseurl))
107 "node-id": "ROADM-C1",
108 "netconf-node-topology:username": "admin",
109 "netconf-node-topology:password": "admin",
110 "netconf-node-topology:host": "127.0.0.1",
111 "netconf-node-topology:port": test_utils.sims['roadmc']['port'],
112 "netconf-node-topology:tcp-only": "false",
113 "netconf-node-topology:pass-through": {}}]}
114 headers = {'content-type': 'application/json'}
115 response = requests.request(
116 "PUT", url, data=json.dumps(data), headers=headers,
117 auth=('admin', 'admin'))
118 self.assertEqual(response.status_code, requests.codes.created)
121 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
122 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
124 "networkutils:input": {
125 "networkutils:links-input": {
126 "networkutils:xpdr-node": "XPDR-A1",
127 "networkutils:xpdr-num": "1",
128 "networkutils:network-num": "1",
129 "networkutils:rdm-node": "ROADM-A1",
130 "networkutils:srg-num": "1",
131 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
135 headers = {'content-type': 'application/json'}
136 response = requests.request(
137 "POST", url, data=json.dumps(data),
138 headers=headers, auth=('admin', 'admin'))
139 self.assertEqual(response.status_code, requests.codes.ok)
140 res = response.json()
141 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
144 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
145 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
147 "networkutils:input": {
148 "networkutils:links-input": {
149 "networkutils:xpdr-node": "XPDR-A1",
150 "networkutils:xpdr-num": "1",
151 "networkutils:network-num": "1",
152 "networkutils:rdm-node": "ROADM-A1",
153 "networkutils:srg-num": "1",
154 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
158 headers = {'content-type': 'application/json'}
159 response = requests.request(
160 "POST", url, data=json.dumps(data),
161 headers=headers, auth=('admin', 'admin'))
162 self.assertEqual(response.status_code, requests.codes.ok)
163 res = response.json()
164 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
167 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
168 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
170 "networkutils:input": {
171 "networkutils:links-input": {
172 "networkutils:xpdr-node": "XPDR-C1",
173 "networkutils:xpdr-num": "1",
174 "networkutils:network-num": "1",
175 "networkutils:rdm-node": "ROADM-C1",
176 "networkutils:srg-num": "1",
177 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
181 headers = {'content-type': 'application/json'}
182 response = requests.request(
183 "POST", url, data=json.dumps(data),
184 headers=headers, auth=('admin', 'admin'))
185 self.assertEqual(response.status_code, requests.codes.ok)
186 res = response.json()
187 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
190 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
191 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
193 "networkutils:input": {
194 "networkutils:links-input": {
195 "networkutils:xpdr-node": "XPDR-C1",
196 "networkutils:xpdr-num": "1",
197 "networkutils:network-num": "1",
198 "networkutils:rdm-node": "ROADM-C1",
199 "networkutils:srg-num": "1",
200 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
204 headers = {'content-type': 'application/json'}
205 response = requests.request(
206 "POST", url, data=json.dumps(data),
207 headers=headers, auth=('admin', 'admin'))
208 self.assertEqual(response.status_code, requests.codes.ok)
209 res = response.json()
210 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
213 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
214 # Config ROADMA-ROADMC oms-attributes
215 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
216 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
217 "OMS-attributes/span"
218 .format(self.restconf_baseurl))
220 "auto-spanloss": "true",
221 "spanloss-base": 11.4,
222 "spanloss-current": 12,
223 "engineered-spanloss": 12.2,
224 "link-concatenation": [{
227 "SRLG-length": 100000,
229 headers = {'content-type': 'application/json'}
230 response = requests.request(
231 "PUT", url, data=json.dumps(data), headers=headers,
232 auth=('admin', 'admin'))
233 self.assertEqual(response.status_code, requests.codes.created)
235 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
236 # Config ROADMC-ROADMA oms-attributes
237 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
238 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
239 "OMS-attributes/span"
240 .format(self.restconf_baseurl))
242 "auto-spanloss": "true",
243 "spanloss-base": 11.4,
244 "spanloss-current": 12,
245 "engineered-spanloss": 12.2,
246 "link-concatenation": [{
249 "SRLG-length": 100000,
251 headers = {'content-type': 'application/json'}
252 response = requests.request(
253 "PUT", url, data=json.dumps(data), headers=headers,
254 auth=('admin', 'admin'))
255 self.assertEqual(response.status_code, requests.codes.created)
257 # test service-create for Eth service from xpdr to xpdr
258 def test_11_create_eth_service1(self):
259 url = ("{}/operations/org-openroadm-service:service-create"
260 .format(self.restconf_baseurl))
262 "sdnc-request-header": {
263 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
264 "rpc-action": "service-create",
265 "request-system-id": "appname",
266 "notification-url": "http://localhost:8585/NotificationServer/notify"
268 "service-name": "service1",
269 "common-id": "ASATT1234567",
270 "connection-type": "service",
272 "service-rate": "100",
273 "node-id": "XPDR-A1",
274 "service-format": "Ethernet",
275 "clli": "SNJSCAMCJP8",
278 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
279 "port-type": "router",
280 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
281 "port-rack": "000000.00",
285 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
286 "lgx-port-name": "LGX Back.3",
287 "lgx-port-rack": "000000.00",
288 "lgx-port-shelf": "00"
293 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
294 "port-type": "router",
295 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
296 "port-rack": "000000.00",
300 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
301 "lgx-port-name": "LGX Back.4",
302 "lgx-port-rack": "000000.00",
303 "lgx-port-shelf": "00"
309 "service-rate": "100",
310 "node-id": "XPDR-C1",
311 "service-format": "Ethernet",
312 "clli": "SNJSCAMCJT4",
315 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
316 "port-type": "router",
317 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
318 "port-rack": "000000.00",
322 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
323 "lgx-port-name": "LGX Back.29",
324 "lgx-port-rack": "000000.00",
325 "lgx-port-shelf": "00"
330 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
331 "port-type": "router",
332 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
333 "port-rack": "000000.00",
337 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
338 "lgx-port-name": "LGX Back.30",
339 "lgx-port-rack": "000000.00",
340 "lgx-port-shelf": "00"
345 "due-date": "2016-11-28T00:00:01Z",
346 "operator-contact": "pw1234"
349 headers = {'content-type': 'application/json',
350 "Accept": "application/json"}
351 response = requests.request(
352 "POST", url, data=json.dumps(data), headers=headers,
353 auth=('admin', 'admin'))
354 self.assertEqual(response.status_code, requests.codes.ok)
355 res = response.json()
356 self.assertIn('PCE calculation in progress',
357 res['output']['configuration-response-common']['response-message'])
358 time.sleep(self.WAITING)
360 def test_12_get_eth_service1(self):
361 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
362 .format(self.restconf_baseurl))
363 headers = {'content-type': 'application/json',
364 "Accept": "application/json"}
365 response = requests.request(
366 "GET", url, headers=headers, auth=('admin', 'admin'))
367 self.assertEqual(response.status_code, requests.codes.ok)
368 res = response.json()
370 res['services'][0]['administrative-state'], 'inService')
372 res['services'][0]['service-name'], 'service1')
374 res['services'][0]['connection-type'], 'service')
376 res['services'][0]['lifecycle-state'], 'planned')
379 def test_13_check_xc1_ROADMA(self):
380 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
381 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
382 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
383 .format(self.restconf_baseurl))
384 headers = {'content-type': 'application/json'}
385 response = requests.request(
386 "GET", url, headers=headers, auth=('admin', 'admin'))
387 self.assertEqual(response.status_code, requests.codes.ok)
388 res = response.json()
389 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
390 self.assertDictEqual(
392 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
393 'opticalControlMode': 'gainLoss',
394 'target-output-power': -3.0
395 }, **res['roadm-connections'][0]),
396 res['roadm-connections'][0]
398 self.assertDictEqual(
399 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
400 res['roadm-connections'][0]['source'])
401 self.assertDictEqual(
402 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
403 res['roadm-connections'][0]['destination'])
406 def test_14_check_xc1_ROADMC(self):
407 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
408 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
409 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
410 .format(self.restconf_baseurl))
411 headers = {'content-type': 'application/json'}
412 response = requests.request(
413 "GET", url, headers=headers, auth=('admin', 'admin'))
414 self.assertEqual(response.status_code, requests.codes.ok)
415 res = response.json()
416 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
417 self.assertDictEqual(
419 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
420 'opticalControlMode': 'gainLoss',
421 'target-output-power': -3.0
422 }, **res['roadm-connections'][0]),
423 res['roadm-connections'][0]
425 self.assertDictEqual(
426 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
427 res['roadm-connections'][0]['source'])
428 self.assertDictEqual(
429 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
430 res['roadm-connections'][0]['destination'])
433 def test_15_check_topo_XPDRA(self):
434 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
435 .format(self.restconf_baseurl))
436 response = requests.request(
437 "GET", url1, auth=('admin', 'admin'))
438 self.assertEqual(response.status_code, requests.codes.ok)
439 res = response.json()
440 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
442 if ele['tp-id'] == 'XPDR1-NETWORK1':
443 self.assertEqual({u'frequency': 196.1,
445 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
446 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
447 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
448 if ele['tp-id'] == 'XPDR1-NETWORK2':
449 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
452 def test_16_check_topo_ROADMA_SRG1(self):
453 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
454 .format(self.restconf_baseurl))
455 response = requests.request(
456 "GET", url1, auth=('admin', 'admin'))
457 self.assertEqual(response.status_code, requests.codes.ok)
458 res = response.json()
459 self.assertNotIn({u'index': 1},
460 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
461 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
463 if ele['tp-id'] == 'SRG1-PP1-TXRX':
464 self.assertIn({u'index': 1, u'frequency': 196.1,
466 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
467 if ele['tp-id'] == 'SRG1-PP2-TXRX':
468 self.assertNotIn('used-wavelength', dict.keys(ele))
471 def test_17_check_topo_ROADMA_DEG1(self):
472 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
473 .format(self.restconf_baseurl))
474 response = requests.request(
475 "GET", url1, auth=('admin', 'admin'))
476 self.assertEqual(response.status_code, requests.codes.ok)
477 res = response.json()
478 self.assertNotIn({u'index': 1},
479 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
480 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
482 if ele['tp-id'] == 'DEG2-CTP-TXRX':
483 self.assertIn({u'index': 1, u'frequency': 196.1,
485 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
486 if ele['tp-id'] == 'DEG2-TTP-TXRX':
487 self.assertIn({u'index': 1, u'frequency': 196.1,
489 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
492 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
493 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
495 "networkutils:input": {
496 "networkutils:links-input": {
497 "networkutils:xpdr-node": "XPDR-A1",
498 "networkutils:xpdr-num": "1",
499 "networkutils:network-num": "2",
500 "networkutils:rdm-node": "ROADM-A1",
501 "networkutils:srg-num": "1",
502 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
506 headers = {'content-type': 'application/json'}
507 response = requests.request(
508 "POST", url, data=json.dumps(data),
509 headers=headers, auth=('admin', 'admin'))
510 self.assertEqual(response.status_code, requests.codes.ok)
511 res = response.json()
512 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
515 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
516 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
518 "networkutils:input": {
519 "networkutils:links-input": {
520 "networkutils:xpdr-node": "XPDR-A1",
521 "networkutils:xpdr-num": "1",
522 "networkutils:network-num": "2",
523 "networkutils:rdm-node": "ROADM-A1",
524 "networkutils:srg-num": "1",
525 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
529 headers = {'content-type': 'application/json'}
530 response = requests.request(
531 "POST", url, data=json.dumps(data),
532 headers=headers, auth=('admin', 'admin'))
533 self.assertEqual(response.status_code, requests.codes.ok)
534 res = response.json()
535 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
538 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
539 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(self.restconf_baseurl)
541 "networkutils:input": {
542 "networkutils:links-input": {
543 "networkutils:xpdr-node": "XPDR-C1",
544 "networkutils:xpdr-num": "1",
545 "networkutils:network-num": "2",
546 "networkutils:rdm-node": "ROADM-C1",
547 "networkutils:srg-num": "1",
548 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
552 headers = {'content-type': 'application/json'}
553 response = requests.request(
554 "POST", url, data=json.dumps(data),
555 headers=headers, auth=('admin', 'admin'))
556 self.assertEqual(response.status_code, requests.codes.ok)
557 res = response.json()
558 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
561 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
562 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(self.restconf_baseurl)
564 "networkutils:input": {
565 "networkutils:links-input": {
566 "networkutils:xpdr-node": "XPDR-C1",
567 "networkutils:xpdr-num": "1",
568 "networkutils:network-num": "2",
569 "networkutils:rdm-node": "ROADM-C1",
570 "networkutils:srg-num": "1",
571 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
575 headers = {'content-type': 'application/json'}
576 response = requests.request(
577 "POST", url, data=json.dumps(data),
578 headers=headers, auth=('admin', 'admin'))
579 self.assertEqual(response.status_code, requests.codes.ok)
580 res = response.json()
581 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
584 def test_22_create_eth_service2(self):
585 url = ("{}/operations/org-openroadm-service:service-create"
586 .format(self.restconf_baseurl))
588 "sdnc-request-header": {
589 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
590 "rpc-action": "service-create",
591 "request-system-id": "appname",
592 "notification-url": "http://localhost:8585/NotificationServer/notify"
594 "service-name": "service2",
595 "common-id": "ASATT1234567",
596 "connection-type": "service",
598 "service-rate": "100",
599 "node-id": "XPDR-A1",
600 "service-format": "Ethernet",
601 "clli": "SNJSCAMCJP8",
604 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
605 "port-type": "router",
606 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
607 "port-rack": "000000.00",
611 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
612 "lgx-port-name": "LGX Back.3",
613 "lgx-port-rack": "000000.00",
614 "lgx-port-shelf": "00"
619 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
620 "port-type": "router",
621 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
622 "port-rack": "000000.00",
626 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
627 "lgx-port-name": "LGX Back.4",
628 "lgx-port-rack": "000000.00",
629 "lgx-port-shelf": "00"
635 "service-rate": "100",
636 "node-id": "XPDR-C1",
637 "service-format": "Ethernet",
638 "clli": "SNJSCAMCJT4",
641 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
642 "port-type": "router",
643 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
644 "port-rack": "000000.00",
648 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
649 "lgx-port-name": "LGX Back.29",
650 "lgx-port-rack": "000000.00",
651 "lgx-port-shelf": "00"
656 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
657 "port-type": "router",
658 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
659 "port-rack": "000000.00",
663 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
664 "lgx-port-name": "LGX Back.30",
665 "lgx-port-rack": "000000.00",
666 "lgx-port-shelf": "00"
671 "due-date": "2016-11-28T00:00:01Z",
672 "operator-contact": "pw1234"
675 headers = {'content-type': 'application/json',
676 "Accept": "application/json"}
677 response = requests.request(
678 "POST", url, data=json.dumps(data), headers=headers,
679 auth=('admin', 'admin'))
680 self.assertEqual(response.status_code, requests.codes.ok)
681 res = response.json()
682 self.assertIn('PCE calculation in progress',
683 res['output']['configuration-response-common']['response-message'])
684 time.sleep(self.WAITING)
686 def test_23_get_eth_service2(self):
687 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
688 .format(self.restconf_baseurl))
689 headers = {'content-type': 'application/json',
690 "Accept": "application/json"}
691 response = requests.request(
692 "GET", url, headers=headers, auth=('admin', 'admin'))
693 self.assertEqual(response.status_code, requests.codes.ok)
694 res = response.json()
696 res['services'][0]['administrative-state'],
699 res['services'][0]['service-name'], 'service2')
701 res['services'][0]['connection-type'], 'service')
703 res['services'][0]['lifecycle-state'], 'planned')
706 def test_24_check_xc2_ROADMA(self):
707 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
708 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
709 "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2"
710 .format(self.restconf_baseurl))
711 headers = {'content-type': 'application/json'}
712 response = requests.request(
713 "GET", url, headers=headers, auth=('admin', 'admin'))
714 self.assertEqual(response.status_code, requests.codes.ok)
715 res = response.json()
716 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
717 self.assertDictEqual(
719 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
720 'opticalControlMode': 'power'
721 }, **res['roadm-connections'][0]),
722 res['roadm-connections'][0]
724 self.assertDictEqual(
725 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
726 res['roadm-connections'][0]['source'])
727 self.assertDictEqual(
728 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
729 res['roadm-connections'][0]['destination'])
731 def test_25_check_topo_XPDRA(self):
732 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
733 .format(self.restconf_baseurl))
734 response = requests.request(
735 "GET", url1, auth=('admin', 'admin'))
736 self.assertEqual(response.status_code, requests.codes.ok)
737 res = response.json()
738 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
740 if ele['tp-id'] == 'XPDR1-NETWORK1':
741 self.assertEqual({u'frequency': 196.1,
743 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
744 if ele['tp-id'] == 'XPDR1-NETWORK2':
745 self.assertEqual({u'frequency': 196.05,
747 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
748 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
749 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
752 def test_26_check_topo_ROADMA_SRG1(self):
753 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
754 .format(self.restconf_baseurl))
755 response = requests.request(
756 "GET", url1, auth=('admin', 'admin'))
757 self.assertEqual(response.status_code, requests.codes.ok)
758 res = response.json()
759 self.assertNotIn({u'index': 1}, res['node'][0]
760 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
761 self.assertNotIn({u'index': 2}, res['node'][0]
762 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
763 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
765 if ele['tp-id'] == 'SRG1-PP1-TXRX':
766 self.assertIn({u'index': 1, u'frequency': 196.1,
768 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
769 self.assertNotIn({u'index': 2, u'frequency': 196.05,
771 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
772 if ele['tp-id'] == 'SRG1-PP2-TXRX':
773 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
774 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
775 self.assertNotIn({u'index': 1, u'frequency': 196.1,
777 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
778 if ele['tp-id'] == 'SRG1-PP3-TXRX':
779 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
782 def test_27_check_topo_ROADMA_DEG2(self):
783 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
784 .format(self.restconf_baseurl))
785 response = requests.request(
786 "GET", url1, auth=('admin', 'admin'))
787 self.assertEqual(response.status_code, requests.codes.ok)
788 res = response.json()
789 self.assertNotIn({u'index': 1}, res['node'][0]
790 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
791 self.assertNotIn({u'index': 2}, res['node'][0]
792 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
793 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
795 if ele['tp-id'] == 'DEG2-CTP-TXRX':
796 self.assertIn({u'index': 1, u'frequency': 196.1,
798 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
799 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
800 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
801 if ele['tp-id'] == 'DEG2-TTP-TXRX':
802 self.assertIn({u'index': 1, u'frequency': 196.1,
804 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
805 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
806 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
809 # creation service test on a non-available resource
810 def test_28_create_eth_service3(self):
811 url = ("{}/operations/org-openroadm-service:service-create"
812 .format(self.restconf_baseurl))
814 "sdnc-request-header": {
815 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
816 "rpc-action": "service-create",
817 "request-system-id": "appname",
818 "notification-url": "http://localhost:8585/NotificationServer/notify"
820 "service-name": "service3",
821 "common-id": "ASATT1234567",
822 "connection-type": "service",
824 "service-rate": "100",
825 "node-id": "XPDR-A1",
826 "service-format": "Ethernet",
827 "clli": "SNJSCAMCJP8",
830 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
831 "port-type": "router",
832 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
833 "port-rack": "000000.00",
837 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
838 "lgx-port-name": "LGX Back.3",
839 "lgx-port-rack": "000000.00",
840 "lgx-port-shelf": "00"
845 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
846 "port-type": "router",
847 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
848 "port-rack": "000000.00",
852 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
853 "lgx-port-name": "LGX Back.4",
854 "lgx-port-rack": "000000.00",
855 "lgx-port-shelf": "00"
861 "service-rate": "100",
862 "node-id": "XPDR-C1",
863 "service-format": "Ethernet",
864 "clli": "SNJSCAMCJT4",
867 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
868 "port-type": "router",
869 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
870 "port-rack": "000000.00",
874 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
875 "lgx-port-name": "LGX Back.29",
876 "lgx-port-rack": "000000.00",
877 "lgx-port-shelf": "00"
882 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
883 "port-type": "router",
884 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
885 "port-rack": "000000.00",
889 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
890 "lgx-port-name": "LGX Back.30",
891 "lgx-port-rack": "000000.00",
892 "lgx-port-shelf": "00"
897 "due-date": "2016-11-28T00:00:01Z",
898 "operator-contact": "pw1234"
901 headers = {'content-type': 'application/json',
902 "Accept": "application/json"}
903 response = requests.request(
904 "POST", url, data=json.dumps(data), headers=headers,
905 auth=('admin', 'admin'))
906 self.assertEqual(response.status_code, requests.codes.ok)
907 res = response.json()
908 self.assertIn('PCE calculation in progress',
909 res['output']['configuration-response-common']['response-message'])
910 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
911 time.sleep(self.WAITING)
913 # add a test that check the openroadm-service-list still only contains 2 elements
914 def test_29_delete_eth_service3(self):
915 url = ("{}/operations/org-openroadm-service:service-delete"
916 .format(self.restconf_baseurl))
918 "sdnc-request-header": {
919 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
920 "rpc-action": "service-delete",
921 "request-system-id": "appname",
922 "notification-url": "http://localhost:8585/NotificationServer/notify"
924 "service-delete-req-info": {
925 "service-name": "service3",
926 "tail-retention": "no"
930 headers = {'content-type': 'application/json'}
931 response = requests.request(
932 "POST", url, data=json.dumps(data), headers=headers,
933 auth=('admin', 'admin'))
934 self.assertEqual(response.status_code, requests.codes.ok)
935 res = response.json()
936 self.assertIn('Service \'service3\' does not exist in datastore',
937 res['output']['configuration-response-common']['response-message'])
938 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
941 def test_30_delete_eth_service1(self):
942 url = ("{}/operations/org-openroadm-service:service-delete"
943 .format(self.restconf_baseurl))
945 "sdnc-request-header": {
946 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
947 "rpc-action": "service-delete",
948 "request-system-id": "appname",
949 "notification-url": "http://localhost:8585/NotificationServer/notify"
951 "service-delete-req-info": {
952 "service-name": "service1",
953 "tail-retention": "no"
957 headers = {'content-type': 'application/json'}
958 response = requests.request(
959 "POST", url, data=json.dumps(data), headers=headers,
960 auth=('admin', 'admin'))
961 self.assertEqual(response.status_code, requests.codes.ok)
962 res = response.json()
963 self.assertIn('Renderer service delete in progress',
964 res['output']['configuration-response-common']['response-message'])
967 def test_31_delete_eth_service2(self):
968 url = ("{}/operations/org-openroadm-service:service-delete"
969 .format(self.restconf_baseurl))
971 "sdnc-request-header": {
972 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
973 "rpc-action": "service-delete",
974 "request-system-id": "appname",
975 "notification-url": "http://localhost:8585/NotificationServer/notify"
977 "service-delete-req-info": {
978 "service-name": "service2",
979 "tail-retention": "no"
983 headers = {'content-type': 'application/json'}
984 response = requests.request(
985 "POST", url, data=json.dumps(data), headers=headers,
986 auth=('admin', 'admin'))
987 self.assertEqual(response.status_code, requests.codes.ok)
988 res = response.json()
989 self.assertIn('Renderer service delete in progress',
990 res['output']['configuration-response-common']['response-message'])
993 def test_32_check_no_xc_ROADMA(self):
994 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
995 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
996 .format(self.restconf_baseurl))
997 response = requests.request(
998 "GET", url, auth=('admin', 'admin'))
999 res = response.json()
1000 self.assertEqual(response.status_code, requests.codes.ok)
1001 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
1004 def test_33_check_topo_XPDRA(self):
1005 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
1006 .format(self.restconf_baseurl))
1007 response = requests.request(
1008 "GET", url1, auth=('admin', 'admin'))
1009 self.assertEqual(response.status_code, requests.codes.ok)
1010 res = response.json()
1011 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1012 for ele in liste_tp:
1013 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
1014 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
1015 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
1016 self.assertIn(u'tail-equipment-id',
1017 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
1018 self.assertNotIn('wavelength', dict.keys(
1019 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
1022 def test_34_check_topo_ROADMA_SRG1(self):
1023 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
1024 .format(self.restconf_baseurl))
1025 response = requests.request(
1026 "GET", url1, auth=('admin', 'admin'))
1027 self.assertEqual(response.status_code, requests.codes.ok)
1028 res = response.json()
1029 self.assertIn({u'index': 1}, res['node'][0]
1030 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1031 self.assertIn({u'index': 2}, res['node'][0]
1032 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
1033 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1034 for ele in liste_tp:
1035 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
1036 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1038 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
1041 def test_35_check_topo_ROADMA_DEG2(self):
1042 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
1043 .format(self.restconf_baseurl))
1044 response = requests.request(
1045 "GET", url1, auth=('admin', 'admin'))
1046 self.assertEqual(response.status_code, requests.codes.ok)
1047 res = response.json()
1048 self.assertIn({u'index': 1}, res['node'][0]
1049 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1050 self.assertIn({u'index': 2}, res['node'][0]
1051 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
1052 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
1053 for ele in liste_tp:
1054 if ele['tp-id'] == 'DEG2-CTP-TXRX':
1055 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
1056 if ele['tp-id'] == 'DEG2-TTP-TXRX':
1057 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
1060 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
1061 def test_36_create_oc_service1(self):
1062 url = ("{}/operations/org-openroadm-service:service-create"
1063 .format(self.restconf_baseurl))
1065 "sdnc-request-header": {
1066 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1067 "rpc-action": "service-create",
1068 "request-system-id": "appname",
1069 "notification-url": "http://localhost:8585/NotificationServer/notify"
1071 "service-name": "service1",
1072 "common-id": "ASATT1234567",
1073 "connection-type": "roadm-line",
1075 "service-rate": "100",
1076 "node-id": "ROADM-A1",
1077 "service-format": "OC",
1078 "clli": "SNJSCAMCJP8",
1081 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1082 "port-type": "router",
1083 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1084 "port-rack": "000000.00",
1088 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1089 "lgx-port-name": "LGX Back.3",
1090 "lgx-port-rack": "000000.00",
1091 "lgx-port-shelf": "00"
1096 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1097 "port-type": "router",
1098 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1099 "port-rack": "000000.00",
1103 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1104 "lgx-port-name": "LGX Back.4",
1105 "lgx-port-rack": "000000.00",
1106 "lgx-port-shelf": "00"
1109 "optic-type": "gray"
1112 "service-rate": "100",
1113 "node-id": "ROADM-C1",
1114 "service-format": "OC",
1115 "clli": "SNJSCAMCJT4",
1118 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1119 "port-type": "router",
1120 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1121 "port-rack": "000000.00",
1125 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1126 "lgx-port-name": "LGX Back.29",
1127 "lgx-port-rack": "000000.00",
1128 "lgx-port-shelf": "00"
1133 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1134 "port-type": "router",
1135 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1136 "port-rack": "000000.00",
1140 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1141 "lgx-port-name": "LGX Back.30",
1142 "lgx-port-rack": "000000.00",
1143 "lgx-port-shelf": "00"
1146 "optic-type": "gray"
1148 "due-date": "2016-11-28T00:00:01Z",
1149 "operator-contact": "pw1234"
1152 headers = {'content-type': 'application/json',
1153 "Accept": "application/json"}
1154 response = requests.request(
1155 "POST", url, data=json.dumps(data), headers=headers,
1156 auth=('admin', 'admin'))
1157 self.assertEqual(response.status_code, requests.codes.ok)
1158 res = response.json()
1159 self.assertIn('PCE calculation in progress',
1160 res['output']['configuration-response-common']['response-message'])
1161 time.sleep(self.WAITING)
1163 def test_37_get_oc_service1(self):
1164 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1165 .format(self.restconf_baseurl))
1166 headers = {'content-type': 'application/json',
1167 "Accept": "application/json"}
1168 response = requests.request(
1169 "GET", url, headers=headers, auth=('admin', 'admin'))
1170 self.assertEqual(response.status_code, requests.codes.ok)
1171 res = response.json()
1173 res['services'][0]['administrative-state'],
1176 res['services'][0]['service-name'], 'service1')
1178 res['services'][0]['connection-type'], 'roadm-line')
1180 res['services'][0]['lifecycle-state'], 'planned')
1183 def test_38_check_xc1_ROADMA(self):
1184 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1185 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1186 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1187 .format(self.restconf_baseurl))
1188 headers = {'content-type': 'application/json'}
1189 response = requests.request(
1190 "GET", url, headers=headers, auth=('admin', 'admin'))
1191 self.assertEqual(response.status_code, requests.codes.ok)
1192 res = response.json()
1193 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1194 self.assertDictEqual(
1196 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1197 'opticalControlMode': 'gainLoss',
1198 'target-output-power': -3.0
1199 }, **res['roadm-connections'][0]),
1200 res['roadm-connections'][0]
1202 self.assertDictEqual(
1203 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1204 res['roadm-connections'][0]['source'])
1205 self.assertDictEqual(
1206 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
1207 res['roadm-connections'][0]['destination'])
1210 def test_39_check_xc1_ROADMC(self):
1211 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1212 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1213 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1214 .format(self.restconf_baseurl))
1215 headers = {'content-type': 'application/json'}
1216 response = requests.request(
1217 "GET", url, headers=headers, auth=('admin', 'admin'))
1218 self.assertEqual(response.status_code, requests.codes.ok)
1219 res = response.json()
1220 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1221 self.assertDictEqual(
1223 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1224 'opticalControlMode': 'gainLoss',
1225 'target-output-power': -3.0
1226 }, **res['roadm-connections'][0]),
1227 res['roadm-connections'][0]
1229 self.assertDictEqual(
1230 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1231 res['roadm-connections'][0]['source'])
1232 self.assertDictEqual(
1233 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
1234 res['roadm-connections'][0]['destination'])
1237 def test_40_create_oc_service2(self):
1238 url = ("{}/operations/org-openroadm-service:service-create"
1239 .format(self.restconf_baseurl))
1241 "sdnc-request-header": {
1242 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1243 "rpc-action": "service-create",
1244 "request-system-id": "appname",
1245 "notification-url": "http://localhost:8585/NotificationServer/notify"
1247 "service-name": "service2",
1248 "common-id": "ASATT1234567",
1249 "connection-type": "roadm-line",
1251 "service-rate": "100",
1252 "node-id": "ROADM-A1",
1253 "service-format": "OC",
1254 "clli": "SNJSCAMCJP8",
1257 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1258 "port-type": "router",
1259 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1260 "port-rack": "000000.00",
1264 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1265 "lgx-port-name": "LGX Back.3",
1266 "lgx-port-rack": "000000.00",
1267 "lgx-port-shelf": "00"
1272 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1273 "port-type": "router",
1274 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1275 "port-rack": "000000.00",
1279 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1280 "lgx-port-name": "LGX Back.4",
1281 "lgx-port-rack": "000000.00",
1282 "lgx-port-shelf": "00"
1285 "optic-type": "gray"
1288 "service-rate": "100",
1289 "node-id": "ROADM-C1",
1290 "service-format": "OC",
1291 "clli": "SNJSCAMCJT4",
1294 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1295 "port-type": "router",
1296 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1297 "port-rack": "000000.00",
1301 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1302 "lgx-port-name": "LGX Back.29",
1303 "lgx-port-rack": "000000.00",
1304 "lgx-port-shelf": "00"
1309 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1310 "port-type": "router",
1311 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1312 "port-rack": "000000.00",
1316 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1317 "lgx-port-name": "LGX Back.30",
1318 "lgx-port-rack": "000000.00",
1319 "lgx-port-shelf": "00"
1322 "optic-type": "gray"
1324 "due-date": "2016-11-28T00:00:01Z",
1325 "operator-contact": "pw1234"
1328 headers = {'content-type': 'application/json',
1329 "Accept": "application/json"}
1330 response = requests.request(
1331 "POST", url, data=json.dumps(data), headers=headers,
1332 auth=('admin', 'admin'))
1333 self.assertEqual(response.status_code, requests.codes.ok)
1334 res = response.json()
1335 self.assertIn('PCE calculation in progress',
1336 res['output']['configuration-response-common']['response-message'])
1337 time.sleep(self.WAITING)
1339 def test_41_get_oc_service2(self):
1340 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
1341 .format(self.restconf_baseurl))
1342 headers = {'content-type': 'application/json',
1343 "Accept": "application/json"}
1344 response = requests.request(
1345 "GET", url, headers=headers, auth=('admin', 'admin'))
1346 self.assertEqual(response.status_code, requests.codes.ok)
1347 res = response.json()
1349 res['services'][0]['administrative-state'],
1352 res['services'][0]['service-name'], 'service2')
1354 res['services'][0]['connection-type'], 'roadm-line')
1356 res['services'][0]['lifecycle-state'], 'planned')
1359 def test_42_check_xc2_ROADMA(self):
1360 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1361 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1362 "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2"
1363 .format(self.restconf_baseurl))
1364 headers = {'content-type': 'application/json'}
1365 response = requests.request(
1366 "GET", url, headers=headers, auth=('admin', 'admin'))
1367 self.assertEqual(response.status_code, requests.codes.ok)
1368 res = response.json()
1369 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1370 self.assertDictEqual(
1372 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
1373 'opticalControlMode': 'gainLoss',
1374 'target-output-power': -3.0
1375 }, **res['roadm-connections'][0]),
1376 res['roadm-connections'][0]
1378 self.assertDictEqual(
1379 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
1380 res['roadm-connections'][0]['source'])
1381 self.assertDictEqual(
1382 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
1383 res['roadm-connections'][0]['destination'])
1386 def test_43_check_topo_ROADMA(self):
1387 self.test_26_check_topo_ROADMA_SRG1()
1388 self.test_27_check_topo_ROADMA_DEG2()
1391 def test_44_delete_oc_service1(self):
1392 url = ("{}/operations/org-openroadm-service:service-delete"
1393 .format(self.restconf_baseurl))
1395 "sdnc-request-header": {
1396 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1397 "rpc-action": "service-delete",
1398 "request-system-id": "appname",
1399 "notification-url": "http://localhost:8585/NotificationServer/notify"
1401 "service-delete-req-info": {
1402 "service-name": "service1",
1403 "tail-retention": "no"
1407 headers = {'content-type': 'application/json'}
1408 response = requests.request(
1409 "POST", url, data=json.dumps(data), headers=headers,
1410 auth=('admin', 'admin'))
1411 self.assertEqual(response.status_code, requests.codes.ok)
1412 res = response.json()
1413 self.assertIn('Renderer service delete in progress',
1414 res['output']['configuration-response-common']['response-message'])
1417 def test_45_delete_oc_service2(self):
1418 url = ("{}/operations/org-openroadm-service:service-delete"
1419 .format(self.restconf_baseurl))
1421 "sdnc-request-header": {
1422 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1423 "rpc-action": "service-delete",
1424 "request-system-id": "appname",
1425 "notification-url": "http://localhost:8585/NotificationServer/notify"
1427 "service-delete-req-info": {
1428 "service-name": "service2",
1429 "tail-retention": "no"
1433 headers = {'content-type': 'application/json'}
1434 response = requests.request(
1435 "POST", url, data=json.dumps(data), headers=headers,
1436 auth=('admin', 'admin'))
1437 self.assertEqual(response.status_code, requests.codes.ok)
1438 res = response.json()
1439 self.assertIn('Renderer service delete in progress',
1440 res['output']['configuration-response-common']['response-message'])
1443 def test_46_get_no_oc_services(self):
1445 url = ("{}/operational/org-openroadm-service:service-list"
1446 .format(self.restconf_baseurl))
1447 headers = {'content-type': 'application/json',
1448 "Accept": "application/json"}
1449 response = requests.request(
1450 "GET", url, headers=headers, auth=('admin', 'admin'))
1451 self.assertEqual(response.status_code, requests.codes.not_found)
1452 res = response.json()
1454 {"error-type": "application", "error-tag": "data-missing",
1455 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1456 res['errors']['error'])
1459 def test_47_get_no_xc_ROADMA(self):
1460 url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
1461 "/node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1462 .format(self.restconf_baseurl))
1463 headers = {'content-type': 'application/json',
1464 "Accept": "application/json"}
1465 response = requests.request(
1466 "GET", url, headers=headers, auth=('admin', 'admin'))
1467 self.assertEqual(response.status_code, requests.codes.ok)
1468 res = response.json()
1469 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1472 def test_48_check_topo_ROADMA(self):
1473 self.test_34_check_topo_ROADMA_SRG1()
1474 self.test_35_check_topo_ROADMA_DEG2()
1476 def test_49_loop_create_eth_service(self):
1477 for i in range(1, 6):
1478 print("trial number {}".format(i))
1479 print("eth service creation")
1480 self.test_11_create_eth_service1()
1481 print("check xc in ROADM-A1")
1482 self.test_13_check_xc1_ROADMA()
1483 print("check xc in ROADM-C1")
1484 self.test_14_check_xc1_ROADMC()
1485 print("eth service deletion\n")
1486 self.test_30_delete_eth_service1()
1488 def test_50_loop_create_oc_service(self):
1489 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1490 .format(self.restconf_baseurl))
1491 response = requests.request("GET", url, auth=('admin', 'admin'))
1492 if response.status_code != 404:
1493 url = ("{}/operations/org-openroadm-service:service-delete"
1494 .format(self.restconf_baseurl))
1496 "sdnc-request-header": {
1497 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1498 "rpc-action": "service-delete",
1499 "request-system-id": "appname",
1500 "notification-url": "http://localhost:8585/NotificationServer/notify"
1502 "service-delete-req-info": {
1503 "service-name": "service1",
1504 "tail-retention": "no"
1508 headers = {'content-type': 'application/json'}
1509 requests.request("POST", url, data=json.dumps(data), headers=headers, auth=('admin', 'admin'))
1512 for i in range(1, 6):
1513 print("trial number {}".format(i))
1514 print("oc service creation")
1515 self.test_36_create_oc_service1()
1516 print("check xc in ROADM-A1")
1517 self.test_38_check_xc1_ROADMA()
1518 print("check xc in ROADM-C1")
1519 self.test_39_check_xc1_ROADMC()
1520 print("oc service deletion\n")
1521 self.test_44_delete_oc_service1()
1523 def test_51_disconnect_XPDRA(self):
1524 url = ("{}/config/network-topology:"
1525 "network-topology/topology/topology-netconf/node/XPDR-A1"
1526 .format(self.restconf_baseurl))
1527 headers = {'content-type': 'application/json'}
1528 response = requests.request(
1529 "DELETE", url, headers=headers,
1530 auth=('admin', 'admin'))
1531 self.assertEqual(response.status_code, requests.codes.ok)
1534 def test_52_disconnect_XPDRC(self):
1535 url = ("{}/config/network-topology:"
1536 "network-topology/topology/topology-netconf/node/XPDR-C1"
1537 .format(self.restconf_baseurl))
1538 headers = {'content-type': 'application/json'}
1539 response = requests.request(
1540 "DELETE", url, headers=headers,
1541 auth=('admin', 'admin'))
1542 self.assertEqual(response.status_code, requests.codes.ok)
1545 def test_53_disconnect_ROADMA(self):
1546 url = ("{}/config/network-topology:"
1547 "network-topology/topology/topology-netconf/node/ROADM-A1"
1548 .format(self.restconf_baseurl))
1549 headers = {'content-type': 'application/json'}
1550 response = requests.request(
1551 "DELETE", url, headers=headers,
1552 auth=('admin', 'admin'))
1553 self.assertEqual(response.status_code, requests.codes.ok)
1556 def test_54_disconnect_ROADMC(self):
1557 url = ("{}/config/network-topology:"
1558 "network-topology/topology/topology-netconf/node/ROADM-C1"
1559 .format(self.restconf_baseurl))
1560 headers = {'content-type': 'application/json'}
1561 response = requests.request(
1562 "DELETE", url, headers=headers,
1563 auth=('admin', 'admin'))
1564 self.assertEqual(response.status_code, requests.codes.ok)
1568 if __name__ == "__main__":
1569 unittest.main(verbosity=2)