+ def test_62_connect_sprdA_3_N1_to_roadmA_PP2(self):
+ response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "3", "1",
+ "ROADM-A1", "1", "SRG1-PP2-TXRX")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Xponder Roadm Link created successfully',
+ res["output"]["result"])
+ time.sleep(2)
+
+ def test_63_connect_roadmA_PP2_to_spdrA_3_N1(self):
+ response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "3", "1",
+ "ROADM-A1", "1", "SRG1-PP2-TXRX")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Roadm Xponder links created successfully',
+ res["output"]["result"])
+ time.sleep(2)
+
+ def test_64_connect_sprdC_3_N1_to_roadmC_PP2(self):
+ response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "3", "1",
+ "ROADM-C1", "1", "SRG1-PP2-TXRX")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Xponder Roadm Link created successfully',
+ res["output"]["result"])
+ time.sleep(2)
+
+ def test_65_connect_roadmC_PP2_to_spdrC_3_N1(self):
+ response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "3", "1",
+ "ROADM-C1", "1", "SRG1-PP2-TXRX")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Roadm Xponder links created successfully',
+ res["output"]["result"])
+ time.sleep(2)
+
+ def test_66_create_OCH_OTU4_service_2(self):
+ self.cr_serv_sample_data["input"]["service-name"] = "service2-OCH-OTU4"
+ self.cr_serv_sample_data["input"]["connection-type"] = "infrastructure"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "100"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OTU"
+ self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "100"
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OTU"
+ self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"] = "org-openroadm-otn-common-types:OTU4"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('PCE calculation in progress',
+ res['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
+
+ def test_67_get_OCH_OTU4_service2(self):
+ response = test_utils.get_service_list_request(
+ "services/service2-OCH-OTU4")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertEqual(
+ res['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(
+ res['services'][0]['service-name'], 'service2-OCH-OTU4')
+ self.assertEqual(
+ res['services'][0]['connection-type'], 'infrastructure')
+ self.assertEqual(
+ res['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(2)
+
+ def test_68_create_ODU4_service_2(self):
+ self.cr_serv_sample_data["input"]["service-name"] = "service2-ODU4"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
+ self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
+ self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-NETWORK1"
+ del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
+
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('PCE calculation in progress',
+ res['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
+
+ def test_69_get_ODU4_service2(self):
+ response = test_utils.get_service_list_request(
+ "services/service2-ODU4")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertEqual(
+ res['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(
+ res['services'][0]['service-name'], 'service2-ODU4')
+ self.assertEqual(
+ res['services'][0]['connection-type'], 'infrastructure')
+ self.assertEqual(
+ res['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(2)
+
+ def test_70_create_1GE_service(self):
+ self.cr_serv_sample_data["input"]["service-name"] = "service1-1GE"
+ self.cr_serv_sample_data["input"]["connection-type"] = "service"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-rate"] = "1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "Ethernet"
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-rate"] = "1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "Ethernet"
+ del self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"]
+ self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-a-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SA1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-a-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
+ del self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"]
+ self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-z-end"]["tx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-device-name"] = "SPDR-SC1-XPDR3"
+ self.cr_serv_sample_data["input"]["service-z-end"]["rx-direction"]["port"]["port-name"] = "XPDR3-CLIENT1"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('PCE calculation in progress',
+ res['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
+
+ def test_71_get_1GE_service1(self):
+ response = test_utils.get_service_list_request("services/service1-1GE")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertEqual(
+ res['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(
+ res['services'][0]['service-name'], 'service1-1GE')
+ self.assertEqual(
+ res['services'][0]['connection-type'], 'service')
+ self.assertEqual(
+ res['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(2)
+
+ def test_72_check_interface_1GE_CLIENT_spdra(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': 'CP1-SFP4',
+ 'type': 'org-openroadm-interfaces:ethernetCsmacd',
+ 'supporting-port': 'CP1-SFP4-P1'
+ }
+ self.assertDictEqual(dict(input_dict, **res['interface'][0]),
+ res['interface'][0])
+ self.assertDictEqual(
+ {u'speed': 1000},
+ res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+
+ def test_73_check_interface_ODU0_CLIENT_spdra(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': 'CP3-SFP1',
+ 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'CP3-SFP1-P1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU0',
+ 'monitoring-mode': 'terminated'}
+
+ self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
+ res['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {u'payload-type': u'07', u'exp-payload-type': u'07'},
+ res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ def test_74_check_interface_ODU0_NETWORK_spdra(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': 'CP3-CFP0',
+ 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'CP3-CFP0-P1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU0',
+ 'monitoring-mode': 'monitored'}
+ input_dict_3 = {'trib-port-number': 1}
+
+ self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
+ res['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(dict(input_dict_3,
+ **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']),
+ res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation'])
+ self.assertIn(1, res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['parent-odu-allocation']
+ ['trib-slots'])
+
+ def test_75_check_ODU0_connection_spdra(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SA1",
+ "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ input_dict_1 = {
+ 'connection-name':
+ 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
+ 'direction': 'bidirectional'
+ }
+
+ self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
+ res['odu-connection'][0])
+ self.assertDictEqual({u'dst-if': u'XPDR3-NETWORK1-ODU0-service1-1GE'},
+ res['odu-connection'][0]['destination'])
+ self.assertDictEqual({u'src-if': u'XPDR3-CLIENT1-ODU0-service1-1GE'},
+ res['odu-connection'][0]['source'])
+
+ def test_76_check_interface_1GE_CLIENT_spdrc(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SC1", "interface/XPDR3-CLIENT1-ETHERNET1G")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ input_dict = {'name': 'XPDR3-CLIENT1-ETHERNET1G',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': 'CP3-SFP1',
+ 'type': 'org-openroadm-interfaces:ethernetCsmacd',
+ 'supporting-port': 'CP3-SFP1-P1'
+ }
+ self.assertDictEqual(dict(input_dict, **res['interface'][0]),
+ res['interface'][0])
+ self.assertDictEqual(
+ {u'speed': 1000},
+ res['interface'][0]['org-openroadm-ethernet-interfaces:ethernet'])
+
+ def test_77_check_interface_ODU0_CLIENT_spdrc(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SC1", "interface/XPDR3-CLIENT1-ODU0-service1-1GE")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ input_dict_1 = {'name': 'XPDR3-CLIENT1-ODU0-service1-1GE',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': 'CP3-SFP1',
+ 'supporting-interface': 'XPDR3-CLIENT1-ETHERNET1G',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'CP3-SFP1-P1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-TTP-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU0',
+ 'monitoring-mode': 'terminated'}
+
+ self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
+ res['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(
+ {u'payload-type': u'07', u'exp-payload-type': u'07'},
+ res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']['opu'])
+
+ def test_78_check_interface_ODU0_NETWORK_spdrc(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SC1", "interface/XPDR3-NETWORK1-ODU0-service1-1GE")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ input_dict_1 = {'name': 'XPDR3-NETWORK1-ODU0-service1-1GE',
+ 'administrative-state': 'inService',
+ 'supporting-circuit-pack-name': 'CP3-CFP0',
+ 'supporting-interface': 'XPDR3-NETWORK1-ODU4',
+ 'type': 'org-openroadm-interfaces:otnOdu',
+ 'supporting-port': 'CP3-CFP0-P1'}
+ input_dict_2 = {
+ 'odu-function': 'org-openroadm-otn-common-types:ODU-CTP',
+ 'rate': 'org-openroadm-otn-common-types:ODU0',
+ 'monitoring-mode': 'monitored'}
+
+ input_dict_3 = {'trib-port-number': 1}
+
+ self.assertDictEqual(dict(input_dict_1, **res['interface'][0]),
+ res['interface'][0])
+ self.assertDictEqual(dict(input_dict_2,
+ **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu']),
+ res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'])
+ self.assertDictEqual(dict(input_dict_3,
+ **res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']),
+ res['interface'][0]['org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation'])
+ self.assertIn(1,
+ res['interface'][0][
+ 'org-openroadm-otn-odu-interfaces:odu'][
+ 'parent-odu-allocation']['trib-slots'])
+
+ def test_79_check_ODU0_connection_spdrc(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SC1",
+ "odu-connection/XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ input_dict_1 = {
+ 'connection-name':
+ 'XPDR3-CLIENT1-ODU0-service1-1GE-x-XPDR3-NETWORK1-ODU0-service1-1GE',
+ 'direction': 'bidirectional'
+ }
+
+ self.assertDictEqual(dict(input_dict_1, **res['odu-connection'][0]),
+ res['odu-connection'][0])
+ self.assertDictEqual({u'dst-if': u'XPDR3-NETWORK1-ODU0-service1-1GE'},
+ res['odu-connection'][0]['destination'])
+ self.assertDictEqual({u'src-if': u'XPDR3-CLIENT1-ODU0-service1-1GE'},
+ res['odu-connection'][0]['source'])
+
+ def test_80_check_otn_topo_links(self):
+ response = test_utils.get_otn_topo_request()
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ nb_links = len(res['network'][0]['ietf-network-topology:link'])
+ self.assertEqual(nb_links, 4)
+ for link in res['network'][0]['ietf-network-topology:link']:
+ linkId = link['link-id']
+ if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
+ 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
+ self.assertEqual(
+ link['org-openroadm-otn-network-topology:available-bandwidth'], 99000)
+ self.assertEqual(
+ link['org-openroadm-otn-network-topology:used-bandwidth'], 1000)
+
+ def test_81_check_otn_topo_tp(self):
+ response = test_utils.get_otn_topo_request()
+ res = response.json()
+ for node in res['network'][0]['node']:
+ if node['node-id'] == 'SPDR-SA1-XPDR3' or 'SPDR-SC1-XPDR3':
+ tpList = node['ietf-network-topology:termination-point']
+ for tp in tpList:
+ if tp['tp-id'] == 'XPDR3-NETWORK1':
+ xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
+ self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 79)
+ tsPoolList = list(range(1, 2))
+ self.assertNotIn(
+ tsPoolList, xpdrTpPortConAt['ts-pool'])
+ self.assertEqual(
+ len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 79)
+ self.assertNotIn(
+ 1, xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool'])
+
+ def test_82_delete_1GE_service(self):
+ response = test_utils.service_delete_request("service1-1GE")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Renderer service delete in progress',
+ res['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
+
+ def test_83_check_service_list(self):
+ response = test_utils.get_service_list_request("")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertEqual(len(res['service-list']['services']), 2)
+ time.sleep(2)
+
+ def test_84_check_no_ODU0_connection_spdra(self):
+ response = test_utils.check_netconf_node_request("SPDR-SA1", "")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertNotIn(['odu-connection'][0], res['org-openroadm-device'])
+ time.sleep(1)
+
+ def test_85_check_no_interface_ODU0_NETWORK_spdra(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SA1", "interface/XPDR3-NETWORK1-ODU0-service1")
+ self.assertEqual(response.status_code, requests.codes.conflict)
+
+ def test_86_check_no_interface_ODU0_CLIENT_spdra(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SA1", "interface/XPDR3-CLIENT1-ODU0-service1")
+ self.assertEqual(response.status_code, requests.codes.conflict)
+
+ def test_87_check_no_interface_10GE_CLIENT_spdra(self):
+ response = test_utils.check_netconf_node_request(
+ "SPDR-SA1", "interface/XPDR3-CLIENT1-ETHERNET1G")
+ self.assertEqual(response.status_code, requests.codes.conflict)
+
+ def test_88_check_otn_topo_links(self):
+ response = test_utils.get_otn_topo_request()
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ nb_links = len(res['network'][0]['ietf-network-topology:link'])
+ self.assertEqual(nb_links, 4)
+ for link in res['network'][0]['ietf-network-topology:link']:
+ linkId = link['link-id']
+ if (linkId in ('ODU4-SPDR-SA1-XPDR3-XPDR3-NETWORK1toSPDR-SC1-XPDR3-XPDR3-NETWORK1',
+ 'ODU4-SPDR-SC1-XPDR3-XPDR3-NETWORK1toSPDR-SA1-XPDR3-XPDR3-NETWORK1')):
+ self.assertEqual(
+ link['org-openroadm-otn-network-topology:available-bandwidth'], 100000)
+ self.assertEqual(
+ link['org-openroadm-otn-network-topology:used-bandwidth'], 0)
+
+ def test_89_check_otn_topo_tp(self):
+ response = test_utils.get_otn_topo_request()
+ res = response.json()
+ for node in res['network'][0]['node']:
+ if (node['node-id'] == 'SPDR-SA1-XPDR3' or 'SPDR-SC1-XPDR3'):
+ tpList = node['ietf-network-topology:termination-point']
+ for tp in tpList:
+ if tp['tp-id'] == 'XPDR3-NETWORK1':
+ xpdrTpPortConAt = tp['org-openroadm-otn-network-topology:xpdr-tp-port-connection-attributes']
+ self.assertEqual(len(xpdrTpPortConAt['ts-pool']), 80)
+ self.assertEqual(
+ len(xpdrTpPortConAt['odtu-tpn-pool'][0]['tpn-pool']), 80)
+
+ def test_90_delete_ODU4_service(self):
+ response = test_utils.service_delete_request("service2-ODU4")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Renderer service delete in progress',
+ res['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
+
+ def test_91_delete_OCH_OTU4_service(self):
+ response = test_utils.service_delete_request("service2-OCH-OTU4")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Renderer service delete in progress',
+ res['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
+
+ def test_92_disconnect_xponders_from_roadm(self):
+ url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
+ response = test_utils.get_ordm_topo_request("")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ links = res['network'][0]['ietf-network-topology:link']
+ for link in links:
+ if (link["org-openroadm-common-network:link-type"] == "XPONDER-OUTPUT" or
+ link["org-openroadm-common-network:link-type"] == "XPONDER-INPUT"):
+ link_name = link["link-id"]
+ response = test_utils.delete_request(url+link_name)
+ self.assertEqual(response.status_code, requests.codes.ok)
+
+ def test_93_check_openroadm_topology(self):
+ response = test_utils.get_ordm_topo_request("")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ links = res['network'][0]['ietf-network-topology:link']
+ self.assertEqual(18, len(links), 'Topology should contain 18 links')
+
+ def test_94_disconnect_spdrA(self):