# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+import base64
import unittest
-import json
import time
import requests
-from common import test_utils
+import sys
+sys.path.append('transportpce_tests/common/')
+import test_utils
class TransportPCEFulltesting(unittest.TestCase):
processes = None
+ cr_serv_sample_data = {"input": {
+ "sdnc-request-header": {
+ "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
+ "rpc-action": "service-create",
+ "request-system-id": "appname",
+ "notification-url": "http://localhost:8585/NotificationServer/notify"
+ },
+ "service-name": "service1",
+ "common-id": "ASATT1234567",
+ "connection-type": "service",
+ "service-a-end": {
+ "service-rate": "100",
+ "node-id": "XPDR-A1",
+ "service-format": "Ethernet",
+ "clli": "SNJSCAMCJP8",
+ "tx-direction": {
+ "port": {
+ "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
+ "port-type": "router",
+ "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
+ "port-rack": "000000.00",
+ "port-shelf": "00"
+ },
+ "lgx": {
+ "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
+ "lgx-port-name": "LGX Back.3",
+ "lgx-port-rack": "000000.00",
+ "lgx-port-shelf": "00"
+ }
+ },
+ "rx-direction": {
+ "port": {
+ "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
+ "port-type": "router",
+ "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
+ "port-rack": "000000.00",
+ "port-shelf": "00"
+ },
+ "lgx": {
+ "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
+ "lgx-port-name": "LGX Back.4",
+ "lgx-port-rack": "000000.00",
+ "lgx-port-shelf": "00"
+ }
+ },
+ "optic-type": "gray"
+ },
+ "service-z-end": {
+ "service-rate": "100",
+ "node-id": "XPDR-C1",
+ "service-format": "Ethernet",
+ "clli": "SNJSCAMCJT4",
+ "tx-direction": {
+ "port": {
+ "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
+ "port-type": "router",
+ "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
+ "port-rack": "000000.00",
+ "port-shelf": "00"
+ },
+ "lgx": {
+ "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
+ "lgx-port-name": "LGX Back.29",
+ "lgx-port-rack": "000000.00",
+ "lgx-port-shelf": "00"
+ }
+ },
+ "rx-direction": {
+ "port": {
+ "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
+ "port-type": "router",
+ "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
+ "port-rack": "000000.00",
+ "port-shelf": "00"
+ },
+ "lgx": {
+ "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
+ "lgx-port-name": "LGX Back.30",
+ "lgx-port-rack": "000000.00",
+ "lgx-port-shelf": "00"
+ }
+ },
+ "optic-type": "gray"
+ },
+ "due-date": "2016-11-28T00:00:01Z",
+ "operator-contact": "pw1234"
+ }
+ }
+
WAITING = 20 # nominal value is 300
+ NODE_VERSION = '2.2.1'
@classmethod
def setUpClass(cls):
cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
+ cls.processes = test_utils.start_sims([('xpdra', cls.NODE_VERSION),
+ ('roadma', cls.NODE_VERSION),
+ ('roadmc', cls.NODE_VERSION),
+ ('xpdrc', cls.NODE_VERSION)])
@classmethod
def tearDownClass(cls):
+ # pylint: disable=not-an-iterable
for process in cls.processes:
test_utils.shutdown_process(process)
print("all processes killed")
print("execution of {}".format(self.id().split(".")[-1]))
def test_01_connect_xpdrA(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ response = test_utils.mount_device("XPDR-A1", ('xpdra', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_connect_xpdrC(self):
- response = test_utils.mount_device("XPDR-C1", 'xpdrc')
+ response = test_utils.mount_device("XPDR-C1", ('xpdrc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
+ response = test_utils.mount_device("ROADM-A1", ('roadma', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ response = test_utils.mount_device("ROADM-C1", ('roadmc', self.NODE_VERSION))
self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
# test service-create for Eth service from xpdr to xpdr
def test_11_create_eth_service1(self):
- url = "{}/operations/org-openroadm-service:service-create"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service1",
- "common-id": "ASATT1234567",
- "connection-type": "service",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "XPDR-A1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "XPDR-C1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = test_utils.post_request(url, data)
+ self.cr_serv_sample_data["input"]["service-name"] = "service1"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_12_get_eth_service1(self):
- url = "{}/operational/org-openroadm-service:service-list/services/service1"
- response = test_utils.get_request(url)
+ response = test_utils.get_service_list_request("services/service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(2)
def test_13_check_xc1_ROADMA(self):
- response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
+ response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
dict({
- 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
+ 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
'opticalControlMode': 'gainLoss',
'target-output-power': -3.0
}, **res['roadm-connections'][0]),
res['roadm-connections'][0]
)
self.assertDictEqual(
- {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
+ {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
res['roadm-connections'][0]['source'])
self.assertDictEqual(
- {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
+ {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
res['roadm-connections'][0]['destination'])
time.sleep(5)
def test_14_check_xc1_ROADMC(self):
- response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
+ response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
dict({
- 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
+ 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
'opticalControlMode': 'gainLoss',
'target-output-power': -3.0
}, **res['roadm-connections'][0]),
res['roadm-connections'][0]
)
self.assertDictEqual(
- {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
+ {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
res['roadm-connections'][0]['source'])
self.assertDictEqual(
- {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
+ {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
res['roadm-connections'][0]['destination'])
time.sleep(5)
response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1},
- res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'SRG1-PP1-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
if ele['tp-id'] == 'SRG1-PP2-TXRX':
- self.assertNotIn('used-wavelength', dict.keys(ele))
+ self.assertNotIn('avail-freq-maps', dict.keys(ele))
time.sleep(3)
def test_17_check_topo_ROADMA_DEG1(self):
response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1},
- res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'DEG2-CTP-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
if ele['tp-id'] == 'DEG2-TTP-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Index 1 should not be available")
time.sleep(3)
def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
time.sleep(2)
def test_22_create_eth_service2(self):
- url = "{}/operations/org-openroadm-service:service-create"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service2",
- "common-id": "ASATT1234567",
- "connection-type": "service",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "XPDR-A1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "XPDR-C1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = test_utils.post_request(url, data)
+ self.cr_serv_sample_data["input"]["service-name"] = "service2"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_23_get_eth_service2(self):
- url = "{}/operational/org-openroadm-service:service-list/services/service2"
- response = test_utils.get_request(url)
+ response = test_utils.get_service_list_request("services/service2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(1)
def test_24_check_xc2_ROADMA(self):
- response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2")
+ response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
dict({
- 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-2',
+ 'connection-name': 'DEG2-TTP-TXRX-SRG1-PP2-TXRX-753:760',
'opticalControlMode': 'power'
}, **res['roadm-connections'][0]),
res['roadm-connections'][0]
)
self.assertDictEqual(
- {'src-if': 'DEG2-TTP-TXRX-nmc-2'},
+ {'src-if': 'DEG2-TTP-TXRX-nmc-753:760'},
res['roadm-connections'][0]['source'])
self.assertDictEqual(
- {'dst-if': 'SRG1-PP2-TXRX-nmc-2'},
+ {'dst-if': 'SRG1-PP2-TXRX-nmc-753:760'},
res['roadm-connections'][0]['destination'])
def test_25_check_topo_XPDRA(self):
response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1}, res['node'][0]
- [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
- self.assertNotIn({u'index': 2}, res['node'][0]
- [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
+ self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'SRG1-PP1-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
- self.assertNotIn({u'index': 2, u'frequency': 196.05,
- u'width': 40},
- ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
+ self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
if ele['tp-id'] == 'SRG1-PP2-TXRX':
- self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
- ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
- self.assertNotIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:pp-attributes']['used-wavelength'])
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
+ self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
if ele['tp-id'] == 'SRG1-PP3-TXRX':
self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
time.sleep(10)
response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1}, res['node'][0]
- [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
- self.assertNotIn({u'index': 2}, res['node'][0]
- [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
+ self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'DEG2-CTP-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
- self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
- ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
+ self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
if ele['tp-id'] == 'DEG2-TTP-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
- self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
- ele['org-openroadm-network-topology:tx-ttp-attributes']['used-wavelengths'])
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 0, "Lambda 1 should not be available")
+ self.assertEqual(freq_map_array[94], 0, "Lambda 2 should not be available")
time.sleep(10)
# creation service test on a non-available resource
def test_28_create_eth_service3(self):
- url = "{}/operations/org-openroadm-service:service-create"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service3",
- "common-id": "ASATT1234567",
- "connection-type": "service",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "XPDR-A1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "XPDR-C1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = test_utils.post_request(url, data)
+ self.cr_serv_sample_data["input"]["service-name"] = "service3"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
# add a test that check the openroadm-service-list still only contains 2 elements
def test_29_delete_eth_service3(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service3",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service3")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Service \'service3\' does not exist in datastore',
time.sleep(20)
def test_30_delete_eth_service1(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_31_delete_eth_service2(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service2",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
for ele in liste_tp:
if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
- elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
+ elif ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
self.assertIn(u'tail-equipment-id',
dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
self.assertNotIn('wavelength', dict.keys(
response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertIn({u'index': 1}, res['node'][0]
- [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
- self.assertIn({u'index': 2}, res['node'][0]
- [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
+ self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
- if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
- self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
+ if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP2-TXRX':
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:pp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
+ self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
+ elif ele['tp-id'] == 'SRG1-CP-TXRX':
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:cp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 255, "Index 1 should be available")
+ self.assertEqual(freq_map_array[94], 255, "Index 2 should be available")
else:
self.assertNotIn('org-openroadm-network-topology:pp-attributes', dict.keys(ele))
time.sleep(10)
response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertIn({u'index': 1}, res['node'][0]
- [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
- self.assertIn({u'index': 2}, res['node'][0]
- [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
+ self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'DEG2-CTP-TXRX':
- self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
+ self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
if ele['tp-id'] == 'DEG2-TTP-TXRX':
- self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
+ freq_map = base64.b64decode(
+ ele['org-openroadm-network-topology:tx-ttp-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[95], 255, "Lambda 1 should be available")
+ self.assertEqual(freq_map_array[94], 255, "Lambda 2 should be available")
time.sleep(10)
# test service-create for Optical Channel (OC) service from srg-pp to srg-pp
def test_36_create_oc_service1(self):
- url = "{}/operations/org-openroadm-service:service-create"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service1",
- "common-id": "ASATT1234567",
- "connection-type": "roadm-line",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "ROADM-A1",
- "service-format": "OC",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "ROADM-C1",
- "service-format": "OC",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = test_utils.post_request(url, data)
+ self.cr_serv_sample_data["input"]["service-name"] = "service1"
+ self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
+ self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
+ self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_37_get_oc_service1(self):
- url = "{}/operational/org-openroadm-service:service-list/services/service1"
- response = test_utils.get_request(url)
+ response = test_utils.get_service_list_request("services/service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(1)
def test_38_check_xc1_ROADMA(self):
- response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
+ response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
dict({
- 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-1',
+ 'connection-name': 'SRG1-PP1-TXRX-DEG2-TTP-TXRX-761:768',
'opticalControlMode': 'gainLoss',
'target-output-power': -3.0
}, **res['roadm-connections'][0]),
res['roadm-connections'][0]
)
self.assertDictEqual(
- {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
+ {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
res['roadm-connections'][0]['source'])
self.assertDictEqual(
- {'dst-if': 'DEG2-TTP-TXRX-nmc-1'},
+ {'dst-if': 'DEG2-TTP-TXRX-nmc-761:768'},
res['roadm-connections'][0]['destination'])
time.sleep(7)
def test_39_check_xc1_ROADMC(self):
- response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
+ response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
dict({
- 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-1',
+ 'connection-name': 'SRG1-PP1-TXRX-DEG1-TTP-TXRX-761:768',
'opticalControlMode': 'gainLoss',
'target-output-power': -3.0
}, **res['roadm-connections'][0]),
res['roadm-connections'][0]
)
self.assertDictEqual(
- {'src-if': 'SRG1-PP1-TXRX-nmc-1'},
+ {'src-if': 'SRG1-PP1-TXRX-nmc-761:768'},
res['roadm-connections'][0]['source'])
self.assertDictEqual(
- {'dst-if': 'DEG1-TTP-TXRX-nmc-1'},
+ {'dst-if': 'DEG1-TTP-TXRX-nmc-761:768'},
res['roadm-connections'][0]['destination'])
time.sleep(7)
def test_40_create_oc_service2(self):
- url = "{}/operations/org-openroadm-service:service-create"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service2",
- "common-id": "ASATT1234567",
- "connection-type": "roadm-line",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "ROADM-A1",
- "service-format": "OC",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "ROADM-C1",
- "service-format": "OC",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = test_utils.post_request(url, data)
+ self.cr_serv_sample_data["input"]["service-name"] = "service2"
+ self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
+ self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
+ self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_41_get_oc_service2(self):
- url = "{}/operational/org-openroadm-service:service-list/services/service2"
- response = test_utils.get_request(url)
+ response = test_utils.get_service_list_request("services/service2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(2)
def test_42_check_xc2_ROADMA(self):
- response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2")
+ response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
self.assertDictEqual(
dict({
- 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-2',
+ 'connection-name': 'SRG1-PP2-TXRX-DEG2-TTP-TXRX-753:760',
'opticalControlMode': 'gainLoss',
'target-output-power': -3.0
}, **res['roadm-connections'][0]),
res['roadm-connections'][0]
)
self.assertDictEqual(
- {'src-if': 'SRG1-PP2-TXRX-nmc-2'},
+ {'src-if': 'SRG1-PP2-TXRX-nmc-753:760'},
res['roadm-connections'][0]['source'])
self.assertDictEqual(
- {'dst-if': 'DEG2-TTP-TXRX-nmc-2'},
+ {'dst-if': 'DEG2-TTP-TXRX-nmc-753:760'},
res['roadm-connections'][0]['destination'])
time.sleep(2)
time.sleep(3)
def test_44_delete_oc_service1(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_45_delete_oc_service2(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service2",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
def test_46_get_no_oc_services(self):
print("start test")
- url = "{}/operational/org-openroadm-service:service-list"
- response = test_utils.get_request(url)
- self.assertEqual(response.status_code, requests.codes.not_found)
+ response = test_utils.get_service_list_request("")
+ self.assertEqual(response.status_code, requests.codes.conflict)
res = response.json()
self.assertIn(
{"error-type": "application", "error-tag": "data-missing",
self.test_30_delete_eth_service1()
def test_50_loop_create_oc_service(self):
- url = "{}/operational/org-openroadm-service:service-list/services/service1"
- response = test_utils.get_request(url)
+ response = test_utils.get_service_list_request("services/service1")
if response.status_code != 404:
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1",
- "tail-retention": "no"
- }
- }
- }
- test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1")
time.sleep(5)
for i in range(1, 6):