+ # test service-create for 100GE service 3 from xpdra2 client1 to xpdrc2 client2
+ def test_055_create_100GE_service_3(self):
+ self.cr_serv_input_data["service-name"] = "service-100GE3"
+ self.cr_serv_input_data["connection-type"] = "service"
+ self.cr_serv_input_data["service-a-end"]["service-rate"] = "100"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "Ethernet"
+ self.cr_serv_input_data["service-a-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
+ self.cr_serv_input_data["service-a-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT1"
+ self.cr_serv_input_data["service-z-end"]["service-rate"] = "100"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "Ethernet"
+ self.cr_serv_input_data["service-z-end"]["tx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
+ self.cr_serv_input_data["service-z-end"]["rx-direction"][0]["port"]["port-name"] = "XPDR2-CLIENT2"
+ response = test_utils.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('PCE calculation in progress',
+ response['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
+
+ def test_056_get_100GE_service_3(self):
+ response = test_utils.get_ordm_serv_list_attr_request("services", "service-100GE3")
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(response['services'][0]['service-name'], 'service-100GE3')
+ self.assertEqual(response['services'][0]['connection-type'], 'service')
+ self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(1)
+
+ def test_057_check_service_list(self):
+ response = test_utils.get_ordm_serv_list_request()
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['service-list']['services']), 3)
+
+ def test_058_check_interface_100GE_CLIENT_xpdra2(self):
+ response = test_utils.check_node_attribute_request(
+ 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ETHERNET-100G')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-CLIENT1-ETHERNET-100G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
+ 'type': 'org-openroadm-interfaces:ethernetCsmacd',
+ 'supporting-port': 'C1'
+ }
+ input_dict_2 = {'speed': 100000}
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(
+ dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
+ response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+
+ def test_059_check_interface_ODU4_CLIENT_xpdra2(self):
+ response = test_utils.check_node_attribute_request(
+ 'XPDR-A2', 'interface', 'XPDR2-CLIENT1-ODU4:service-100GE3')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE3',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/1/1-PLUG-CLIENT',
+ 'supporting-interface-list': 'XPDR2-CLIENT1-ETHERNET-100GE3',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'C1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU4',
+ 'monitoring-mode': 'terminated'}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '07', 'exp-payload-type': '07'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ def test_060_check_interface_ODU4_NETWORK_xpdra2(self):
+ response = test_utils.check_node_attribute_request(
+ 'XPDR-A2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE3')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE3',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'L1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU4',
+ 'monitoring-mode': 'not-terminated'}
+ input_dict_3 = {'trib-port-number': 1}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(dict(input_dict_3,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
+ self.assertIn('1.1', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ ['opucn-trib-slots'])
+ self.assertIn('1.20', response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ ['opucn-trib-slots'])
+
+ def test_061_check_ODU4_connection_xpdra2(self):
+ response = test_utils.check_node_attribute_request(
+ 'XPDR-A2', 'odu-connection', 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {
+ 'connection-name':
+ 'XPDR2-CLIENT1-ODU4-x-XPDR2-NETWORK1-ODU4',
+ 'direction': 'bidirectional'
+ }
+
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
+ self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE3'},
+ response['odu-connection'][0]['destination'])
+ self.assertDictEqual({'src-if': 'XPDR2-CLIENT1-ODU4:service-100GE3'},
+ response['odu-connection'][0]['source'])
+
+ def test_062_check_interface_100GE_CLIENT_xpdrc2(self):
+ response = test_utils.check_node_attribute_request(
+ 'XPDR-C2', 'interface', 'XPDR2-CLIENT2-ETHERNET-100G')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-CLIENT2-ETHERNET-100G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/1/2-PLUG-CLIENT',
+ 'type': 'org-openroadm-interfaces:ethernetCsmacd',
+ 'supporting-port': 'C1'
+ }
+ input_dict_2 = {'speed': 100000}
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(
+ dict(input_dict_2, **response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet']),
+ response['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+
+ def test_063_check_interface_ODU4_CLIENT_xpdrc2(self):
+ response = test_utils.check_node_attribute_request(
+ 'XPDR-C2', 'interface', 'XPDR2-CLIENT2-ODU4:service-100GE3')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-CLIENT1-ODU4:service-100GE3',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/1/2-PLUG-CLIENT',
+ 'supporting-interface-list': 'XPDR2-CLIENT2-ETHERNET-100GE3',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'C1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU4',
+ 'monitoring-mode': 'terminated'}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {'payload-type': '07', 'exp-payload-type': '07'},
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ def test_064_check_interface_ODU4_NETWORK_xpdrc2(self):
+ response = test_utils.check_node_attribute_request(
+ 'XPDR-C2', 'interface', 'XPDR2-NETWORK1-ODU4:service-100GE3')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {'name': 'XPDR2-NETWORK1-ODU4:service-100GE3',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': '1/2/2-PLUG-NET',
+ 'supporting-interface-list': 'XPDR2-NETWORK1-ODUC4',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'C1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU4',
+ 'monitoring-mode': 'not-terminated'}
+
+ input_dict_3 = {'trib-port-number': 1}
+
+ self.assertDictEqual(dict(input_dict_1, **response['interface'][0]),
+ response['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(dict(input_dict_3,
+ **response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']),
+ response['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation'])
+ self.assertIn('1.1',
+ response['interface'][0][
+ 'org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']['opucn-trib-slots'])
+ self.assertIn('1.20',
+ response['interface'][0][
+ 'org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']['opucn-trib-slots'])
+
+ def test_065_check_ODU4_connection_xpdrc2(self):
+ response = test_utils.check_node_attribute_request(
+ 'XPDR-C2', 'odu-connection', 'XPDR2-CLIENT2-ODU4-x-XPDR2-NETWORK1-ODU4')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ input_dict_1 = {
+ 'connection-name':
+ 'XPDR2-CLIENT2-ODU4-x-XPDR2-NETWORK1-ODU4',
+ 'direction': 'bidirectional'
+ }
+
+ self.assertDictEqual(dict(input_dict_1, **response['odu-connection'][0]),
+ response['odu-connection'][0])
+ self.assertDictEqual({'dst-if': 'XPDR2-NETWORK1-ODU4:service-100GE3'},
+ response['odu-connection'][0]['destination'])
+ self.assertDictEqual({'src-if': 'XPDR2-CLIENT2-ODU4:service-100GE3'},
+ response['odu-connection'][0]['source'])
+
+ def test_066_check_otn_topo_links(self):
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 4)
+ for link in response['network'][0]['ietf-network-topology:link']:
+ linkId = link['link-id']
+ if (linkId in ('ODUC4-XPDR-A2-XPDR2-XPDR2-NETWORK1toXPDR-C2-XPDR2-XPDR2-NETWORK1',
+ 'ODUC4-XPDR-C2-XPDR2-XPDR2-NETWORK1toXPDR-A2-XPDR2-XPDR2-NETWORK1')):
+ self.assertEqual(
+ link['org-openroadm-otn-network-topology:available-bandwidth'], 300000)
+ self.assertEqual(
+ link['org-openroadm-otn-network-topology:used-bandwidth'], 100000)
+
+ def test_067_check_otn_topo_tp(self):
+ response = test_utils.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ for node in response['network'][0]['node']:
+ if node['node-id'] in ('XPDR-A2-XPDR2', 'XPDR-C2-XPDR2'):
+ tpList = node['ietf-network-topology:termination-point']
+ for tp in tpList:
+ if tp['tp-id'] == 'XPDR2-NETWORK1':
+ xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
+ self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 60)
+ tsPoolList = list(range(1, 20))
+ self.assertNotIn(tsPoolList, xpdrTpPortConAt['ts-pool'])
+ self.assertEqual(len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 3)
+ self.assertNotIn(1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
+
+ def test_068_delete_100GE_service_3(self):
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service-100GE3"
+ response = test_utils.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Renderer service delete in progress',
+ response['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
+
+ def test_069_delete_ODUC4_service(self):