2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
21 from common import test_utils
24 class TransportPCEFulltesting(unittest.TestCase):
27 WAITING = 20 # nominal value is 300
31 cls.processes = test_utils.start_tpce()
32 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
35 def tearDownClass(cls):
36 for process in cls.processes:
37 test_utils.shutdown_process(process)
38 print("all processes killed")
40 def setUp(self): # instruction executed before each test method
41 print("execution of {}".format(self.id().split(".")[-1]))
43 def test_01_connect_xpdrA(self):
44 response = test_utils.mount_device("XPDR-A1", 'xpdra')
45 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
47 def test_02_connect_xpdrC(self):
48 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
49 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
51 def test_03_connect_rdmA(self):
52 response = test_utils.mount_device("ROADM-A1", 'roadma')
53 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
55 def test_04_connect_rdmC(self):
56 response = test_utils.mount_device("ROADM-C1", 'roadmc')
57 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
59 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
60 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
62 "networkutils:input": {
63 "networkutils:links-input": {
64 "networkutils:xpdr-node": "XPDR-A1",
65 "networkutils:xpdr-num": "1",
66 "networkutils:network-num": "1",
67 "networkutils:rdm-node": "ROADM-A1",
68 "networkutils:srg-num": "1",
69 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
73 response = requests.request(
74 "POST", url, data=json.dumps(data),
75 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
76 self.assertEqual(response.status_code, requests.codes.ok)
78 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
81 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
82 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
84 "networkutils:input": {
85 "networkutils:links-input": {
86 "networkutils:xpdr-node": "XPDR-A1",
87 "networkutils:xpdr-num": "1",
88 "networkutils:network-num": "1",
89 "networkutils:rdm-node": "ROADM-A1",
90 "networkutils:srg-num": "1",
91 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
95 response = requests.request(
96 "POST", url, data=json.dumps(data),
97 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
98 self.assertEqual(response.status_code, requests.codes.ok)
100 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
103 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
104 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
106 "networkutils:input": {
107 "networkutils:links-input": {
108 "networkutils:xpdr-node": "XPDR-C1",
109 "networkutils:xpdr-num": "1",
110 "networkutils:network-num": "1",
111 "networkutils:rdm-node": "ROADM-C1",
112 "networkutils:srg-num": "1",
113 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
117 response = requests.request(
118 "POST", url, data=json.dumps(data),
119 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
120 self.assertEqual(response.status_code, requests.codes.ok)
121 res = response.json()
122 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
125 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
126 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
128 "networkutils:input": {
129 "networkutils:links-input": {
130 "networkutils:xpdr-node": "XPDR-C1",
131 "networkutils:xpdr-num": "1",
132 "networkutils:network-num": "1",
133 "networkutils:rdm-node": "ROADM-C1",
134 "networkutils:srg-num": "1",
135 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
139 response = requests.request(
140 "POST", url, data=json.dumps(data),
141 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
142 self.assertEqual(response.status_code, requests.codes.ok)
143 res = response.json()
144 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
147 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
148 # Config ROADMA-ROADMC oms-attributes
149 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
150 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
151 "OMS-attributes/span"
152 .format(test_utils.RESTCONF_BASE_URL))
154 "auto-spanloss": "true",
155 "spanloss-base": 11.4,
156 "spanloss-current": 12,
157 "engineered-spanloss": 12.2,
158 "link-concatenation": [{
161 "SRLG-length": 100000,
163 response = requests.request(
164 "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
165 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
166 self.assertEqual(response.status_code, requests.codes.created)
168 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
169 # Config ROADMC-ROADMA oms-attributes
170 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
171 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
172 "OMS-attributes/span"
173 .format(test_utils.RESTCONF_BASE_URL))
175 "auto-spanloss": "true",
176 "spanloss-base": 11.4,
177 "spanloss-current": 12,
178 "engineered-spanloss": 12.2,
179 "link-concatenation": [{
182 "SRLG-length": 100000,
184 response = requests.request(
185 "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
186 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
187 self.assertEqual(response.status_code, requests.codes.created)
189 # test service-create for Eth service from xpdr to xpdr
190 def test_11_create_eth_service1(self):
191 url = ("{}/operations/org-openroadm-service:service-create"
192 .format(test_utils.RESTCONF_BASE_URL))
194 "sdnc-request-header": {
195 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
196 "rpc-action": "service-create",
197 "request-system-id": "appname",
198 "notification-url": "http://localhost:8585/NotificationServer/notify"
200 "service-name": "service1",
201 "common-id": "ASATT1234567",
202 "connection-type": "service",
204 "service-rate": "100",
205 "node-id": "XPDR-A1",
206 "service-format": "Ethernet",
207 "clli": "SNJSCAMCJP8",
210 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
211 "port-type": "router",
212 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
213 "port-rack": "000000.00",
217 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
218 "lgx-port-name": "LGX Back.3",
219 "lgx-port-rack": "000000.00",
220 "lgx-port-shelf": "00"
225 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
226 "port-type": "router",
227 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
228 "port-rack": "000000.00",
232 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
233 "lgx-port-name": "LGX Back.4",
234 "lgx-port-rack": "000000.00",
235 "lgx-port-shelf": "00"
241 "service-rate": "100",
242 "node-id": "XPDR-C1",
243 "service-format": "Ethernet",
244 "clli": "SNJSCAMCJT4",
247 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
248 "port-type": "router",
249 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
250 "port-rack": "000000.00",
254 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
255 "lgx-port-name": "LGX Back.29",
256 "lgx-port-rack": "000000.00",
257 "lgx-port-shelf": "00"
262 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
263 "port-type": "router",
264 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
265 "port-rack": "000000.00",
269 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
270 "lgx-port-name": "LGX Back.30",
271 "lgx-port-rack": "000000.00",
272 "lgx-port-shelf": "00"
277 "due-date": "2016-11-28T00:00:01Z",
278 "operator-contact": "pw1234"
281 response = requests.request(
282 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
283 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
284 self.assertEqual(response.status_code, requests.codes.ok)
285 res = response.json()
286 self.assertIn('PCE calculation in progress',
287 res['output']['configuration-response-common']['response-message'])
288 time.sleep(self.WAITING)
290 def test_12_get_eth_service1(self):
291 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
292 .format(test_utils.RESTCONF_BASE_URL))
293 response = requests.request(
294 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
295 self.assertEqual(response.status_code, requests.codes.ok)
296 res = response.json()
298 res['services'][0]['administrative-state'], 'inService')
300 res['services'][0]['service-name'], 'service1')
302 res['services'][0]['connection-type'], 'service')
304 res['services'][0]['lifecycle-state'], 'planned')
307 def test_13_check_xc1_ROADMA(self):
308 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
309 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
310 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
311 .format(test_utils.RESTCONF_BASE_URL))
312 response = requests.request(
313 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
314 self.assertEqual(response.status_code, requests.codes.ok)
315 res = response.json()
316 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
317 self.assertDictEqual(
319 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
320 'opticalControlMode': 'gainLoss',
321 'target-output-power': -3.0
322 }, **res['roadm-connections'][0]),
323 res['roadm-connections'][0]
325 self.assertDictEqual(
326 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
327 res['roadm-connections'][0]['source'])
328 self.assertDictEqual(
329 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
330 res['roadm-connections'][0]['destination'])
333 def test_14_check_xc1_ROADMC(self):
334 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
335 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
336 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
337 .format(test_utils.RESTCONF_BASE_URL))
338 response = requests.request(
339 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
340 self.assertEqual(response.status_code, requests.codes.ok)
341 res = response.json()
342 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
343 self.assertDictEqual(
345 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
346 'opticalControlMode': 'gainLoss',
347 'target-output-power': -3.0
348 }, **res['roadm-connections'][0]),
349 res['roadm-connections'][0]
351 self.assertDictEqual(
352 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
353 res['roadm-connections'][0]['source'])
354 self.assertDictEqual(
355 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
356 res['roadm-connections'][0]['destination'])
359 def test_15_check_topo_XPDRA(self):
360 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
361 .format(test_utils.RESTCONF_BASE_URL))
362 response = requests.request(
363 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
364 self.assertEqual(response.status_code, requests.codes.ok)
365 res = response.json()
366 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
368 if ele['tp-id'] == 'XPDR1-NETWORK1':
369 self.assertEqual({u'frequency': 196.1,
371 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
372 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
373 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
374 if ele['tp-id'] == 'XPDR1-NETWORK2':
375 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
378 def test_16_check_topo_ROADMA_SRG1(self):
379 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
380 .format(test_utils.RESTCONF_BASE_URL))
381 response = requests.request(
382 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
383 self.assertEqual(response.status_code, requests.codes.ok)
384 res = response.json()
385 self.assertNotIn({u'index': 1},
386 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
387 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
389 if ele['tp-id'] == 'SRG1-PP1-TXRX':
390 self.assertIn({u'index': 1, u'frequency': 196.1,
392 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
393 if ele['tp-id'] == 'SRG1-PP2-TXRX':
394 self.assertNotIn('used-wavelength', dict.keys(ele))
397 def test_17_check_topo_ROADMA_DEG1(self):
398 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
399 .format(test_utils.RESTCONF_BASE_URL))
400 response = requests.request(
401 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
402 self.assertEqual(response.status_code, requests.codes.ok)
403 res = response.json()
404 self.assertNotIn({u'index': 1},
405 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
406 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
408 if ele['tp-id'] == 'DEG2-CTP-TXRX':
409 self.assertIn({u'index': 1, u'frequency': 196.1,
411 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
412 if ele['tp-id'] == 'DEG2-TTP-TXRX':
413 self.assertIn({u'index': 1, u'frequency': 196.1,
415 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
418 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
419 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
421 "networkutils:input": {
422 "networkutils:links-input": {
423 "networkutils:xpdr-node": "XPDR-A1",
424 "networkutils:xpdr-num": "1",
425 "networkutils:network-num": "2",
426 "networkutils:rdm-node": "ROADM-A1",
427 "networkutils:srg-num": "1",
428 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
432 response = requests.request(
433 "POST", url, data=json.dumps(data),
434 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
435 self.assertEqual(response.status_code, requests.codes.ok)
436 res = response.json()
437 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
440 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
441 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
443 "networkutils:input": {
444 "networkutils:links-input": {
445 "networkutils:xpdr-node": "XPDR-A1",
446 "networkutils:xpdr-num": "1",
447 "networkutils:network-num": "2",
448 "networkutils:rdm-node": "ROADM-A1",
449 "networkutils:srg-num": "1",
450 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
454 response = requests.request(
455 "POST", url, data=json.dumps(data),
456 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
457 self.assertEqual(response.status_code, requests.codes.ok)
458 res = response.json()
459 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
462 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
463 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
465 "networkutils:input": {
466 "networkutils:links-input": {
467 "networkutils:xpdr-node": "XPDR-C1",
468 "networkutils:xpdr-num": "1",
469 "networkutils:network-num": "2",
470 "networkutils:rdm-node": "ROADM-C1",
471 "networkutils:srg-num": "1",
472 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
476 response = requests.request(
477 "POST", url, data=json.dumps(data),
478 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
479 self.assertEqual(response.status_code, requests.codes.ok)
480 res = response.json()
481 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
484 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
485 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
487 "networkutils:input": {
488 "networkutils:links-input": {
489 "networkutils:xpdr-node": "XPDR-C1",
490 "networkutils:xpdr-num": "1",
491 "networkutils:network-num": "2",
492 "networkutils:rdm-node": "ROADM-C1",
493 "networkutils:srg-num": "1",
494 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
498 response = requests.request(
499 "POST", url, data=json.dumps(data),
500 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
501 self.assertEqual(response.status_code, requests.codes.ok)
502 res = response.json()
503 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
506 def test_22_create_eth_service2(self):
507 url = ("{}/operations/org-openroadm-service:service-create"
508 .format(test_utils.RESTCONF_BASE_URL))
510 "sdnc-request-header": {
511 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
512 "rpc-action": "service-create",
513 "request-system-id": "appname",
514 "notification-url": "http://localhost:8585/NotificationServer/notify"
516 "service-name": "service2",
517 "common-id": "ASATT1234567",
518 "connection-type": "service",
520 "service-rate": "100",
521 "node-id": "XPDR-A1",
522 "service-format": "Ethernet",
523 "clli": "SNJSCAMCJP8",
526 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
527 "port-type": "router",
528 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
529 "port-rack": "000000.00",
533 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
534 "lgx-port-name": "LGX Back.3",
535 "lgx-port-rack": "000000.00",
536 "lgx-port-shelf": "00"
541 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
542 "port-type": "router",
543 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
544 "port-rack": "000000.00",
548 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
549 "lgx-port-name": "LGX Back.4",
550 "lgx-port-rack": "000000.00",
551 "lgx-port-shelf": "00"
557 "service-rate": "100",
558 "node-id": "XPDR-C1",
559 "service-format": "Ethernet",
560 "clli": "SNJSCAMCJT4",
563 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
564 "port-type": "router",
565 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
566 "port-rack": "000000.00",
570 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
571 "lgx-port-name": "LGX Back.29",
572 "lgx-port-rack": "000000.00",
573 "lgx-port-shelf": "00"
578 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
579 "port-type": "router",
580 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
581 "port-rack": "000000.00",
585 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
586 "lgx-port-name": "LGX Back.30",
587 "lgx-port-rack": "000000.00",
588 "lgx-port-shelf": "00"
593 "due-date": "2016-11-28T00:00:01Z",
594 "operator-contact": "pw1234"
597 response = requests.request(
598 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
599 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
600 self.assertEqual(response.status_code, requests.codes.ok)
601 res = response.json()
602 self.assertIn('PCE calculation in progress',
603 res['output']['configuration-response-common']['response-message'])
604 time.sleep(self.WAITING)
606 def test_23_get_eth_service2(self):
607 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
608 .format(test_utils.RESTCONF_BASE_URL))
609 response = requests.request(
610 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
611 self.assertEqual(response.status_code, requests.codes.ok)
612 res = response.json()
614 res['services'][0]['administrative-state'],
617 res['services'][0]['service-name'], 'service2')
619 res['services'][0]['connection-type'], 'service')
621 res['services'][0]['lifecycle-state'], 'planned')
624 def test_24_check_xc2_ROADMA(self):
625 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
626 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
627 "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2"
628 .format(test_utils.RESTCONF_BASE_URL))
629 response = requests.request(
630 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
631 self.assertEqual(response.status_code, requests.codes.ok)
632 res = response.json()
633 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
634 self.assertDictEqual(
636 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
637 'opticalControlMode': 'power'
638 }, **res['roadm-connections'][0]),
639 res['roadm-connections'][0]
641 self.assertDictEqual(
642 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
643 res['roadm-connections'][0]['source'])
644 self.assertDictEqual(
645 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
646 res['roadm-connections'][0]['destination'])
648 def test_25_check_topo_XPDRA(self):
649 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
650 .format(test_utils.RESTCONF_BASE_URL))
651 response = requests.request(
652 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
653 self.assertEqual(response.status_code, requests.codes.ok)
654 res = response.json()
655 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
657 if ele['tp-id'] == 'XPDR1-NETWORK1':
658 self.assertEqual({u'frequency': 196.1,
660 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
661 if ele['tp-id'] == 'XPDR1-NETWORK2':
662 self.assertEqual({u'frequency': 196.05,
664 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
665 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
666 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
669 def test_26_check_topo_ROADMA_SRG1(self):
670 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
671 .format(test_utils.RESTCONF_BASE_URL))
672 response = requests.request(
673 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
674 self.assertEqual(response.status_code, requests.codes.ok)
675 res = response.json()
676 self.assertNotIn({u'index': 1}, res['node'][0]
677 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
678 self.assertNotIn({u'index': 2}, res['node'][0]
679 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
680 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
682 if ele['tp-id'] == 'SRG1-PP1-TXRX':
683 self.assertIn({u'index': 1, u'frequency': 196.1,
685 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
686 self.assertNotIn({u'index': 2, u'frequency': 196.05,
688 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
689 if ele['tp-id'] == 'SRG1-PP2-TXRX':
690 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
691 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
692 self.assertNotIn({u'index': 1, u'frequency': 196.1,
694 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
695 if ele['tp-id'] == 'SRG1-PP3-TXRX':
696 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
699 def test_27_check_topo_ROADMA_DEG2(self):
700 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
701 .format(test_utils.RESTCONF_BASE_URL))
702 response = requests.request(
703 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
704 self.assertEqual(response.status_code, requests.codes.ok)
705 res = response.json()
706 self.assertNotIn({u'index': 1}, res['node'][0]
707 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
708 self.assertNotIn({u'index': 2}, res['node'][0]
709 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
710 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
712 if ele['tp-id'] == 'DEG2-CTP-TXRX':
713 self.assertIn({u'index': 1, u'frequency': 196.1,
715 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
716 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
717 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
718 if ele['tp-id'] == 'DEG2-TTP-TXRX':
719 self.assertIn({u'index': 1, u'frequency': 196.1,
721 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
722 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
723 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
726 # creation service test on a non-available resource
727 def test_28_create_eth_service3(self):
728 url = ("{}/operations/org-openroadm-service:service-create"
729 .format(test_utils.RESTCONF_BASE_URL))
731 "sdnc-request-header": {
732 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
733 "rpc-action": "service-create",
734 "request-system-id": "appname",
735 "notification-url": "http://localhost:8585/NotificationServer/notify"
737 "service-name": "service3",
738 "common-id": "ASATT1234567",
739 "connection-type": "service",
741 "service-rate": "100",
742 "node-id": "XPDR-A1",
743 "service-format": "Ethernet",
744 "clli": "SNJSCAMCJP8",
747 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
748 "port-type": "router",
749 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
750 "port-rack": "000000.00",
754 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
755 "lgx-port-name": "LGX Back.3",
756 "lgx-port-rack": "000000.00",
757 "lgx-port-shelf": "00"
762 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
763 "port-type": "router",
764 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
765 "port-rack": "000000.00",
769 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
770 "lgx-port-name": "LGX Back.4",
771 "lgx-port-rack": "000000.00",
772 "lgx-port-shelf": "00"
778 "service-rate": "100",
779 "node-id": "XPDR-C1",
780 "service-format": "Ethernet",
781 "clli": "SNJSCAMCJT4",
784 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
785 "port-type": "router",
786 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
787 "port-rack": "000000.00",
791 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
792 "lgx-port-name": "LGX Back.29",
793 "lgx-port-rack": "000000.00",
794 "lgx-port-shelf": "00"
799 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
800 "port-type": "router",
801 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
802 "port-rack": "000000.00",
806 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
807 "lgx-port-name": "LGX Back.30",
808 "lgx-port-rack": "000000.00",
809 "lgx-port-shelf": "00"
814 "due-date": "2016-11-28T00:00:01Z",
815 "operator-contact": "pw1234"
818 response = requests.request(
819 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
820 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
821 self.assertEqual(response.status_code, requests.codes.ok)
822 res = response.json()
823 self.assertIn('PCE calculation in progress',
824 res['output']['configuration-response-common']['response-message'])
825 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
826 time.sleep(self.WAITING)
828 # add a test that check the openroadm-service-list still only contains 2 elements
829 def test_29_delete_eth_service3(self):
830 url = ("{}/operations/org-openroadm-service:service-delete"
831 .format(test_utils.RESTCONF_BASE_URL))
833 "sdnc-request-header": {
834 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
835 "rpc-action": "service-delete",
836 "request-system-id": "appname",
837 "notification-url": "http://localhost:8585/NotificationServer/notify"
839 "service-delete-req-info": {
840 "service-name": "service3",
841 "tail-retention": "no"
845 response = requests.request(
846 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
847 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
848 self.assertEqual(response.status_code, requests.codes.ok)
849 res = response.json()
850 self.assertIn('Service \'service3\' does not exist in datastore',
851 res['output']['configuration-response-common']['response-message'])
852 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
855 def test_30_delete_eth_service1(self):
856 url = ("{}/operations/org-openroadm-service:service-delete"
857 .format(test_utils.RESTCONF_BASE_URL))
859 "sdnc-request-header": {
860 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
861 "rpc-action": "service-delete",
862 "request-system-id": "appname",
863 "notification-url": "http://localhost:8585/NotificationServer/notify"
865 "service-delete-req-info": {
866 "service-name": "service1",
867 "tail-retention": "no"
871 response = requests.request(
872 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
873 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
874 self.assertEqual(response.status_code, requests.codes.ok)
875 res = response.json()
876 self.assertIn('Renderer service delete in progress',
877 res['output']['configuration-response-common']['response-message'])
880 def test_31_delete_eth_service2(self):
881 url = ("{}/operations/org-openroadm-service:service-delete"
882 .format(test_utils.RESTCONF_BASE_URL))
884 "sdnc-request-header": {
885 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
886 "rpc-action": "service-delete",
887 "request-system-id": "appname",
888 "notification-url": "http://localhost:8585/NotificationServer/notify"
890 "service-delete-req-info": {
891 "service-name": "service2",
892 "tail-retention": "no"
896 response = requests.request(
897 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
898 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
899 self.assertEqual(response.status_code, requests.codes.ok)
900 res = response.json()
901 self.assertIn('Renderer service delete in progress',
902 res['output']['configuration-response-common']['response-message'])
905 def test_32_check_no_xc_ROADMA(self):
906 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
907 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
908 .format(test_utils.RESTCONF_BASE_URL))
909 response = requests.request(
910 "GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
911 res = response.json()
912 self.assertEqual(response.status_code, requests.codes.ok)
913 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
916 def test_33_check_topo_XPDRA(self):
917 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
918 .format(test_utils.RESTCONF_BASE_URL))
919 response = requests.request(
920 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
921 self.assertEqual(response.status_code, requests.codes.ok)
922 res = response.json()
923 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
925 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
926 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
927 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
928 self.assertIn(u'tail-equipment-id',
929 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
930 self.assertNotIn('wavelength', dict.keys(
931 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
934 def test_34_check_topo_ROADMA_SRG1(self):
935 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
936 .format(test_utils.RESTCONF_BASE_URL))
937 response = requests.request(
938 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
939 self.assertEqual(response.status_code, requests.codes.ok)
940 res = response.json()
941 self.assertIn({u'index': 1}, res['node'][0]
942 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
943 self.assertIn({u'index': 2}, res['node'][0]
944 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
945 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
947 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
948 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
950 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
953 def test_35_check_topo_ROADMA_DEG2(self):
954 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
955 .format(test_utils.RESTCONF_BASE_URL))
956 response = requests.request(
957 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
958 self.assertEqual(response.status_code, requests.codes.ok)
959 res = response.json()
960 self.assertIn({u'index': 1}, res['node'][0]
961 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
962 self.assertIn({u'index': 2}, res['node'][0]
963 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
964 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
966 if ele['tp-id'] == 'DEG2-CTP-TXRX':
967 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
968 if ele['tp-id'] == 'DEG2-TTP-TXRX':
969 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
972 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
973 def test_36_create_oc_service1(self):
974 url = ("{}/operations/org-openroadm-service:service-create"
975 .format(test_utils.RESTCONF_BASE_URL))
977 "sdnc-request-header": {
978 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
979 "rpc-action": "service-create",
980 "request-system-id": "appname",
981 "notification-url": "http://localhost:8585/NotificationServer/notify"
983 "service-name": "service1",
984 "common-id": "ASATT1234567",
985 "connection-type": "roadm-line",
987 "service-rate": "100",
988 "node-id": "ROADM-A1",
989 "service-format": "OC",
990 "clli": "SNJSCAMCJP8",
993 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
994 "port-type": "router",
995 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
996 "port-rack": "000000.00",
1000 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1001 "lgx-port-name": "LGX Back.3",
1002 "lgx-port-rack": "000000.00",
1003 "lgx-port-shelf": "00"
1008 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1009 "port-type": "router",
1010 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1011 "port-rack": "000000.00",
1015 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1016 "lgx-port-name": "LGX Back.4",
1017 "lgx-port-rack": "000000.00",
1018 "lgx-port-shelf": "00"
1021 "optic-type": "gray"
1024 "service-rate": "100",
1025 "node-id": "ROADM-C1",
1026 "service-format": "OC",
1027 "clli": "SNJSCAMCJT4",
1030 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1031 "port-type": "router",
1032 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1033 "port-rack": "000000.00",
1037 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1038 "lgx-port-name": "LGX Back.29",
1039 "lgx-port-rack": "000000.00",
1040 "lgx-port-shelf": "00"
1045 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1046 "port-type": "router",
1047 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1048 "port-rack": "000000.00",
1052 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1053 "lgx-port-name": "LGX Back.30",
1054 "lgx-port-rack": "000000.00",
1055 "lgx-port-shelf": "00"
1058 "optic-type": "gray"
1060 "due-date": "2016-11-28T00:00:01Z",
1061 "operator-contact": "pw1234"
1064 response = requests.request(
1065 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1066 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1067 self.assertEqual(response.status_code, requests.codes.ok)
1068 res = response.json()
1069 self.assertIn('PCE calculation in progress',
1070 res['output']['configuration-response-common']['response-message'])
1071 time.sleep(self.WAITING)
1073 def test_37_get_oc_service1(self):
1074 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1075 .format(test_utils.RESTCONF_BASE_URL))
1076 response = requests.request(
1077 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1078 self.assertEqual(response.status_code, requests.codes.ok)
1079 res = response.json()
1081 res['services'][0]['administrative-state'],
1084 res['services'][0]['service-name'], 'service1')
1086 res['services'][0]['connection-type'], 'roadm-line')
1088 res['services'][0]['lifecycle-state'], 'planned')
1091 def test_38_check_xc1_ROADMA(self):
1092 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1093 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1094 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1095 .format(test_utils.RESTCONF_BASE_URL))
1096 response = requests.request(
1097 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1098 self.assertEqual(response.status_code, requests.codes.ok)
1099 res = response.json()
1100 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1101 self.assertDictEqual(
1103 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1104 'opticalControlMode': 'gainLoss',
1105 'target-output-power': -3.0
1106 }, **res['roadm-connections'][0]),
1107 res['roadm-connections'][0]
1109 self.assertDictEqual(
1110 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1111 res['roadm-connections'][0]['source'])
1112 self.assertDictEqual(
1113 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
1114 res['roadm-connections'][0]['destination'])
1117 def test_39_check_xc1_ROADMC(self):
1118 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1119 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1120 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1121 .format(test_utils.RESTCONF_BASE_URL))
1122 response = requests.request(
1123 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1124 self.assertEqual(response.status_code, requests.codes.ok)
1125 res = response.json()
1126 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1127 self.assertDictEqual(
1129 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1130 'opticalControlMode': 'gainLoss',
1131 'target-output-power': -3.0
1132 }, **res['roadm-connections'][0]),
1133 res['roadm-connections'][0]
1135 self.assertDictEqual(
1136 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1137 res['roadm-connections'][0]['source'])
1138 self.assertDictEqual(
1139 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
1140 res['roadm-connections'][0]['destination'])
1143 def test_40_create_oc_service2(self):
1144 url = ("{}/operations/org-openroadm-service:service-create"
1145 .format(test_utils.RESTCONF_BASE_URL))
1147 "sdnc-request-header": {
1148 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1149 "rpc-action": "service-create",
1150 "request-system-id": "appname",
1151 "notification-url": "http://localhost:8585/NotificationServer/notify"
1153 "service-name": "service2",
1154 "common-id": "ASATT1234567",
1155 "connection-type": "roadm-line",
1157 "service-rate": "100",
1158 "node-id": "ROADM-A1",
1159 "service-format": "OC",
1160 "clli": "SNJSCAMCJP8",
1163 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1164 "port-type": "router",
1165 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1166 "port-rack": "000000.00",
1170 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1171 "lgx-port-name": "LGX Back.3",
1172 "lgx-port-rack": "000000.00",
1173 "lgx-port-shelf": "00"
1178 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1179 "port-type": "router",
1180 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1181 "port-rack": "000000.00",
1185 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1186 "lgx-port-name": "LGX Back.4",
1187 "lgx-port-rack": "000000.00",
1188 "lgx-port-shelf": "00"
1191 "optic-type": "gray"
1194 "service-rate": "100",
1195 "node-id": "ROADM-C1",
1196 "service-format": "OC",
1197 "clli": "SNJSCAMCJT4",
1200 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1201 "port-type": "router",
1202 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1203 "port-rack": "000000.00",
1207 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1208 "lgx-port-name": "LGX Back.29",
1209 "lgx-port-rack": "000000.00",
1210 "lgx-port-shelf": "00"
1215 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1216 "port-type": "router",
1217 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1218 "port-rack": "000000.00",
1222 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1223 "lgx-port-name": "LGX Back.30",
1224 "lgx-port-rack": "000000.00",
1225 "lgx-port-shelf": "00"
1228 "optic-type": "gray"
1230 "due-date": "2016-11-28T00:00:01Z",
1231 "operator-contact": "pw1234"
1234 response = requests.request(
1235 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1236 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1237 self.assertEqual(response.status_code, requests.codes.ok)
1238 res = response.json()
1239 self.assertIn('PCE calculation in progress',
1240 res['output']['configuration-response-common']['response-message'])
1241 time.sleep(self.WAITING)
1243 def test_41_get_oc_service2(self):
1244 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
1245 .format(test_utils.RESTCONF_BASE_URL))
1246 response = requests.request(
1247 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1248 self.assertEqual(response.status_code, requests.codes.ok)
1249 res = response.json()
1251 res['services'][0]['administrative-state'],
1254 res['services'][0]['service-name'], 'service2')
1256 res['services'][0]['connection-type'], 'roadm-line')
1258 res['services'][0]['lifecycle-state'], 'planned')
1261 def test_42_check_xc2_ROADMA(self):
1262 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1263 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1264 "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2"
1265 .format(test_utils.RESTCONF_BASE_URL))
1266 response = requests.request(
1267 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1268 self.assertEqual(response.status_code, requests.codes.ok)
1269 res = response.json()
1270 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1271 self.assertDictEqual(
1273 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
1274 'opticalControlMode': 'gainLoss',
1275 'target-output-power': -3.0
1276 }, **res['roadm-connections'][0]),
1277 res['roadm-connections'][0]
1279 self.assertDictEqual(
1280 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
1281 res['roadm-connections'][0]['source'])
1282 self.assertDictEqual(
1283 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
1284 res['roadm-connections'][0]['destination'])
1287 def test_43_check_topo_ROADMA(self):
1288 self.test_26_check_topo_ROADMA_SRG1()
1289 self.test_27_check_topo_ROADMA_DEG2()
1292 def test_44_delete_oc_service1(self):
1293 url = ("{}/operations/org-openroadm-service:service-delete"
1294 .format(test_utils.RESTCONF_BASE_URL))
1296 "sdnc-request-header": {
1297 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1298 "rpc-action": "service-delete",
1299 "request-system-id": "appname",
1300 "notification-url": "http://localhost:8585/NotificationServer/notify"
1302 "service-delete-req-info": {
1303 "service-name": "service1",
1304 "tail-retention": "no"
1308 response = requests.request(
1309 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1310 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1311 self.assertEqual(response.status_code, requests.codes.ok)
1312 res = response.json()
1313 self.assertIn('Renderer service delete in progress',
1314 res['output']['configuration-response-common']['response-message'])
1317 def test_45_delete_oc_service2(self):
1318 url = ("{}/operations/org-openroadm-service:service-delete"
1319 .format(test_utils.RESTCONF_BASE_URL))
1321 "sdnc-request-header": {
1322 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1323 "rpc-action": "service-delete",
1324 "request-system-id": "appname",
1325 "notification-url": "http://localhost:8585/NotificationServer/notify"
1327 "service-delete-req-info": {
1328 "service-name": "service2",
1329 "tail-retention": "no"
1333 response = requests.request(
1334 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1335 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1336 self.assertEqual(response.status_code, requests.codes.ok)
1337 res = response.json()
1338 self.assertIn('Renderer service delete in progress',
1339 res['output']['configuration-response-common']['response-message'])
1342 def test_46_get_no_oc_services(self):
1344 url = ("{}/operational/org-openroadm-service:service-list"
1345 .format(test_utils.RESTCONF_BASE_URL))
1346 response = requests.request(
1347 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1348 self.assertEqual(response.status_code, requests.codes.not_found)
1349 res = response.json()
1351 {"error-type": "application", "error-tag": "data-missing",
1352 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1353 res['errors']['error'])
1356 def test_47_get_no_xc_ROADMA(self):
1357 url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
1358 "/node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1359 .format(test_utils.RESTCONF_BASE_URL))
1360 response = requests.request(
1361 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1362 self.assertEqual(response.status_code, requests.codes.ok)
1363 res = response.json()
1364 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1367 def test_48_check_topo_ROADMA(self):
1368 self.test_34_check_topo_ROADMA_SRG1()
1369 self.test_35_check_topo_ROADMA_DEG2()
1371 def test_49_loop_create_eth_service(self):
1372 for i in range(1, 6):
1373 print("trial number {}".format(i))
1374 print("eth service creation")
1375 self.test_11_create_eth_service1()
1376 print("check xc in ROADM-A1")
1377 self.test_13_check_xc1_ROADMA()
1378 print("check xc in ROADM-C1")
1379 self.test_14_check_xc1_ROADMC()
1380 print("eth service deletion\n")
1381 self.test_30_delete_eth_service1()
1383 def test_50_loop_create_oc_service(self):
1384 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1385 .format(test_utils.RESTCONF_BASE_URL))
1386 response = requests.request("GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1387 if response.status_code != 404:
1388 url = ("{}/operations/org-openroadm-service:service-delete"
1389 .format(test_utils.RESTCONF_BASE_URL))
1391 "sdnc-request-header": {
1392 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1393 "rpc-action": "service-delete",
1394 "request-system-id": "appname",
1395 "notification-url": "http://localhost:8585/NotificationServer/notify"
1397 "service-delete-req-info": {
1398 "service-name": "service1",
1399 "tail-retention": "no"
1403 requests.request("POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1404 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1407 for i in range(1, 6):
1408 print("trial number {}".format(i))
1409 print("oc service creation")
1410 self.test_36_create_oc_service1()
1411 print("check xc in ROADM-A1")
1412 self.test_38_check_xc1_ROADMA()
1413 print("check xc in ROADM-C1")
1414 self.test_39_check_xc1_ROADMC()
1415 print("oc service deletion\n")
1416 self.test_44_delete_oc_service1()
1418 def test_51_disconnect_XPDRA(self):
1419 response = test_utils.unmount_device("XPDR-A1")
1420 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1422 def test_52_disconnect_XPDRC(self):
1423 response = test_utils.unmount_device("XPDR-C1")
1424 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1426 def test_53_disconnect_ROADMA(self):
1427 response = test_utils.unmount_device("ROADM-A1")
1428 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1430 def test_54_disconnect_ROADMC(self):
1431 response = test_utils.unmount_device("ROADM-C1")
1432 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1435 if __name__ == "__main__":
1436 unittest.main(verbosity=2)