3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
21 from common import test_utils
24 class TransportPCEPortMappingTesting(unittest.TestCase):
27 restconf_baseurl = "http://localhost:8181/restconf"
31 cls.processes = test_utils.start_tpce()
32 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
35 def tearDownClass(cls):
36 for process in cls.processes:
37 test_utils.shutdown_process(process)
38 print("all processes killed")
41 print("execution of {}".format(self.id().split(".")[-1]))
44 def test_01_rdm_device_connected(self):
45 url = ("{}/config/network-topology:"
46 "network-topology/topology/topology-netconf/node/ROADM-A1"
47 .format(self.restconf_baseurl))
49 "node-id": "ROADM-A1",
50 "netconf-node-topology:username": "admin",
51 "netconf-node-topology:password": "admin",
52 "netconf-node-topology:host": "127.0.0.1",
53 "netconf-node-topology:port": test_utils.sims['roadma']['port'],
54 "netconf-node-topology:tcp-only": "false",
55 "netconf-node-topology:pass-through": {}}]}
56 headers = {'content-type': 'application/json'}
57 response = requests.request(
58 "PUT", url, data=json.dumps(data), headers=headers,
59 auth=('admin', 'admin'))
60 self.assertEqual(response.status_code, requests.codes.created)
63 def test_02_rdm_device_connected(self):
64 url = ("{}/operational/network-topology:"
65 "network-topology/topology/topology-netconf/node/ROADM-A1"
66 .format(self.restconf_baseurl))
67 headers = {'content-type': 'application/json'}
68 response = requests.request(
69 "GET", url, headers=headers, auth=('admin', 'admin'))
70 self.assertEqual(response.status_code, requests.codes.ok)
73 res['node'][0]['netconf-node-topology:connection-status'],
77 def test_03_rdm_portmapping_info(self):
78 url = ("{}/config/transportpce-portmapping:network/"
79 "nodes/ROADM-A1/node-info"
80 .format(self.restconf_baseurl))
81 headers = {'content-type': 'application/json'}
82 response = requests.request(
83 "GET", url, headers=headers, auth=('admin', 'admin'))
84 self.assertEqual(response.status_code, requests.codes.ok)
87 {u'node-info': {u'node-type': u'rdm',
88 u'node-ip-address': u'127.0.0.11',
89 u'node-clli': u'NodeA',
90 u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
91 u'node-model': u'model2'}},
95 def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
96 url = ("{}/config/transportpce-portmapping:network/"
97 "nodes/ROADM-A1/mapping/DEG1-TTP-TXRX"
98 .format(self.restconf_baseurl))
99 headers = {'content-type': 'application/json'}
100 response = requests.request(
101 "GET", url, headers=headers, auth=('admin', 'admin'))
102 self.assertEqual(response.status_code, requests.codes.ok)
103 res = response.json()
105 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
106 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
109 def test_05_rdm_portmapping_DEG2_TTP_TXRX_with_ots_oms(self):
110 url = ("{}/config/transportpce-portmapping:network/"
111 "nodes/ROADM-A1/mapping/DEG2-TTP-TXRX"
112 .format(self.restconf_baseurl))
113 headers = {'content-type': 'application/json'}
114 response = requests.request(
115 "GET", url, headers=headers, auth=('admin', 'admin'))
116 self.assertEqual(response.status_code, requests.codes.ok)
117 res = response.json()
119 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
120 'logical-connection-point': 'DEG2-TTP-TXRX',
121 'supporting-oms': 'OMS-DEG2-TTP-TXRX', 'supporting-ots': 'OTS-DEG2-TTP-TXRX',
122 'port-direction': 'bidirectional'},
125 def test_06_rdm_portmapping_SRG1_PP3_TXRX(self):
126 url = ("{}/config/transportpce-portmapping:network/"
127 "nodes/ROADM-A1/mapping/SRG1-PP3-TXRX"
128 .format(self.restconf_baseurl))
129 headers = {'content-type': 'application/json'}
130 response = requests.request(
131 "GET", url, headers=headers, auth=('admin', 'admin'))
132 self.assertEqual(response.status_code, requests.codes.ok)
133 res = response.json()
135 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
136 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
139 def test_07_rdm_portmapping_SRG3_PP1_TXRX(self):
140 url = ("{}/config/transportpce-portmapping:network/"
141 "nodes/ROADM-A1/mapping/SRG3-PP1-TXRX"
142 .format(self.restconf_baseurl))
143 headers = {'content-type': 'application/json'}
144 response = requests.request(
145 "GET", url, headers=headers, auth=('admin', 'admin'))
146 self.assertEqual(response.status_code, requests.codes.ok)
147 res = response.json()
149 {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
150 'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
153 def test_08_xpdr_device_connected(self):
154 url = ("{}/config/network-topology:"
155 "network-topology/topology/topology-netconf/node/XPDR-A1"
156 .format(self.restconf_baseurl))
158 "node-id": "XPDR-A1",
159 "netconf-node-topology:username": "admin",
160 "netconf-node-topology:password": "admin",
161 "netconf-node-topology:host": "127.0.0.1",
162 "netconf-node-topology:port": test_utils.sims['xpdra']['port'],
163 "netconf-node-topology:tcp-only": "false",
164 "netconf-node-topology:pass-through": {}}]}
165 headers = {'content-type': 'application/json'}
166 response = requests.request(
167 "PUT", url, data=json.dumps(data), headers=headers,
168 auth=('admin', 'admin'))
169 self.assertEqual(response.status_code, requests.codes.created)
172 def test_09_xpdr_device_connected(self):
173 url = ("{}/operational/network-topology:"
174 "network-topology/topology/topology-netconf/node/XPDR-A1"
175 .format(self.restconf_baseurl))
176 headers = {'content-type': 'application/json'}
177 response = requests.request(
178 "GET", url, headers=headers, auth=('admin', 'admin'))
179 self.assertEqual(response.status_code, requests.codes.ok)
180 res = response.json()
182 res['node'][0]['netconf-node-topology:connection-status'],
186 def test_10_xpdr_portmapping_info(self):
187 url = ("{}/config/transportpce-portmapping:network/"
188 "nodes/XPDR-A1/node-info"
189 .format(self.restconf_baseurl))
190 headers = {'content-type': 'application/json'}
191 response = requests.request(
192 "GET", url, headers=headers, auth=('admin', 'admin'))
193 self.assertEqual(response.status_code, requests.codes.ok)
194 res = response.json()
196 {u'node-info': {u'node-type': u'xpdr',
197 u'node-ip-address': u'1.2.3.4',
198 u'node-clli': u'NodeA',
199 u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
200 u'node-model': u'model2'}},
204 def test_11_xpdr_portmapping_NETWORK1(self):
205 url = ("{}/config/transportpce-portmapping:network/"
206 "nodes/XPDR-A1/mapping/XPDR1-NETWORK1"
207 .format(self.restconf_baseurl))
208 headers = {'content-type': 'application/json'}
209 response = requests.request(
210 "GET", url, headers=headers, auth=('admin', 'admin'))
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
214 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
215 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
216 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
217 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
218 'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd941'},
221 def test_12_xpdr_portmapping_NETWORK2(self):
222 url = ("{}/config/transportpce-portmapping:network/"
223 "nodes/XPDR-A1/mapping/XPDR1-NETWORK2"
224 .format(self.restconf_baseurl))
225 headers = {'content-type': 'application/json'}
226 response = requests.request(
227 "GET", url, headers=headers, auth=('admin', 'admin'))
228 self.assertEqual(response.status_code, requests.codes.ok)
229 res = response.json()
231 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
232 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
233 'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
234 'connection-map-lcp': 'XPDR1-CLIENT2', 'port-qual': 'xpdr-network',
235 'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd942'},
238 def test_13_xpdr_portmapping_CLIENT1(self):
239 url = ("{}/config/transportpce-portmapping:network/"
240 "nodes/XPDR-A1/mapping/XPDR1-CLIENT1"
241 .format(self.restconf_baseurl))
242 headers = {'content-type': 'application/json'}
243 response = requests.request(
244 "GET", url, headers=headers, auth=('admin', 'admin'))
245 self.assertEqual(response.status_code, requests.codes.ok)
246 res = response.json()
248 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
249 'supporting-port': 'C1',
250 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
251 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
252 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
253 'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03d8'},
256 def test_14_xpdr_portmapping_CLIENT2(self):
257 url = ("{}/config/transportpce-portmapping:network/"
258 "nodes/XPDR-A1/mapping/XPDR1-CLIENT2"
259 .format(self.restconf_baseurl))
260 headers = {'content-type': 'application/json'}
261 response = requests.request(
262 "GET", url, headers=headers, auth=('admin', 'admin'))
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
266 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
267 'supporting-port': 'C1',
268 'supporting-circuit-pack-name': '1/0/2-PLUG-CLIENT',
269 'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
270 'connection-map-lcp': 'XPDR1-NETWORK2', 'port-qual': 'xpdr-client',
271 'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03db'},
274 def test_15_xpdr_device_disconnected(self):
275 url = ("{}/config/network-topology:"
276 "network-topology/topology/topology-netconf/node/XPDR-A1"
277 .format(self.restconf_baseurl))
278 headers = {'content-type': 'application/json'}
279 response = requests.request(
280 "DELETE", url, headers=headers,
281 auth=('admin', 'admin'))
282 self.assertEqual(response.status_code, requests.codes.ok)
285 def test_16_xpdr_device_disconnected(self):
286 url = ("{}/operational/network-topology:network-topology/topology/"
287 "topology-netconf/node/XPDR-A1".format(self.restconf_baseurl))
288 headers = {'content-type': 'application/json'}
289 response = requests.request(
290 "GET", url, headers=headers, auth=('admin', 'admin'))
291 self.assertEqual(response.status_code, requests.codes.not_found)
292 res = response.json()
294 {"error-type": "application", "error-tag": "data-missing",
295 "error-message": "Request could not be completed because the relevant data model content does not exist"},
296 res['errors']['error'])
298 def test_17_xpdr_device_disconnected(self):
299 url = ("{}/config/transportpce-portmapping:network/nodes/XPDR-A1".format(self.restconf_baseurl))
300 headers = {'content-type': 'application/json'}
301 response = requests.request(
302 "GET", url, headers=headers, auth=('admin', 'admin'))
303 self.assertEqual(response.status_code, requests.codes.not_found)
304 res = response.json()
306 {"error-type": "application", "error-tag": "data-missing",
307 "error-message": "Request could not be completed because the relevant data model content does not exist"},
308 res['errors']['error'])
310 def test_18_rdm_device_disconnected(self):
311 url = ("{}/config/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
312 .format(self.restconf_baseurl))
313 headers = {'content-type': 'application/json'}
314 response = requests.request(
315 "DELETE", url, headers=headers,
316 auth=('admin', 'admin'))
317 self.assertEqual(response.status_code, requests.codes.ok)
320 def test_19_rdm_device_disconnected(self):
321 url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
322 .format(self.restconf_baseurl))
323 headers = {'content-type': 'application/json'}
324 response = requests.request(
325 "GET", url, headers=headers, auth=('admin', 'admin'))
326 self.assertEqual(response.status_code, requests.codes.not_found)
327 res = response.json()
329 {"error-type": "application", "error-tag": "data-missing",
330 "error-message": "Request could not be completed because the relevant data model content does not exist"},
331 res['errors']['error'])
333 def test_20_rdm_device_disconnected(self):
334 url = ("{}/config/transportpce-portmapping:network/nodes/ROADM-A1".format(self.restconf_baseurl))
335 headers = {'content-type': 'application/json'}
336 response = requests.request(
337 "GET", url, headers=headers, auth=('admin', 'admin'))
338 self.assertEqual(response.status_code, requests.codes.not_found)
339 res = response.json()
341 {"error-type": "application", "error-tag": "data-missing",
342 "error-message": "Request could not be completed because the relevant data model content does not exist"},
343 res['errors']['error'])
346 if __name__ == "__main__":
347 unittest.main(verbosity=2)