class TransportPCEFulltesting(unittest.TestCase):
- cr_serv_sample_data = { "input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url":
- "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service1",
- "common-id": "ASATT1234567",
- "connection-type": "service",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "XPDRA01",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJP8",
+ cr_serv_sample_data = {"input": {
+ "sdnc-request-header": {
+ "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
+ "rpc-action": "service-create",
+ "request-system-id": "appname",
+ "notification-url":
+ "http://localhost:8585/NotificationServer/notify"
+ },
+ "service-name": "service1",
+ "common-id": "ASATT1234567",
+ "connection-type": "service",
+ "service-a-end": {
+ "service-rate": "100",
+ "node-id": "XPDRA01",
+ "service-format": "Ethernet",
+ "clli": "SNJSCAMCJP8",
"tx-direction": {
"port": {
"port-device-name":
"lgx-port-shelf": "00"
}
},
- "rx-direction": {
+ "rx-direction": {
"port": {
"port-device-name":
"ROUTER_SNJSCAMCJP8_000000.00_00",
"lgx-port-shelf": "00"
}
},
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "XPDRC01",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJT4",
+ "optic-type": "gray"
+ },
+ "service-z-end": {
+ "service-rate": "100",
+ "node-id": "XPDRC01",
+ "service-format": "Ethernet",
+ "clli": "SNJSCAMCJT4",
"tx-direction": {
"port": {
"port-device-name":
"lgx-port-shelf": "00"
}
},
- "rx-direction": {
+ "rx-direction": {
"port": {
"port-device-name":
"ROUTER_SNJSCAMCJT4_000000.00_00",
"lgx-port-shelf": "00"
}
},
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
+ "optic-type": "gray"
+ },
+ "due-date": "2016-11-28T00:00:01Z",
+ "operator-contact": "pw1234"
+ }
+ }
processes = None
WAITING = 20
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
def test_09_create_OTS_ROADMA(self):
- url = "{}/operations/transportpce-device-renderer:create-ots-oms"
- data = {
- "input": {
- "node-id": "ROADMA01",
- "logical-connection-point": "DEG1-TTP-TXRX"
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.create_ots_oms_request("ROADMA01", "DEG1-TTP-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Interfaces OTS-DEG1-TTP-TXRX - OMS-DEG1-TTP-TXRX successfully created on node ROADMA01',
res["output"]["result"])
def test_10_create_OTS_ROADMC(self):
- url = "{}/operations/transportpce-device-renderer:create-ots-oms"
- data = {
- "input": {
- "node-id": "ROADMC01",
- "logical-connection-point": "DEG2-TTP-TXRX"
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.create_ots_oms_request("ROADMC01", "DEG2-TTP-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADMC01',
self.assertEqual(5.7, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
def test_17_servicePath_create_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "create",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK1",
- "src-tp": "XPDR1-CLIENT1",
- "node-id": "XPDRA01"
- },
- {
- "dest-tp": "DEG1-TTP-TXRX",
- "src-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADMA01"
- },
- {
- "dest-tp": "SRG1-PP1-TXRX",
- "src-tp": "DEG2-TTP-TXRX",
- "node-id": "ROADMC01"
- },
- {
- "dest-tp": "XPDR1-CLIENT1",
- "src-tp": "XPDR1-NETWORK1",
- "node-id": "XPDRC01"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "test", "1",
+ [{"node-id": "XPDRA01",
+ "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
+ {"node-id": "ROADMA01",
+ "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "ROADMC01",
+ "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG2-TTP-TXRX"},
+ {"node-id": "XPDRC01",
+ "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
time.sleep(10)
def test_18_servicePath_create_ZToA(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "create",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK1",
- "src-tp": "XPDR1-CLIENT1",
- "node-id": "XPDRC01"
- },
- {
- "dest-tp": "DEG2-TTP-TXRX",
- "src-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADMC01"
- },
- {
- "src-tp": "DEG1-TTP-TXRX",
- "dest-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADMA01"
- },
- {
- "src-tp": "XPDR1-NETWORK1",
- "dest-tp": "XPDR1-CLIENT1",
- "node-id": "XPDRA01"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "test", "1",
+ [{"node-id": "XPDRC01",
+ "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
+ {"node-id": "ROADMC01",
+ "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "ROADMA01",
+ "src-tp": "DEG1-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "XPDRA01",
+ "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
def test_29_servicePath_delete_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "delete",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK1",
- "src-tp": "XPDR1-CLIENT1",
- "node-id": "XPDRA01"
- },
- {
- "dest-tp": "DEG1-TTP-TXRX",
- "src-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADMA01"
- },
- {
- "dest-tp": "SRG1-PP1-TXRX",
- "src-tp": "DEG2-TTP-TXRX",
- "node-id": "ROADMC01"
- },
- {
- "dest-tp": "XPDR1-CLIENT1",
- "src-tp": "XPDR1-NETWORK1",
- "node-id": "XPDRC01"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "test", "1",
+ [{"node-id": "XPDRA01",
+ "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
+ {"node-id": "ROADMA01",
+ "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "ROADMC01",
+ "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG2-TTP-TXRX"},
+ {"node-id": "XPDRC01",
+ "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
time.sleep(10)
def test_30_servicePath_delete_ZToA(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "delete",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK1",
- "src-tp": "XPDR1-CLIENT1",
- "node-id": "XPDRC01"
- },
- {
- "dest-tp": "DEG2-TTP-TXRX",
- "src-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADMC01"
- },
- {
- "src-tp": "DEG1-TTP-TXRX",
- "dest-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADMA01"
- },
- {
- "src-tp": "XPDR1-NETWORK1",
- "dest-tp": "XPDR1-CLIENT1",
- "node-id": "XPDRA01"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "test", "1",
+ [{"node-id": "XPDRC01",
+ "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
+ {"node-id": "ROADMC01",
+ "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "ROADMA01",
+ "src-tp": "DEG1-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "XPDRA01",
+ "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
def test_33_servicePath_create_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test2",
- "wave-number": "2",
- "modulation-format": "qpsk",
- "operation": "create",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK2",
- "src-tp": "XPDR1-CLIENT2",
- "node-id": "XPDRA01"
- },
- {
- "dest-tp": "DEG1-TTP-TXRX",
- "src-tp": "SRG1-PP2-TXRX",
- "node-id": "ROADMA01"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "test2", "2",
+ [{"node-id": "XPDRA01",
+ "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
+ {"node-id": "ROADMA01",
+ "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
def test_35_servicePath_delete_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "delete",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK2",
- "src-tp": "XPDR1-CLIENT2",
- "node-id": "XPDRA01"
- },
- {
- "dest-tp": "DEG1-TTP-TXRX",
- "src-tp": "SRG1-PP2-TXRX",
- "node-id": "ROADMA01"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "test", "1",
+ [{"node-id": "XPDRA01",
+ "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
+ {"node-id": "ROADMA01",
+ "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
res['nodes'][0]['mapping'])
def test_05_service_path_create(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {"renderer:input": {
- "renderer:service-name": "service_test",
- "renderer:wave-number": "7",
- "renderer:modulation-format": "qpsk",
- "renderer:operation": "create",
- "renderer:nodes": [
- {"renderer:node-id": "ROADMA01",
- "renderer:src-tp": "SRG1-PP7-TXRX",
- "renderer:dest-tp": "DEG1-TTP-TXRX"},
- {"renderer:node-id": "XPDRA01",
- "renderer:src-tp": "XPDR1-CLIENT1",
- "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "service_test", "7",
+ [{"renderer:node-id": "ROADMA01",
+ "renderer:src-tp": "SRG1-PP7-TXRX", "renderer:dest-tp": "DEG1-TTP-TXRX"},
+ {"renderer:node-id": "XPDRA01",
+ "renderer:src-tp": "XPDR1-CLIENT1", "renderer:dest-tp": "XPDR1-NETWORK1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes: ROADMA01', res["output"]["result"])
self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
def test_14_service_path_delete(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {"renderer:input": {
- "renderer:service-name": "service_test",
- "renderer:wave-number": "7",
- "renderer:operation": "delete",
- "renderer:nodes": [
- {"renderer:node-id": "ROADMA01",
- "renderer:src-tp": "SRG1-PP7-TXRX",
- "renderer:dest-tp": "DEG1-TTP-TXRX"},
- {"renderer:node-id": "XPDRA01",
- "renderer:src-tp": "XPDR1-CLIENT1",
- "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "service_test", "7",
+ [{"renderer:node-id": "ROADMA01",
+ "renderer:src-tp": "SRG1-PP7-TXRX", "renderer:dest-tp": "DEG1-TTP-TXRX"},
+ {"renderer:node-id": "XPDRA01",
+ "renderer:src-tp": "XPDR1-CLIENT1", "renderer:dest-tp": "XPDR1-NETWORK1"}])
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.json(), {
'output': {'result': 'Request processed', 'success': True}})
processes = None
cr_serv_sample_data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service1",
- "common-id": "ASATT1234567",
- "connection-type": "service",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "XPDR-A1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJP8",
+ "sdnc-request-header": {
+ "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
+ "rpc-action": "service-create",
+ "request-system-id": "appname",
+ "notification-url": "http://localhost:8585/NotificationServer/notify"
+ },
+ "service-name": "service1",
+ "common-id": "ASATT1234567",
+ "connection-type": "service",
+ "service-a-end": {
+ "service-rate": "100",
+ "node-id": "XPDR-A1",
+ "service-format": "Ethernet",
+ "clli": "SNJSCAMCJP8",
"tx-direction": {
"port": {
"port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
"lgx-port-shelf": "00"
}
},
- "rx-direction": {
+ "rx-direction": {
"port": {
"port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
"port-type": "router",
"lgx-port-shelf": "00"
}
},
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "XPDR-C1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJT4",
+ "optic-type": "gray"
+ },
+ "service-z-end": {
+ "service-rate": "100",
+ "node-id": "XPDR-C1",
+ "service-format": "Ethernet",
+ "clli": "SNJSCAMCJT4",
"tx-direction": {
"port": {
"port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
"lgx-port-shelf": "00"
}
},
- "rx-direction": {
+ "rx-direction": {
"port": {
"port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
"port-type": "router",
"lgx-port-shelf": "00"
}
},
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
+ "optic-type": "gray"
+ },
+ "due-date": "2016-11-28T00:00:01Z",
+ "operator-contact": "pw1234"
+ }
+ }
WAITING = 20 # nominal value is 300
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
def test_09_create_OTS_ROADMA(self):
- url = "{}/operations/transportpce-device-renderer:create-ots-oms"
- data = {
- "input": {
- "node-id": "ROADM-A1",
- "logical-connection-point": "DEG1-TTP-TXRX"
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.create_ots_oms_request("ROADM-A1", "DEG1-TTP-TXRX")
time.sleep(10)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
res["output"]["result"])
def test_10_create_OTS_ROADMC(self):
- url = "{}/operations/transportpce-device-renderer:create-ots-oms"
- data = {
- "input": {
- "node-id": "ROADM-C1",
- "logical-connection-point": "DEG2-TTP-TXRX"
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.create_ots_oms_request("ROADM-C1", "DEG2-TTP-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Interfaces OTS-DEG2-TTP-TXRX - OMS-DEG2-TTP-TXRX successfully created on node ROADM-C1',
self.assertEqual(17.6, res['org-openroadm-optical-transport-interfaces:ots']['span-loss-receive'])
def test_17_servicePath_create_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "create",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK1",
- "src-tp": "XPDR1-CLIENT1",
- "node-id": "XPDR-A1"
- },
- {
- "dest-tp": "DEG2-TTP-TXRX",
- "src-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADM-A1"
- },
- {
- "dest-tp": "SRG1-PP1-TXRX",
- "src-tp": "DEG1-TTP-TXRX",
- "node-id": "ROADM-C1"
- },
- {
- "dest-tp": "XPDR1-CLIENT1",
- "src-tp": "XPDR1-NETWORK1",
- "node-id": "XPDR-C1"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "test", "1",
+ [{"node-id": "XPDR-A1",
+ "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
+ {"node-id": "ROADM-A1",
+ "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "ROADM-C1",
+ "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG1-TTP-TXRX"},
+ {"node-id": "XPDR-C1",
+ "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
time.sleep(10)
def test_18_servicePath_create_ZToA(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "create",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK1",
- "src-tp": "XPDR1-CLIENT1",
- "node-id": "XPDR-C1"
- },
- {
- "dest-tp": "DEG1-TTP-TXRX",
- "src-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADM-C1"
- },
- {
- "src-tp": "DEG2-TTP-TXRX",
- "dest-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADM-A1"
- },
- {
- "src-tp": "XPDR1-NETWORK1",
- "dest-tp": "XPDR1-CLIENT1",
- "node-id": "XPDR-A1"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "test", "1",
+ [{"node-id": "XPDR-C1",
+ "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
+ {"node-id": "ROADM-C1",
+ "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "ROADM-A1",
+ "src-tp": "DEG2-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "XPDR-A1",
+ "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
self.assertEqual("off", res['roadm-connections'][0]['opticalControlMode'])
def test_29_servicePath_delete_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "delete",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK1",
- "src-tp": "XPDR1-CLIENT1",
- "node-id": "XPDR-A1"
- },
- {
- "dest-tp": "DEG2-TTP-TXRX",
- "src-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADM-A1"
- },
- {
- "dest-tp": "SRG1-PP1-TXRX",
- "src-tp": "DEG1-TTP-TXRX",
- "node-id": "ROADM-C1"
- },
- {
- "dest-tp": "XPDR1-CLIENT1",
- "src-tp": "XPDR1-NETWORK1",
- "node-id": "XPDR-C1"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "test", "1",
+ [{"node-id": "XPDR-A1",
+ "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
+ {"node-id": "ROADM-A1",
+ "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "ROADM-C1",
+ "dest-tp": "SRG1-PP1-TXRX", "src-tp": "DEG1-TTP-TXRX"},
+ {"node-id": "XPDR-C1",
+ "dest-tp": "XPDR1-CLIENT1", "src-tp": "XPDR1-NETWORK1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
time.sleep(10)
def test_30_servicePath_delete_ZToA(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "delete",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK1",
- "src-tp": "XPDR1-CLIENT1",
- "node-id": "XPDR-C1"
- },
- {
- "dest-tp": "DEG1-TTP-TXRX",
- "src-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADM-C1"
- },
- {
- "src-tp": "DEG2-TTP-TXRX",
- "dest-tp": "SRG1-PP1-TXRX",
- "node-id": "ROADM-A1"
- },
- {
- "src-tp": "XPDR1-NETWORK1",
- "dest-tp": "XPDR1-CLIENT1",
- "node-id": "XPDR-A1"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "test", "1",
+ [{"node-id": "XPDR-C1",
+ "dest-tp": "XPDR1-NETWORK1", "src-tp": "XPDR1-CLIENT1"},
+ {"node-id": "ROADM-C1",
+ "dest-tp": "DEG1-TTP-TXRX", "src-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "ROADM-A1",
+ "src-tp": "DEG2-TTP-TXRX", "dest-tp": "SRG1-PP1-TXRX"},
+ {"node-id": "XPDR-A1",
+ "src-tp": "XPDR1-NETWORK1", "dest-tp": "XPDR1-CLIENT1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
def test_33_servicePath_create_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test2",
- "wave-number": "2",
- "modulation-format": "qpsk",
- "operation": "create",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK2",
- "src-tp": "XPDR1-CLIENT2",
- "node-id": "XPDR-A1"
- },
- {
- "dest-tp": "DEG2-TTP-TXRX",
- "src-tp": "SRG1-PP2-TXRX",
- "node-id": "ROADM-A1"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "test2", "2",
+ [{"node-id": "XPDR-A1",
+ "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
+ {"node-id": "ROADM-A1",
+ "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes', res["output"]["result"])
# self.assertEqual(2, res['org-openroadm-optical-channel-interfaces:och']['wavelength-number'])
def test_35_servicePath_delete_AToZ(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {
- "input": {
- "service-name": "test",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "delete",
- "nodes": [
- {
- "dest-tp": "XPDR1-NETWORK2",
- "src-tp": "XPDR1-CLIENT2",
- "node-id": "XPDR-A1"
- },
- {
- "dest-tp": "DEG2-TTP-TXRX",
- "src-tp": "SRG1-PP2-TXRX",
- "node-id": "ROADM-A1"
- }
- ]
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "test", "1",
+ [{"node-id": "XPDR-A1",
+ "dest-tp": "XPDR1-NETWORK2", "src-tp": "XPDR1-CLIENT2"},
+ {"node-id": "ROADM-A1",
+ "dest-tp": "DEG2-TTP-TXRX", "src-tp": "SRG1-PP2-TXRX"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Request processed', res["output"]["result"])
WAITING = 20 # nominal value is 300
cr_serv_sample_data = {"input": {
- "sdnc-request-header": {
- "request-id": "request-1",
- "rpc-action": "service-create",
- "request-system-id": "appname"
- },
- "service-name": "service1-OCH-OTU4",
- "common-id": "commonId",
- "connection-type": "infrastructure",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "SPDR-SA1",
- "service-format": "OTU",
- "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
- "clli": "NodeSA",
- "subrate-eth-sla": {
+ "sdnc-request-header": {
+ "request-id": "request-1",
+ "rpc-action": "service-create",
+ "request-system-id": "appname"
+ },
+ "service-name": "service1-OCH-OTU4",
+ "common-id": "commonId",
+ "connection-type": "infrastructure",
+ "service-a-end": {
+ "service-rate": "100",
+ "node-id": "SPDR-SA1",
+ "service-format": "OTU",
+ "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
+ "clli": "NodeSA",
+ "subrate-eth-sla": {
"subrate-eth-sla": {
"committed-info-rate": "100000",
"committed-burst-size": "64"
}
+ },
+ "tx-direction": {
+ "port": {
+ "port-device-name": "SPDR-SA1-XPDR1",
+ "port-type": "fixed",
+ "port-name": "XPDR1-NETWORK1",
+ "port-rack": "000000.00",
+ "port-shelf": "Chassis#1"
},
- "tx-direction": {
- "port": {
- "port-device-name": "SPDR-SA1-XPDR1",
- "port-type": "fixed",
- "port-name": "XPDR1-NETWORK1",
- "port-rack": "000000.00",
- "port-shelf": "Chassis#1"
- },
- "lgx": {
- "lgx-device-name": "Some lgx-device-name",
- "lgx-port-name": "Some lgx-port-name",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "SPDR-SA1-XPDR1",
- "port-type": "fixed",
- "port-name": "XPDR1-NETWORK1",
- "port-rack": "000000.00",
- "port-shelf": "Chassis#1"
- },
- "lgx": {
- "lgx-device-name": "Some lgx-device-name",
- "lgx-port-name": "Some lgx-port-name",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
+ "lgx": {
+ "lgx-device-name": "Some lgx-device-name",
+ "lgx-port-name": "Some lgx-port-name",
+ "lgx-port-rack": "000000.00",
+ "lgx-port-shelf": "00"
+ }
+ },
+ "rx-direction": {
+ "port": {
+ "port-device-name": "SPDR-SA1-XPDR1",
+ "port-type": "fixed",
+ "port-name": "XPDR1-NETWORK1",
+ "port-rack": "000000.00",
+ "port-shelf": "Chassis#1"
},
- "optic-type": "gray"
+ "lgx": {
+ "lgx-device-name": "Some lgx-device-name",
+ "lgx-port-name": "Some lgx-port-name",
+ "lgx-port-rack": "000000.00",
+ "lgx-port-shelf": "00"
+ }
},
- "service-z-end": {
- "service-rate": "100",
- "node-id": "SPDR-SC1",
- "service-format": "OTU",
- "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
- "clli": "NodeSC",
- "subrate-eth-sla": {
+ "optic-type": "gray"
+ },
+ "service-z-end": {
+ "service-rate": "100",
+ "node-id": "SPDR-SC1",
+ "service-format": "OTU",
+ "otu-service-rate": "org-openroadm-otn-common-types:OTU4",
+ "clli": "NodeSC",
+ "subrate-eth-sla": {
"subrate-eth-sla": {
"committed-info-rate": "100000",
"committed-burst-size": "64"
}
+ },
+ "tx-direction": {
+ "port": {
+ "port-device-name": "SPDR-SC1-XPDR1",
+ "port-type": "fixed",
+ "port-name": "XPDR1-NETWORK1",
+ "port-rack": "000000.00",
+ "port-shelf": "Chassis#1"
},
- "tx-direction": {
- "port": {
- "port-device-name": "SPDR-SC1-XPDR1",
- "port-type": "fixed",
- "port-name": "XPDR1-NETWORK1",
- "port-rack": "000000.00",
- "port-shelf": "Chassis#1"
- },
- "lgx": {
- "lgx-device-name": "Some lgx-device-name",
- "lgx-port-name": "Some lgx-port-name",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "SPDR-SC1-XPDR1",
- "port-type": "fixed",
- "port-name": "XPDR1-NETWORK1",
- "port-rack": "000000.00",
- "port-shelf": "Chassis#1"
- },
- "lgx": {
- "lgx-device-name": "Some lgx-device-name",
- "lgx-port-name": "Some lgx-port-name",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
+ "lgx": {
+ "lgx-device-name": "Some lgx-device-name",
+ "lgx-port-name": "Some lgx-port-name",
+ "lgx-port-rack": "000000.00",
+ "lgx-port-shelf": "00"
+ }
+ },
+ "rx-direction": {
+ "port": {
+ "port-device-name": "SPDR-SC1-XPDR1",
+ "port-type": "fixed",
+ "port-name": "XPDR1-NETWORK1",
+ "port-rack": "000000.00",
+ "port-shelf": "Chassis#1"
},
- "optic-type": "gray"
+ "lgx": {
+ "lgx-device-name": "Some lgx-device-name",
+ "lgx-port-name": "Some lgx-port-name",
+ "lgx-port-rack": "000000.00",
+ "lgx-port-shelf": "00"
+ }
},
+ "optic-type": "gray"
+ },
"due-date": "2018-06-15T00:00:01Z",
"operator-contact": "pw1234"
- }
}
-
-
+ }
@classmethod
def setUpClass(cls):
self.assertEqual(nbNode, 4)
self.assertNotIn('ietf-network-topology:link', res['network'][0])
-
def test_12_create_OCH_OTU4_service(self):
response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res['mapping'])
def test_04_service_path_create_OCH_OTU4(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {"renderer:input": {
- "service-name": "service_OCH_OTU4",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "create",
- "nodes": [
- {"node-id": "SPDR-SA1",
- "dest-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "service_OCH_OTU4", "1",
+ [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
time.sleep(3)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
res['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_08_otn_service_path_create_ODU4(self):
- url = "{}/operations/transportpce-device-renderer:otn-service-path"
- data = {"renderer:input": {
- "service-name": "service_ODU4",
- "operation": "create",
- "service-rate": "100G",
- "service-type": "ODU",
- "nodes": [
- {"node-id": "SPDR-SA1",
- "network-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.otn_service_path_request("create", "service_ODU4", "100G", "ODU",
+ [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
time.sleep(3)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_11_otn_service_path_create_10GE(self):
- url = "{}/operations/transportpce-device-renderer:otn-service-path"
- data = {"renderer:input": {
- "service-name": "service1",
- "operation": "create",
- "service-rate": "10G",
- "service-type": "Ethernet",
- "ethernet-encoding": "eth encode",
- "trib-slot": ["1"],
- "trib-port-number": "1",
- "nodes": [
- {"node-id": "SPDR-SA1",
- "client-tp": "XPDR1-CLIENT1",
- "network-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.otn_service_path_request("create", "service1", "10G", "Ethernet",
+ [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT1",
+ "network-tp": "XPDR1-NETWORK1"}],
+ {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
time.sleep(3)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
res['odu-connection'][0]['source'])
def test_16_otn_service_path_delete_10GE(self):
- url = "{}/operations/transportpce-device-renderer:otn-service-path"
- data = {"renderer:input": {
- "service-name": "service1",
- "operation": "delete",
- "service-rate": "10G",
- "service-type": "Ethernet",
- "ethernet-encoding": "eth encode",
- "trib-slot": ["1"],
- "trib-port-number": "1",
- "nodes": [
- {"node-id": "SPDR-SA1",
- "client-tp": "XPDR1-CLIENT1",
- "network-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.otn_service_path_request("delete", "service1", "10G", "Ethernet",
+ [{"node-id": "SPDR-SA1", "client-tp": "XPDR1-CLIENT1",
+ "network-tp": "XPDR1-NETWORK1"}],
+ {"ethernet-encoding": "eth encode", "trib-slot": ["1"], "trib-port-number": "1"})
time.sleep(3)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(response.status_code, requests.codes.not_found)
def test_21_otn_service_path_delete_ODU4(self):
- url = "{}/operations/transportpce-device-renderer:otn-service-path"
- data = {"renderer:input": {
- "service-name": "service_ODU4",
- "operation": "delete",
- "service-rate": "100G",
- "service-type": "ODU",
- "nodes": [
- {"node-id": "SPDR-SA1",
- "network-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.otn_service_path_request("delete", "service_ODU4", "100G", "ODU",
+ [{"node-id": "SPDR-SA1", "network-tp": "XPDR1-NETWORK1"}])
time.sleep(3)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(response.status_code, requests.codes.not_found)
def test_23_service_path_delete_OCH_OTU4(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {"renderer:input": {
- "service-name": "service_OTU4",
- "wave-number": "1",
- "modulation-format": "qpsk",
- "operation": "delete",
- "nodes": [
- {"node-id": "SPDR-SA1",
- "dest-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "service_OCH_OTU4", "1",
+ [{"node-id": "SPDR-SA1", "dest-tp": "XPDR1-NETWORK1"}])
time.sleep(3)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
res['nodes'][0]['mapping'])
def test_05_service_path_create(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {"renderer:input": {
- "renderer:service-name": "service_test",
- "renderer:wave-number": "7",
- "renderer:modulation-format": "qpsk",
- "renderer:operation": "create",
- "renderer:nodes": [
- {"renderer:node-id": "ROADM-A1",
- "renderer:src-tp": "SRG1-PP3-TXRX",
- "renderer:dest-tp": "DEG1-TTP-TXRX"},
- {"renderer:node-id": "XPDR-A1",
- "renderer:src-tp": "XPDR1-CLIENT1",
- "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("create", "service_test", "7",
+ [{"renderer:node-id": "ROADM-A1",
+ "renderer:src-tp": "SRG1-PP3-TXRX", "renderer:dest-tp": "DEG1-TTP-TXRX"},
+ {"renderer:node-id": "XPDR-A1",
+ "renderer:src-tp": "XPDR1-CLIENT1", "renderer:dest-tp": "XPDR1-NETWORK1"}])
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm-connection successfully created for nodes: ROADM-A1', res["output"]["result"])
self.assertIn('not-reserved-inuse', res['circuit-packs'][0]["equipment-state"])
def test_17_service_path_delete(self):
- url = "{}/operations/transportpce-device-renderer:service-path"
- data = {"renderer:input": {
- "renderer:service-name": "service_test",
- "renderer:wave-number": "7",
- "renderer:operation": "delete",
- "renderer:nodes": [
- {"renderer:node-id": "ROADM-A1",
- "renderer:src-tp": "SRG1-PP3-TXRX",
- "renderer:dest-tp": "DEG1-TTP-TXRX"},
- {"renderer:node-id": "XPDR-A1",
- "renderer:src-tp": "XPDR1-CLIENT1",
- "renderer:dest-tp": "XPDR1-NETWORK1"}]}}
- response = test_utils.post_request(url, data)
+ response = test_utils.service_path_request("delete", "service_test", "7",
+ [{"renderer:node-id": "ROADM-A1",
+ "renderer:src-tp": "SRG1-PP3-TXRX", "renderer:dest-tp": "DEG1-TTP-TXRX"},
+ {"renderer:node-id": "XPDR-A1",
+ "renderer:src-tp": "XPDR1-CLIENT1", "renderer:dest-tp": "XPDR1-NETWORK1"}])
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(response.json(), {
'output': {'result': 'Request processed', 'success': True}})
URL_PORTMAPPING = "{}/config/transportpce-portmapping:network/nodes/"
URL_OPER_SERV_LIST = "{}/operational/org-openroadm-service:service-list/"
URL_SERV_CREATE = "{}/operations/org-openroadm-service:service-create"
+URL_SERVICE_PATH = "{}/operations/transportpce-device-renderer:service-path"
+URL_OTN_SERVICE_PATH = "{}/operations/transportpce-device-renderer:otn-service-path"
+URL_CREATE_OTS_OMS = "{}/operations/transportpce-device-renderer:create-ots-oms"
TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
url = URL_PORTMAPPING + suffix
return get_request(url)
+
def get_service_list_request(suffix: str):
url = URL_OPER_SERV_LIST + suffix
return get_request(url)
+
def service_create_request(attr):
return post_request(URL_SERV_CREATE, attr)
+def service_path_request(operation: str, servicename: str, wavenumber: str, nodes):
+ attr = {"renderer:input": {
+ "renderer:service-name": servicename,
+ "renderer:wave-number": wavenumber,
+ "renderer:modulation-format": "qpsk",
+ "renderer:operation": operation,
+ "renderer:nodes": nodes}}
+ return post_request(URL_SERVICE_PATH, attr)
+
+
+def otn_service_path_request(operation: str, servicename: str, servicerate: str, servicetype: str, nodes, eth_attr=None):
+ attr = {"service-name": servicename,
+ "operation": operation,
+ "service-rate": servicerate,
+ "service-type": servicetype,
+ "nodes": nodes}
+ if eth_attr:
+ attr.update(eth_attr)
+ return post_request(URL_OTN_SERVICE_PATH, {"renderer:input": attr})
+
+
+def create_ots_oms_request(nodeid: str, lcp: str):
+ attr = {"input": {
+ "node-id": nodeid,
+ "logical-connection-point": lcp}}
+ return post_request(URL_CREATE_OTS_OMS, attr)
+
+
def shutdown_process(process):
if process is not None:
for child in psutil.Process(process.pid).children():