# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import json
-import os
-import psutil
-import requests
-import signal
-import shutil
-import subprocess
-import time
+# pylint: disable=no-member
+# pylint: disable=too-many-public-methods
+
import unittest
+import time
+import json
import logging
-import test_utils
+import requests
+from common import test_utils
class TransportPCEtesting(unittest.TestCase):
- honeynode_process1 = None
- honeynode_process2 = None
- honeynode_process3 = None
- honeynode_process4 = None
- odl_process = None
- restconf_baseurl = "http://localhost:8181/restconf"
-
-# START_IGNORE_XTESTING
+ processes = None
@classmethod
def setUpClass(cls):
- print("starting honeynode1...")
- cls.honeynode_process1 = test_utils.start_xpdra_honeynode()
- time.sleep(20)
-
- print("starting honeynode2...")
- cls.honeynode_process2 = test_utils.start_roadma_honeynode()
- time.sleep(20)
-
- print("starting honeynode3...")
- cls.honeynode_process3 = test_utils.start_roadmb_honeynode()
- time.sleep(20)
-
- print("starting honeynode4...")
- cls.honeynode_process4 = test_utils.start_roadmc_honeynode()
- time.sleep(20)
- print("all honeynodes started")
-
- print("starting opendaylight...")
- cls.odl_process = test_utils.start_tpce()
- time.sleep(60)
- print("opendaylight started")
+ cls.processes = test_utils.start_tpce()
+ cls.processes = test_utils.start_sims(['xpdra', 'roadma', 'roadmb', 'roadmc'])
@classmethod
def tearDownClass(cls):
- for child in psutil.Process(cls.odl_process.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.odl_process.send_signal(signal.SIGINT)
- cls.odl_process.wait()
- for child in psutil.Process(cls.honeynode_process1.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process1.send_signal(signal.SIGINT)
- cls.honeynode_process1.wait()
- for child in psutil.Process(cls.honeynode_process2.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process2.send_signal(signal.SIGINT)
- cls.honeynode_process2.wait()
- for child in psutil.Process(cls.honeynode_process3.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process3.send_signal(signal.SIGINT)
- cls.honeynode_process3.wait()
- for child in psutil.Process(cls.honeynode_process4.pid).children():
- child.send_signal(signal.SIGINT)
- child.wait()
- cls.honeynode_process4.send_signal(signal.SIGINT)
- cls.honeynode_process4.wait()
+ for process in cls.processes:
+ test_utils.shutdown_process(process)
+ print("all processes killed")
def setUp(self):
time.sleep(5)
-# END_IGNORE_XTESTING
-
def test_01_connect_ROADM_A1(self):
- # Config ROADMA
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADM-A1"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADM-A1",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17841",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADM-A1", 'roadma')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_02_getClliNetwork(self):
- url = ("{}/config/ietf-network:networks/network/clli-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_clli_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
logging.info(res)
self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
def test_03_getOpenRoadmNetwork(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(res['network'][0]['node'][0]['node-id'], 'ROADM-A1')
self.assertEqual(res['network'][0]['node'][0]['org-openroadm-network:model'], 'model2')
def test_04_getLinks_OpenroadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# Tests related to links
self.assertEqual(len(dropLink), 0)
def test_05_getNodes_OpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
res = response.json()
# Tests related to nodes
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(len(listNode), 0)
def test_06_connect_XPDRA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDR-A1"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "XPDR-A1",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17840",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(30)
+ response = test_utils.mount_device("XPDR-A1", 'xpdra')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_07_getClliNetwork(self):
- url = ("{}/config/ietf-network:networks/network/clli-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_clli_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertEqual(res['network'][0]['node'][0]['node-id'], 'NodeA')
self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
def test_08_getOpenRoadmNetwork(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
self.assertFalse(True)
def test_09_getNodes_OpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
res = response.json()
# Tests related to nodes
self.assertEqual(response.status_code, requests.codes.ok)
# Connect the tail XPDRA to ROADMA and vice versa
def test_10_connect_tail_xpdr_rdm(self):
# Connect the tail: XPDRA to ROADMA
- url = ("{}/operations/transportpce-networkutils:init-xpdr-rdm-links"
- .format(self.restconf_baseurl))
- data = {"networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-A1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADM-A1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.connect_xpdr_to_rdm_request("XPDR-A1", "1", "1",
+ "ROADM-A1", "1", "SRG1-PP1-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
def test_11_connect_tail_rdm_xpdr(self):
- # Connect the tail: ROADMA to XPDRA
- url = ("{}/operations/transportpce-networkutils:init-rdm-xpdr-links"
- .format(self.restconf_baseurl))
- data = {"networkutils:input": {
- "networkutils:links-input": {
- "networkutils:xpdr-node": "XPDR-A1",
- "networkutils:xpdr-num": "1",
- "networkutils:network-num": "1",
- "networkutils:rdm-node": "ROADM-A1",
- "networkutils:srg-num": "1",
- "networkutils:termination-point-num": "SRG1-PP1-TXRX"
- }
- }
- }
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "POST", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.connect_rdm_to_xpdr_request("XPDR-A1", "1", "1",
+ "ROADM-A1", "1", "SRG1-PP1-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
def test_12_getLinks_OpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# Tests related to links
self.assertEqual(len(XPDR_OUT), 0)
def test_13_connect_ROADMC(self):
- # Config ROADMC
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADM-C1"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADM-C1",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17843",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADM-C1", 'roadmc')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_14_omsAttributes_ROADMA_ROADMC(self):
# Config ROADMA-ROADMC oms-attributes
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
- "OMS-attributes/span"
- .format(self.restconf_baseurl))
data = {"span": {
"auto-spanloss": "true",
"engineered-spanloss": 12.2,
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.add_oms_attr_request("ROADM-A1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_15_omsAttributes_ROADMC_ROADMA(self):
# Config ROADM-C1-ROADM-A1 oms-attributes
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
- "OMS-attributes/span"
- .format(self.restconf_baseurl))
data = {"span": {
"auto-spanloss": "true",
"engineered-spanloss": 12.2,
"SRLG-length": 100000,
"pmd": 0.5}]}}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.add_oms_attr_request("ROADM-C1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_16_getClliNetwork(self):
- url = ("{}/config/ietf-network:networks/network/clli-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_clli_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
self.assertEqual(len(listNode), 0)
def test_17_getOpenRoadmNetwork(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
self.assertEqual(len(listNode), 0)
def test_18_getROADMLinkOpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# Tests related to links
self.assertEqual(len(XPDR_OUT), 0)
def test_19_getLinkOmsAttributesOpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# Tests related to links
self.assertEqual(len(R2RLink), 0)
def test_20_getNodes_OpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
res = response.json()
# Tests related to nodes
self.assertEqual(response.status_code, requests.codes.ok)
self.assertEqual(len(listNode), 0)
def test_21_connect_ROADMB(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADM-B1"
- .format(self.restconf_baseurl))
- data = {"node": [{
- "node-id": "ROADM-B1",
- "netconf-node-topology:username": "admin",
- "netconf-node-topology:password": "admin",
- "netconf-node-topology:host": "127.0.0.1",
- "netconf-node-topology:port": "17842",
- "netconf-node-topology:tcp-only": "false",
- "netconf-node-topology:pass-through": {}}]}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.created)
- time.sleep(20)
+ response = test_utils.mount_device("ROADM-B1", 'roadmb')
+ self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
def test_22_omsAttributes_ROADMA_ROADMB(self):
# Config ROADM-A1-ROADM-B1 oms-attributes
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
- "OMS-attributes/span"
- .format(self.restconf_baseurl))
data = {"span": {
"auto-spanloss": "true",
"engineered-spanloss": 12.2,
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.add_oms_attr_request("ROADM-A1-DEG1-DEG1-TTP-TXRXtoROADM-B1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_23_omsAttributes_ROADMB_ROADMA(self):
# Config ROADM-B1-ROADM-A1 oms-attributes
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX/org-openroadm-network-topology:"
- "OMS-attributes/span"
- .format(self.restconf_baseurl))
data = {"span": {
"auto-spanloss": "true",
"engineered-spanloss": 12.2,
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.add_oms_attr_request("ROADM-B1-DEG1-DEG1-TTP-TXRXtoROADM-A1-DEG1-DEG1-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_24_omsAttributes_ROADMB_ROADMC(self):
# Config ROADM-B1-ROADM-C1 oms-attributes
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
- "OMS-attributes/span"
- .format(self.restconf_baseurl))
data = {"span": {
"auto-spanloss": "true",
"engineered-spanloss": 12.2,
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.add_oms_attr_request("ROADM-B1-DEG2-DEG2-TTP-TXRXtoROADM-C1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_25_omsAttributes_ROADMC_ROADMB(self):
# Config ROADM-C1-ROADM-B1 oms-attributes
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX/org-openroadm-network-topology:"
- "OMS-attributes/span"
- .format(self.restconf_baseurl))
data = {"span": {
"auto-spanloss": "true",
"engineered-spanloss": 12.2,
"fiber-type": "smf",
"SRLG-length": 100000,
"pmd": 0.5}]}}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "PUT", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.add_oms_attr_request("ROADM-C1-DEG2-DEG2-TTP-TXRXtoROADM-B1-DEG2-DEG2-TTP-TXRX", data)
self.assertEqual(response.status_code, requests.codes.created)
def test_26_getClliNetwork(self):
- url = ("{}/config/ietf-network:networks/network/clli-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_clli_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
self.assertEqual(len(listNode), 0)
def test_27_verifyDegree(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# Tests related to links
self.assertEqual(len(listR2RLink), 0)
def test_28_verifyOppositeLinkTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
# Tests related to links
link_dest = res['network'][0]['ietf-network-topology:link'][i]['destination']['dest-node']
oppLink_id = res['network'][0]['ietf-network-topology:link'][i]['org-openroadm-common-network:opposite-link']
# Find the opposite link
- url_oppLink = "{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:link/"+oppLink_id
- url = (url_oppLink.format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response_oppLink = requests.request("GET", url, headers=headers, auth=('admin', 'admin'))
+ response_oppLink = test_utils.get_ordm_topo_request("ietf-network-topology:link/"+oppLink_id)
self.assertEqual(response_oppLink.status_code, requests.codes.ok)
res_oppLink = response_oppLink.json()
self.assertEqual(res_oppLink['ietf-network-topology:link'][0]
self.assertEqual(oppLink_type, 'XPONDER-INPUT')
def test_29_getLinkOmsAttributesOpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbLink = len(res['network'][0]['ietf-network-topology:link'])
def test_30_disconnect_ROADMB(self):
# Delete in the topology-netconf
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADM-B1"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils.unmount_device("ROADM-B1")
# Delete in the clli-network
- url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeB"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.del_node_request("NodeB")
self.assertEqual(response.status_code, requests.codes.ok)
def test_31_disconnect_ROADMC(self):
# Delete in the topology-netconf
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADM-C1"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils.unmount_device("ROADM-C1")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
# Delete in the clli-network
- url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeC"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.del_node_request("NodeC")
self.assertEqual(response.status_code, requests.codes.ok)
# def test_24_check_roadm2roadm_links_deletion(self):
-# url = ("{}/config/ietf-network:networks/network/openroadm-topology"
-# .format(self.restconf_baseurl))
-# headers = {'content-type': 'application/json'}
-# response = requests.request(
-# "GET", url, headers=headers, auth=('admin', 'admin'))
+# response = test_utils.get_ordm_topo_request("")
# self.assertEqual(response.status_code, requests.codes.ok)
# res = response.json()
# #Write the response in the log
# self.assertNotEqual(res['network'][0]['ietf-network-topology:link'][i]['link-id'],'ROADMC-DEG2-DEG2-CTP-TXRXtoROADMC-DEG1-DEG1-CTP-TXRX')
def test_32_getNodes_OpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
res = response.json()
# Tests related to nodes
self.assertEqual(response.status_code, requests.codes.ok)
self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-C1-DEG2')
def test_33_getOpenRoadmNetwork(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'ROADM-B1')
def test_34_getClliNetwork(self):
- url = ("{}/config/ietf-network:networks/network/clli-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_clli_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
self.assertNotEqual(res['network'][0]['node'][1]['org-openroadm-clli-network:clli'], 'NodeC')
def test_35_disconnect_XPDRA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/XPDR-A1"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils.unmount_device("XPDR-A1")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
def test_36_getClliNetwork(self):
- url = ("{}/config/ietf-network:networks/network/clli-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_clli_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
self.assertEqual(res['network'][0]['node'][0]['org-openroadm-clli-network:clli'], 'NodeA')
def test_37_getOpenRoadmNetwork(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbNode = len(res['network'][0]['node'])
self.assertNotEqual(res['network'][0]['node'][i]['node-id'], 'XPDR-A1')
def test_38_getNodes_OpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
res = response.json()
# Tests related to nodes
self.assertEqual(response.status_code, requests.codes.ok)
def test_39_disconnect_ROADM_XPDRA_link(self):
# Link-1
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.del_link_request("XPDR-A1-XPDR1-XPDR1-NETWORK1toROADM-A1-SRG1-SRG1-PP1-TXRX")
self.assertEqual(response.status_code, requests.codes.ok)
# Link-2
- url = ("{}/config/ietf-network:networks/network/openroadm-topology/ietf-network-topology:"
- "link/ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.del_link_request("ROADM-A1-SRG1-SRG1-PP1-TXRXtoXPDR-A1-XPDR1-XPDR1-NETWORK1")
self.assertEqual(response.status_code, requests.codes.ok)
def test_40_getLinks_OpenRoadmTopology(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbLink = len(res['network'][0]['ietf-network-topology:link'])
['org-openroadm-common-network:link-type'], 'XPONDER-INPUT')
def test_41_disconnect_ROADMA(self):
- url = ("{}/config/network-topology:"
- "network-topology/topology/topology-netconf/node/ROADM-A1"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
- self.assertEqual(response.status_code, requests.codes.ok)
+ response = test_utils.unmount_device("ROADM-A1")
+ self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
# Delete in the clli-network
- url = ("{}/config/ietf-network:networks/network/clli-network/node/NodeA"
- .format(self.restconf_baseurl))
- data = {}
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "DELETE", url, data=json.dumps(data), headers=headers,
- auth=('admin', 'admin'))
+ response = test_utils.del_node_request("NodeA")
self.assertEqual(response.status_code, requests.codes.ok)
def test_42_getClliNetwork(self):
- url = ("{}/config/ietf-network:networks/network/clli-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_clli_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn('node', res['network'][0])
def test_43_getOpenRoadmNetwork(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-network"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_net_request()
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
self.assertNotIn('node', res['network'][0])
def test_44_check_roadm2roadm_link_persistence(self):
- url = ("{}/config/ietf-network:networks/network/openroadm-topology"
- .format(self.restconf_baseurl))
- headers = {'content-type': 'application/json'}
- response = requests.request(
- "GET", url, headers=headers, auth=('admin', 'admin'))
+ response = test_utils.get_ordm_topo_request("")
self.assertEqual(response.status_code, requests.codes.ok)
res = response.json()
nbLink = len(res['network'][0]['ietf-network-topology:link'])