# pylint: disable=wrong-import-position
# pylint: disable=import-error
import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
class TransportPCEtesting(unittest.TestCase):
@classmethod
def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
- ('roadma', cls.NODE_VERSION),
- ('roadmc', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
time.sleep(5)
def test_01_connect_spdrA(self):
- response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_02_connect_spdrC(self):
- response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code,
- requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully',
- res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully',
- res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully',
- res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully',
- res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_09_add_omsAttributes_ROADMA_ROADMC(self):
# Config ROADMA-ROADMC oms-attributes
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request(
+ response = test_utils_rfc8040.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request(
+ response = test_utils_rfc8040.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
# Check correct configuration of devices
def test_14_check_interface_och_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-761:768")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-761:768')
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'type': 'org-openroadm-interfaces:opticalChannel',
'supporting-port': 'CP1-CFP0-P1'
- }, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(
- {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
- 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
- res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
+ }, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual('org-openroadm-common-types:R100G',
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
+ self.assertEqual('dp-qpsk',
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
+ self.assertEqual(196.1,
+ float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
+ self.assertEqual(
+ -5,
+ float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
def test_15_check_interface_OTU4_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'rate': 'org-openroadm-otn-common-types:OTU4',
'fec': 'scfec'
}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(input_dict_2,
- res['interface'][0]
- ['org-openroadm-otn-otu-interfaces:otu'])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_16_check_interface_och_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR1-NETWORK1-761:768")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-761:768')
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertDictEqual(dict({'name': 'XPDR1-NETWORK1-761:768',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'type': 'org-openroadm-interfaces:opticalChannel',
'supporting-port': 'CP1-CFP0-P1'
- }, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(
- {'frequency': 196.1, 'rate': 'org-openroadm-common-types:R100G',
- 'transmit-power': -5, 'modulation-format': 'dp-qpsk'},
- res['interface'][0]['org-openroadm-optical-channel-interfaces:och'])
+ }, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual('org-openroadm-common-types:R100G',
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['rate'])
+ self.assertEqual('dp-qpsk',
+ response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['modulation-format'])
+ self.assertEqual(196.1,
+ float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['frequency']))
+ self.assertEqual(
+ -5,
+ float(response['interface'][0]['org-openroadm-optical-channel-interfaces:och']['transmit-power']))
def test_17_check_interface_OTU4_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-OTU')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-OTU',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'rate': 'org-openroadm-otn-common-types:OTU4',
'fec': 'scfec'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
-
- self.assertDictEqual(input_dict_2,
- res['interface'][0]
- ['org-openroadm-otn-otu-interfaces:otu'])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2, **response['interface'][0]['org-openroadm-otn-otu-interfaces:otu']),
+ response['interface'][0]['org-openroadm-otn-otu-interfaces:otu'])
def test_18_check_no_interface_ODU4_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
- self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_19_check_openroadm_topo_spdra(self):
response = test_utils.get_ordm_topo_request("node/SPDR-SA1-XPDR1")
time.sleep(2)
def test_25_check_interface_ODU4_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'tx-dapi': 'AMf1n5hK6Xkk',
'tx-sapi': 'H/OelLynehI='}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
)
self.assertDictEqual(
{'payload-type': '21', 'exp-payload-type': '21'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_26_check_interface_ODU4_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU4')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU4',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'expected-sapi': 'H/OelLynehI=',
'expected-dapi': 'AMf1n5hK6Xkk'
}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(dict(res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'],
**input_dict_2),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']
)
self.assertDictEqual(
{'payload-type': '21', 'exp-payload-type': '21'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_27_check_otn_topo_links(self):
response = test_utils.get_otn_topo_request()
time.sleep(2)
def test_31_check_interface_10GE_CLIENT_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(input_dict, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 10000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(input_dict, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual(
+ 10000,
+ response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_32_check_interface_ODU2E_CLIENT_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'terminated'}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '03', 'exp-payload-type': '03'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_33_check_interface_ODU2E_NETWORK_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
'monitoring-mode': 'monitored'}
input_dict_3 = {'trib-port-number': 1}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
- self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
+ self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
['trib-slots'])
def test_34_check_ODU2E_connection_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1",
- "odu-connection/XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
'direction': 'bidirectional'
}
- self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_35_check_interface_10GE_CLIENT_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR1-CLIENT1-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR1-CLIENT1-ETHERNET10G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(input_dict, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 10000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(input_dict, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual(
+ 10000,
+ response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_36_check_interface_ODU2E_CLIENT_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR1-CLIENT1-ODU2e")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR1-CLIENT1-ODU2e')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-CLIENT1-ODU2e',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'rate': 'org-openroadm-otn-common-types:ODU2e',
'monitoring-mode': 'terminated'}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '03', 'exp-payload-type': '03'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_37_check_interface_ODU2E_NETWORK_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR1-NETWORK1-ODU2e")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR1-NETWORK1-ODU2e')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR1-NETWORK1-ODU2e',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-CFP0',
input_dict_3 = {'trib-port-number': 1}
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
def test_38_check_ODU2E_connection_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1",
- "odu-connection/XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'odu-connection', 'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR1-CLIENT1-ODU2e-x-XPDR1-NETWORK1-ODU2e',
'direction': 'bidirectional'
}
- self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR1-NETWORK1-ODU2e'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR1-CLIENT1-ODU2e'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_39_check_otn_topo_links(self):
response = test_utils.get_otn_topo_request()
time.sleep(1)
def test_44_check_no_interface_ODU2E_NETWORK_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU2e-service1')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_45_check_no_interface_ODU2E_CLIENT_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-CLIENT1-ODU2e-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ODU2e-service1')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_46_check_no_interface_10GE_CLIENT_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-CLIENT1-ETHERNET10G")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-CLIENT1-ETHERNET10G')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_47_check_otn_topo_links(self):
response = test_utils.get_otn_topo_request()
time.sleep(2)
def test_51_check_no_interface_ODU4_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-ODU4")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-ODU4')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_52_check_otn_topo_links(self):
self.test_22_check_otn_topo_otu4_links()
time.sleep(1)
def test_56_check_no_interface_OTU4_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-OTU")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-OTU')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_57_check_no_interface_OCH_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR1-NETWORK1-1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR1-NETWORK1-1')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_58_getLinks_OtnTopology(self):
response = test_utils.get_otn_topo_request()
time.sleep(3)
def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
- "ROADM-A1", "1", "SRG1-PP2-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully',
- res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
- "ROADM-A1", "1", "SRG1-PP2-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully',
- res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '3', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
- "ROADM-C1", "1", "SRG1-PP2-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully',
- res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
- "ROADM-C1", "1", "SRG1-PP2-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully',
- res["output"]["result"])
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '3', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
def test_66_create_OCH_OTU4_service_2(self):
# pylint: disable=line-too-long
time.sleep(2)
def test_72_check_interface_1GE_CLIENT_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP1-SFP4',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP1-SFP4-P1'
}
- self.assertDictEqual(dict(input_dict, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 1000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(input_dict, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual(
+ 1000,
+ response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_73_check_interface_ODU0_CLIENT_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP3-SFP1',
'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
'rate': 'org-openroadm-otn-common-types:ODU0',
'monitoring-mode': 'terminated'}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '07', 'exp-payload-type': '07'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_74_check_interface_ODU0_NETWORK_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP3-CFP0',
'rate': 'org-openroadm-otn-common-types:ODU0',
'monitoring-mode': 'monitored'}
input_dict_3 = {'trib-port-number': 1}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
- self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
+ self.assertIn(1, response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
['trib-slots'])
def test_75_check_ODU0_connection_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1",
- "odu-connection/XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
'direction': 'bidirectional'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_76_check_interface_1GE_CLIENT_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP3-SFP1',
'type': 'org-openroadm-interfaces:ethernetCsmacd',
'supporting-port': 'CP3-SFP1-P1'
}
- self.assertDictEqual(dict(input_dict, **res['interface'][0]),
- res['interface'][0])
- self.assertDictEqual(
- {'speed': 1000},
- res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+ self.assertDictEqual(dict(input_dict, **response['interface'][0]),
+ response['interface'][0])
+ self.assertEqual(
+ 1000,
+ response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']['speed'])
def test_77_check_interface_ODU0_CLIENT_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR3-CLIENT1-ODU0')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP3-SFP1',
'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
'rate': 'org-openroadm-otn-common-types:ODU0',
'monitoring-mode': 'terminated'}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(
{'payload-type': '07', 'exp-payload-type': '07'},
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
def test_78_check_interface_ODU0_NETWORK_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'interface', 'XPDR3-NETWORK1-ODU0')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0',
'administrative-state': 'inService',
'supporting-circuit-pack-name': 'CP3-CFP0',
'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
'rate': 'org-openroadm-otn-common-types:ODU0',
'monitoring-mode': 'monitored'}
-
input_dict_3 = {'trib-port-number': 1}
-
- self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
- res['interface'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
self.assertDictEqual(dict(input_dict_2,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
self.assertDictEqual(dict(input_dict_3,
- **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']),
- res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation'])
self.assertIn(1,
- res['interface'][0][
+ response['interface'][0][
'org-openroadm-otn-odu-interfaces:odu'][
'parent-odu-allocation']['trib-slots'])
def test_79_check_ODU0_connection_spdrc(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SC1",
- "odu-connection/XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SC1', 'odu-connection', 'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0')
+ self.assertEqual(response['status_code'], requests.codes.ok)
input_dict_1 = {
'connection-name':
'XPDR3-CLIENT1-ODU0-x-XPDR3-NETWORK1-ODU0',
'direction': 'bidirectional'
}
-
- self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
- res['odu-connection'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
self.assertDictEqual({'dst-if': 'XPDR3-NETWORK1-ODU0'},
- res['odu-connection'][0]['destination'])
+ response['odu-connection'][0]['destination'])
self.assertDictEqual({'src-if': 'XPDR3-CLIENT1-ODU0'},
- res['odu-connection'][0]['source'])
+ response['odu-connection'][0]['source'])
def test_80_check_otn_topo_links(self):
response = test_utils.get_otn_topo_request()
time.sleep(1)
def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR3-NETWORK1-ODU0-service1')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ODU0-service1')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_87_check_no_interface_10GE_CLIENT_spdra(self):
- response = test_utils.check_netconf_node_request(
- "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
- self.assertEqual(response.status_code, requests.codes.conflict)
+ response = test_utils_rfc8040.check_node_attribute_request(
+ 'SPDR-SA1', 'interface', 'XPDR3-CLIENT1-ETHERNET1G')
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_88_check_otn_topo_links(self):
response = test_utils.get_otn_topo_request()
self.assertEqual(18, len(links), 'Topology should contain 18 links')
def test_94_disconnect_spdrA(self):
- response = test_utils.unmount_device("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_95_disconnect_spdrC(self):
- response = test_utils.unmount_device("SPDR-SC1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_96_disconnect_roadmA(self):
- response = test_utils.unmount_device("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_97_disconnect_roadmC(self):
- response = test_utils.unmount_device("ROADM-C1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":