CODE_SHOULD_BE_200 = 'Http status code should be 200'
CODE_SHOULD_BE_201 = 'Http status code should be 201'
+T100GE = 'Transponder 100GE'
+T0_MULTILAYER_TOPO = 'T0 - Multi-layer topology'
+T0_FULL_MULTILAYER_TOPO = 'T0 - Full Multi-layer topology'
SIM_LOG_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log')
res = response.json()
return_key = {'rfc8040': api_module + ':output',
'draft-bierman02': 'output'}
- return_output = res[return_key[RESTCONF_VERSION]]
+ if response.status_code == requests.codes.internal_server_error:
+ return_output = res
+ else:
+ return_output = res[return_key[RESTCONF_VERSION]]
return {'status_code': response.status_code,
'output': return_output}
sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
CREATED_SUCCESSFULLY = 'Result message should contain Xponder Roadm Link created successfully'
processes = None
WAITING = 20
NODE_VERSION = '2.2.1'
- cr_serv_sample_data = {"input": {
+ cr_serv_input_data = {
"sdnc-request-header": {
"request-id": "request-1",
"rpc-action": "service-create",
"due-date": "2018-06-15T00:00:01Z",
"operator-contact": "pw1234"
}
+
+ del_serv_input_data = {
+ "sdnc-request-header": {
+ "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
+ "rpc-action": "service-delete",
+ "request-system-id": "appname",
+ "notification-url": "http://localhost:8585/NotificationServer/notify"},
+ "service-delete-req-info": {
+ "service-name": "TBD",
+ "tail-retention": "no"}
}
+ tapi_topo = {"topology-id-or-name": "TBD"}
+
@classmethod
def setUpClass(cls):
cls.init_failed = False
os.environ['JAVA_MIN_MEM'] = '1024M'
os.environ['JAVA_MAX_MEM'] = '4096M'
- cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils_rfc8040.start_tpce()
# TAPI feature is not installed by default in Karaf
if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
print("installing tapi feature...")
- result = test_utils.install_karaf_feature("odl-transportpce-tapi")
+ result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi")
if result.returncode != 0:
cls.init_failed = True
print("Restarting OpenDaylight...")
- test_utils.shutdown_process(cls.processes[0])
- cls.processes[0] = test_utils.start_karaf()
- test_utils.process_list[0] = cls.processes[0]
- cls.init_failed = not test_utils.wait_until_log_contains(
- test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
+ test_utils_rfc8040.shutdown_process(cls.processes[0])
+ cls.processes[0] = test_utils_rfc8040.start_karaf()
+ test_utils_rfc8040.process_list[0] = cls.processes[0]
+ cls.init_failed = not test_utils_rfc8040.wait_until_log_contains(
+ test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60)
if cls.init_failed:
print("tapi installation feature failed...")
- test_utils.shutdown_process(cls.processes[0])
+ test_utils_rfc8040.shutdown_process(cls.processes[0])
sys.exit(2)
- cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
- ('roadma', cls.NODE_VERSION),
- ('roadmb', cls.NODE_VERSION),
- ('roadmc', cls.NODE_VERSION),
- ('xpdrc', cls.NODE_VERSION),
- ('spdra', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmb', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION),
+ ('spdra', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self): # instruction executed before each test method
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_get_tapi_topology_T100G(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "Transponder 100GE"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
- self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
- self.assertNotIn("owned-node-edge-point", res["output"]["topology"]["node"][0],
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
+ self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
+ self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0],
'Node should contain no owned-node-edge-points')
- self.assertEqual("Tpdr100g over WDM node", res["output"]["topology"]["node"][0]["name"][0]["value"],
+ self.assertEqual("Tpdr100g over WDM node", response["output"]["topology"]["node"][0]["name"][0]["value"],
'node name should be: Tpdr100g over WDM node')
- self.assertIn("ETH", res["output"]["topology"]["node"][0]["layer-protocol-name"],
+ self.assertIn("ETH", response["output"]["topology"]["node"][0]["layer-protocol-name"],
'Node layer protocol should contain ETH')
- self.assertEqual(1, len(res["output"]["topology"]["node"][0]["node-rule-group"]),
+ self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]),
'node should contain 1 node rule group')
def test_02_get_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertNotIn("node", res["output"]["topology"], 'Topology should contain no node')
- self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
+ self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
def test_03_connect_rdmb(self):
- response = test_utils.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
- time.sleep(10)
+ response = test_utils_rfc8040.mount_device("ROADM-B1", ('roadmb', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_04_check_tapi_topos(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "Transponder 100GE"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
- self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
-
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
- self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
+ self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
+
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
+ self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
def test_05_disconnect_roadmb(self):
- response = test_utils.unmount_device("ROADM-B1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- time.sleep(5)
+ response = test_utils_rfc8040.unmount_device("ROADM-B1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_06_connect_xpdra(self):
- response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
- time.sleep(10)
+ response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_07_check_tapi_topos(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertNotIn("node", res["output"]["topology"], 'Topology should contain no node')
- self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertNotIn("node", response["output"]["topology"], 'Topology should contain no node')
+ self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
def test_08_connect_rdma(self):
- response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
- time.sleep(10)
+ response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_09_connect_rdmc(self):
- response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
- time.sleep(10)
+ response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_10_check_tapi_topos(self):
self.test_01_get_tapi_topology_T100G()
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertEqual(1, len(res["output"]["topology"]["node"]), 'Topology should contain 1 node')
- self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
- self.assertEqual("ROADM-infra", res["output"]["topology"]["node"][0]["name"][0]["value"],
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
+ self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
+ self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"],
'node name should be: ROADM-infra')
- self.assertIn("PHOTONIC_MEDIA", res["output"]["topology"]["node"][0]["layer-protocol-name"],
+ self.assertIn("PHOTONIC_MEDIA", response["output"]["topology"]["node"][0]["layer-protocol-name"],
'Node layer protocol should contain PHOTONIC_MEDIA')
- self.assertEqual(1, len(res["output"]["topology"]["node"][0]["node-rule-group"]),
+ self.assertEqual(1, len(response["output"]["topology"]["node"][0]["node-rule-group"]),
'node should contain 1 node rule group')
def test_11_connect_xprda_n1_to_roadma_pp1(self):
- response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_12_connect_roadma_pp1_to_xpdra_n1(self):
- response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_13_check_tapi_topology_T100G(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "Transponder 100GE"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertEqual(1, len(res["output"]["topology"]["node"][0]["owned-node-edge-point"]),
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(1, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
'Node should contain 1 owned-node-edge-points')
self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1",
- res["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
+ response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
def test_14_check_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- nodes = res["output"]["topology"]["node"]
- links = res["output"]["topology"]["link"]
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nodes = response["output"]["topology"]["node"]
+ links = response["output"]["topology"]["link"]
self.assertEqual(3, len(nodes), 'Topology should contain 3 nodes')
self.assertEqual(2, len(links), 'Topology should contain 2 links')
self.assertEqual(2, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
'Topology should contain 1 oms link')
def test_15_connect_xpdrc(self):
- response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
- time.sleep(10)
+ response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_16_connect_xprdc_n1_to_roadmc_pp1(self):
- response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_17_connect_roadmc_pp1_to_xpdrc_n1(self):
- response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_18_check_tapi_topology_T100G(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "Transponder 100GE"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertEqual(2, len(res["output"]["topology"]["node"][0]["owned-node-edge-point"]),
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(2, len(response["output"]["topology"]["node"][0]["owned-node-edge-point"]),
'Node should contain 2 owned-node-edge-points')
self.assertEqual("XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1",
- res["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
+ response["output"]["topology"]["node"][0]["owned-node-edge-point"][0]["name"][0]["value"],
'name of owned-node-edge-points should be XPDR-C1-XPDR1+DSR+XPDR1-CLIENT1')
self.assertEqual("XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1",
- res["output"]["topology"]["node"][0]["owned-node-edge-point"][1]["name"][0]["value"],
+ response["output"]["topology"]["node"][0]["owned-node-edge-point"][1]["name"][0]["value"],
'name of owned-node-edge-points should be XPDR-A1-XPDR1+DSR+XPDR1-CLIENT1')
def test_19_check_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- nodes = res["output"]["topology"]["node"]
- links = res["output"]["topology"]["link"]
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nodes = response["output"]["topology"]["node"]
+ links = response["output"]["topology"]["link"]
self.assertEqual(5, len(nodes), 'Topology should contain 5 nodes')
self.assertEqual(4, len(links), 'Topology should contain 4 links')
self.assertEqual(3, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
'Topology should contain 2 oms links')
def test_20_connect_spdr_sa1(self):
- response = test_utils.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
- time.sleep(10)
- # TODO replace connect and disconnect timers with test_utils.wait_until_log_contains
+ response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code,
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_21_connect_spdr_sc1(self):
- response = test_utils.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
- time.sleep(10)
- # TODO replace connect and disconnect timers with test_utils.wait_until_log_contains
+ response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ self.assertEqual(response.status_code,
+ requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_22_check_tapi_topology_T100G(self):
self.test_18_check_tapi_topology_T100G()
self.test_19_check_tapi_topology_T0()
def test_24_connect_sprda_n1_to_roadma_pp2(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP2-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_25_connect_roadma_pp2_to_spdra_n1(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP2-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_26_connect_sprdc_n1_to_roadmc_pp2(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP2-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_27_connect_roadmc_pp2_to_spdrc_n1(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP2-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP2-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
self.test_18_check_tapi_topology_T100G()
def test_29_check_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- nodes = res["output"]["topology"]["node"]
- links = res["output"]["topology"]["link"]
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nodes = response["output"]["topology"]["node"]
+ links = response["output"]["topology"]["link"]
self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
self.assertEqual(8, len(links), 'Topology should contain 8 links')
self.assertEqual(5, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
+ response = test_utils_rfc8040.add_oms_attr_request(
+ "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
# Config ROADMC-ROADMA oms-attributes
data = {"span": {
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
+ response = test_utils_rfc8040.add_oms_attr_request(
+ "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_31_create_OCH_OTU4_service(self):
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ response['output']['configuration-response-common']['response-message'])
time.sleep(self.WAITING)
def test_32_check_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- nodes = res["output"]["topology"]["node"]
- links = res["output"]["topology"]["link"]
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nodes = response["output"]["topology"]["node"]
+ links = response["output"]["topology"]["link"]
self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
self.assertEqual(9, len(links), 'Topology should contain 9 links')
self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
'Topology should contain 1 otn link')
for link in links:
if link["name"][0]["value"] == "OTU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
- self.assertEqual(100000, link["available-capacity"]["total-size"]["value"],
+ self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]),
'OTU4 link should have an available capacity of 100 000 Mbps')
elif link["name"][0]["value-name"] == "transitional link name":
- self.assertEqual(100, link["available-capacity"]["total-size"]["value"],
+ self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]),
'link should have an available capacity of 100 Gbps')
self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
def test_33_create_ODU4_service(self):
- self.cr_serv_sample_data["input"]["service-name"] = "service1-ODU4"
- self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "ODU"
- del self.cr_serv_sample_data["input"]["service-a-end"]["otu-service-rate"]
- self.cr_serv_sample_data["input"]["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
- self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "ODU"
- del self.cr_serv_sample_data["input"]["service-z-end"]["otu-service-rate"]
- self.cr_serv_sample_data["input"]["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
-
- response = test_utils.service_create_request(self.cr_serv_sample_data)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ self.cr_serv_input_data["service-name"] = "service1-ODU4"
+ self.cr_serv_input_data["service-a-end"]["service-format"] = "ODU"
+ del self.cr_serv_input_data["service-a-end"]["otu-service-rate"]
+ self.cr_serv_input_data["service-a-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+ self.cr_serv_input_data["service-z-end"]["service-format"] = "ODU"
+ del self.cr_serv_input_data["service-z-end"]["otu-service-rate"]
+ self.cr_serv_input_data["service-z-end"]["odu-service-rate"] = "org-openroadm-otn-common-types:ODU4"
+
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-create',
+ self.cr_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('PCE calculation in progress',
- res['output']['configuration-response-common']['response-message'])
+ response['output']['configuration-response-common']['response-message'])
time.sleep(self.WAITING)
def test_34_check_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- nodes = res["output"]["topology"]["node"]
- links = res["output"]["topology"]["link"]
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nodes = response["output"]["topology"]["node"]
+ links = response["output"]["topology"]["link"]
self.assertEqual(9, len(nodes), 'Topology should contain 9 nodes')
self.assertEqual(10, len(links), 'Topology should contain 10 links')
self.assertEqual(4, count_object_with_double_key(links, "name", "value-name", "transitional link name"),
self.assertEqual(0, link["available-capacity"]["total-size"]["value"],
'OTU4 link should have an available capacity of 0 Mbps')
elif link["name"][0]["value"] == "ODU4-SPDR-SA1-XPDR1-XPDR1-NETWORK1toSPDR-SC1-XPDR1-XPDR1-NETWORK1":
- self.assertEqual(100000, link["available-capacity"]["total-size"]["value"],
+ self.assertEqual(100000, int(link["available-capacity"]["total-size"]["value"]),
'ODU4 link should have an available capacity of 100 000 Mbps')
elif link["name"][0]["value-name"] == "transitional link name":
- self.assertEqual(100, link["available-capacity"]["total-size"]["value"],
+ self.assertEqual(100, int(link["available-capacity"]["total-size"]["value"]),
'link should have an available capacity of 100 Gbps')
self.assertEqual(2, len(link["node-edge-point"]), 'link should have 2 neps')
def test_35_connect_sprda_2_n2_to_roadma_pp3(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "2", "2",
- "ROADM-A1", "1", "SRG1-PP3-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_36_connect_roadma_pp3_to_spdra_2_n2(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "2", "2",
- "ROADM-A1", "1", "SRG1-PP3-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"],
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '2', 'network-num': '2',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP3-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"],
CREATED_SUCCESSFULLY)
time.sleep(2)
def test_37_check_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- nodes = res["output"]["topology"]["node"]
- links = res["output"]["topology"]["link"]
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nodes = response["output"]["topology"]["node"]
+ links = response["output"]["topology"]["link"]
self.assertEqual(11, len(nodes), 'Topology should contain 11 nodes')
self.assertEqual(12, len(links), 'Topology should contain 12 links')
self.assertEqual(6, count_object_with_double_key(nodes, "name", "value-name", "otsi node name"),
'Topology should contain 2 otn links')
def test_38_delete_ODU4_service(self):
- response = test_utils.service_delete_request("service1-ODU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-ODU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
- time.sleep(20)
+ response['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
def test_39_delete_OCH_OTU4_service(self):
- response = test_utils.service_delete_request("service1-OCH-OTU4")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
+ self.del_serv_input_data["service-delete-req-info"]["service-name"] = "service1-OCH-OTU4"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'org-openroadm-service', 'service-delete',
+ self.del_serv_input_data)
+ self.assertEqual(response['status_code'], requests.codes.ok)
self.assertIn('Renderer service delete in progress',
- res['output']['configuration-response-common']['response-message'])
- time.sleep(20)
+ response['output']['configuration-response-common']['response-message'])
+ time.sleep(self.WAITING)
def test_40_check_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- nodes = res["output"]["topology"]["node"]
- links = res["output"]["topology"]["link"]
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nodes = response["output"]["topology"]["node"]
+ links = response["output"]["topology"]["link"]
self.assertEqual(11, len(nodes), 'Topology should contain 11 nodes')
self.assertEqual(10, len(links), 'Topology should contain 10 links')
self.assertEqual(0, count_object_with_double_key(links, "name", "value-name", "otn link name"),
'Topology should contain 0 otn link')
def test_41_disconnect_xponders_from_roadm(self):
- url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
- response = test_utils.get_ordm_topo_request("")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- links = res['network'][0]['ietf-network-topology:link']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- link_name = link["link-id"]
- response = test_utils.delete_request(url+link_name)
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.del_ietf_network_link_request(
+ 'openroadm-topology', link['link-id'], 'config')
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_42_check_tapi_topology_T0(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "T0 - Multi-layer topology"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- nodes = res["output"]["topology"]["node"]
- self.assertEqual(1, len(nodes), 'Topology should contain 1 node')
- self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
- self.assertEqual("ROADM-infra", res["output"]["topology"]["node"][0]["name"][0]["value"],
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(1, len(response["output"]["topology"]["node"]), 'Topology should contain 1 node')
+ self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
+ self.assertEqual("ROADM-infra", response["output"]["topology"]["node"][0]["name"][0]["value"],
'node name should be: ROADM-infra')
def test_43_get_tapi_topology_T100G(self):
- url = "{}/operations/tapi-topology:get-topology-details"
- data = {
- "tapi-topology:input": {
- "tapi-topology:topology-id-or-name": "Transponder 100GE"
- }
- }
- response = test_utils.post_request(url, data)
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- res = response.json()
- self.assertEqual(len(res["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
- self.assertNotIn("link", res["output"]["topology"], 'Topology should contain no link')
- self.assertNotIn("owned-node-edge-point", res["output"]["topology"]["node"][0],
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T100GE
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response["output"]["topology"]["node"]), 1, 'Topology should contain 1 node')
+ self.assertNotIn("link", response["output"]["topology"], 'Topology should contain no link')
+ self.assertNotIn("owned-node-edge-point", response["output"]["topology"]["node"][0],
'Node should contain no owned-node-edge-points')
def test_44_disconnect_roadma(self):
- response = test_utils.unmount_device("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- time.sleep(5)
+ response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_45_disconnect_roadmc(self):
- response = test_utils.unmount_device("ROADM-C1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- time.sleep(5)
+ response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_46_check_tapi_topos(self):
self.test_01_get_tapi_topology_T100G()
self.test_02_get_tapi_topology_T0()
def test_47_disconnect_xpdra(self):
- response = test_utils.unmount_device("XPDR-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- time.sleep(5)
+ response = test_utils_rfc8040.unmount_device("XPDR-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_48_disconnect_xpdrc(self):
- response = test_utils.unmount_device("XPDR-C1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- time.sleep(5)
+ response = test_utils_rfc8040.unmount_device("XPDR-C1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_49_disconnect_spdr_sa1(self):
- response = test_utils.unmount_device("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
- time.sleep(5)
+ response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_50_disconnect_spdr_sc1(self):
- response = test_utils.unmount_device("SPDR-SC1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
-
-
-def find_object_with_key(list_dicts, key, value):
- for dict_ in list_dicts:
- if dict_[key] == value:
- return dict_
- return None
+ response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def count_object_with_double_key(list_dicts, key1, key2, value):
sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
# pylint: disable=too-few-public-methods
NODE_VERSION = '2.2.1'
uuid_services = UuidServices()
- cr_serv_sample_data = {
- "input": {
- "end-point": [
- {
- "layer-protocol-name": "PHOTONIC_MEDIA",
- "service-interface-point": {
- "service-interface-point-uuid": "b1a0d883-32b8-3b0b-93d6-7ed074f6f107"
- },
- "administrative-state": "UNLOCKED",
- "operational-state": "ENABLED",
- "direction": "BIDIRECTIONAL",
- "role": "SYMMETRIC",
- "protection-role": "WORK",
- "local-id": "SPDR-SA1-XPDR1",
- "name": [
+ cr_serv_input_data = {
+ "end-point": [
+ {
+ "layer-protocol-name": "PHOTONIC_MEDIA",
+ "service-interface-point": {
+ "service-interface-point-uuid": "b1a0d883-32b8-3b0b-93d6-7ed074f6f107"
+ },
+ "administrative-state": "UNLOCKED",
+ "operational-state": "ENABLED",
+ "direction": "BIDIRECTIONAL",
+ "role": "SYMMETRIC",
+ "protection-role": "WORK",
+ "local-id": "SPDR-SA1-XPDR1",
+ "name": [
{
"value-name": "OpenROADM node id",
"value": "SPDR-SA1-XPDR1"
}
- ]
+ ]
+ },
+ {
+ "layer-protocol-name": "PHOTONIC_MEDIA",
+ "service-interface-point": {
+ "service-interface-point-uuid": "d1d6305e-179b-346f-b02d-8260aebe1ce8"
},
- {
- "layer-protocol-name": "PHOTONIC_MEDIA",
- "service-interface-point": {
- "service-interface-point-uuid": "d1d6305e-179b-346f-b02d-8260aebe1ce8"
- },
- "administrative-state": "UNLOCKED",
- "operational-state": "ENABLED",
- "direction": "BIDIRECTIONAL",
- "role": "SYMMETRIC",
- "protection-role": "WORK",
- "local-id": "SPDR-SC1-XPDR1",
- "name": [
+ "administrative-state": "UNLOCKED",
+ "operational-state": "ENABLED",
+ "direction": "BIDIRECTIONAL",
+ "role": "SYMMETRIC",
+ "protection-role": "WORK",
+ "local-id": "SPDR-SC1-XPDR1",
+ "name": [
{
"value-name": "OpenROADM node id",
"value": "SPDR-SC1-XPDR1"
}
- ]
- }
- ],
- "connectivity-constraint": {
- "service-layer": "PHOTONIC_MEDIA",
- "service-type": "POINT_TO_POINT_CONNECTIVITY",
- "service-level": "Some service-level",
- "requested-capacity": {
- "total-size": {
- "value": "100",
- "unit": "GB"
- }
+ ]
+ }
+ ],
+ "connectivity-constraint": {
+ "service-layer": "PHOTONIC_MEDIA",
+ "service-type": "POINT_TO_POINT_CONNECTIVITY",
+ "service-level": "Some service-level",
+ "requested-capacity": {
+ "total-size": {
+ "value": "100",
+ "unit": "GB"
}
- },
- "state": "Some state"}}
+ }
+ },
+ "state": "Some state"}
+
+ del_serv_input_data = {"service-id-or-name": "TBD"}
+
+ tapi_topo = {"topology-id-or-name": "TBD"}
@classmethod
def setUpClass(cls):
cls.init_failed = False
os.environ['JAVA_MIN_MEM'] = '1024M'
os.environ['JAVA_MAX_MEM'] = '4096M'
- cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils_rfc8040.start_tpce()
# TAPI feature is not installed by default in Karaf
if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
print("installing tapi feature...")
- result = test_utils.install_karaf_feature("odl-transportpce-tapi")
+ result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi")
if result.returncode != 0:
cls.init_failed = True
print("Restarting OpenDaylight...")
- test_utils.shutdown_process(cls.processes[0])
- cls.processes[0] = test_utils.start_karaf()
- test_utils.process_list[0] = cls.processes[0]
- cls.init_failed = not test_utils.wait_until_log_contains(
- test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
+ test_utils_rfc8040.shutdown_process(cls.processes[0])
+ cls.processes[0] = test_utils_rfc8040.start_karaf()
+ test_utils_rfc8040.process_list[0] = cls.processes[0]
+ cls.init_failed = not test_utils_rfc8040.wait_until_log_contains(
+ test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60)
if cls.init_failed:
print("tapi installation feature failed...")
- test_utils.shutdown_process(cls.processes[0])
+ test_utils_rfc8040.shutdown_process(cls.processes[0])
sys.exit(2)
- cls.processes = test_utils.start_sims([('spdra', cls.NODE_VERSION),
- ('roadma', cls.NODE_VERSION),
- ('roadmc', cls.NODE_VERSION),
- ('spdrc', cls.NODE_VERSION)])
+ cls.processes = test_utils_rfc8040.start_sims([('spdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('spdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
def setUp(self):
def test_01_connect_spdrA(self):
print("Connecting SPDRA")
- response = test_utils.mount_tapi_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
- self.assertEqual(response.status_code,
- requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("SPDR-SA1", ('spdra', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(2)
def test_02_connect_spdrC(self):
print("Connecting SPDRC")
- response = test_utils.mount_tapi_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
- self.assertEqual(response.status_code,
- requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("SPDR-SC1", ('spdrc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(2)
def test_03_connect_rdmA(self):
print("Connecting ROADMA")
- response = test_utils.mount_tapi_device("ROADM-A1", ('roadma', self.NODE_VERSION))
- self.assertEqual(response.status_code,
- requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(2)
def test_04_connect_rdmC(self):
print("Connecting ROADMC")
- response = test_utils.mount_tapi_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
- self.assertEqual(response.status_code,
- requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
time.sleep(2)
def test_05_connect_sprdA_1_N1_to_roadmA_PP1(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SA1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully',
- res["output"]["result"])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
time.sleep(2)
def test_06_connect_roadmA_PP1_to_spdrA_1_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SA1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully',
- res["output"]["result"])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SA1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
time.sleep(2)
def test_07_connect_sprdC_1_N1_to_roadmC_PP1(self):
- response = test_utils.connect_xpdr_to_rdm_request("SPDR-SC1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully',
- res["output"]["result"])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
time.sleep(2)
def test_08_connect_roadmC_PP1_to_spdrC_1_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("SPDR-SC1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully',
- res["output"]["result"])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'SPDR-SC1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
time.sleep(2)
def test_09_add_omsAttributes_ROADMA_ROADMC(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request(
+ response = test_utils_rfc8040.add_oms_attr_request(
"ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(2)
def test_10_add_omsAttributes_ROADMC_ROADMA(self):
# Config ROADMC-ROADMA oms-attributes
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request(
+ response = test_utils_rfc8040.add_oms_attr_request(
"ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(2)
def test_11_check_otn_topology(self):
- response = test_utils.get_otn_topo_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nbNode = len(res['network'][0]['node'])
- self.assertEqual(nbNode, 6, 'There should be 6 otn nodes')
- self.assertNotIn('ietf-network-topology:link', res['network'][0])
- time.sleep(2)
+ response = test_utils_rfc8040.get_ietf_network_request('otn-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['node']), 6, 'There should be 6 otn nodes')
+ self.assertNotIn('ietf-network-topology:link', response['network'][0])
def test_12_check_openroadm_topology(self):
- response = test_utils.get_ordm_topo_request("")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nbNode = len(res['network'][0]['node'])
- nbLink = len(res['network'][0]['ietf-network-topology:link'])
- self.assertEqual(nbNode, 13, 'There should be 13 openroadm nodes')
- self.assertEqual(nbLink, 22, 'There should be 22 openroadm links')
- time.sleep(2)
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['network'][0]['node']), 13, 'There should be 13 openroadm nodes')
+ self.assertEqual(len(response['network'][0]['ietf-network-topology:link']), 22,
+ 'There should be 22 openroadm links')
def test_13_get_tapi_topology_details(self):
- response = test_utils.tapi_get_topology_details_request(
- "T0 - Full Multi-layer topology")
- time.sleep(2)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nbNode = len(res['output']['topology']['node'])
- nbLink = len(res['output']['topology']['link'])
- self.assertEqual(nbNode, 14, 'There should be 14 TAPI nodes')
- self.assertEqual(nbLink, 15, 'There should be 15 TAPI links')
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
time.sleep(2)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(len(response['output']['topology']['node']), 14, 'There should be 14 TAPI nodes')
+ self.assertEqual(len(response['output']['topology']['link']), 15, 'There should be 15 TAPI links')
def test_14_check_sip_details(self):
- response = test_utils.tapi_get_sip_details_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nbSip = len(res['output']['sip'])
- self.assertEqual(nbSip, 60, 'There should be 60 service interface point')
- time.sleep(2)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-common', 'get-service-interface-point-list', None)
+ self.assertEqual(len(response['output']['sip']), 60, 'There should be 60 service interface point')
# test create connectivity service from spdrA to spdrC for Photonic_media
def test_15_create_connectivity_service_PhotonicMedia(self):
- response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.uuid_services.pm = res['output']['service']['uuid']
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.uuid_services.pm = response['output']['service']['uuid']
# pylint: disable=consider-using-f-string
print("photonic media service uuid : {}".format(self.uuid_services.pm))
input_dict_3 = {'value-name': 'OpenROADM node id',
'value': 'SPDR-SA1-XPDR1'}
- self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
- res['output']['service'])
- self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
- res['output']['service']['end-point'][0]['name'][0])
- self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
- res['output']['service']['end-point'][1]['name'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
+ response['output']['service'])
+ self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
+ response['output']['service']['end-point'][0]['name'][0])
+ self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
+ response['output']['service']['end-point'][1]['name'][0])
# If the gate fails is because of the waiting time not being enough
- time.sleep(self.WAITING)
+# time.sleep(self.WAITING)
def test_16_get_service_PhotonicMedia(self):
- response = test_utils.get_service_list_request(
- "services/" + str(self.uuid_services.pm))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['services'][0]['administrative-state'], 'inService')
- self.assertEqual(
- res['services'][0]['service-name'], self.uuid_services.pm)
- self.assertEqual(
- res['services'][0]['connection-type'], 'infrastructure')
- self.assertEqual(
- res['services'][0]['lifecycle-state'], 'planned')
- time.sleep(2)
+ response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.pm))
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.pm))
+ self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
+ self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(1)
# test create connectivity service from spdrA to spdrC for odu
def test_17_create_connectivity_service_ODU(self):
# pylint: disable=line-too-long
- self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "ODU"
- self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "5efda776-f8de-3e0b-9bbd-2c702e210946"
- self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "ODU"
- self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "8116d0af-39fa-3df5-bed2-dd2cd5e8217d"
- self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "ODU"
- self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-level"] = self.uuid_services.pm
-
- response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
+ self.cr_serv_input_data["end-point"][0]["layer-protocol-name"] = "ODU"
+ self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "5efda776-f8de-3e0b-9bbd-2c702e210946"
+ self.cr_serv_input_data["end-point"][1]["layer-protocol-name"] = "ODU"
+ self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "8116d0af-39fa-3df5-bed2-dd2cd5e8217d"
+ self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "ODU"
+ self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.pm
+
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.uuid_services.odu = res['output']['service']['uuid']
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.uuid_services.odu = response['output']['service']['uuid']
# pylint: disable=consider-using-f-string
print("odu service uuid : {}".format(self.uuid_services.odu))
input_dict_3 = {'value-name': 'OpenROADM node id',
'value': 'SPDR-SA1-XPDR1'}
- self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
- res['output']['service'])
- self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
- res['output']['service']['end-point'][0]['name'][0])
- self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
- res['output']['service']['end-point'][1]['name'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
+ response['output']['service'])
+ self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
+ response['output']['service']['end-point'][0]['name'][0])
+ self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
+ response['output']['service']['end-point'][1]['name'][0])
# If the gate fails is because of the waiting time not being enough
- time.sleep(self.WAITING)
+# time.sleep(self.WAITING)
def test_18_get_service_ODU(self):
- response = test_utils.get_service_list_request(
- "services/" + str(self.uuid_services.odu))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['services'][0]['administrative-state'], 'inService')
- self.assertEqual(
- res['services'][0]['service-name'], self.uuid_services.odu)
- self.assertEqual(
- res['services'][0]['connection-type'], 'infrastructure')
- self.assertEqual(
- res['services'][0]['lifecycle-state'], 'planned')
- time.sleep(2)
+ response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.odu))
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.odu))
+ self.assertEqual(response['services'][0]['connection-type'], 'infrastructure')
+ self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(1)
# test create connectivity service from spdrA to spdrC for dsr
def test_19_create_connectivity_service_DSR(self):
# pylint: disable=line-too-long
- self.cr_serv_sample_data["input"]["end-point"][0]["layer-protocol-name"] = "DSR"
- self.cr_serv_sample_data["input"]["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "c14797a0-adcc-3875-a1fe-df8949d1a2d7"
- self.cr_serv_sample_data["input"]["end-point"][1]["layer-protocol-name"] = "DSR"
- self.cr_serv_sample_data["input"]["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "25812ef2-625d-3bf8-af55-5e93946d1c22"
- self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-layer"] = "DSR"
- self.cr_serv_sample_data["input"]["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10"
- self.cr_serv_sample_data["input"]["connectivity-constraint"]["service-level"] = self.uuid_services.odu
-
- response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
+ self.cr_serv_input_data["end-point"][0]["layer-protocol-name"] = "DSR"
+ self.cr_serv_input_data["end-point"][0]["service-interface-point"]["service-interface-point-uuid"] = "c14797a0-adcc-3875-a1fe-df8949d1a2d7"
+ self.cr_serv_input_data["end-point"][1]["layer-protocol-name"] = "DSR"
+ self.cr_serv_input_data["end-point"][1]["service-interface-point"]["service-interface-point-uuid"] = "25812ef2-625d-3bf8-af55-5e93946d1c22"
+ self.cr_serv_input_data["connectivity-constraint"]["service-layer"] = "DSR"
+ self.cr_serv_input_data["connectivity-constraint"]["requested-capacity"]["total-size"]["value"] = "10"
+ self.cr_serv_input_data["connectivity-constraint"]["service-level"] = self.uuid_services.odu
+
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.uuid_services.dsr = res['output']['service']['uuid']
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.uuid_services.dsr = response['output']['service']['uuid']
# pylint: disable=consider-using-f-string
print("dsr service uuid : {}".format(self.uuid_services.dsr))
'value': 'SPDR-SA1-XPDR1'}
self.assertDictEqual(dict(input_dict_1,
- **res['output']['service']),
- res['output']['service'])
+ **response['output']['service']),
+ response['output']['service'])
self.assertDictEqual(dict(input_dict_2,
- **res['output']['service']['end-point'][0]['name'][0]),
- res['output']['service']['end-point'][0]['name'][0])
+ **response['output']['service']['end-point'][0]['name'][0]),
+ response['output']['service']['end-point'][0]['name'][0])
self.assertDictEqual(dict(input_dict_3,
- **res['output']['service']['end-point'][1]['name'][0]),
- res['output']['service']['end-point'][1]['name'][0])
+ **response['output']['service']['end-point'][1]['name'][0]),
+ response['output']['service']['end-point'][1]['name'][0])
# The sleep here is okey as the DSR service creation is very fast
- time.sleep(self.WAITING)
+# time.sleep(self.WAITING)
def test_20_get_service_DSR(self):
- response = test_utils.get_service_list_request(
- "services/" + str(self.uuid_services.dsr))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['services'][0]['administrative-state'], 'inService')
- self.assertEqual(
- res['services'][0]['service-name'], self.uuid_services.dsr)
- self.assertEqual(
- res['services'][0]['connection-type'], 'service')
- self.assertEqual(
- res['services'][0]['lifecycle-state'], 'planned')
- time.sleep(2)
+ response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.dsr))
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.dsr))
+ self.assertEqual(response['services'][0]['connection-type'], 'service')
+ self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(1)
def test_21_get_connectivity_service_list(self):
- response = test_utils.tapi_get_service_list_request()
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- liste_service = res['output']['service']
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'get-connectivity-service-list', None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ liste_service = response['output']['service']
for ele in liste_service:
if ele['uuid'] == self.uuid_services.pm:
self.assertEqual(ele['operational-state'], 'ENABLED')
time.sleep(2)
def test_22_delete_connectivity_service_DSR(self):
- response = test_utils.tapi_delete_connectivity_request(self.uuid_services.dsr)
- self.assertEqual(response.status_code, requests.codes.no_content)
+ self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.dsr)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
+ self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_23_delete_connectivity_service_ODU(self):
- response = test_utils.tapi_delete_connectivity_request(self.uuid_services.odu)
- self.assertEqual(response.status_code, requests.codes.no_content)
+ self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.odu)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
+ self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_24_delete_connectivity_service_PhotonicMedia(self):
- response = test_utils.tapi_delete_connectivity_request(self.uuid_services.pm)
- self.assertEqual(response.status_code, requests.codes.no_content)
+ self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.pm)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
+ self.assertIn(response["status_code"], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_25_get_no_tapi_services(self):
- response = test_utils.tapi_get_service_list_request()
- res = response.json()
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'get-connectivity-service-list', None)
+ self.assertEqual(response['status_code'], requests.codes.internal_server_error)
self.assertIn(
{"error-type": "rpc", "error-tag": "operation-failed",
"error-message": "No services exist in datastore",
"error-info": "<severity>error</severity>"},
- res['errors']['error'])
- time.sleep(2)
+ response['output']['errors']['error'])
def test_26_get_no_openroadm_services(self):
- response = test_utils.get_service_list_request("")
- self.assertEqual(response.status_code, requests.codes.conflict)
- res = response.json()
- self.assertIn(
- {"error-type": "application", "error-tag": "data-missing",
- "error-message": "Request could not be completed because the relevant data model content does not exist"},
- res['errors']['error'])
- time.sleep(2)
+ response = test_utils_rfc8040.get_ordm_serv_list_request()
+ self.assertEqual(response['status_code'], requests.codes.conflict)
def test_27_disconnect_spdrA(self):
- response = test_utils.unmount_device("SPDR-SA1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SA1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_28_disconnect_spdrC(self):
- response = test_utils.unmount_device("SPDR-SC1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("SPDR-SC1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_29_disconnect_roadmA(self):
- response = test_utils.unmount_device("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_30_disconnect_roadmC(self):
- response = test_utils.unmount_device("ROADM-C1")
- self.assertEqual(response.status_code, requests.codes.ok,
- test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":
sys.path.append('transportpce_tests/common/')
# pylint: disable=wrong-import-position
# pylint: disable=import-error
-import test_utils # nopep8
+import test_utils_rfc8040 # nopep8
# pylint: disable=too-few-public-methods
class TransportPCEFulltesting(unittest.TestCase):
- cr_serv_sample_data = {
- "input": {
- "end-point": [
- {
- "layer-protocol-name": "DSR",
- "service-interface-point": {
- "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df"
- },
- "administrative-state": "UNLOCKED",
- "operational-state": "ENABLED",
- "direction": "BIDIRECTIONAL",
- "role": "SYMMETRIC",
- "protection-role": "WORK",
- "local-id": "XPDR-C1-XPDR1",
- "name": [
+ cr_serv_input_data = {
+ "end-point": [
+ {
+ "layer-protocol-name": "DSR",
+ "service-interface-point": {
+ "service-interface-point-uuid": "b1f4bd3b-7fa9-367b-a8ab-6e80293238df"
+ },
+ "administrative-state": "UNLOCKED",
+ "operational-state": "ENABLED",
+ "direction": "BIDIRECTIONAL",
+ "role": "SYMMETRIC",
+ "protection-role": "WORK",
+ "local-id": "XPDR-C1-XPDR1",
+ "name": [
{
"value-name": "OpenROADM node id",
"value": "XPDR-C1-XPDR1"
}
- ]
+ ]
+ },
+ {
+ "layer-protocol-name": "DSR",
+ "service-interface-point": {
+ "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e"
},
- {
- "layer-protocol-name": "DSR",
- "service-interface-point": {
- "service-interface-point-uuid": "b5964ce9-274c-3f68-b4d1-83c0b61bc74e"
- },
- "administrative-state": "UNLOCKED",
- "operational-state": "ENABLED",
- "direction": "BIDIRECTIONAL",
- "role": "SYMMETRIC",
- "protection-role": "WORK",
- "local-id": "XPDR-A1-XPDR1",
- "name": [
+ "administrative-state": "UNLOCKED",
+ "operational-state": "ENABLED",
+ "direction": "BIDIRECTIONAL",
+ "role": "SYMMETRIC",
+ "protection-role": "WORK",
+ "local-id": "XPDR-A1-XPDR1",
+ "name": [
{
"value-name": "OpenROADM node id",
"value": "XPDR-A1-XPDR1"
}
- ]
+ ]
+ }
+ ],
+ "connectivity-constraint": {
+ "service-layer": "ETH",
+ "service-type": "POINT_TO_POINT_CONNECTIVITY",
+ "service-level": "Some service-level",
+ "requested-capacity": {
+ "total-size": {
+ "value": "100",
+ "unit": "GB"
}
- ],
- "connectivity-constraint": {
- "service-layer": "ETH",
- "service-type": "POINT_TO_POINT_CONNECTIVITY",
- "service-level": "Some service-level",
- "requested-capacity": {
- "total-size": {
- "value": "100",
- "unit": "GB"
- }
- }
- },
- "state": "Some state"
- }
+ }
+ },
+ "state": "Some state"
}
+ del_serv_input_data = {"service-id-or-name": "TBD"}
+
+ tapi_topo = {"topology-id-or-name": "TBD"}
+
+ node_details = {
+ "topology-id-or-name": "TBD",
+ "node-id-or-name": "TBD"
+ }
+
+ tapi_serv_details = {"service-id-or-name": "TBD"}
+
processes = []
uuid_services = UuidServices()
WAITING = 25 # nominal value is 300
cls.init_failed = False
os.environ['JAVA_MIN_MEM'] = '1024M'
os.environ['JAVA_MAX_MEM'] = '4096M'
- cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils_rfc8040.start_tpce()
# TAPI feature is not installed by default in Karaf
if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
print("installing tapi feature...")
- result = test_utils.install_karaf_feature("odl-transportpce-tapi")
+ result = test_utils_rfc8040.install_karaf_feature("odl-transportpce-tapi")
if result.returncode != 0:
cls.init_failed = True
print("Restarting OpenDaylight...")
- test_utils.shutdown_process(cls.processes[0])
- cls.processes[0] = test_utils.start_karaf()
- test_utils.process_list[0] = cls.processes[0]
- cls.init_failed = not test_utils.wait_until_log_contains(
- test_utils.KARAF_LOG, test_utils.KARAF_OK_START_MSG, time_to_wait=60)
+ test_utils_rfc8040.shutdown_process(cls.processes[0])
+ cls.processes[0] = test_utils_rfc8040.start_karaf()
+ test_utils_rfc8040.process_list[0] = cls.processes[0]
+ cls.init_failed = not test_utils_rfc8040.wait_until_log_contains(
+ test_utils_rfc8040.KARAF_LOG, test_utils_rfc8040.KARAF_OK_START_MSG, time_to_wait=60)
if cls.init_failed:
print("tapi installation feature failed...")
- test_utils.shutdown_process(cls.processes[0])
+ test_utils_rfc8040.shutdown_process(cls.processes[0])
sys.exit(2)
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION_221),
- ('roadma', cls.NODE_VERSION_221),
- ('roadmc', cls.NODE_VERSION_221),
- ('xpdrc', cls.NODE_VERSION_221)])
+ cls.processes = test_utils_rfc8040.start_tpce()
+ cls.processes = test_utils_rfc8040.start_sims([('xpdra', cls.NODE_VERSION_221),
+ ('roadma', cls.NODE_VERSION_221),
+ ('roadmc', cls.NODE_VERSION_221),
+ ('xpdrc', cls.NODE_VERSION_221)])
@classmethod
def tearDownClass(cls):
# pylint: disable=not-an-iterable
for process in cls.processes:
- test_utils.shutdown_process(process)
+ test_utils_rfc8040.shutdown_process(process)
print("all processes killed")
- time.sleep(10)
def setUp(self): # instruction executed before each test method
# pylint: disable=consider-using-f-string
print("execution of {}".format(self.id().split(".")[-1]))
+ time.sleep(1)
def test_01_connect_xpdrA(self):
- response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION_221))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION_221))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION_221))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+ response = test_utils_rfc8040.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION_221))
+ self.assertEqual(response.status_code, requests.codes.created, test_utils_rfc8040.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
- response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
time.sleep(2)
def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
- "ROADM-A1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-A1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
time.sleep(2)
def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
- response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-xpdr-rdm-links',
+ {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Xponder Roadm Link created successfully', response["output"]["result"])
time.sleep(2)
def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
- response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
- "ROADM-C1", "1", "SRG1-PP1-TXRX")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'transportpce-networkutils', 'init-rdm-xpdr-links',
+ {'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
+ 'rdm-node': 'ROADM-C1', 'srg-num': '1', 'termination-point-num': 'SRG1-PP1-TXRX'}})
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
time.sleep(2)
def test_09_add_omsAttributes_ROADMA_ROADMC(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
+ response = test_utils_rfc8040.add_oms_attr_request(
+ "ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_10_add_omsAttributes_ROADMC_ROADMA(self):
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
+ response = test_utils_rfc8040.add_oms_attr_request(
+ "ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
# test service-create for Eth service from xpdr to xpdr
def test_11_create_connectivity_service_Ethernet(self):
- response = test_utils.tapi_create_connectivity_request(self.cr_serv_sample_data)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'create-connectivity-service', self.cr_serv_input_data)
time.sleep(self.WAITING)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.uuid_services.eth = res['output']['service']['uuid']
+ self.uuid_services.eth = response['output']['service']['uuid']
# pylint: disable=consider-using-f-string
input_dict_1 = {'administrative-state': 'LOCKED',
input_dict_3 = {'value-name': 'OpenROADM node id',
'value': 'XPDR-A1-XPDR1'}
- self.assertDictEqual(dict(input_dict_1, **res['output']['service']),
- res['output']['service'])
- self.assertDictEqual(dict(input_dict_2, **res['output']['service']['end-point'][0]['name'][0]),
- res['output']['service']['end-point'][0]['name'][0])
- self.assertDictEqual(dict(input_dict_3, **res['output']['service']['end-point'][1]['name'][0]),
- res['output']['service']['end-point'][1]['name'][0])
+ self.assertDictEqual(dict(input_dict_1, **response['output']['service']),
+ response['output']['service'])
+ self.assertDictEqual(dict(input_dict_2, **response['output']['service']['end-point'][0]['name'][0]),
+ response['output']['service']['end-point'][0]['name'][0])
+ self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
+ response['output']['service']['end-point'][1]['name'][0])
# If the gate fails is because of the waiting time not being enough
- time.sleep(self.WAITING)
+# time.sleep(self.WAITING)
def test_12_get_service_Ethernet(self):
- response = test_utils.get_service_list_request("services/" + str(self.uuid_services.eth))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['services'][0]['administrative-state'], 'inService')
- self.assertEqual(
- res['services'][0]['service-name'], self.uuid_services.eth)
- self.assertEqual(
- res['services'][0]['connection-type'], 'service')
- self.assertEqual(
- res['services'][0]['lifecycle-state'], 'planned')
- time.sleep(2)
+ response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['services'][0]['administrative-state'], 'inService')
+ self.assertEqual(response['services'][0]['service-name'], str(self.uuid_services.eth))
+ self.assertEqual(response['services'][0]['connection-type'], 'service')
+ self.assertEqual(response['services'][0]['lifecycle-state'], 'planned')
+ time.sleep(1)
def test_13_get_connectivity_service_Ethernet(self):
- response = test_utils.tapi_get_connectivity_request(str(self.uuid_services.eth))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['output']['service']['operational-state'], 'ENABLED')
- self.assertEqual(
- res['output']['service']['name'][0]['value'], self.uuid_services.eth)
- self.assertEqual(
- res['output']['service']['administrative-state'], 'UNLOCKED')
- self.assertEqual(
- res['output']['service']['lifecycle-state'], 'INSTALLED')
- time.sleep(2)
+ self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['output']['service']['operational-state'], 'ENABLED')
+ self.assertEqual(response['output']['service']['name'][0]['value'], self.uuid_services.eth)
+ self.assertEqual(response['output']['service']['administrative-state'], 'UNLOCKED')
+ self.assertEqual(response['output']['service']['lifecycle-state'], 'INSTALLED')
def test_14_change_status_line_port_xpdrc(self):
url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/1%2F0%2F1-PLUG-NET/ports/1"
"administrative-state": "outOfService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_15_check_update_portmapping(self):
- response = test_utils.portmapping_request("XPDR-C1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'XPDR1-NETWORK1':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_16_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
updated_links = ['XPDR-C1-XPDR1-XPDR1-NETWORK1toROADM-C1-SRG1-SRG1-PP1-TXRX',
'ROADM-C1-SRG1-SRG1-PP1-TXRXtoXPDR-C1-XPDR1-XPDR1-NETWORK1']
nb_updated_link = 0
time.sleep(1)
def test_17_check_update_tapi_neps(self):
- response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+OTSi")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nep_list = res['output']['node']['owned-node-edge-point']
+ self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-node-details', self.node_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nep_list = response['output']['node']['owned-node-edge-point']
nb_updated_neps = 0
for nep in nep_list:
if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
"Operational State should be 'ENABLED'")
self.assertEqual(nep['administrative-state'], 'UNLOCKED',
"Administrative State should be 'UNLOCKED'")
- response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+DSR")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nep_list = res['output']['node']['owned-node-edge-point']
+ self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-node-details', self.node_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nep_list = response['output']['node']['owned-node-edge-point']
for nep in nep_list:
if 'XPDR1-NETWORK1' in nep['name'][0]['value']:
self.assertEqual(nep['operational-state'], 'DISABLED',
time.sleep(1)
def test_18_check_update_tapi_links(self):
- response = test_utils.tapi_get_topology_details_request("T0 - Full Multi-layer topology")
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
time.sleep(2)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- link_list = res['output']['topology']['link']
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ link_list = response['output']['topology']['link']
nb_updated_link = 0
for link in link_list:
if all(x in link['name'][0]['value'] for x in ['XPDR-C1-XPDR1', 'XPDR1-NETWORK1']):
time.sleep(1)
def test_19_check_update_service_Ethernet(self):
- response = test_utils.get_service_list_request(
- "services/" + str(self.uuid_services.eth))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(res['services'][0]['operational-state'], 'outOfService')
- self.assertEqual(res['services'][0]['administrative-state'], 'inService')
- time.sleep(1)
+ response = test_utils_rfc8040.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['services'][0]['operational-state'], 'outOfService')
+ self.assertEqual(response['services'][0]['administrative-state'], 'inService')
def test_20_check_update_connectivity_service_Ethernet(self):
- response = test_utils.tapi_get_connectivity_request(str(self.uuid_services.eth))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertEqual(
- res['output']['service']['operational-state'], 'DISABLED')
- self.assertEqual(
- res['output']['service']['administrative-state'], 'LOCKED')
+ self.tapi_serv_details["service-id-or-name"] = str(self.uuid_services.eth)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'get-connectivity-service-details', self.tapi_serv_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ self.assertEqual(response['output']['service']['operational-state'], 'DISABLED')
+ self.assertEqual(response['output']['service']['administrative-state'], 'LOCKED')
time.sleep(1)
def test_21_restore_status_line_port_xpdrc(self):
"administrative-state": "inService",
"port-qual": "xpdr-network"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8144/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_22_check_update_portmapping_ok(self):
- response = test_utils.portmapping_request("XPDR-C1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("XPDR-C1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
self.assertEqual(mapping['port-oper-state'], 'InService',
"Operational State should be 'InService'")
time.sleep(1)
def test_23_check_update_openroadm_topo_ok(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(node['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
for link in link_list:
self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(link['org-openroadm-common-network:administrative-state'], 'inService')
time.sleep(1)
def test_24_check_update_tapi_neps_ok(self):
- response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+OTSi")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nep_list = res['output']['node']['owned-node-edge-point']
+ self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+OTSi"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-node-details', self.node_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nep_list = response['output']['node']['owned-node-edge-point']
for nep in nep_list:
self.assertEqual(nep['operational-state'], 'ENABLED',
"Operational State should be 'ENABLED'")
self.assertEqual(nep['administrative-state'], 'UNLOCKED',
"Administrative State should be 'UNLOCKED'")
- response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "XPDR-C1-XPDR1+DSR")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nep_list = res['output']['node']['owned-node-edge-point']
+ self.node_details["node-id-or-name"] = "XPDR-C1-XPDR1+DSR"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-node-details', self.node_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nep_list = response['output']['node']['owned-node-edge-point']
for nep in nep_list:
self.assertEqual(nep['operational-state'], 'ENABLED',
"Operational State should be 'ENABLED'")
time.sleep(1)
def test_25_check_update_tapi_links_ok(self):
- response = test_utils.tapi_get_topology_details_request(
- "T0 - Full Multi-layer topology")
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
time.sleep(2)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- link_list = res['output']['topology']['link']
+ link_list = response['output']['topology']['link']
for link in link_list:
self.assertEqual(link['operational-state'], 'ENABLED')
self.assertEqual(link['administrative-state'], 'UNLOCKED')
-
time.sleep(1)
def test_26_check_update_service1_ok(self):
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_29_check_update_portmapping(self):
- response = test_utils.portmapping_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'SRG1-PP1-TXRX':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_30_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
updated_links = ['XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX',
'ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1']
nb_updated_link = 0
time.sleep(1)
def test_31_check_update_tapi_neps(self):
- response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nep_list = res['output']['node']['owned-node-edge-point']
+ self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-node-details', self.node_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nep_list = response['output']['node']['owned-node-edge-point']
nb_updated_neps = 0
for nep in nep_list:
if 'SRG1-PP1-TXRX' in nep['name'][0]['value']:
time.sleep(1)
def test_32_check_update_tapi_links(self):
- response = test_utils.tapi_get_topology_details_request(
- "T0 - Full Multi-layer topology")
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
time.sleep(2)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- link_list = res['output']['topology']['link']
+ link_list = response['output']['topology']['link']
nb_updated_link = 0
for link in link_list:
if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP1-TXRX']):
"administrative-state": "inService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_36_check_update_portmapping_ok(self):
- response = test_utils.portmapping_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
self.assertEqual(mapping['port-oper-state'], 'InService',
"Operational State should be 'InService'")
self.test_23_check_update_openroadm_topo_ok()
def test_38_check_update_tapi_neps_ok(self):
- response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nep_list = res['output']['node']['owned-node-edge-point']
+ self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-node-details', self.node_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nep_list = response['output']['node']['owned-node-edge-point']
for nep in nep_list:
self.assertEqual(nep['operational-state'], 'ENABLED',
"Operational State should be 'ENABLED'")
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_43_check_update_portmapping(self):
- response = test_utils.portmapping_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'DEG2-TTP-TXRX':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_44_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
updated_links = ['ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX',
'ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX']
nb_updated_link = 0
time.sleep(1)
def test_45_check_update_tapi_neps(self):
- response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nep_list = res['output']['node']['owned-node-edge-point']
+ self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-node-details', self.node_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nep_list = response['output']['node']['owned-node-edge-point']
nb_updated_neps = 0
for nep in nep_list:
if 'DEG2-TTP-TXRX' in nep['name'][0]['value']:
time.sleep(1)
def test_46_check_update_tapi_links(self):
- response = test_utils.tapi_get_topology_details_request(
- "T0 - Full Multi-layer topology")
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
time.sleep(2)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- link_list = res['output']['topology']['link']
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ link_list = response['output']['topology']['link']
nb_updated_link = 0
for link in link_list:
if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'DEG2-TTP-TXRX']):
"administrative-state": "inService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
"administrative-state": "outOfService",
"port-qual": "roadm-external"}]}
response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
+ data=json.dumps(body), headers=test_utils_rfc8040.TYPE_APPLICATION_JSON,
+ auth=(test_utils_rfc8040.ODL_LOGIN, test_utils_rfc8040.ODL_PWD),
+ timeout=test_utils_rfc8040.REQUEST_TIMEOUT)
self.assertEqual(response.status_code, requests.codes.ok)
time.sleep(2)
def test_57_check_update_portmapping(self):
- response = test_utils.portmapping_request("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- mapping_list = res['nodes'][0]['mapping']
+ response = test_utils_rfc8040.get_portmapping_node_attr("ROADM-A1", None, None)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ mapping_list = response['nodes'][0]['mapping']
for mapping in mapping_list:
if mapping['logical-connection-point'] == 'SRG1-PP2-TXRX':
self.assertEqual(mapping['port-oper-state'], 'OutOfService',
time.sleep(1)
def test_58_check_update_openroadm_topo(self):
- url = test_utils.URL_CONFIG_ORDM_TOPO
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- node_list = res['network'][0]['node']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ node_list = response['network'][0]['node']
nb_updated_tp = 0
for node in node_list:
self.assertEqual(node['org-openroadm-common-network:operational-state'], 'inService')
self.assertEqual(tp['org-openroadm-common-network:administrative-state'], 'inService')
self.assertEqual(nb_updated_tp, 1, "Only one termination-point should have been modified")
- link_list = res['network'][0]['ietf-network-topology:link']
+ link_list = response['network'][0]['ietf-network-topology:link']
nb_updated_link = 0
for link in link_list:
self.assertEqual(link['org-openroadm-common-network:operational-state'], 'inService')
time.sleep(1)
def test_59_check_update_tapi_neps(self):
- response = test_utils.tapi_get_node_details_request("T0 - Full Multi-layer topology", "ROADM-A1+PHOTONIC_MEDIA")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- nep_list = res['output']['node']['owned-node-edge-point']
+ self.node_details["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ self.node_details["node-id-or-name"] = "ROADM-A1+PHOTONIC_MEDIA"
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-node-details', self.node_details)
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ nep_list = response['output']['node']['owned-node-edge-point']
nb_updated_neps = 0
for nep in nep_list:
if 'SRG1-PP2-TXRX' in nep['name'][0]['value']:
time.sleep(1)
def test_60_check_update_tapi_links(self):
- response = test_utils.tapi_get_topology_details_request(
- "T0 - Full Multi-layer topology")
+ self.tapi_topo["topology-id-or-name"] = test_utils_rfc8040.T0_FULL_MULTILAYER_TOPO
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-topology', 'get-topology-details', self.tapi_topo)
time.sleep(2)
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- link_list = res['output']['topology']['link']
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ link_list = response['output']['topology']['link']
nb_updated_link = 0
for link in link_list:
if all(x in link['name'][0]['value'] for x in ['ROADM-A1', 'SRG1-PP2-TXRX']):
self.test_13_get_connectivity_service_Ethernet()
def test_63_delete_connectivity_service_Ethernet(self):
- response = test_utils.tapi_delete_connectivity_request(str(self.uuid_services.eth))
- self.assertEqual(response.status_code, requests.codes.no_content)
+ self.del_serv_input_data["service-id-or-name"] = str(self.uuid_services.eth)
+ response = test_utils_rfc8040.transportpce_api_rpc_request(
+ 'tapi-connectivity', 'delete-connectivity-service', self.del_serv_input_data)
+ self.assertIn(response['status_code'], (requests.codes.ok, requests.codes.no_content))
time.sleep(self.WAITING)
def test_64_disconnect_xponders_from_roadm(self):
- url = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"
- response = test_utils.get_ordm_topo_request("")
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- links = res['network'][0]['ietf-network-topology:link']
+ response = test_utils_rfc8040.get_ietf_network_request('openroadm-topology', 'config')
+ self.assertEqual(response['status_code'], requests.codes.ok)
+ links = response['network'][0]['ietf-network-topology:link']
for link in links:
if link["org-openroadm-common-network:link-type"] in ('XPONDER-OUTPUT', 'XPONDER-INPUT'):
- link_name = link["link-id"]
- response = test_utils.delete_request(url+link_name)
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils_rfc8040.del_ietf_network_link_request(
+ 'openroadm-topology', link['link-id'], 'config')
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_65_disconnect_XPDRA(self):
- response = test_utils.unmount_device("XPDR-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDR-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_66_disconnect_XPDRC(self):
- response = test_utils.unmount_device("XPDR-C1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("XPDR-C1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_67_disconnect_ROADMA(self):
- response = test_utils.unmount_device("ROADM-A1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-A1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
def test_68_disconnect_ROADMC(self):
- response = test_utils.unmount_device("ROADM-C1")
- self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
+ response = test_utils_rfc8040.unmount_device("ROADM-C1")
+ self.assertIn(response.status_code, (requests.codes.ok, requests.codes.no_content))
if __name__ == "__main__":