# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+import base64
import unittest
-import json
import time
import requests
from common import test_utils
+from common.test_utils import INDEX_1_USED_FREQ_MAP, INDEX_1_2_USED_FREQ_MAP, AVAILABLE_FREQ_MAP
class TransportPCEFulltesting(unittest.TestCase):
@classmethod
def tearDownClass(cls):
+ # pylint: disable=not-an-iterable
for process in cls.processes:
test_utils.shutdown_process(process)
print("all processes killed")
response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1},
- res['node'][0][u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'SRG1-PP1-TXRX':
response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1},
- res['node'][0][u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'DEG2-CTP-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
+ self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
+ u'effective-bits': 8, u'freq-map': INDEX_1_USED_FREQ_MAP},
+ ele['org-openroadm-network-topology:'
+ 'ctp-attributes'][
+ 'avail-freq-maps'])
if ele['tp-id'] == 'DEG2-TTP-TXRX':
self.assertIn({u'index': 1, u'frequency': 196.1,
u'width': 40},
response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1}, res['node'][0]
- [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
- self.assertNotIn({u'index': 2}, res['node'][0]
- [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
+ self.assertEqual(freq_map_array[1], 0, "Index 2 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'SRG1-PP1-TXRX':
response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertNotIn({u'index': 1}, res['node'][0]
- [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
- self.assertNotIn({u'index': 2}, res['node'][0]
- [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 0, "Index 1 should not be available")
+ self.assertEqual(freq_map_array[1], 0, "Index 2 should not be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'DEG2-CTP-TXRX':
- self.assertIn({u'index': 1, u'frequency': 196.1,
- u'width': 40},
- ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
- self.assertIn({u'index': 2, u'frequency': 196.05, u'width': 40},
- ele['org-openroadm-network-topology:ctp-attributes']['used-wavelengths'])
+ self.assertIn({u'map-name': 'cband', u'freq-map-granularity': 6.25, u'start-edge-freq': 191.325,
+ u'effective-bits': 8, u'freq-map': INDEX_1_2_USED_FREQ_MAP},
+ ele['org-openroadm-network-topology:'
+ 'ctp-attributes'][
+ 'avail-freq-maps'])
if ele['tp-id'] == 'DEG2-TTP-TXRX':
self.assertIn({u'index': 1, u'frequency': 196.1,
u'width': 40},
# add a test that check the openroadm-service-list still only contains 2 elements
def test_29_delete_eth_service3(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service3",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service3")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Service \'service3\' does not exist in datastore',
time.sleep(20)
def test_30_delete_eth_service1(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_31_delete_eth_service2(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service2",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
for ele in liste_tp:
if ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-CLIENT':
self.assertNotIn('org-openroadm-network-topology:xpdr-client-attributes', dict.keys(ele))
- elif (ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK'):
+ elif ele[u'org-openroadm-common-network:tp-type'] == 'XPONDER-NETWORK':
self.assertIn(u'tail-equipment-id',
dict.keys(ele[u'org-openroadm-network-topology:xpdr-network-attributes']))
self.assertNotIn('wavelength', dict.keys(
response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertIn({u'index': 1}, res['node'][0]
- [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
- self.assertIn({u'index': 2}, res['node'][0]
- [u'org-openroadm-network-topology:srg-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:srg-attributes']['avail-freq-maps'][0]['freq-map'])
+ freq_map_array = [int(x) for x in freq_map]
+ self.assertEqual(freq_map_array[0], 255, "Index 1 should be available")
+ self.assertEqual(freq_map_array[1], 255, "Index 2 should be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'SRG1-PP1-TXRX' or ele['tp-id'] == 'SRG1-PP1-TXRX':
response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertIn({u'index': 1}, res['node'][0]
- [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
- self.assertIn({u'index': 2}, res['node'][0]
- [u'org-openroadm-network-topology:degree-attributes']['available-wavelengths'])
+ freq_map = base64.b64decode(
+ res['node'][0]['org-openroadm-network-topology:degree-attributes']['avail-freq-maps'][0]['freq-map'])
+ self.assertTrue(test_utils.check_freq_map(freq_map), "Index 1 and 2 should be available")
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
for ele in liste_tp:
if ele['tp-id'] == 'DEG2-CTP-TXRX':
- self.assertNotIn('org-openroadm-network-topology:ctp-attributes', dict.keys(ele))
+ self.assertEqual(ele['org-openroadm-network-topology:ctp-attributes']['avail-freq-maps'][0]['freq-map'],
+ AVAILABLE_FREQ_MAP)
if ele['tp-id'] == 'DEG2-TTP-TXRX':
self.assertNotIn('org-openroadm-network-topology:tx-ttp-attributes', dict.keys(ele))
time.sleep(10)
time.sleep(3)
def test_44_delete_oc_service1(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_45_delete_oc_service2(self):
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service2",
- "tail-retention": "no"
- }
- }
- }
- response = test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
def test_46_get_no_oc_services(self):
print("start test")
response = test_utils.get_service_list_request("")
- self.assertEqual(response.status_code, requests.codes.not_found)
+ self.assertEqual(response.status_code, requests.codes.conflict)
res = response.json()
self.assertIn(
{"error-type": "application", "error-tag": "data-missing",
def test_50_loop_create_oc_service(self):
response = test_utils.get_service_list_request("services/service1")
if response.status_code != 404:
- url = "{}/operations/org-openroadm-service:service-delete"
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-delete",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-delete-req-info": {
- "service-name": "service1",
- "tail-retention": "no"
- }
- }
- }
- test_utils.post_request(url, data)
+ response = test_utils.service_delete_request("service1")
time.sleep(5)
for i in range(1, 6):