3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
15 from common import test_utils
18 class TransportPCEPortMappingTesting(unittest.TestCase):
24 cls.processes = test_utils.start_tpce()
25 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
28 def tearDownClass(cls):
29 for process in cls.processes:
30 test_utils.shutdown_process(process)
31 print("all processes killed")
34 print("execution of {}".format(self.id().split(".")[-1]))
37 def test_01_rdm_device_connection(self):
38 response = test_utils.mount_device("ROADM-A1", 'roadma')
39 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
41 def test_02_rdm_device_connected(self):
42 url = ("{}/operational/network-topology:"
43 "network-topology/topology/topology-netconf/node/ROADM-A1"
44 .format(test_utils.RESTCONF_BASE_URL))
45 response = requests.request(
46 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
47 self.assertEqual(response.status_code, requests.codes.ok)
50 res['node'][0]['netconf-node-topology:connection-status'],
54 def test_03_rdm_portmapping_info(self):
55 url = ("{}/config/transportpce-portmapping:network/"
56 "nodes/ROADM-A1/node-info"
57 .format(test_utils.RESTCONF_BASE_URL))
58 response = requests.request(
59 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
60 self.assertEqual(response.status_code, requests.codes.ok)
63 {u'node-info': {u'node-type': u'rdm',
64 u'node-ip-address': u'127.0.0.11',
65 u'node-clli': u'NodeA',
66 u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
67 u'node-model': u'model2'}},
71 def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
72 url = ("{}/config/transportpce-portmapping:network/"
73 "nodes/ROADM-A1/mapping/DEG1-TTP-TXRX"
74 .format(test_utils.RESTCONF_BASE_URL))
75 response = requests.request(
76 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
77 self.assertEqual(response.status_code, requests.codes.ok)
80 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
81 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
84 def test_05_rdm_portmapping_DEG2_TTP_TXRX_with_ots_oms(self):
85 url = ("{}/config/transportpce-portmapping:network/"
86 "nodes/ROADM-A1/mapping/DEG2-TTP-TXRX"
87 .format(test_utils.RESTCONF_BASE_URL))
88 response = requests.request(
89 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
90 self.assertEqual(response.status_code, requests.codes.ok)
93 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
94 'logical-connection-point': 'DEG2-TTP-TXRX',
95 'supporting-oms': 'OMS-DEG2-TTP-TXRX', 'supporting-ots': 'OTS-DEG2-TTP-TXRX',
96 'port-direction': 'bidirectional'},
99 def test_06_rdm_portmapping_SRG1_PP3_TXRX(self):
100 url = ("{}/config/transportpce-portmapping:network/"
101 "nodes/ROADM-A1/mapping/SRG1-PP3-TXRX"
102 .format(test_utils.RESTCONF_BASE_URL))
103 response = requests.request(
104 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
105 self.assertEqual(response.status_code, requests.codes.ok)
106 res = response.json()
108 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
109 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
112 def test_07_rdm_portmapping_SRG3_PP1_TXRX(self):
113 url = ("{}/config/transportpce-portmapping:network/"
114 "nodes/ROADM-A1/mapping/SRG3-PP1-TXRX"
115 .format(test_utils.RESTCONF_BASE_URL))
116 response = requests.request(
117 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
118 self.assertEqual(response.status_code, requests.codes.ok)
119 res = response.json()
121 {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
122 'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
125 def test_08_xpdr_device_connection(self):
126 response = test_utils.mount_device("XPDR-A1", 'xpdra')
127 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
129 def test_09_xpdr_device_connected(self):
130 url = ("{}/operational/network-topology:"
131 "network-topology/topology/topology-netconf/node/XPDR-A1"
132 .format(test_utils.RESTCONF_BASE_URL))
133 response = requests.request(
134 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
135 self.assertEqual(response.status_code, requests.codes.ok)
136 res = response.json()
138 res['node'][0]['netconf-node-topology:connection-status'],
142 def test_10_xpdr_portmapping_info(self):
143 url = ("{}/config/transportpce-portmapping:network/"
144 "nodes/XPDR-A1/node-info"
145 .format(test_utils.RESTCONF_BASE_URL))
146 response = requests.request(
147 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
148 self.assertEqual(response.status_code, requests.codes.ok)
149 res = response.json()
151 {u'node-info': {u'node-type': u'xpdr',
152 u'node-ip-address': u'1.2.3.4',
153 u'node-clli': u'NodeA',
154 u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
155 u'node-model': u'model2'}},
159 def test_11_xpdr_portmapping_NETWORK1(self):
160 url = ("{}/config/transportpce-portmapping:network/"
161 "nodes/XPDR-A1/mapping/XPDR1-NETWORK1"
162 .format(test_utils.RESTCONF_BASE_URL))
163 response = requests.request(
164 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
165 self.assertEqual(response.status_code, requests.codes.ok)
166 res = response.json()
168 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
169 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
170 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
171 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
172 'lcp-hash-val': 'e54EtOovlcE='},
175 def test_12_xpdr_portmapping_NETWORK2(self):
176 url = ("{}/config/transportpce-portmapping:network/"
177 "nodes/XPDR-A1/mapping/XPDR1-NETWORK2"
178 .format(test_utils.RESTCONF_BASE_URL))
179 response = requests.request(
180 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
181 self.assertEqual(response.status_code, requests.codes.ok)
182 res = response.json()
184 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
185 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
186 'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
187 'connection-map-lcp': 'XPDR1-CLIENT2', 'port-qual': 'xpdr-network',
188 'lcp-hash-val': 'e54EtOovlcI='},
191 def test_13_xpdr_portmapping_CLIENT1(self):
192 url = ("{}/config/transportpce-portmapping:network/"
193 "nodes/XPDR-A1/mapping/XPDR1-CLIENT1"
194 .format(test_utils.RESTCONF_BASE_URL))
195 response = requests.request(
196 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
197 self.assertEqual(response.status_code, requests.codes.ok)
198 res = response.json()
200 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
201 'supporting-port': 'C1',
202 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
203 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
204 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
205 'lcp-hash-val': 'BIyxYXnFEFA='},
208 def test_14_xpdr_portmapping_CLIENT2(self):
209 url = ("{}/config/transportpce-portmapping:network/"
210 "nodes/XPDR-A1/mapping/XPDR1-CLIENT2"
211 .format(test_utils.RESTCONF_BASE_URL))
212 response = requests.request(
213 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
214 self.assertEqual(response.status_code, requests.codes.ok)
215 res = response.json()
217 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
218 'supporting-port': 'C1',
219 'supporting-circuit-pack-name': '1/0/2-PLUG-CLIENT',
220 'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
221 'connection-map-lcp': 'XPDR1-NETWORK2', 'port-qual': 'xpdr-client',
222 'lcp-hash-val': 'BIyxYXnFEFM='},
225 def test_15_xpdr_device_disconnection(self):
226 response = test_utils.unmount_device("XPDR-A1")
227 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
229 def test_16_xpdr_device_disconnected(self):
230 url = ("{}/operational/network-topology:network-topology/topology/"
231 "topology-netconf/node/XPDR-A1".format(test_utils.RESTCONF_BASE_URL))
232 response = requests.request(
233 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
234 self.assertEqual(response.status_code, requests.codes.not_found)
235 res = response.json()
237 {"error-type": "application", "error-tag": "data-missing",
238 "error-message": "Request could not be completed because the relevant data model content does not exist"},
239 res['errors']['error'])
241 def test_17_xpdr_device_not_connected(self):
242 url = ("{}/config/transportpce-portmapping:network/nodes/XPDR-A1".format(test_utils.RESTCONF_BASE_URL))
243 response = requests.request(
244 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
245 self.assertEqual(response.status_code, requests.codes.not_found)
246 res = response.json()
248 {"error-type": "application", "error-tag": "data-missing",
249 "error-message": "Request could not be completed because the relevant data model content does not exist"},
250 res['errors']['error'])
252 def test_18_rdm_device_disconnection(self):
253 response = test_utils.unmount_device("ROADM-A1")
254 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
256 def test_19_rdm_device_disconnected(self):
257 url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
258 .format(test_utils.RESTCONF_BASE_URL))
259 response = requests.request(
260 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
261 self.assertEqual(response.status_code, requests.codes.not_found)
262 res = response.json()
264 {"error-type": "application", "error-tag": "data-missing",
265 "error-message": "Request could not be completed because the relevant data model content does not exist"},
266 res['errors']['error'])
268 def test_20_rdm_device_not_connected(self):
269 url = ("{}/config/transportpce-portmapping:network/nodes/ROADM-A1".format(test_utils.RESTCONF_BASE_URL))
270 response = requests.request(
271 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
272 self.assertEqual(response.status_code, requests.codes.not_found)
273 res = response.json()
275 {"error-type": "application", "error-tag": "data-missing",
276 "error-message": "Request could not be completed because the relevant data model content does not exist"},
277 res['errors']['error'])
280 if __name__ == "__main__":
281 unittest.main(verbosity=2)