3 ##############################################################################
4 # Copyright (c) 2019 Orange, Inc. and others. All rights reserved.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
21 from common import test_utils
24 class TransportPCEPortMappingTesting(unittest.TestCase):
30 cls.processes = test_utils.start_tpce()
31 cls.processes = test_utils.start_sims(['xpdra', 'roadma'])
34 def tearDownClass(cls):
35 for process in cls.processes:
36 test_utils.shutdown_process(process)
37 print("all processes killed")
40 print("execution of {}".format(self.id().split(".")[-1]))
43 def test_01_rdm_device_connection(self):
44 response = test_utils.mount_device("ROADM-A1", 'roadma')
45 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
47 def test_02_rdm_device_connected(self):
48 url = ("{}/operational/network-topology:"
49 "network-topology/topology/topology-netconf/node/ROADM-A1"
50 .format(test_utils.RESTCONF_BASE_URL))
51 response = requests.request(
52 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
53 self.assertEqual(response.status_code, requests.codes.ok)
56 res['node'][0]['netconf-node-topology:connection-status'],
60 def test_03_rdm_portmapping_info(self):
61 url = ("{}/config/transportpce-portmapping:network/"
62 "nodes/ROADM-A1/node-info"
63 .format(test_utils.RESTCONF_BASE_URL))
64 response = requests.request(
65 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
66 self.assertEqual(response.status_code, requests.codes.ok)
69 {u'node-info': {u'node-type': u'rdm',
70 u'node-ip-address': u'127.0.0.11',
71 u'node-clli': u'NodeA',
72 u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
73 u'node-model': u'model2'}},
77 def test_04_rdm_portmapping_DEG1_TTP_TXRX(self):
78 url = ("{}/config/transportpce-portmapping:network/"
79 "nodes/ROADM-A1/mapping/DEG1-TTP-TXRX"
80 .format(test_utils.RESTCONF_BASE_URL))
81 response = requests.request(
82 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
83 self.assertEqual(response.status_code, requests.codes.ok)
86 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '1/0',
87 'logical-connection-point': 'DEG1-TTP-TXRX', 'port-direction': 'bidirectional'},
90 def test_05_rdm_portmapping_DEG2_TTP_TXRX_with_ots_oms(self):
91 url = ("{}/config/transportpce-portmapping:network/"
92 "nodes/ROADM-A1/mapping/DEG2-TTP-TXRX"
93 .format(test_utils.RESTCONF_BASE_URL))
94 response = requests.request(
95 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
96 self.assertEqual(response.status_code, requests.codes.ok)
99 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
100 'logical-connection-point': 'DEG2-TTP-TXRX',
101 'supporting-oms': 'OMS-DEG2-TTP-TXRX', 'supporting-ots': 'OTS-DEG2-TTP-TXRX',
102 'port-direction': 'bidirectional'},
105 def test_06_rdm_portmapping_SRG1_PP3_TXRX(self):
106 url = ("{}/config/transportpce-portmapping:network/"
107 "nodes/ROADM-A1/mapping/SRG1-PP3-TXRX"
108 .format(test_utils.RESTCONF_BASE_URL))
109 response = requests.request(
110 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
111 self.assertEqual(response.status_code, requests.codes.ok)
112 res = response.json()
114 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '3/0',
115 'logical-connection-point': 'SRG1-PP3-TXRX', 'port-direction': 'bidirectional'},
118 def test_07_rdm_portmapping_SRG3_PP1_TXRX(self):
119 url = ("{}/config/transportpce-portmapping:network/"
120 "nodes/ROADM-A1/mapping/SRG3-PP1-TXRX"
121 .format(test_utils.RESTCONF_BASE_URL))
122 response = requests.request(
123 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
124 self.assertEqual(response.status_code, requests.codes.ok)
125 res = response.json()
127 {'supporting-port': 'C1', 'supporting-circuit-pack-name': '5/0',
128 'logical-connection-point': 'SRG3-PP1-TXRX', 'port-direction': 'bidirectional'},
131 def test_08_xpdr_device_connection(self):
132 response = test_utils.mount_device("XPDR-A1", 'xpdra')
133 self.assertEqual(response.status_code, requests.codes.created, test_utils.CODE_SHOULD_BE_201)
135 def test_09_xpdr_device_connected(self):
136 url = ("{}/operational/network-topology:"
137 "network-topology/topology/topology-netconf/node/XPDR-A1"
138 .format(test_utils.RESTCONF_BASE_URL))
139 response = requests.request(
140 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
141 self.assertEqual(response.status_code, requests.codes.ok)
142 res = response.json()
144 res['node'][0]['netconf-node-topology:connection-status'],
148 def test_10_xpdr_portmapping_info(self):
149 url = ("{}/config/transportpce-portmapping:network/"
150 "nodes/XPDR-A1/node-info"
151 .format(test_utils.RESTCONF_BASE_URL))
152 response = requests.request(
153 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
154 self.assertEqual(response.status_code, requests.codes.ok)
155 res = response.json()
157 {u'node-info': {u'node-type': u'xpdr',
158 u'node-ip-address': u'1.2.3.4',
159 u'node-clli': u'NodeA',
160 u'openroadm-version': u'2.2.1', u'node-vendor': u'vendorA',
161 u'node-model': u'model2'}},
165 def test_11_xpdr_portmapping_NETWORK1(self):
166 url = ("{}/config/transportpce-portmapping:network/"
167 "nodes/XPDR-A1/mapping/XPDR1-NETWORK1"
168 .format(test_utils.RESTCONF_BASE_URL))
169 response = requests.request(
170 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
171 self.assertEqual(response.status_code, requests.codes.ok)
172 res = response.json()
174 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
175 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/1-PLUG-NET',
176 'logical-connection-point': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-network',
177 'port-direction': 'bidirectional', 'connection-map-lcp': 'XPDR1-CLIENT1',
178 'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd941'},
181 def test_12_xpdr_portmapping_NETWORK2(self):
182 url = ("{}/config/transportpce-portmapping:network/"
183 "nodes/XPDR-A1/mapping/XPDR1-NETWORK2"
184 .format(test_utils.RESTCONF_BASE_URL))
185 response = requests.request(
186 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
187 self.assertEqual(response.status_code, requests.codes.ok)
188 res = response.json()
190 {'supported-interface-capability': ['org-openroadm-port-types:if-OCH'],
191 'supporting-port': '1', 'supporting-circuit-pack-name': '1/0/2-PLUG-NET',
192 'logical-connection-point': 'XPDR1-NETWORK2', 'port-direction': 'bidirectional',
193 'connection-map-lcp': 'XPDR1-CLIENT2', 'port-qual': 'xpdr-network',
194 'lcp-hash-val': '8e128ba57560403cfd4ffafae38cd942'},
197 def test_13_xpdr_portmapping_CLIENT1(self):
198 url = ("{}/config/transportpce-portmapping:network/"
199 "nodes/XPDR-A1/mapping/XPDR1-CLIENT1"
200 .format(test_utils.RESTCONF_BASE_URL))
201 response = requests.request(
202 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
203 self.assertEqual(response.status_code, requests.codes.ok)
204 res = response.json()
206 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
207 'supporting-port': 'C1',
208 'supporting-circuit-pack-name': '1/0/1-PLUG-CLIENT',
209 'logical-connection-point': 'XPDR1-CLIENT1', 'port-direction': 'bidirectional',
210 'connection-map-lcp': 'XPDR1-NETWORK1', 'port-qual': 'xpdr-client',
211 'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03d8'},
214 def test_14_xpdr_portmapping_CLIENT2(self):
215 url = ("{}/config/transportpce-portmapping:network/"
216 "nodes/XPDR-A1/mapping/XPDR1-CLIENT2"
217 .format(test_utils.RESTCONF_BASE_URL))
218 response = requests.request(
219 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
220 self.assertEqual(response.status_code, requests.codes.ok)
221 res = response.json()
223 {'supported-interface-capability': ['org-openroadm-port-types:if-100GE'],
224 'supporting-port': 'C1',
225 'supporting-circuit-pack-name': '1/0/2-PLUG-CLIENT',
226 'logical-connection-point': 'XPDR1-CLIENT2', 'port-direction': 'bidirectional',
227 'connection-map-lcp': 'XPDR1-NETWORK2', 'port-qual': 'xpdr-client',
228 'lcp-hash-val': '3ed8ed1336784ac7c2f66c22f2f03db'},
231 def test_15_xpdr_device_disconnection(self):
232 response = test_utils.unmount_device("XPDR-A1")
233 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
235 def test_16_xpdr_device_disconnected(self):
236 url = ("{}/operational/network-topology:network-topology/topology/"
237 "topology-netconf/node/XPDR-A1".format(test_utils.RESTCONF_BASE_URL))
238 response = requests.request(
239 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
240 self.assertEqual(response.status_code, requests.codes.not_found)
241 res = response.json()
243 {"error-type": "application", "error-tag": "data-missing",
244 "error-message": "Request could not be completed because the relevant data model content does not exist"},
245 res['errors']['error'])
247 def test_17_xpdr_device_not_connected(self):
248 url = ("{}/config/transportpce-portmapping:network/nodes/XPDR-A1".format(test_utils.RESTCONF_BASE_URL))
249 response = requests.request(
250 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
251 self.assertEqual(response.status_code, requests.codes.not_found)
252 res = response.json()
254 {"error-type": "application", "error-tag": "data-missing",
255 "error-message": "Request could not be completed because the relevant data model content does not exist"},
256 res['errors']['error'])
258 def test_18_rdm_device_disconnection(self):
259 response = test_utils.unmount_device("ROADM-A1")
260 self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
262 def test_19_rdm_device_disconnected(self):
263 url = ("{}/operational/network-topology:network-topology/topology/topology-netconf/node/ROADM-A1"
264 .format(test_utils.RESTCONF_BASE_URL))
265 response = requests.request(
266 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
267 self.assertEqual(response.status_code, requests.codes.not_found)
268 res = response.json()
270 {"error-type": "application", "error-tag": "data-missing",
271 "error-message": "Request could not be completed because the relevant data model content does not exist"},
272 res['errors']['error'])
274 def test_20_rdm_device_not_connected(self):
275 url = ("{}/config/transportpce-portmapping:network/nodes/ROADM-A1".format(test_utils.RESTCONF_BASE_URL))
276 response = requests.request(
277 "GET", url, headers=test_utils.TYPE_APPLICATION_JSON, auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
278 self.assertEqual(response.status_code, requests.codes.not_found)
279 res = response.json()
281 {"error-type": "application", "error-tag": "data-missing",
282 "error-message": "Request could not be completed because the relevant data model content does not exist"},
283 res['errors']['error'])
286 if __name__ == "__main__":
287 unittest.main(verbosity=2)