import requests
import sys
sys.path.append('transportpce_tests/common/')
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEPortMappingTesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(10)
def test_01_rdm_device_connection(self):
- response = test_utils.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_02_rdm_device_connected(self):
- response = test_utils.get_netconf_oper_request("ROADMA01")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("ROADMA01")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
time.sleep(10)
def test_03_rdm_portmapping_info(self):
- response = test_utils.portmapping_request("ROADMA01/node-info")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("ROADMA01")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
- {'node-info': {'node-type': 'rdm',
- 'node-ip-address': '127.0.0.12',
- 'node-clli': 'NodeA',
- 'openroadm-version': '1.2.1',
- 'node-vendor': 'vendorA',
- 'node-model': '2'}},
- res)
+ {'node-type': 'rdm',
+ 'node-ip-address': '127.0.0.12',
+ 'node-clli': 'NodeA',
+ 'openroadm-version': '1.2.1',
+ 'node-vendor': 'vendorA',
+ 'node-model': '2'},
+ response['node-info'])
time.sleep(3)
def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
- response = test_utils.portmapping_request("ROADMA01/mapping/DEG1-TTP-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("ROADMA01", "DEG1-TTP-TXRX")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_05_rdm_portmapping_SRG1_PP7_TXRX(self):
- response = test_utils.portmapping_request("ROADMA01/mapping/SRG1-PP7-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("ROADMA01", "SRG1-PP7-TXRX")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
'logical-connection-point': 'SRG1-PP7-TXRX', 'port-direction': 'bidirectional',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_06_rdm_portmapping_SRG3_PP1_TXRX(self):
- response = test_utils.portmapping_request("ROADMA01/mapping/SRG3-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("ROADMA01", "SRG3-PP1-TXRX")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_07_xpdr_device_connection(self):
- response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_08_xpdr_device_connected(self):
- response = test_utils.get_netconf_oper_request("XPDRA01")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("XPDRA01")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
time.sleep(10)
def test_09_xpdr_portmapping_info(self):
- response = test_utils.portmapping_request("XPDRA01/node-info")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDRA01")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
- {'node-info': {'node-type': 'xpdr',
- 'node-ip-address': '127.0.0.10',
- 'node-clli': 'NodeA',
- 'openroadm-version': '1.2.1',
- 'node-vendor': 'vendorA',
- 'node-model': '1'}},
- res)
+ {'node-type': 'xpdr',
+ 'node-ip-address': '127.0.0.10',
+ 'node-clli': 'NodeA',
+ 'openroadm-version': '1.2.1',
+ 'node-vendor': 'vendorA',
+ 'node-model': '1'},
+ response['node-info'])
time.sleep(3)
def test_10_xpdr_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("XPDRA01/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
'logical-connection-point': 'XPDR1-NETWORK1', 'port-direction': 'bidirectional',
'connection-map-lcp': 'XPDR1-CLIENT1', 'port-qual': 'xpdr-network',
'lcp-hash-val': 'OSvMgUyP+mE=',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_11_xpdr_portmapping_NETWORK2(self):
- response = test_utils.portmapping_request("XPDRA01/mapping/XPDR1-NETWORK2")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-NETWORK2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
'connection-map-lcp': 'XPDR1-CLIENT3', 'port-qual': 'xpdr-network',
'lcp-hash-val': 'OSvMgUyP+mI=',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_12_xpdr_portmapping_CLIENT1(self):
- response = test_utils.portmapping_request("XPDRA01/mapping/XPDR1-CLIENT1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-CLIENT1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'C1',
'supporting-circuit-pack-name': '1/0/C1-PLUG-CLIENT',
'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
'lcp-hash-val': 'AO9UFkY/TLYw',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_13_xpdr_portmapping_CLIENT2(self):
- response = test_utils.portmapping_request("XPDRA01/mapping/XPDR1-CLIENT2")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-CLIENT2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'C2',
'supporting-circuit-pack-name': '1/0/C2-PLUG-CLIENT',
'port-qual': 'xpdr-client',
'lcp-hash-val': 'AO9UFkY/TLYz',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_14_xpdr_portmapping_CLIENT3(self):
- response = test_utils.portmapping_request("XPDRA01/mapping/XPDR1-CLIENT3")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-CLIENT3")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'C3',
'supporting-circuit-pack-name': '1/0/C3-PLUG-CLIENT',
'connection-map-lcp': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
'port-qual': 'xpdr-client', 'lcp-hash-val': 'AO9UFkY/TLYy',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_15_xpdr_portmapping_CLIENT4(self):
- response = test_utils.portmapping_request("XPDRA01/mapping/XPDR1-CLIENT4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDRA01", "XPDR1-CLIENT4")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'C4',
'supporting-circuit-pack-name': '1/0/C4-PLUG-CLIENT',
'logical-connection-point': 'XPDR1-CLIENT4', 'port-direction': 'bidirectional',
'port-qual': 'xpdr-client', 'lcp-hash-val': 'AO9UFkY/TLY1',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_16_xpdr_device_disconnection(self):
- response = test_utils.unmount_device("XPDRA01")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDRA01")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_17_xpdr_device_disconnected(self):
- response = test_utils.get_netconf_oper_request("XPDRA01")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.check_device_connection("XPDRA01")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['connection-status'])
def test_18_xpdr_device_not_connected(self):
- response = test_utils.portmapping_request("XPDRA01")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDRA01")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['node-info'])
def test_19_rdm_device_disconnection(self):
- response = test_utils.unmount_device("ROADMA01")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADMA01")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_20_rdm_device_disconnected(self):
- response = test_utils.get_netconf_oper_request("ROADMA01")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.check_device_connection("ROADMA01")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['connection-status'])
def test_21_rdm_device_not_connected(self):
- response = test_utils.portmapping_request("ROADMA01")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("ROADMA01")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['node-info'])
if __name__ == "__main__":
import requests
import sys
sys.path.append('transportpce_tests/common/')
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
# Connect the ROADMA
def test_01_connect_rdm(self):
- response = test_utils.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADMA01", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
# Verify the termination points of the ROADMA
def test_02_compare_Openroadm_topology_portmapping_rdm(self):
- responseTopo = test_utils.get_ordm_topo_request("")
+ responseTopo = test_utils_rfc8040.get_request(test_utils_rfc8040.URL_CONFIG_ORDM_TOPO)
resTopo = responseTopo.json()
- nbNode = len(resTopo['network'][0]['node'])
- for i in range(0, nbNode):
- nodeId = resTopo['network'][0]['node'][i]['node-id']
+ firstEntry = resTopo['ietf-network:network'][0]['node']
+ for i in range(0, len(firstEntry)):
+ nodeId = firstEntry[i]['node-id']
nodeMapId = nodeId.split("-")[0]
- test_utils.portmapping_request(nodeMapId)
- nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
+ response = test_utils_rfc8040.get_portmapping_node_info(nodeMapId)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nbTp = len(firstEntry[i]['ietf-network-topology:termination-point'])
for j in range(0, nbTp):
- tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
+ tpId = firstEntry[i]['ietf-network-topology:termination-point'][j]['tp-id']
if((not "CP" in tpId) and (not "CTP" in tpId)):
- test_utils.portmapping_request(nodeMapId+"/mapping/"+tpId)
+ response2 = test_utils_rfc8040.portmapping_request(nodeMapId, tpId)
+ self.assertEqual(response2['status_code'], requests.codes.ok)
# Disconnect the ROADMA
def test_03_disconnect_rdm(self):
- response = test_utils.unmount_device("ROADMA01")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADMA01")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
# #Connect the XPDRA
def test_04_connect_xpdr(self):
- response = test_utils.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("XPDRA01", ('xpdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
# #Verify the termination points related to XPDR
def test_05_compare_Openroadm_topology_portmapping_xpdr(self):
# Disconnect the XPDRA
def test_06_disconnect_device(self):
- response = test_utils.unmount_device("XPDRA01")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDRA01")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":
import requests
import sys
sys.path.append('transportpce_tests/common/')
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEPortMappingTesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(10)
def test_01_rdm_device_connection(self):
- response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_02_rdm_device_connected(self):
- response = test_utils.get_netconf_oper_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("ROADM-A1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
time.sleep(10)
def test_03_rdm_portmapping_info(self):
- response = test_utils.portmapping_request("ROADM-A1/node-info")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("ROADM-A1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
- {'node-info': {'node-type': 'rdm',
- 'node-ip-address': '127.0.0.11',
- 'node-clli': 'NodeA',
- 'openroadm-version': '2.2.1',
- 'node-vendor': 'vendorA',
- 'node-model': 'model2'}},
- res)
+ {'node-type': 'rdm',
+ 'node-ip-address': '127.0.0.11',
+ 'node-clli': 'NodeA',
+ 'openroadm-version': '2.2.1',
+ 'node-vendor': 'vendorA',
+ 'node-model': 'model2'},
+ response['node-info'])
time.sleep(3)
def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
- response = test_utils.portmapping_request("ROADM-A1/mapping/DEG1-TTP-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("ROADM-A1", "DEG1-TTP-TXRX")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_05_rdm_portmapping_DEG2_TTP_TXRX_with_ots_oms(self):
- response = test_utils.portmapping_request("ROADM-A1/mapping/DEG2-TTP-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("ROADM-A1", "DEG2-TTP-TXRX")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
'logical-connection-point': 'DEG2-TTP-TXRX',
'supporting-oms': 'OMS-DEG2-TTP-TXRX', 'supporting-ots': 'OTS-DEG2-TTP-TXRX',
'port-direction': 'bidirectional',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_06_rdm_portmapping_SRG1_PP3_TXRX(self):
- response = test_utils.portmapping_request("ROADM-A1/mapping/SRG1-PP3-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("ROADM-A1", "SRG1-PP3-TXRX")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_07_rdm_portmapping_SRG3_PP1_TXRX(self):
- response = test_utils.portmapping_request("ROADM-A1/mapping/SRG3-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("ROADM-A1", "SRG3-PP1-TXRX")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_08_xpdr_device_connection(self):
- response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_09_xpdr_device_connected(self):
- response = test_utils.get_netconf_oper_request("XPDR-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("XPDR-A1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
time.sleep(10)
def test_10_xpdr_portmapping_info(self):
- response = test_utils.portmapping_request("XPDR-A1/node-info")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
- {'node-info': {'node-type': 'xpdr',
- 'node-ip-address': '1.2.3.4',
- 'node-clli': 'NodeA',
- 'openroadm-version': '2.2.1',
- 'node-vendor': 'vendorA',
- 'node-model': 'model2'}},
- res)
+ {'node-type': 'xpdr',
+ 'node-ip-address': '1.2.3.4',
+ 'node-clli': 'NodeA',
+ 'openroadm-version': '2.2.1',
+ 'node-vendor': 'vendorA',
+ 'node-model': 'model2'},
+ response['node-info'])
time.sleep(3)
def test_11_xpdr_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("XPDR-A1/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDR-A1", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
'lcp-hash-val': 'AMkDwQ7xTmRI',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_12_xpdr_portmapping_NETWORK2(self):
- response = test_utils.portmapping_request("XPDR-A1/mapping/XPDR1-NETWORK2")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDR-A1", "XPDR1-NETWORK2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
'connection-map-lcp': 'XPDR1-CLIENT2', 'port-qual': 'xpdr-network',
'lcp-hash-val': 'AMkDwQ7xTmRL',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_13_xpdr_portmapping_CLIENT1(self):
- response = test_utils.portmapping_request("XPDR-A1/mapping/XPDR1-CLIENT1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDR-A1", "XPDR1-CLIENT1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
'supporting-port': 'C1',
'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
'lcp-hash-val': 'AJUUr6I5fALj',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_14_xpdr_portmapping_CLIENT2(self):
- response = test_utils.portmapping_request("XPDR-A1/mapping/XPDR1-CLIENT2")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDR-A1", "XPDR1-CLIENT2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
'supporting-port': 'C1',
'connection-map-lcp': 'XPDR1-NETWORK2', 'port-qual': 'xpdr-client',
'lcp-hash-val': 'AJUUr6I5fALg',
'port-admin-state': 'InService', 'port-oper-state': 'InService'},
- res['mapping'])
+ response['mapping'])
def test_15_xpdr_device_disconnection(self):
- response = test_utils.unmount_device("XPDR-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDR-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_16_xpdr_device_disconnected(self):
- response = test_utils.get_netconf_oper_request("XPDR-A1")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.check_device_connection("XPDR-A1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['connection-status'])
def test_17_xpdr_device_not_connected(self):
- response = test_utils.portmapping_request("XPDR-A1")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['node-info'])
def test_18_rdm_device_disconnection(self):
- response = test_utils.unmount_device("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_19_rdm_device_disconnected(self):
- response = test_utils.get_netconf_oper_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.check_device_connection("ROADM-A1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['connection-status'])
def test_20_rdm_device_not_connected(self):
- response = test_utils.portmapping_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("ROADM-A1")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['node-info'])
if __name__ == "__main__":
import requests
import sys
sys.path.append('transportpce_tests/common/')
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION), ('roadma', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
# Connect the ROADMA
def test_01_connect_rdm(self):
- response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
# Verify the termination points of the ROADMA
def test_02_compareOpenroadmTopologyPortMapping_rdm(self):
- responseTopo = test_utils.get_ordm_topo_request("")
+ responseTopo = test_utils_rfc8040.get_request(test_utils_rfc8040.URL_CONFIG_ORDM_TOPO)
resTopo = responseTopo.json()
- nbNode = len(resTopo['network'][0]['node'])
+ firstEntry = resTopo['ietf-network:network'][0]['node']
nbMapCumul = 0
nbMappings = 0
- for i in range(0, nbNode):
- nodeId = resTopo['network'][0]['node'][i]['node-id']
+ for i in range(0, len(firstEntry)):
+ nodeId = firstEntry[i]['node-id']
print("nodeId={}".format(nodeId))
nodeMapId = nodeId.split("-")[0] + "-" + nodeId.split("-")[1]
print("nodeMapId={}".format(nodeMapId))
- responseMapList = test_utils.portmapping_request(nodeMapId)
- resMapList = responseMapList.json()
-
- nbMappings = len(resMapList['nodes'][0]['mapping']) - nbMapCumul
- nbTp = len(resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'])
+ response = test_utils_rfc8040.get_portmapping_node_info(nodeMapId)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ responseMapList = test_utils_rfc8040.get_portmapping(nodeMapId)
+ nbMappings = len(responseMapList['nodes'][0]['mapping']) - nbMapCumul
+ nbTp = len(firstEntry[i]['ietf-network-topology:termination-point'])
nbMapCurrent = 0
for j in range(0, nbTp):
- tpId = resTopo['network'][0]['node'][i]['ietf-network-topology:termination-point'][j]['tp-id']
+ tpId = firstEntry[i]['ietf-network-topology:termination-point'][j]['tp-id']
if (not "CP" in tpId) and (not "CTP" in tpId):
- responseMap = test_utils.portmapping_request(nodeMapId+"/mapping/"+tpId)
- self.assertEqual(responseMap.status_code, requests.codes.ok)
- if responseMap.status_code == requests.codes.ok:
+ responseMap = test_utils_rfc8040.portmapping_request(nodeMapId, tpId)
+ self.assertEqual(responseMap['status_code'], requests.codes.ok)
+ if responseMap['status_code'] == requests.codes.ok:
nbMapCurrent += 1
nbMapCumul += nbMapCurrent
nbMappings -= nbMapCurrent
# Disconnect the ROADMA
def test_03_disconnect_rdm(self):
- response = test_utils.unmount_device("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
# #Connect the XPDRA
def test_04_connect_xpdr(self):
- response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
# #Verify the termination points related to XPDR
def test_05_compareOpenroadmTopologyPortMapping_xpdr(self):
# Disconnect the XPDRA
def test_06_disconnect_device(self):
- response = test_utils.unmount_device("XPDR-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDR-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":
import requests
import sys
sys.path.append('transportpce_tests/common')
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCE400GPortMappingTesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('xpdra2', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra2', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(10)
def test_01_xpdr_device_connection(self):
- response = test_utils.mount_device("XPDR-A2",
- ('xpdra2', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("XPDR-A2",
+ ('xpdra2', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created,
- test_utils.CODE_SHOULD_BE_201)
+ test_utils_rfc8040.CODE_SHOULD_BE_201)
# Check if the node appears in the ietf-network topology
def test_02_xpdr_device_connected(self):
- response = test_utils.get_netconf_oper_request("XPDR-A2")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['node'][0]['netconf-node-topology:connection-status'],
- 'connected')
+ response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['connection-status'], 'connected')
time.sleep(10)
# Check node info in the port-mappings
def test_03_xpdr_portmapping_info(self):
- response = test_utils.portmapping_request("XPDR-A2/node-info")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertEqual(
- {'node-info': {'node-type': 'xpdr',
- 'node-ip-address': '1.2.3.4',
- 'node-clli': 'NodeA',
- 'openroadm-version': '7.1',
- 'node-vendor': 'vendorA',
- 'node-model': 'model'}},
- res)
+ {'node-type': 'xpdr',
+ 'node-ip-address': '1.2.3.4',
+ 'node-clli': 'NodeA',
+ 'openroadm-version': '7.1',
+ 'node-vendor': 'vendorA',
+ 'node-model': 'model'},
+ response['node-info'])
time.sleep(3)
# Check the if-capabilities and the other details for network
def test_04_tpdr_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR1-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR1-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supported-interface-capability':
['org-openroadm-port-types:if-otsi-otsigroup'],
'port-oper-state': 'InService',
'xponder-type': 'tpdr'
},
- res['mapping'])
+ response['mapping'])
def test_05_tpdr_portmapping_CLIENT1(self):
- response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR1-CLIENT1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR1-CLIENT1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supported-interface-capability': ['org-openroadm-port-types:if-400GE'],
'supporting-port': 'C1',
'supporting-circuit-pack-name': '1/1/1-PLUG-CLIENT',
- 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
- 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
+ 'logical-connection-point': 'XPDR1-CLIENT1',
+ 'port-direction': 'bidirectional',
+ 'connection-map-lcp': 'XPDR1-NETWORK1',
+ 'port-qual': 'xpdr-client',
'lcp-hash-val': 'AODABTVSOHH0',
- 'port-admin-state': 'InService', 'port-oper-state': 'InService',
- 'xponder-type': 'tpdr'},
- res['mapping'])
+ 'port-admin-state': 'InService',
+ 'port-oper-state': 'InService',
+ 'xponder-type': 'tpdr'
+ },
+ response['mapping']
+ )
# Check the port-mapping for the switch-client and switch-network port-quals
def test_06_mpdr_portmapping_NETWORK1(self):
- response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-NETWORK1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-NETWORK1")
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn(
{'supported-interface-capability':
['org-openroadm-port-types:if-otsi-otsigroup'],
'port-oper-state': 'InService',
'xponder-type': 'mpdr'
},
- res['mapping'])
+ response['mapping'])
def test_07_mpdr_portmapping_CLIENT1(self):
- response = test_utils.portmapping_request("XPDR-A2/mapping/XPDR2-CLIENT1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ res = test_utils_rfc8040.portmapping_request("XPDR-A2", "XPDR2-CLIENT1")
+ self.assertEqual(res['status_code'], requests.codes.ok)
self.assertIn('org-openroadm-port-types:if-100GE-ODU4',
res['mapping'][0]['supported-interface-capability'])
self.assertIn('org-openroadm-port-types:if-OCH-OTU4-ODU4',
# Added test to check mc-capability-profile for a transponder
def test_08_check_mccapprofile(self):
- response = test_utils.portmapping_request("XPDR-A2/mc-capabilities/XPDR-mcprofile")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ res = test_utils_rfc8040.portmapping_mc_capa_request("XPDR-A2", "XPDR-mcprofile")
+ self.assertEqual(res['status_code'], requests.codes.ok)
self.assertEqual(res['mc-capabilities'][0]['mc-node-name'], 'XPDR-mcprofile')
- self.assertEqual(res['mc-capabilities'][0]['center-freq-granularity'], 3.125)
- self.assertEqual(res['mc-capabilities'][0]['slot-width-granularity'], 6.25)
+ self.assertEqual(res['mc-capabilities'][0]['center-freq-granularity'], '3.125')
+ self.assertEqual(res['mc-capabilities'][0]['slot-width-granularity'], '6.25')
def test_09_xpdr_device_disconnection(self):
- response = test_utils.unmount_device("XPDR-A2")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDR-A2")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_10_xpdr_device_disconnected(self):
- response = test_utils.get_netconf_oper_request("XPDR-A2")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.check_device_connection("XPDR-A2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the "
- "relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['connection-status'])
def test_11_xpdr_device_not_connected(self):
- response = test_utils.portmapping_request("XPDR-A2")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
+ response = test_utils_rfc8040.get_portmapping_node_info("XPDR-A2")
+ self.assertEqual(response['status_code'], requests.codes.conflict)
self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the "
- "relevant data model content does not exist"},
- res['errors']['error'])
+ {"error-tag": "data-missing",
+ "error-message": "Request could not be completed because the relevant data model content does not exist",
+ "error-type": "protocol"},
+ response['node-info'])
if __name__ == '__main__':
--- /dev/null
+#!/usr/bin/env python
+
+##############################################################################
+# Copyright (c) 2021 Orange, Inc. and others. All rights reserved.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# pylint: disable=no-member
+
+import json
+import os
+import sys
+import re
+import signal
+import subprocess
+import time
+
+import psutil
+import requests
+
+import simulators
+
+SIMS = simulators.SIMS
+
+HONEYNODE_OK_START_MSG = "Netconf SSH endpoint started successfully at 0.0.0.0"
+KARAF_OK_START_MSG = re.escape(
+ "Blueprint container for bundle org.opendaylight.netconf.restconf")+".* was successfully created"
+LIGHTY_OK_START_MSG = re.escape("lighty.io and RESTCONF-NETCONF started")
+
+ODL_LOGIN = "admin"
+ODL_PWD = "admin"
+NODES_LOGIN = "admin"
+NODES_PWD = "admin"
+URL_CONFIG_ORDM_TOPO = "{}/data/ietf-network:networks/network=openroadm-topology"
+URL_PORTMAPPING = "{}/data/transportpce-portmapping:network/nodes="
+
+TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
+TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
+
+CODE_SHOULD_BE_200 = 'Http status code should be 200'
+CODE_SHOULD_BE_201 = 'Http status code should be 201'
+
+SIM_LOG_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)), "log")
+
+process_list = []
+
+
+if "USE_ODL_ALT_RESTCONF_PORT" in os.environ:
+ RESTCONF_BASE_URL = "http://localhost:" + os.environ['USE_ODL_ALT_RESTCONF_PORT'] + "/rests"
+else:
+ RESTCONF_BASE_URL = "http://localhost:8181/rests"
+
+if "USE_ODL_ALT_KARAF_INSTALL_DIR" in os.environ:
+ KARAF_INSTALLDIR = os.environ['USE_ODL_ALT_KARAF_INSTALL_DIR']
+else:
+ KARAF_INSTALLDIR = "karaf"
+
+KARAF_LOG = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ "..", "..", "..", KARAF_INSTALLDIR, "target", "assembly", "data", "log", "karaf.log")
+
+if "USE_LIGHTY" in os.environ and os.environ['USE_LIGHTY'] == 'True':
+ TPCE_LOG = 'odl-' + str(os.getpid()) + '.log'
+else:
+ TPCE_LOG = KARAF_LOG
+
+#
+# Basic HTTP operations
+#
+
+
+def get_request(url):
+ return requests.request(
+ "GET", url.format(RESTCONF_BASE_URL),
+ headers=TYPE_APPLICATION_JSON,
+ auth=(ODL_LOGIN, ODL_PWD))
+
+
+def put_request(url, data):
+ return requests.request(
+ "PUT", url.format(RESTCONF_BASE_URL),
+ data=json.dumps(data),
+ headers=TYPE_APPLICATION_JSON,
+ auth=(ODL_LOGIN, ODL_PWD))
+
+
+def delete_request(url):
+ return requests.request(
+ "DELETE", url.format(RESTCONF_BASE_URL),
+ headers=TYPE_APPLICATION_JSON,
+ auth=(ODL_LOGIN, ODL_PWD))
+
+#
+# Process management
+#
+
+
+def start_sims(sims_list):
+ for sim in sims_list:
+ print("starting simulator " + sim[0] + " in OpenROADM device version " + sim[1] + "...")
+ log_file = os.path.join(SIM_LOG_DIRECTORY, SIMS[sim]['logfile'])
+ process = start_honeynode(log_file, sim)
+ if wait_until_log_contains(log_file, HONEYNODE_OK_START_MSG, 100):
+ print("simulator for " + sim[0] + " started")
+ else:
+ print("simulator for " + sim[0] + " failed to start")
+ shutdown_process(process)
+ for pid in process_list:
+ shutdown_process(pid)
+ sys.exit(3)
+ process_list.append(process)
+ return process_list
+
+
+def start_tpce():
+ print("starting OpenDaylight...")
+ if "USE_LIGHTY" in os.environ and os.environ['USE_LIGHTY'] == 'True':
+ process = start_lighty()
+ start_msg = LIGHTY_OK_START_MSG
+ else:
+ process = start_karaf()
+ start_msg = KARAF_OK_START_MSG
+ if wait_until_log_contains(TPCE_LOG, start_msg, time_to_wait=300):
+ print("OpenDaylight started !")
+ else:
+ print("OpenDaylight failed to start !")
+ shutdown_process(process)
+ for pid in process_list:
+ shutdown_process(pid)
+ sys.exit(1)
+ process_list.append(process)
+ return process_list
+
+
+def start_karaf():
+ print("starting KARAF TransportPCE build...")
+ executable = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ "..", "..", "..", KARAF_INSTALLDIR, "target", "assembly", "bin", "karaf")
+ with open('odl.log', 'w') as outfile:
+ return subprocess.Popen(
+ ["sh", executable, "server"], stdout=outfile, stderr=outfile, stdin=None)
+
+
+def start_lighty():
+ print("starting LIGHTY.IO TransportPCE build...")
+ executable = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ "..", "..", "..", "lighty", "target", "tpce",
+ "clean-start-controller.sh")
+ with open(TPCE_LOG, 'w') as outfile:
+ return subprocess.Popen(
+ ["sh", executable], stdout=outfile, stderr=outfile, stdin=None)
+
+
+def install_karaf_feature(feature_name: str):
+ print("installing feature " + feature_name)
+ executable = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)),
+ "..", "..", "..", KARAF_INSTALLDIR, "target", "assembly", "bin", "client")
+ return subprocess.run([executable],
+ input='feature:install ' + feature_name + '\n feature:list | grep '
+ + feature_name + ' \n logout \n',
+ universal_newlines=True, check=False)
+
+
+def shutdown_process(process):
+ if process is not None:
+ for child in psutil.Process(process.pid).children():
+ child.send_signal(signal.SIGINT)
+ child.wait()
+ process.send_signal(signal.SIGINT)
+
+
+def start_honeynode(log_file: str, sim):
+ executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ "..", "..", "honeynode", sim[1], "honeynode-simulator", "honeycomb-tpce")
+ sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
+ "..", "..", "sample_configs", "openroadm", sim[1])
+ if os.path.isfile(executable):
+ with open(log_file, 'w') as outfile:
+ return subprocess.Popen(
+ [executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
+ stdout=outfile, stderr=outfile)
+ return None
+
+
+def wait_until_log_contains(log_file, regexp, time_to_wait=60):
+ # pylint: disable=lost-exception
+ stringfound = False
+ filefound = False
+ line = None
+ try:
+ with TimeOut(seconds=time_to_wait):
+ while not os.path.exists(log_file):
+ time.sleep(0.2)
+ filelogs = open(log_file, 'r')
+ filelogs.seek(0, 2)
+ filefound = True
+ print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
+ compiled_regexp = re.compile(regexp)
+ while True:
+ line = filelogs.readline()
+ if compiled_regexp.search(line):
+ print("Pattern found!", end=' ')
+ stringfound = True
+ break
+ if not line:
+ time.sleep(0.1)
+ except TimeoutError:
+ print("Pattern not found after " + str(time_to_wait), end=" seconds! ", flush=True)
+ except PermissionError:
+ print("Permission Error when trying to access the log file", end=" ... ", flush=True)
+ finally:
+ if filefound:
+ filelogs.close()
+ else:
+ print("log file does not exist or is not accessible... ", flush=True)
+ return stringfound
+
+
+class TimeOut:
+ def __init__(self, seconds=1, error_message='Timeout'):
+ self.seconds = seconds
+ self.error_message = error_message
+
+ def handle_timeout(self, signum, frame):
+ raise TimeoutError(self.error_message)
+
+ def __enter__(self):
+ signal.signal(signal.SIGALRM, self.handle_timeout)
+ signal.alarm(self.seconds)
+
+ def __exit__(self, type, value, traceback):
+ # pylint: disable=W0622
+ signal.alarm(0)
+
+#
+# Basic NetCONF device operations
+#
+
+
+def mount_device(node_id, sim):
+ url = "{}/data/network-topology:network-topology/topology=topology-netconf/node={}"
+ body = {"node": [{
+ "node-id": node_id,
+ "netconf-node-topology:username": NODES_LOGIN,
+ "netconf-node-topology:password": NODES_PWD,
+ "netconf-node-topology:host": "127.0.0.1",
+ "netconf-node-topology:port": SIMS[sim]['port'],
+ "netconf-node-topology:tcp-only": "false",
+ "netconf-node-topology:pass-through": {}}]}
+ response = put_request(url.format('{}', node_id), body)
+ if wait_until_log_contains(TPCE_LOG, re.escape("Triggering notification stream NETCONF for node " + node_id), 180):
+ print("Node " + node_id + " correctly added to tpce topology", end='... ', flush=True)
+ else:
+ print("Node " + node_id + " still not added to tpce topology", end='... ', flush=True)
+ if response.status_code == requests.codes.ok:
+ print("It was probably loaded at start-up", end='... ', flush=True)
+ # TODO an else-clause to abort test would probably be nice here
+ return response
+
+
+def unmount_device(node_id):
+ url = "{}/data/network-topology:network-topology/topology=topology-netconf/node={}"
+ response = delete_request(url.format('{}', node_id))
+ if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: " + node_id), 180):
+ print("Node " + node_id + " correctly deleted from tpce topology", end='... ', flush=True)
+ else:
+ print("Node " + node_id + " still not deleted from tpce topology", end='... ', flush=True)
+ return response
+
+
+def check_device_connection(node: str):
+ url = "{}/data/network-topology:network-topology/topology=topology-netconf/node={}"
+ response = get_request(url.format('{}', node))
+ res = response.json()
+ key = 'network-topology:node'
+ if key in res.keys():
+ connection_status = res[key][0]['netconf-node-topology:connection-status']
+ else:
+ connection_status = res['errors']['error']
+ return {'status_code': response.status_code,
+ 'connection-status': connection_status}
+
+#
+# Portmapping operations
+#
+
+
+def get_portmapping(node: str):
+ url = "{}/data/transportpce-portmapping:network/nodes={}"
+ response = get_request(url.format('{}', node))
+ res = response.json()
+ print(res)
+ nodes = res['transportpce-portmapping:nodes']
+ return {'status_code': response.status_code,
+ 'nodes': nodes}
+
+
+def get_portmapping_node_info(node: str):
+ url = "{}/data/transportpce-portmapping:network/nodes={}/node-info"
+ response = get_request(url.format('{}', node))
+ res = response.json()
+ key = u'transportpce-portmapping:node-info'
+ if key in res.keys():
+ node_info = res[key]
+ else:
+ node_info = res['errors']['error']
+ return {'status_code': response.status_code,
+ 'node-info': node_info}
+
+
+def portmapping_request(node: str, mapping: str):
+ url = "{}/data/transportpce-portmapping:network/nodes={}/mapping={}"
+ response = get_request(url.format('{}', node, mapping))
+ res = response.json()
+ mapping = res['transportpce-portmapping:mapping']
+ return {'status_code': response.status_code,
+ 'mapping': mapping}
+
+
+def portmapping_mc_capa_request(node: str, mc_capa: str):
+ url = "{}/data/transportpce-portmapping:network/nodes={}/mc-capabilities={}"
+ response = get_request(url.format('{}', node, mc_capa))
+ res = response.json()
+ capabilities = res['transportpce-portmapping:mc-capabilities']
+ return {'status_code': response.status_code,
+ 'mc-capabilities': capabilities}