# pylint: disable=too-many-public-methods
import os
-import json
# pylint: disable=wrong-import-order
import sys
import unittest
os.environ['JAVA_MAX_MEM'] = '4096M'
cls.processes = test_utils.start_tpce()
# NBI notification feature is not installed by default in Karaf
- if "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
+ if "NO_ODL_STARTUP" not in os.environ or "USE_LIGHTY" not in os.environ or os.environ['USE_LIGHTY'] != 'True':
print("installing NBI notification feature...")
result = test_utils.install_karaf_feature("odl-transportpce-nbinotifications")
if result.returncode != 0:
self.assertEqual(response.status_code,
requests.codes.created, test_utils.CODE_SHOULD_BE_201)
- def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
+ def test_05_connect_xpdrA_N1_to_roadmA_PP1(self):
response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-A1', 'xpdr-num': '1', 'network-num': '1',
self.assertIn('Roadm Xponder links created successfully', response["output"]["result"])
time.sleep(2)
- def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
+ def test_07_connect_xpdrC_N1_to_roadmC_PP1(self):
response = test_utils.transportpce_api_rpc_request(
'transportpce-networkutils', 'init-xpdr-rdm-links',
{'links-input': {'xpdr-node': 'XPDR-C1', 'xpdr-num': '1', 'network-num': '1',
response['output']['service']['end-point'][0]['name'][0])
self.assertDictEqual(dict(input_dict_3, **response['output']['service']['end-point'][1]['name'][0]),
response['output']['service']['end-point'][1]['name'][0])
- # If the gate fails is because of the waiting time not being enough
- time.sleep(self.WAITING)
def test_12_get_service_Ethernet(self):
response = test_utils.get_ordm_serv_list_attr_request("services", str(self.uuid_services.eth))
time.sleep(2)
def test_15_change_status_port_roadma_srg(self):
- url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
- body = {"ports": [{
+ self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C1',
+ {
"port-name": "C1",
"logical-connection-point": "SRG1-PP1",
"port-type": "client",
"circuit-id": "SRG1",
"administrative-state": "outOfService",
- "port-qual": "roadm-external"}]}
- response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
- self.assertEqual(response.status_code, requests.codes.ok)
- # If the gate fails is because of the waiting time not being enough
- time.sleep(2)
+ "port-qual": "roadm-external"
+ }))
def test_16_get_tapi_notifications_connectivity_service_Ethernet(self):
self.cr_get_notif_list_input_data["subscription-id-or-name"] = str(self.uuid_subscriptions.eth)
time.sleep(2)
def test_17_restore_status_port_roadma_srg(self):
- url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
- body = {"ports": [{
+ self.assertTrue(test_utils.sims_update_cp_port(('roadma', self.NODE_VERSION_221), '3/0', 'C1',
+ {
"port-name": "C1",
"logical-connection-point": "SRG1-PP1",
"port-type": "client",
"circuit-id": "SRG1",
"administrative-state": "inService",
- "port-qual": "roadm-external"}]}
- response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
- data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD),
- timeout=test_utils.REQUEST_TIMEOUT)
- self.assertEqual(response.status_code, requests.codes.ok)
- # If the gate fails is because of the waiting time not being enough
- time.sleep(2)
+ "port-qual": "roadm-external"
+ }))
def test_18_get_tapi_notifications_connectivity_service_Ethernet(self):
self.cr_get_notif_list_input_data["subscription-id-or-name"] = str(self.uuid_subscriptions.eth)
response = test_utils.transportpce_api_rpc_request(
'tapi-notification', 'get-notification-list', self.cr_get_notif_list_input_data)
self.assertEqual(response['status_code'], requests.codes.ok)
- self.assertEqual(response['output']['notification'][1]['target-object-identifier'], str(self.uuid_services.eth))
- self.assertEqual(response['output']['notification'][1]['target-object-type'], 'CONNECTIVITY_SERVICE')
- self.assertEqual(response['output']['notification'][1]['changed-attributes'][0]['new-value'], 'UNLOCKED')
- self.assertEqual(response['output']['notification'][1]['changed-attributes'][1]['new-value'], 'ENABLED')
+ self.assertEqual(response['output']['notification'][0]['target-object-identifier'], str(self.uuid_services.eth))
+ self.assertEqual(response['output']['notification'][0]['target-object-type'], 'CONNECTIVITY_SERVICE')
+ self.assertEqual(response['output']['notification'][0]['changed-attributes'][0]['new-value'], 'UNLOCKED')
+ self.assertEqual(response['output']['notification'][0]['changed-attributes'][1]['new-value'], 'ENABLED')
time.sleep(2)
def test_19_delete_connectivity_service_Ethernet(self):