2 ##############################################################################
3 # Copyright (c) 2017 Orange, Inc. and others. All rights reserved.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 ##############################################################################
16 from common import test_utils
19 class TransportPCEFulltesting(unittest.TestCase):
22 WAITING = 20 # nominal value is 300
26 cls.processes = test_utils.start_tpce()
27 cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
30 def tearDownClass(cls):
31 for process in cls.processes:
32 test_utils.shutdown_process(process)
33 print("all processes killed")
35 def setUp(self): # instruction executed before each test method
36 print("execution of {}".format(self.id().split(".")[-1]))
38 def test_01_connect_xpdrA(self):
39 response = test_utils.mount_device("XPDR-A1", 'xpdra')
40 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
42 def test_02_connect_xpdrC(self):
43 response = test_utils.mount_device("XPDR-C1", 'xpdrc')
44 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
46 def test_03_connect_rdmA(self):
47 response = test_utils.mount_device("ROADM-A1", 'roadma')
48 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
50 def test_04_connect_rdmC(self):
51 response = test_utils.mount_device("ROADM-C1", 'roadmc')
52 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
54 def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
55 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
57 "networkutils:input": {
58 "networkutils:links-input": {
59 "networkutils:xpdr-node": "XPDR-A1",
60 "networkutils:xpdr-num": "1",
61 "networkutils:network-num": "1",
62 "networkutils:rdm-node": "ROADM-A1",
63 "networkutils:srg-num": "1",
64 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
68 response = requests.request(
69 "POST", url, data=json.dumps(data),
70 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
71 self.assertEqual(response.status_code, requests.codes.ok)
73 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
76 def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
77 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
79 "networkutils:input": {
80 "networkutils:links-input": {
81 "networkutils:xpdr-node": "XPDR-A1",
82 "networkutils:xpdr-num": "1",
83 "networkutils:network-num": "1",
84 "networkutils:rdm-node": "ROADM-A1",
85 "networkutils:srg-num": "1",
86 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
90 response = requests.request(
91 "POST", url, data=json.dumps(data),
92 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
93 self.assertEqual(response.status_code, requests.codes.ok)
95 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
98 def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
99 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
101 "networkutils:input": {
102 "networkutils:links-input": {
103 "networkutils:xpdr-node": "XPDR-C1",
104 "networkutils:xpdr-num": "1",
105 "networkutils:network-num": "1",
106 "networkutils:rdm-node": "ROADM-C1",
107 "networkutils:srg-num": "1",
108 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
112 response = requests.request(
113 "POST", url, data=json.dumps(data),
114 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
115 self.assertEqual(response.status_code, requests.codes.ok)
116 res = response.json()
117 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
120 def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
121 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
123 "networkutils:input": {
124 "networkutils:links-input": {
125 "networkutils:xpdr-node": "XPDR-C1",
126 "networkutils:xpdr-num": "1",
127 "networkutils:network-num": "1",
128 "networkutils:rdm-node": "ROADM-C1",
129 "networkutils:srg-num": "1",
130 "networkutils:termination-point-num": "SRG1-PP1-TXRX"
134 response = requests.request(
135 "POST", url, data=json.dumps(data),
136 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
137 self.assertEqual(response.status_code, requests.codes.ok)
138 res = response.json()
139 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
142 def test_09_add_omsAttributes_ROADMA_ROADMC(self):
143 # Config ROADMA-ROADMC oms-attributes
144 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
145 "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
146 "OMS-attributes/span"
147 .format(test_utils.RESTCONF_BASE_URL))
149 "auto-spanloss": "true",
150 "spanloss-base": 11.4,
151 "spanloss-current": 12,
152 "engineered-spanloss": 12.2,
153 "link-concatenation": [{
156 "SRLG-length": 100000,
158 response = requests.request(
159 "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
160 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
161 self.assertEqual(response.status_code, requests.codes.created)
163 def test_10_add_omsAttributes_ROADMC_ROADMA(self):
164 # Config ROADMC-ROADMA oms-attributes
165 url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
166 "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
167 "OMS-attributes/span"
168 .format(test_utils.RESTCONF_BASE_URL))
170 "auto-spanloss": "true",
171 "spanloss-base": 11.4,
172 "spanloss-current": 12,
173 "engineered-spanloss": 12.2,
174 "link-concatenation": [{
177 "SRLG-length": 100000,
179 response = requests.request(
180 "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
181 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
182 self.assertEqual(response.status_code, requests.codes.created)
184 # test service-create for Eth service from xpdr to xpdr
185 def test_11_create_eth_service1(self):
186 url = ("{}/operations/org-openroadm-service:service-create"
187 .format(test_utils.RESTCONF_BASE_URL))
189 "sdnc-request-header": {
190 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
191 "rpc-action": "service-create",
192 "request-system-id": "appname",
193 "notification-url": "http://localhost:8585/NotificationServer/notify"
195 "service-name": "service1",
196 "common-id": "ASATT1234567",
197 "connection-type": "service",
199 "service-rate": "100",
200 "node-id": "XPDR-A1",
201 "service-format": "Ethernet",
202 "clli": "SNJSCAMCJP8",
205 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
206 "port-type": "router",
207 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
208 "port-rack": "000000.00",
212 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
213 "lgx-port-name": "LGX Back.3",
214 "lgx-port-rack": "000000.00",
215 "lgx-port-shelf": "00"
220 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
221 "port-type": "router",
222 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
223 "port-rack": "000000.00",
227 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
228 "lgx-port-name": "LGX Back.4",
229 "lgx-port-rack": "000000.00",
230 "lgx-port-shelf": "00"
236 "service-rate": "100",
237 "node-id": "XPDR-C1",
238 "service-format": "Ethernet",
239 "clli": "SNJSCAMCJT4",
242 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
243 "port-type": "router",
244 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
245 "port-rack": "000000.00",
249 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
250 "lgx-port-name": "LGX Back.29",
251 "lgx-port-rack": "000000.00",
252 "lgx-port-shelf": "00"
257 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
258 "port-type": "router",
259 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
260 "port-rack": "000000.00",
264 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
265 "lgx-port-name": "LGX Back.30",
266 "lgx-port-rack": "000000.00",
267 "lgx-port-shelf": "00"
272 "due-date": "2016-11-28T00:00:01Z",
273 "operator-contact": "pw1234"
276 response = requests.request(
277 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
278 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
279 self.assertEqual(response.status_code, requests.codes.ok)
280 res = response.json()
281 self.assertIn('PCE calculation in progress',
282 res['output']['configuration-response-common']['response-message'])
283 time.sleep(self.WAITING)
285 def test_12_get_eth_service1(self):
286 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
287 .format(test_utils.RESTCONF_BASE_URL))
288 response = requests.request(
289 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
290 self.assertEqual(response.status_code, requests.codes.ok)
291 res = response.json()
293 res['services'][0]['administrative-state'], 'inService')
295 res['services'][0]['service-name'], 'service1')
297 res['services'][0]['connection-type'], 'service')
299 res['services'][0]['lifecycle-state'], 'planned')
302 def test_13_check_xc1_ROADMA(self):
303 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
304 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
305 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
306 .format(test_utils.RESTCONF_BASE_URL))
307 response = requests.request(
308 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
309 self.assertEqual(response.status_code, requests.codes.ok)
310 res = response.json()
311 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
312 self.assertDictEqual(
314 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
315 'opticalControlMode': 'gainLoss',
316 'target-output-power': -3.0
317 }, **res['roadm-connections'][0]),
318 res['roadm-connections'][0]
320 self.assertDictEqual(
321 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
322 res['roadm-connections'][0]['source'])
323 self.assertDictEqual(
324 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
325 res['roadm-connections'][0]['destination'])
328 def test_14_check_xc1_ROADMC(self):
329 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
330 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
331 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
332 .format(test_utils.RESTCONF_BASE_URL))
333 response = requests.request(
334 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
335 self.assertEqual(response.status_code, requests.codes.ok)
336 res = response.json()
337 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
338 self.assertDictEqual(
340 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
341 'opticalControlMode': 'gainLoss',
342 'target-output-power': -3.0
343 }, **res['roadm-connections'][0]),
344 res['roadm-connections'][0]
346 self.assertDictEqual(
347 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
348 res['roadm-connections'][0]['source'])
349 self.assertDictEqual(
350 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
351 res['roadm-connections'][0]['destination'])
354 def test_15_check_topo_XPDRA(self):
355 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
356 .format(test_utils.RESTCONF_BASE_URL))
357 response = requests.request(
358 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
359 self.assertEqual(response.status_code, requests.codes.ok)
360 res = response.json()
361 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
363 if ele['tp-id'] == 'XPDR1-NETWORK1':
364 self.assertEqual({u'frequency': 196.1,
366 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
367 if ele['tp-id'] == 'XPDR1-CLIENT2' or ele['tp-id'] == 'XPDR1-CLIENT1':
368 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
369 if ele['tp-id'] == 'XPDR1-NETWORK2':
370 self.assertNotIn('org-openroadm-network-topology:xpdr-network-attributes', dict.keys(ele))
373 def test_16_check_topo_ROADMA_SRG1(self):
374 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
375 .format(test_utils.RESTCONF_BASE_URL))
376 response = requests.request(
377 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
378 self.assertEqual(response.status_code, requests.codes.ok)
379 res = response.json()
380 self.assertNotIn({u'index': 1},
381 res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
382 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
384 if ele['tp-id'] == 'SRG1-PP1-TXRX':
385 self.assertIn({u'index': 1, u'frequency': 196.1,
387 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
388 if ele['tp-id'] == 'SRG1-PP2-TXRX':
389 self.assertNotIn('used-wavelength', dict.keys(ele))
392 def test_17_check_topo_ROADMA_DEG1(self):
393 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
394 .format(test_utils.RESTCONF_BASE_URL))
395 response = requests.request(
396 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
397 self.assertEqual(response.status_code, requests.codes.ok)
398 res = response.json()
399 self.assertNotIn({u'index': 1},
400 res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
401 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
403 if ele['tp-id'] == 'DEG2-CTP-TXRX':
404 self.assertIn({u'index': 1, u'frequency': 196.1,
406 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
407 if ele['tp-id'] == 'DEG2-TTP-TXRX':
408 self.assertIn({u'index': 1, u'frequency': 196.1,
410 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
413 def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
414 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
416 "networkutils:input": {
417 "networkutils:links-input": {
418 "networkutils:xpdr-node": "XPDR-A1",
419 "networkutils:xpdr-num": "1",
420 "networkutils:network-num": "2",
421 "networkutils:rdm-node": "ROADM-A1",
422 "networkutils:srg-num": "1",
423 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
427 response = requests.request(
428 "POST", url, data=json.dumps(data),
429 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
430 self.assertEqual(response.status_code, requests.codes.ok)
431 res = response.json()
432 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
435 def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
436 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
438 "networkutils:input": {
439 "networkutils:links-input": {
440 "networkutils:xpdr-node": "XPDR-A1",
441 "networkutils:xpdr-num": "1",
442 "networkutils:network-num": "2",
443 "networkutils:rdm-node": "ROADM-A1",
444 "networkutils:srg-num": "1",
445 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
449 response = requests.request(
450 "POST", url, data=json.dumps(data),
451 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
452 self.assertEqual(response.status_code, requests.codes.ok)
453 res = response.json()
454 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
457 def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
458 url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
460 "networkutils:input": {
461 "networkutils:links-input": {
462 "networkutils:xpdr-node": "XPDR-C1",
463 "networkutils:xpdr-num": "1",
464 "networkutils:network-num": "2",
465 "networkutils:rdm-node": "ROADM-C1",
466 "networkutils:srg-num": "1",
467 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
471 response = requests.request(
472 "POST", url, data=json.dumps(data),
473 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
474 self.assertEqual(response.status_code, requests.codes.ok)
475 res = response.json()
476 self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
479 def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
480 url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
482 "networkutils:input": {
483 "networkutils:links-input": {
484 "networkutils:xpdr-node": "XPDR-C1",
485 "networkutils:xpdr-num": "1",
486 "networkutils:network-num": "2",
487 "networkutils:rdm-node": "ROADM-C1",
488 "networkutils:srg-num": "1",
489 "networkutils:termination-point-num": "SRG1-PP2-TXRX"
493 response = requests.request(
494 "POST", url, data=json.dumps(data),
495 headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
496 self.assertEqual(response.status_code, requests.codes.ok)
497 res = response.json()
498 self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
501 def test_22_create_eth_service2(self):
502 url = ("{}/operations/org-openroadm-service:service-create"
503 .format(test_utils.RESTCONF_BASE_URL))
505 "sdnc-request-header": {
506 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
507 "rpc-action": "service-create",
508 "request-system-id": "appname",
509 "notification-url": "http://localhost:8585/NotificationServer/notify"
511 "service-name": "service2",
512 "common-id": "ASATT1234567",
513 "connection-type": "service",
515 "service-rate": "100",
516 "node-id": "XPDR-A1",
517 "service-format": "Ethernet",
518 "clli": "SNJSCAMCJP8",
521 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
522 "port-type": "router",
523 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
524 "port-rack": "000000.00",
528 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
529 "lgx-port-name": "LGX Back.3",
530 "lgx-port-rack": "000000.00",
531 "lgx-port-shelf": "00"
536 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
537 "port-type": "router",
538 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
539 "port-rack": "000000.00",
543 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
544 "lgx-port-name": "LGX Back.4",
545 "lgx-port-rack": "000000.00",
546 "lgx-port-shelf": "00"
552 "service-rate": "100",
553 "node-id": "XPDR-C1",
554 "service-format": "Ethernet",
555 "clli": "SNJSCAMCJT4",
558 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
559 "port-type": "router",
560 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
561 "port-rack": "000000.00",
565 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
566 "lgx-port-name": "LGX Back.29",
567 "lgx-port-rack": "000000.00",
568 "lgx-port-shelf": "00"
573 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
574 "port-type": "router",
575 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
576 "port-rack": "000000.00",
580 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
581 "lgx-port-name": "LGX Back.30",
582 "lgx-port-rack": "000000.00",
583 "lgx-port-shelf": "00"
588 "due-date": "2016-11-28T00:00:01Z",
589 "operator-contact": "pw1234"
592 response = requests.request(
593 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
594 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
595 self.assertEqual(response.status_code, requests.codes.ok)
596 res = response.json()
597 self.assertIn('PCE calculation in progress',
598 res['output']['configuration-response-common']['response-message'])
599 time.sleep(self.WAITING)
601 def test_23_get_eth_service2(self):
602 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
603 .format(test_utils.RESTCONF_BASE_URL))
604 response = requests.request(
605 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
606 self.assertEqual(response.status_code, requests.codes.ok)
607 res = response.json()
609 res['services'][0]['administrative-state'],
612 res['services'][0]['service-name'], 'service2')
614 res['services'][0]['connection-type'], 'service')
616 res['services'][0]['lifecycle-state'], 'planned')
619 def test_24_check_xc2_ROADMA(self):
620 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
621 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
622 "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2"
623 .format(test_utils.RESTCONF_BASE_URL))
624 response = requests.request(
625 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
626 self.assertEqual(response.status_code, requests.codes.ok)
627 res = response.json()
628 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
629 self.assertDictEqual(
631 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
632 'opticalControlMode': 'power'
633 }, **res['roadm-connections'][0]),
634 res['roadm-connections'][0]
636 self.assertDictEqual(
637 {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
638 res['roadm-connections'][0]['source'])
639 self.assertDictEqual(
640 {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
641 res['roadm-connections'][0]['destination'])
643 def test_25_check_topo_XPDRA(self):
644 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
645 .format(test_utils.RESTCONF_BASE_URL))
646 response = requests.request(
647 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
648 self.assertEqual(response.status_code, requests.codes.ok)
649 res = response.json()
650 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
652 if ele['tp-id'] == 'XPDR1-NETWORK1':
653 self.assertEqual({u'frequency': 196.1,
655 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
656 if ele['tp-id'] == 'XPDR1-NETWORK2':
657 self.assertEqual({u'frequency': 196.05,
659 ele['org-openroadm-network-topology:xpdr-network-attributes']['wavelength'])
660 if ele['tp-id'] == 'XPDR1-CLIENT1' or ele['tp-id'] == 'XPDR1-CLIENT2':
661 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
664 def test_26_check_topo_ROADMA_SRG1(self):
665 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
666 .format(test_utils.RESTCONF_BASE_URL))
667 response = requests.request(
668 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
669 self.assertEqual(response.status_code, requests.codes.ok)
670 res = response.json()
671 self.assertNotIn({u'index': 1}, res['node'][0]
672 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
673 self.assertNotIn({u'index': 2}, res['node'][0]
674 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
675 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
677 if ele['tp-id'] == 'SRG1-PP1-TXRX':
678 self.assertIn({u'index': 1, u'frequency': 196.1,
680 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
681 self.assertNotIn({u'index': 2, u'frequency': 196.05,
683 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
684 if ele['tp-id'] == 'SRG1-PP2-TXRX':
685 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
686 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
687 self.assertNotIn({u'index': 1, u'frequency': 196.1,
689 ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
690 if ele['tp-id'] == 'SRG1-PP3-TXRX':
691 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
694 def test_27_check_topo_ROADMA_DEG2(self):
695 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
696 .format(test_utils.RESTCONF_BASE_URL))
697 response = requests.request(
698 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
699 self.assertEqual(response.status_code, requests.codes.ok)
700 res = response.json()
701 self.assertNotIn({u'index': 1}, res['node'][0]
702 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
703 self.assertNotIn({u'index': 2}, res['node'][0]
704 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
705 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
707 if ele['tp-id'] == 'DEG2-CTP-TXRX':
708 self.assertIn({u'index': 1, u'frequency': 196.1,
710 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
711 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
712 ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
713 if ele['tp-id'] == 'DEG2-TTP-TXRX':
714 self.assertIn({u'index': 1, u'frequency': 196.1,
716 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
717 self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
718 ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
721 # creation service test on a non-available resource
722 def test_28_create_eth_service3(self):
723 url = ("{}/operations/org-openroadm-service:service-create"
724 .format(test_utils.RESTCONF_BASE_URL))
726 "sdnc-request-header": {
727 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
728 "rpc-action": "service-create",
729 "request-system-id": "appname",
730 "notification-url": "http://localhost:8585/NotificationServer/notify"
732 "service-name": "service3",
733 "common-id": "ASATT1234567",
734 "connection-type": "service",
736 "service-rate": "100",
737 "node-id": "XPDR-A1",
738 "service-format": "Ethernet",
739 "clli": "SNJSCAMCJP8",
742 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
743 "port-type": "router",
744 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
745 "port-rack": "000000.00",
749 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
750 "lgx-port-name": "LGX Back.3",
751 "lgx-port-rack": "000000.00",
752 "lgx-port-shelf": "00"
757 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
758 "port-type": "router",
759 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
760 "port-rack": "000000.00",
764 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
765 "lgx-port-name": "LGX Back.4",
766 "lgx-port-rack": "000000.00",
767 "lgx-port-shelf": "00"
773 "service-rate": "100",
774 "node-id": "XPDR-C1",
775 "service-format": "Ethernet",
776 "clli": "SNJSCAMCJT4",
779 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
780 "port-type": "router",
781 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
782 "port-rack": "000000.00",
786 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
787 "lgx-port-name": "LGX Back.29",
788 "lgx-port-rack": "000000.00",
789 "lgx-port-shelf": "00"
794 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
795 "port-type": "router",
796 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
797 "port-rack": "000000.00",
801 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
802 "lgx-port-name": "LGX Back.30",
803 "lgx-port-rack": "000000.00",
804 "lgx-port-shelf": "00"
809 "due-date": "2016-11-28T00:00:01Z",
810 "operator-contact": "pw1234"
813 response = requests.request(
814 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
815 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
816 self.assertEqual(response.status_code, requests.codes.ok)
817 res = response.json()
818 self.assertIn('PCE calculation in progress',
819 res['output']['configuration-response-common']['response-message'])
820 self.assertIn('200', res['output']['configuration-response-common']['response-code'])
821 time.sleep(self.WAITING)
823 # add a test that check the openroadm-service-list still only contains 2 elements
824 def test_29_delete_eth_service3(self):
825 url = ("{}/operations/org-openroadm-service:service-delete"
826 .format(test_utils.RESTCONF_BASE_URL))
828 "sdnc-request-header": {
829 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
830 "rpc-action": "service-delete",
831 "request-system-id": "appname",
832 "notification-url": "http://localhost:8585/NotificationServer/notify"
834 "service-delete-req-info": {
835 "service-name": "service3",
836 "tail-retention": "no"
840 response = requests.request(
841 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
842 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
843 self.assertEqual(response.status_code, requests.codes.ok)
844 res = response.json()
845 self.assertIn('Service \'service3\' does not exist in datastore',
846 res['output']['configuration-response-common']['response-message'])
847 self.assertIn('500', res['output']['configuration-response-common']['response-code'])
850 def test_30_delete_eth_service1(self):
851 url = ("{}/operations/org-openroadm-service:service-delete"
852 .format(test_utils.RESTCONF_BASE_URL))
854 "sdnc-request-header": {
855 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
856 "rpc-action": "service-delete",
857 "request-system-id": "appname",
858 "notification-url": "http://localhost:8585/NotificationServer/notify"
860 "service-delete-req-info": {
861 "service-name": "service1",
862 "tail-retention": "no"
866 response = requests.request(
867 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
868 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
869 self.assertEqual(response.status_code, requests.codes.ok)
870 res = response.json()
871 self.assertIn('Renderer service delete in progress',
872 res['output']['configuration-response-common']['response-message'])
875 def test_31_delete_eth_service2(self):
876 url = ("{}/operations/org-openroadm-service:service-delete"
877 .format(test_utils.RESTCONF_BASE_URL))
879 "sdnc-request-header": {
880 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
881 "rpc-action": "service-delete",
882 "request-system-id": "appname",
883 "notification-url": "http://localhost:8585/NotificationServer/notify"
885 "service-delete-req-info": {
886 "service-name": "service2",
887 "tail-retention": "no"
891 response = requests.request(
892 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
893 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
894 self.assertEqual(response.status_code, requests.codes.ok)
895 res = response.json()
896 self.assertIn('Renderer service delete in progress',
897 res['output']['configuration-response-common']['response-message'])
900 def test_32_check_no_xc_ROADMA(self):
901 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
902 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
903 .format(test_utils.RESTCONF_BASE_URL))
904 response = requests.request(
905 "GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
906 res = response.json()
907 self.assertEqual(response.status_code, requests.codes.ok)
908 self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
911 def test_33_check_topo_XPDRA(self):
912 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
913 .format(test_utils.RESTCONF_BASE_URL))
914 response = requests.request(
915 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
916 self.assertEqual(response.status_code, requests.codes.ok)
917 res = response.json()
918 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
920 if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
921 self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
922 elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
923 self.assertIn(u'tail-equipment-id',
924 dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
925 self.assertNotIn('wavelength', dict.keys(
926 ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
929 def test_34_check_topo_ROADMA_SRG1(self):
930 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
931 .format(test_utils.RESTCONF_BASE_URL))
932 response = requests.request(
933 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
934 self.assertEqual(response.status_code, requests.codes.ok)
935 res = response.json()
936 self.assertIn({u'index': 1}, res['node'][0]
937 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
938 self.assertIn({u'index': 2}, res['node'][0]
939 [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
940 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
942 if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
943 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
945 self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
948 def test_35_check_topo_ROADMA_DEG2(self):
949 url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
950 .format(test_utils.RESTCONF_BASE_URL))
951 response = requests.request(
952 "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
953 self.assertEqual(response.status_code, requests.codes.ok)
954 res = response.json()
955 self.assertIn({u'index': 1}, res['node'][0]
956 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
957 self.assertIn({u'index': 2}, res['node'][0]
958 [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
959 liste_tp = res['node'][0]['ietf-network-topology:termination-point']
961 if ele['tp-id'] == 'DEG2-CTP-TXRX':
962 self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
963 if ele['tp-id'] == 'DEG2-TTP-TXRX':
964 self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
967 # test service-create for Optical Channel (OC) service from srg-pp to srg-pp
968 def test_36_create_oc_service1(self):
969 url = ("{}/operations/org-openroadm-service:service-create"
970 .format(test_utils.RESTCONF_BASE_URL))
972 "sdnc-request-header": {
973 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
974 "rpc-action": "service-create",
975 "request-system-id": "appname",
976 "notification-url": "http://localhost:8585/NotificationServer/notify"
978 "service-name": "service1",
979 "common-id": "ASATT1234567",
980 "connection-type": "roadm-line",
982 "service-rate": "100",
983 "node-id": "ROADM-A1",
984 "service-format": "OC",
985 "clli": "SNJSCAMCJP8",
988 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
989 "port-type": "router",
990 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
991 "port-rack": "000000.00",
995 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
996 "lgx-port-name": "LGX Back.3",
997 "lgx-port-rack": "000000.00",
998 "lgx-port-shelf": "00"
1003 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1004 "port-type": "router",
1005 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1006 "port-rack": "000000.00",
1010 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1011 "lgx-port-name": "LGX Back.4",
1012 "lgx-port-rack": "000000.00",
1013 "lgx-port-shelf": "00"
1016 "optic-type": "gray"
1019 "service-rate": "100",
1020 "node-id": "ROADM-C1",
1021 "service-format": "OC",
1022 "clli": "SNJSCAMCJT4",
1025 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1026 "port-type": "router",
1027 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1028 "port-rack": "000000.00",
1032 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1033 "lgx-port-name": "LGX Back.29",
1034 "lgx-port-rack": "000000.00",
1035 "lgx-port-shelf": "00"
1040 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1041 "port-type": "router",
1042 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1043 "port-rack": "000000.00",
1047 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1048 "lgx-port-name": "LGX Back.30",
1049 "lgx-port-rack": "000000.00",
1050 "lgx-port-shelf": "00"
1053 "optic-type": "gray"
1055 "due-date": "2016-11-28T00:00:01Z",
1056 "operator-contact": "pw1234"
1059 response = requests.request(
1060 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1061 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1062 self.assertEqual(response.status_code, requests.codes.ok)
1063 res = response.json()
1064 self.assertIn('PCE calculation in progress',
1065 res['output']['configuration-response-common']['response-message'])
1066 time.sleep(self.WAITING)
1068 def test_37_get_oc_service1(self):
1069 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1070 .format(test_utils.RESTCONF_BASE_URL))
1071 response = requests.request(
1072 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1073 self.assertEqual(response.status_code, requests.codes.ok)
1074 res = response.json()
1076 res['services'][0]['administrative-state'],
1079 res['services'][0]['service-name'], 'service1')
1081 res['services'][0]['connection-type'], 'roadm-line')
1083 res['services'][0]['lifecycle-state'], 'planned')
1086 def test_38_check_xc1_ROADMA(self):
1087 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1088 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1089 "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
1090 .format(test_utils.RESTCONF_BASE_URL))
1091 response = requests.request(
1092 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1093 self.assertEqual(response.status_code, requests.codes.ok)
1094 res = response.json()
1095 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1096 self.assertDictEqual(
1098 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
1099 'opticalControlMode': 'gainLoss',
1100 'target-output-power': -3.0
1101 }, **res['roadm-connections'][0]),
1102 res['roadm-connections'][0]
1104 self.assertDictEqual(
1105 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1106 res['roadm-connections'][0]['source'])
1107 self.assertDictEqual(
1108 {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
1109 res['roadm-connections'][0]['destination'])
1112 def test_39_check_xc1_ROADMC(self):
1113 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1114 "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1115 "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
1116 .format(test_utils.RESTCONF_BASE_URL))
1117 response = requests.request(
1118 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1119 self.assertEqual(response.status_code, requests.codes.ok)
1120 res = response.json()
1121 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1122 self.assertDictEqual(
1124 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
1125 'opticalControlMode': 'gainLoss',
1126 'target-output-power': -3.0
1127 }, **res['roadm-connections'][0]),
1128 res['roadm-connections'][0]
1130 self.assertDictEqual(
1131 {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
1132 res['roadm-connections'][0]['source'])
1133 self.assertDictEqual(
1134 {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
1135 res['roadm-connections'][0]['destination'])
1138 def test_40_create_oc_service2(self):
1139 url = ("{}/operations/org-openroadm-service:service-create"
1140 .format(test_utils.RESTCONF_BASE_URL))
1142 "sdnc-request-header": {
1143 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1144 "rpc-action": "service-create",
1145 "request-system-id": "appname",
1146 "notification-url": "http://localhost:8585/NotificationServer/notify"
1148 "service-name": "service2",
1149 "common-id": "ASATT1234567",
1150 "connection-type": "roadm-line",
1152 "service-rate": "100",
1153 "node-id": "ROADM-A1",
1154 "service-format": "OC",
1155 "clli": "SNJSCAMCJP8",
1158 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1159 "port-type": "router",
1160 "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
1161 "port-rack": "000000.00",
1165 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1166 "lgx-port-name": "LGX Back.3",
1167 "lgx-port-rack": "000000.00",
1168 "lgx-port-shelf": "00"
1173 "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
1174 "port-type": "router",
1175 "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
1176 "port-rack": "000000.00",
1180 "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
1181 "lgx-port-name": "LGX Back.4",
1182 "lgx-port-rack": "000000.00",
1183 "lgx-port-shelf": "00"
1186 "optic-type": "gray"
1189 "service-rate": "100",
1190 "node-id": "ROADM-C1",
1191 "service-format": "OC",
1192 "clli": "SNJSCAMCJT4",
1195 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1196 "port-type": "router",
1197 "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
1198 "port-rack": "000000.00",
1202 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1203 "lgx-port-name": "LGX Back.29",
1204 "lgx-port-rack": "000000.00",
1205 "lgx-port-shelf": "00"
1210 "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
1211 "port-type": "router",
1212 "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
1213 "port-rack": "000000.00",
1217 "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
1218 "lgx-port-name": "LGX Back.30",
1219 "lgx-port-rack": "000000.00",
1220 "lgx-port-shelf": "00"
1223 "optic-type": "gray"
1225 "due-date": "2016-11-28T00:00:01Z",
1226 "operator-contact": "pw1234"
1229 response = requests.request(
1230 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1231 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1232 self.assertEqual(response.status_code, requests.codes.ok)
1233 res = response.json()
1234 self.assertIn('PCE calculation in progress',
1235 res['output']['configuration-response-common']['response-message'])
1236 time.sleep(self.WAITING)
1238 def test_41_get_oc_service2(self):
1239 url = ("{}/operational/org-openroadm-service:service-list/services/service2"
1240 .format(test_utils.RESTCONF_BASE_URL))
1241 response = requests.request(
1242 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1243 self.assertEqual(response.status_code, requests.codes.ok)
1244 res = response.json()
1246 res['services'][0]['administrative-state'],
1249 res['services'][0]['service-name'], 'service2')
1251 res['services'][0]['connection-type'], 'roadm-line')
1253 res['services'][0]['lifecycle-state'], 'planned')
1256 def test_42_check_xc2_ROADMA(self):
1257 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
1258 "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1259 "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2"
1260 .format(test_utils.RESTCONF_BASE_URL))
1261 response = requests.request(
1262 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1263 self.assertEqual(response.status_code, requests.codes.ok)
1264 res = response.json()
1265 # the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
1266 self.assertDictEqual(
1268 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
1269 'opticalControlMode': 'gainLoss',
1270 'target-output-power': -3.0
1271 }, **res['roadm-connections'][0]),
1272 res['roadm-connections'][0]
1274 self.assertDictEqual(
1275 {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
1276 res['roadm-connections'][0]['source'])
1277 self.assertDictEqual(
1278 {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
1279 res['roadm-connections'][0]['destination'])
1282 def test_43_check_topo_ROADMA(self):
1283 self.test_26_check_topo_ROADMA_SRG1()
1284 self.test_27_check_topo_ROADMA_DEG2()
1287 def test_44_delete_oc_service1(self):
1288 url = ("{}/operations/org-openroadm-service:service-delete"
1289 .format(test_utils.RESTCONF_BASE_URL))
1291 "sdnc-request-header": {
1292 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1293 "rpc-action": "service-delete",
1294 "request-system-id": "appname",
1295 "notification-url": "http://localhost:8585/NotificationServer/notify"
1297 "service-delete-req-info": {
1298 "service-name": "service1",
1299 "tail-retention": "no"
1303 response = requests.request(
1304 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1305 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1306 self.assertEqual(response.status_code, requests.codes.ok)
1307 res = response.json()
1308 self.assertIn('Renderer service delete in progress',
1309 res['output']['configuration-response-common']['response-message'])
1312 def test_45_delete_oc_service2(self):
1313 url = ("{}/operations/org-openroadm-service:service-delete"
1314 .format(test_utils.RESTCONF_BASE_URL))
1316 "sdnc-request-header": {
1317 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1318 "rpc-action": "service-delete",
1319 "request-system-id": "appname",
1320 "notification-url": "http://localhost:8585/NotificationServer/notify"
1322 "service-delete-req-info": {
1323 "service-name": "service2",
1324 "tail-retention": "no"
1328 response = requests.request(
1329 "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1330 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1331 self.assertEqual(response.status_code, requests.codes.ok)
1332 res = response.json()
1333 self.assertIn('Renderer service delete in progress',
1334 res['output']['configuration-response-common']['response-message'])
1337 def test_46_get_no_oc_services(self):
1339 url = ("{}/operational/org-openroadm-service:service-list"
1340 .format(test_utils.RESTCONF_BASE_URL))
1341 response = requests.request(
1342 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1343 self.assertEqual(response.status_code, requests.codes.not_found)
1344 res = response.json()
1346 {"error-type": "application", "error-tag": "data-missing",
1347 "error-message": "Request could not be completed because the relevant data model content does not exist"},
1348 res['errors']['error'])
1351 def test_47_get_no_xc_ROADMA(self):
1352 url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
1353 "/node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
1354 .format(test_utils.RESTCONF_BASE_URL))
1355 response = requests.request(
1356 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1357 self.assertEqual(response.status_code, requests.codes.ok)
1358 res = response.json()
1359 self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
1362 def test_48_check_topo_ROADMA(self):
1363 self.test_34_check_topo_ROADMA_SRG1()
1364 self.test_35_check_topo_ROADMA_DEG2()
1366 def test_49_loop_create_eth_service(self):
1367 for i in range(1, 6):
1368 print("iteration number {}".format(i))
1369 print("eth service creation")
1370 self.test_11_create_eth_service1()
1371 print("check xc in ROADM-A1")
1372 self.test_13_check_xc1_ROADMA()
1373 print("check xc in ROADM-C1")
1374 self.test_14_check_xc1_ROADMC()
1375 print("eth service deletion\n")
1376 self.test_30_delete_eth_service1()
1378 def test_50_loop_create_oc_service(self):
1379 url = ("{}/operational/org-openroadm-service:service-list/services/service1"
1380 .format(test_utils.RESTCONF_BASE_URL))
1381 response = requests.request("GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1382 if response.status_code != 404:
1383 url = ("{}/operations/org-openroadm-service:service-delete"
1384 .format(test_utils.RESTCONF_BASE_URL))
1386 "sdnc-request-header": {
1387 "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
1388 "rpc-action": "service-delete",
1389 "request-system-id": "appname",
1390 "notification-url": "http://localhost:8585/NotificationServer/notify"
1392 "service-delete-req-info": {
1393 "service-name": "service1",
1394 "tail-retention": "no"
1398 requests.request("POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
1399 auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
1402 for i in range(1, 6):
1403 print("iteration number {}".format(i))
1404 print("oc service creation")
1405 self.test_36_create_oc_service1()
1406 print("check xc in ROADM-A1")
1407 self.test_38_check_xc1_ROADMA()
1408 print("check xc in ROADM-C1")
1409 self.test_39_check_xc1_ROADMC()
1410 print("oc service deletion\n")
1411 self.test_44_delete_oc_service1()
1413 def test_51_disconnect_XPDRA(self):
1414 response = test_utils.unmount_device("XPDR-A1")
1415 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1417 def test_52_disconnect_XPDRC(self):
1418 response = test_utils.unmount_device("XPDR-C1")
1419 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1421 def test_53_disconnect_ROADMA(self):
1422 response = test_utils.unmount_device("ROADM-A1")
1423 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1425 def test_54_disconnect_ROADMC(self):
1426 response = test_utils.unmount_device("ROADM-C1")
1427 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
1430 if __name__ == "__main__":
1431 unittest.main(verbosity=2)