##############################################################################
+import unittest
import json
-import os
-import psutil
-import requests
-import signal
-import shutil
-import subprocess
import time
-import unittest
+import requests
from common import test_utils
class TransportPCEFulltesting(unittest.TestCase):
processes = None
- WAITING = 20 # nominal value is 300
-
- @classmethod
- def setUpClass(cls):
- cls.processes = test_utils.start_tpce()
- cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
-
- @classmethod
- def tearDownClass(cls):
- for process in cls.processes:
- test_utils.shutdown_process(process)
- print("all processes killed")
-
- def setUp(self): # instruction executed before each test method
- print("execution of {}".format(self.id().split(".")[-1]))
-
- def test_01_connect_xpdrA(self):
- response = test_utils.mount_device("XPDR-A1", 'xpdra')
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
-
- def test_02_connect_xpdrC(self):
- response = test_utils.mount_device("XPDR-C1", 'xpdrc')
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
-
- def test_03_connect_rdmA(self):
- response = test_utils.mount_device("ROADM-A1", 'roadma')
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
-
- def test_04_connect_rdmC(self):
- response = test_utils.mount_device("ROADM-C1", 'roadmc')
- self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
-
- def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-A1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADM-A1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-A1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADM-A1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-C1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADM-C1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-C1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADM-C1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
- self.assertEqual(response.status_code, requests.codes.ok)
- res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_09_add_omsAttributes_ROADMA_ROADMC(self):
- # Config ROADMA-ROADMC oms-attributes
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
- "OMS-attributes/span"
- .format(test_utils.RESTCONF_BASE_URL))
- data = {"span": {
- "auto-spanloss": "true",
- "spanloss-base": 11.4,
- "spanloss-current": 12,
- "engineered-spanloss": 12.2,
- "link-concatenation": [{
- "SRLG-Id": 0,
- "fiber-type": "smf",
- "SRLG-length": 100000,
- "pmd": 0.5}]}}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
- self.assertEqual(response.status_code, requests.codes.created)
-
- def test_10_add_omsAttributes_ROADMC_ROADMA(self):
- # Config ROADMC-ROADMA oms-attributes
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
- "OMS-attributes/span"
- .format(test_utils.RESTCONF_BASE_URL))
- data = {"span": {
- "auto-spanloss": "true",
- "spanloss-base": 11.4,
- "spanloss-current": 12,
- "engineered-spanloss": 12.2,
- "link-concatenation": [{
- "SRLG-Id": 0,
- "fiber-type": "smf",
- "SRLG-length": 100000,
- "pmd": 0.5}]}}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
- self.assertEqual(response.status_code, requests.codes.created)
-
-# test service-create for Eth service from xpdr to xpdr
- def test_11_create_eth_service1(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(test_utils.RESTCONF_BASE_URL))
- data = {"input": {
+ cr_serv_sample_data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
"rpc-action": "service-create",
"operator-contact": "pw1234"
}
}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+
+ WAITING = 20 # nominal value is 300
+
+ @classmethod
+ def setUpClass(cls):
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmc', 'xpdrc'])
+
+ @classmethod
+ def tearDownClass(cls):
+ for process in cls.processes:
+ test_utils.shutdown_process(process)
+ print("all processes killed")
+
+ def setUp(self): # instruction executed before each test method
+ print("execution of {}".format(self.id().split(".")[-1]))
+
+ def test_01_connect_xpdrA(self):
+ response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+
+ def test_02_connect_xpdrC(self):
+ response = test_utils.mount_device("XPDR-C1", 'xpdrc')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+
+ def test_03_connect_rdmA(self):
+ response = test_utils.mount_device("ROADM-A1", 'roadma')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+
+ def test_04_connect_rdmC(self):
+ response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
+
+ def test_05_connect_xprdA_N1_to_roadmA_PP1(self):
+ response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
+ "ROADM-A1", "1", "SRG1-PP1-TXRX")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
+ time.sleep(2)
+
+ def test_06_connect_roadmA_PP1_to_xpdrA_N1(self):
+ response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
+ "ROADM-A1", "1", "SRG1-PP1-TXRX")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
+ time.sleep(2)
+
+ def test_07_connect_xprdC_N1_to_roadmC_PP1(self):
+ response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "1",
+ "ROADM-C1", "1", "SRG1-PP1-TXRX")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
+ time.sleep(2)
+
+ def test_08_connect_roadmC_PP1_to_xpdrC_N1(self):
+ response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "1",
+ "ROADM-C1", "1", "SRG1-PP1-TXRX")
+ self.assertEqual(response.status_code, requests.codes.ok)
+ res = response.json()
+ self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
+ time.sleep(2)
+
+ def test_09_add_omsAttributes_ROADMA_ROADMC(self):
+ # Config ROADMA-ROADMC oms-attributes
+ data = {"span": {
+ "auto-spanloss": "true",
+ "spanloss-base": 11.4,
+ "spanloss-current": 12,
+ "engineered-spanloss": 12.2,
+ "link-concatenation": [{
+ "SRLG-Id": 0,
+ "fiber-type": "smf",
+ "SRLG-length": 100000,
+ "pmd": 0.5}]}}
+ response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
+ self.assertEqual(response.status_code, requests.codes.created)
+
+ def test_10_add_omsAttributes_ROADMC_ROADMA(self):
+ # Config ROADMC-ROADMA oms-attributes
+ data = {"span": {
+ "auto-spanloss": "true",
+ "spanloss-base": 11.4,
+ "spanloss-current": 12,
+ "engineered-spanloss": 12.2,
+ "link-concatenation": [{
+ "SRLG-Id": 0,
+ "fiber-type": "smf",
+ "SRLG-length": 100000,
+ "pmd": 0.5}]}}
+ response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
+ self.assertEqual(response.status_code, requests.codes.created)
+
+# test service-create for Eth service from xpdr to xpdr
+ def test_11_create_eth_service1(self):
+ self.cr_serv_sample_data["input"]["service-name"] = "service1"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_12_get_eth_service1(self):
- url = ("{}/operational/org-openroadm-service:service-list/services/service1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_service_list_request("services/service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(2)
def test_13_check_xc1_ROADMA(self):
- url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
- "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(5)
def test_14_check_xc1_ROADMC(self):
- url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
- "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(5)
def test_15_check_topo_XPDRA(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
time.sleep(3)
def test_16_check_topo_ROADMA_SRG1(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn({u'index': 1},
time.sleep(3)
def test_17_check_topo_ROADMA_DEG1(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn({u'index': 1},
time.sleep(3)
def test_18_connect_xprdA_N2_to_roadmA_PP2(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-A1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADM-A1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "2",
+ "ROADM-A1", "1", "SRG1-PP2-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
time.sleep(2)
def test_19_connect_roadmA_PP2_to_xpdrA_N2(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-A1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADM-A1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "2",
+ "ROADM-A1", "1", "SRG1-PP2-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
time.sleep(2)
def test_20_connect_xprdC_N2_to_roadmC_PP2(self):
- url = "{}/operations/transportpce-networkutils:init-xpdr-rdm-links".format(test_utils.RESTCONF_BASE_URL)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-C1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADM-C1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.connect_xpdr_to_rdm_request("XPDR-C1", "1", "2",
+ "ROADM-C1", "1", "SRG1-PP2-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Xponder Roadm Link created successfully', res["output"]["result"])
time.sleep(2)
def test_21_connect_roadmC_PP2_to_xpdrC_N2(self):
- url = "{}/operations/transportpce-networkutils:init-rdm-xpdr-links".format(test_utils.RESTCONF_BASE_URL)
- data = {
- "networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-C1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "2",
- "networkutils:rdm-node": "ROADM-C1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP2-TXRX"
- }
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data),
- headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.connect_rdm_to_xpdr_request("XPDR-C1", "1", "2",
+ "ROADM-C1", "1", "SRG1-PP2-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
- self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
- time.sleep(2)
-
- def test_22_create_eth_service2(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(test_utils.RESTCONF_BASE_URL))
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service2",
- "common-id": "ASATT1234567",
- "connection-type": "service",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "XPDR-A1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "XPDR-C1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ self.assertIn('Roadm Xponder links created successfully', res["output"]["result"])
+ time.sleep(2)
+
+ def test_22_create_eth_service2(self):
+ self.cr_serv_sample_data["input"]["service-name"] = "service2"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_23_get_eth_service2(self):
- url = ("{}/operational/org-openroadm-service:service-list/services/service2"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_service_list_request("services/service2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(1)
def test_24_check_xc2_ROADMA(self):
- url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
- "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/DEG2-TTP-TXRX-SRG1-PP2-TXRX-2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
res['roadm-connections'][0]['destination'])
def test_25_check_topo_XPDRA(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
time.sleep(10)
def test_26_check_topo_ROADMA_SRG1(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn({u'index': 1}, res['node'][0]
time.sleep(10)
def test_27_check_topo_ROADMA_DEG2(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn({u'index': 1}, res['node'][0]
# creation service test on a non-available resource
def test_28_create_eth_service3(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(test_utils.RESTCONF_BASE_URL))
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service3",
- "common-id": "ASATT1234567",
- "connection-type": "service",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "XPDR-A1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "XPDR-C1",
- "service-format": "Ethernet",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ self.cr_serv_sample_data["input"]["service-name"] = "service3"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
# add a test that check the openroadm-service-list still only contains 2 elements
def test_29_delete_eth_service3(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(test_utils.RESTCONF_BASE_URL))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Service \'service3\' does not exist in datastore',
time.sleep(20)
def test_30_delete_eth_service1(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(test_utils.RESTCONF_BASE_URL))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_31_delete_eth_service2(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(test_utils.RESTCONF_BASE_URL))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_32_check_no_xc_ROADMA(self):
- url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
- "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.check_netconf_node_request("ROADM-A1", "")
res = response.json()
self.assertEqual(response.status_code, requests.codes.ok)
self.assertNotIn('roadm-connections', dict.keys(res['org-openroadm-device']))
time.sleep(2)
def test_33_check_topo_XPDRA(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/XPDR-A1-XPDR1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/XPDR-A1-XPDR1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
liste_tp = res['node'][0]['ietf-network-topology:termination-point']
time.sleep(10)
def test_34_check_topo_ROADMA_SRG1(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-SRG1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/ROADM-A1-SRG1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn({u'index': 1}, res['node'][0]
time.sleep(10)
def test_35_check_topo_ROADMA_DEG2(self):
- url1 = ("{}/config/ietf-network:networks/network/openroadm-topology/node/ROADM-A1-DEG2"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url1, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_ordm_topo_request("node/ROADM-A1-DEG2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn({u'index': 1}, res['node'][0]
# test service-create for Optical Channel (OC) service from srg-pp to srg-pp
def test_36_create_oc_service1(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(test_utils.RESTCONF_BASE_URL))
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service1",
- "common-id": "ASATT1234567",
- "connection-type": "roadm-line",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "ROADM-A1",
- "service-format": "OC",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "ROADM-C1",
- "service-format": "OC",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ self.cr_serv_sample_data["input"]["service-name"] = "service1"
+ self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
+ self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
+ self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_37_get_oc_service1(self):
- url = ("{}/operational/org-openroadm-service:service-list/services/service1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_service_list_request("services/service1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(1)
def test_38_check_xc1_ROADMA(self):
- url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
- "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP1-TXRX-DEG2-TTP-TXRX-1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(7)
def test_39_check_xc1_ROADMC(self):
- url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
- "node/ROADM-C1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.check_netconf_node_request("ROADM-C1", "roadm-connections/SRG1-PP1-TXRX-DEG1-TTP-TXRX-1")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(7)
def test_40_create_oc_service2(self):
- url = ("{}/operations/org-openroadm-service:service-create"
- .format(test_utils.RESTCONF_BASE_URL))
- data = {"input": {
- "sdnc-request-header": {
- "request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
- "rpc-action": "service-create",
- "request-system-id": "appname",
- "notification-url": "http://localhost:8585/NotificationServer/notify"
- },
- "service-name": "service2",
- "common-id": "ASATT1234567",
- "connection-type": "roadm-line",
- "service-a-end": {
- "service-rate": "100",
- "node-id": "ROADM-A1",
- "service-format": "OC",
- "clli": "SNJSCAMCJP8",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.3",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJP8_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-5/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJP8_000000.00_00",
- "lgx-port-name": "LGX Back.4",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "service-z-end": {
- "service-rate": "100",
- "node-id": "ROADM-C1",
- "service-format": "OC",
- "clli": "SNJSCAMCJT4",
- "tx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Tx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.29",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "rx-direction": {
- "port": {
- "port-device-name": "ROUTER_SNJSCAMCJT4_000000.00_00",
- "port-type": "router",
- "port-name": "Gigabit Ethernet_Rx.ge-1/0/0.0",
- "port-rack": "000000.00",
- "port-shelf": "00"
- },
- "lgx": {
- "lgx-device-name": "LGX Panel_SNJSCAMCJT4_000000.00_00",
- "lgx-port-name": "LGX Back.30",
- "lgx-port-rack": "000000.00",
- "lgx-port-shelf": "00"
- }
- },
- "optic-type": "gray"
- },
- "due-date": "2016-11-28T00:00:01Z",
- "operator-contact": "pw1234"
- }
- }
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ self.cr_serv_sample_data["input"]["service-name"] = "service2"
+ self.cr_serv_sample_data["input"]["connection-type"] = "roadm-line"
+ self.cr_serv_sample_data["input"]["service-a-end"]["node-id"] = "ROADM-A1"
+ self.cr_serv_sample_data["input"]["service-a-end"]["service-format"] = "OC"
+ self.cr_serv_sample_data["input"]["service-z-end"]["node-id"] = "ROADM-C1"
+ self.cr_serv_sample_data["input"]["service-z-end"]["service-format"] = "OC"
+ response = test_utils.service_create_request(self.cr_serv_sample_data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('PCE calculation in progress',
time.sleep(self.WAITING)
def test_41_get_oc_service2(self):
- url = ("{}/operational/org-openroadm-service:service-list/services/service2"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_service_list_request("services/service2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(
time.sleep(2)
def test_42_check_xc2_ROADMA(self):
- url = ("{}/config/network-topology:network-topology/topology/topology-netconf/"
- "node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
- "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.check_netconf_node_request("ROADM-A1", "roadm-connections/SRG1-PP2-TXRX-DEG2-TTP-TXRX-2")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# the following statement replaces self.assertDictContainsSubset deprecated in python 3.2
time.sleep(3)
def test_44_delete_oc_service1(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(test_utils.RESTCONF_BASE_URL))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
time.sleep(20)
def test_45_delete_oc_service2(self):
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(test_utils.RESTCONF_BASE_URL))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.post_request(url, data)
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertIn('Renderer service delete in progress',
def test_46_get_no_oc_services(self):
print("start test")
- url = ("{}/operational/org-openroadm-service:service-list"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_service_list_request("")
self.assertEqual(response.status_code, requests.codes.not_found)
res = response.json()
self.assertIn(
time.sleep(1)
def test_47_get_no_xc_ROADMA(self):
- url = ("{}/config/network-topology:network-topology/topology/topology-netconf"
- "/node/ROADM-A1/yang-ext:mount/org-openroadm-device:org-openroadm-device/"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request(
- "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.check_netconf_node_request("ROADM-A1", "")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn(['roadm-connections'][0], res['org-openroadm-device'])
def test_49_loop_create_eth_service(self):
for i in range(1, 6):
- print("trial number {}".format(i))
+ print("iteration number {}".format(i))
print("eth service creation")
self.test_11_create_eth_service1()
print("check xc in ROADM-A1")
self.test_30_delete_eth_service1()
def test_50_loop_create_oc_service(self):
- url = ("{}/operational/org-openroadm-service:service-list/services/service1"
- .format(test_utils.RESTCONF_BASE_URL))
- response = requests.request("GET", url, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ response = test_utils.get_service_list_request("services/service1")
if response.status_code != 404:
- url = ("{}/operations/org-openroadm-service:service-delete"
- .format(test_utils.RESTCONF_BASE_URL))
+ url = "{}/operations/org-openroadm-service:service-delete"
data = {"input": {
"sdnc-request-header": {
"request-id": "e3028bae-a90f-4ddd-a83f-cf224eba0e58",
}
}
}
- requests.request("POST", url, data=json.dumps(data), headers=test_utils.TYPE_APPLICATION_JSON,
- auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+ test_utils.post_request(url, data)
time.sleep(5)
for i in range(1, 6):
- print("trial number {}".format(i))
+ print("iteration number {}".format(i))
print("oc service creation")
self.test_36_create_oc_service1()
print("check xc in ROADM-A1")