14 class TransportPCEtesting(unittest.TestCase):
16 testtools_process = None
20 def __start_testtools(cls):
21 executable = ("./netconf/netconf/tools/netconf-testtool/target/"
22 "netconf-testtool-1.3.0-SNAPSHOT-executable.jar")
23 if os.path.isfile(executable):
24 with open('testtools.log', 'w') as outfile:
25 cls.testtools_process = subprocess.Popen(
26 ["java", "-jar", executable, "--schemas-dir", "schemas",
27 "--initial-config-xml", "sample-config-ROADM.xml"],
32 executable = "../karaf/target/assembly/bin/karaf"
33 with open('odl.log', 'w') as outfile:
34 cls.odl_process = subprocess.Popen(
35 ["bash", executable], stdout=outfile,
36 stdin=open(os.devnull))
40 cls.__start_testtools()
45 def tearDownClass(cls):
46 cls.testtools_process.send_signal(signal.SIGINT)
47 cls.testtools_process.wait()
48 for child in psutil.Process(cls.odl_process.pid).children():
49 child.send_signal(signal.SIGINT)
51 cls.odl_process.send_signal(signal.SIGINT)
52 cls.odl_process.wait()
57 def test_01_connect_device(self):
58 url = ("http://127.0.0.1:8181/restconf/config/network-topology:"
59 "network-topology/topology/topology-netconf/node/ROADMA")
62 "netconf-node-topology:username": "admin",
63 "netconf-node-topology:password": "admin",
64 "netconf-node-topology:host": "127.0.0.1",
65 "netconf-node-topology:port": "17830",
66 "netconf-node-topology:tcp-only": "false",
67 "netconf-node-topology:pass-through": {}}]}
68 headers = {'content-type': 'application/json'}
69 response = requests.request(
70 "PUT", url, data=json.dumps(data), headers=headers,
71 auth=('admin', 'admin'))
72 self.assertEqual(response.status_code, requests.codes.created)
75 def test_02_device_connected(self):
76 url = ("http://127.0.0.1:8181/restconf/operational/network-topology:"
77 "network-topology/topology/topology-netconf/node/ROADMA")
78 headers = {'content-type': 'application/json'}
79 response = requests.request(
80 "GET", url, headers=headers, auth=('admin', 'admin'))
81 self.assertEqual(response.status_code, requests.codes.ok)
84 res['node'][0]['netconf-node-topology:connection-status'],
88 def test_03_portmapping_SRG1_PP3_TXRX(self):
89 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
90 "nodes/ROADMA/mapping/SRG1-PP3-TXRX")
91 headers = {'content-type': 'application/json'}
92 response = requests.request(
93 "GET", url, headers=headers, auth=('admin', 'admin'))
94 self.assertEqual(response.status_code, requests.codes.ok)
97 {'supporting-port': 'C3', 'supporting-circuit-pack-name': '4/0',
98 'logical-connection-point': 'SRG1-PP3-TXRX'},
101 def test_04_portmapping_SRG1_PP6_TXRX(self):
102 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
103 "nodes/ROADMA/mapping/SRG1-PP6-TXRX")
104 headers = {'content-type': 'application/json'}
105 response = requests.request(
106 "GET", url, headers=headers, auth=('admin', 'admin'))
107 self.assertEqual(response.status_code, requests.codes.ok)
108 res = response.json()
110 {'supporting-port': 'C6', 'supporting-circuit-pack-name': '4/0',
111 'logical-connection-point': 'SRG1-PP6-TXRX'},
114 def test_05_portmapping_DEG1_TTP_TXRX(self):
115 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
116 "nodes/ROADMA/mapping/DEG1-TTP-TXRX")
117 headers = {'content-type': 'application/json'}
118 response = requests.request(
119 "GET", url, headers=headers, auth=('admin', 'admin'))
120 self.assertEqual(response.status_code, requests.codes.ok)
121 res = response.json()
123 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '2/0',
124 'logical-connection-point': 'DEG1-TTP-TXRX'},
127 def test_06_portmapping_SRG1_PP9_TXRX(self):
128 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
129 "nodes/ROADMA/mapping/SRG1-PP9-TXRX")
130 headers = {'content-type': 'application/json'}
131 response = requests.request(
132 "GET", url, headers=headers, auth=('admin', 'admin'))
133 self.assertEqual(response.status_code, requests.codes.ok)
134 res = response.json()
136 {'supporting-port': 'C9', 'supporting-circuit-pack-name': '4/0',
137 'logical-connection-point': 'SRG1-PP9-TXRX'},
140 def test_07_portmapping_SRG1_PP16_TXRX(self):
141 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
142 "nodes/ROADMA/mapping/SRG1-PP16-TXRX")
143 headers = {'content-type': 'application/json'}
144 response = requests.request(
145 "GET", url, headers=headers, auth=('admin', 'admin'))
146 self.assertEqual(response.status_code, requests.codes.ok)
147 res = response.json()
149 {'supporting-port': 'C16', 'supporting-circuit-pack-name': '4/0',
150 'logical-connection-point': 'SRG1-PP16-TXRX'},
153 def test_08_portmapping_SRG1_PP4_TXRX(self):
154 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
155 "nodes/ROADMA/mapping/SRG1-PP4-TXRX")
156 headers = {'content-type': 'application/json'}
157 response = requests.request(
158 "GET", url, headers=headers, auth=('admin', 'admin'))
159 self.assertEqual(response.status_code, requests.codes.ok)
160 res = response.json()
162 {'supporting-port': 'C4', 'supporting-circuit-pack-name': '4/0',
163 'logical-connection-point': 'SRG1-PP4-TXRX'},
166 def test_09_portmapping_SRG1_PP2_TXRX(self):
167 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
168 "nodes/ROADMA/mapping/SRG1-PP2-TXRX")
169 headers = {'content-type': 'application/json'}
170 response = requests.request(
171 "GET", url, headers=headers, auth=('admin', 'admin'))
172 self.assertEqual(response.status_code, requests.codes.ok)
173 res = response.json()
175 {'supporting-port': 'C2', 'supporting-circuit-pack-name': '4/0',
176 'logical-connection-point': 'SRG1-PP2-TXRX'},
179 def test_10_portmapping_SRG1_PP14_TXRX(self):
180 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
181 "nodes/ROADMA/mapping/SRG1-PP14-TXRX")
182 headers = {'content-type': 'application/json'}
183 response = requests.request(
184 "GET", url, headers=headers, auth=('admin', 'admin'))
185 self.assertEqual(response.status_code, requests.codes.ok)
186 res = response.json()
188 {'supporting-port': 'C14', 'supporting-circuit-pack-name': '4/0',
189 'logical-connection-point': 'SRG1-PP14-TXRX'},
192 def test_11_portmapping_SRG1_PP11_TXRX(self):
193 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
194 "nodes/ROADMA/mapping/SRG1-PP11-TXRX")
195 headers = {'content-type': 'application/json'}
196 response = requests.request(
197 "GET", url, headers=headers, auth=('admin', 'admin'))
198 self.assertEqual(response.status_code, requests.codes.ok)
199 res = response.json()
201 {'supporting-port': 'C11', 'supporting-circuit-pack-name': '4/0',
202 'logical-connection-point': 'SRG1-PP11-TXRX'},
205 def test_12_portmapping_SRG1_PP7_TXRX(self):
206 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
207 "nodes/ROADMA/mapping/SRG1-PP7-TXRX")
208 headers = {'content-type': 'application/json'}
209 response = requests.request(
210 "GET", url, headers=headers, auth=('admin', 'admin'))
211 self.assertEqual(response.status_code, requests.codes.ok)
212 res = response.json()
214 {'supporting-port': 'C7', 'supporting-circuit-pack-name': '4/0',
215 'logical-connection-point': 'SRG1-PP7-TXRX'},
218 def test_13_portmapping_DEG2_TTP_TXRX(self):
219 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
220 "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
221 headers = {'content-type': 'application/json'}
222 response = requests.request(
223 "GET", url, headers=headers, auth=('admin', 'admin'))
224 self.assertEqual(response.status_code, requests.codes.ok)
225 res = response.json()
227 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
228 'logical-connection-point': 'DEG2-TTP-TXRX'},
231 def test_14_portmapping_DEG2_TTP_TXRX(self):
232 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
233 "nodes/ROADMA/mapping/DEG2-TTP-TXRX")
234 headers = {'content-type': 'application/json'}
235 response = requests.request(
236 "GET", url, headers=headers, auth=('admin', 'admin'))
237 self.assertEqual(response.status_code, requests.codes.ok)
238 res = response.json()
240 {'supporting-port': 'L1', 'supporting-circuit-pack-name': '3/0',
241 'logical-connection-point': 'DEG2-TTP-TXRX'},
244 def test_15_portmapping_SRG1_PP12_TXRX(self):
245 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
246 "nodes/ROADMA/mapping/SRG1-PP12-TXRX")
247 headers = {'content-type': 'application/json'}
248 response = requests.request(
249 "GET", url, headers=headers, auth=('admin', 'admin'))
250 self.assertEqual(response.status_code, requests.codes.ok)
251 res = response.json()
253 {'supporting-port': 'C12', 'supporting-circuit-pack-name': '4/0',
254 'logical-connection-point': 'SRG1-PP12-TXRX'},
257 def test_16_portmapping_SRG1_PP8_TXRX(self):
258 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
259 "nodes/ROADMA/mapping/SRG1-PP8-TXRX")
260 headers = {'content-type': 'application/json'}
261 response = requests.request(
262 "GET", url, headers=headers, auth=('admin', 'admin'))
263 self.assertEqual(response.status_code, requests.codes.ok)
264 res = response.json()
266 {'supporting-port': 'C8', 'supporting-circuit-pack-name': '4/0',
267 'logical-connection-point': 'SRG1-PP8-TXRX'},
270 def test_17_portmapping_SRG1_PP5_TXRX(self):
271 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
272 "nodes/ROADMA/mapping/SRG1-PP5-TXRX")
273 headers = {'content-type': 'application/json'}
274 response = requests.request(
275 "GET", url, headers=headers, auth=('admin', 'admin'))
276 self.assertEqual(response.status_code, requests.codes.ok)
277 res = response.json()
279 {'supporting-port': 'C5', 'supporting-circuit-pack-name': '4/0',
280 'logical-connection-point': 'SRG1-PP5-TXRX'},
283 def test_18_portmapping_SRG1_PP13_TXRX(self):
284 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
285 "nodes/ROADMA/mapping/SRG1-PP13-TXRX")
286 headers = {'content-type': 'application/json'}
287 response = requests.request(
288 "GET", url, headers=headers, auth=('admin', 'admin'))
289 self.assertEqual(response.status_code, requests.codes.ok)
290 res = response.json()
292 {'supporting-port': 'C13', 'supporting-circuit-pack-name': '4/0',
293 'logical-connection-point': 'SRG1-PP13-TXRX'},
296 def test_19_portmapping_SRG1_PP15_TXRX(self):
297 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
298 "nodes/ROADMA/mapping/SRG1-PP15-TXRX")
299 headers = {'content-type': 'application/json'}
300 response = requests.request(
301 "GET", url, headers=headers, auth=('admin', 'admin'))
302 self.assertEqual(response.status_code, requests.codes.ok)
303 res = response.json()
305 {'supporting-port': 'C15', 'supporting-circuit-pack-name': '4/0',
306 'logical-connection-point': 'SRG1-PP15-TXRX'},
309 def test_20_portmapping_SRG1_PP10_TXRX(self):
310 url = ("http://127.0.0.1:8181/restconf/config/portmapping:network/"
311 "nodes/ROADMA/mapping/SRG1-PP10-TXRX")
312 headers = {'content-type': 'application/json'}
313 response = requests.request(
314 "GET", url, headers=headers, auth=('admin', 'admin'))
315 self.assertEqual(response.status_code, requests.codes.ok)
316 res = response.json()
318 {'supporting-port': 'C10', 'supporting-circuit-pack-name': '4/0',
319 'logical-connection-point': 'SRG1-PP10-TXRX'},
322 def test_21_cross_connection_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
323 url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
324 data = {"renderer:input": {
325 "renderer:service-name": "service_32",
326 "renderer:wave-number": "32",
327 "renderer:operation": "create",
329 {"renderer:node-id": "ROADMA",
330 "renderer:src-tp": "DEG1-TTP-TXRX",
331 "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
332 headers = {'content-type': 'application/json'}
333 response = requests.request(
334 "POST", url, data=json.dumps(data),
335 headers=headers, auth=('admin', 'admin'))
336 self.assertEqual(response.status_code, requests.codes.ok)
337 self.assertEqual(response.json(), {
340 'Roadm-connection successfully created for nodes [ROADMA]'}})
342 def test_22_cross_connection_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
343 url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
344 data = {"renderer:input": {
345 "renderer:service-name": "service_32",
346 "renderer:wave-number": "32",
347 "renderer:operation": "create",
349 {"renderer:node-id": "ROADMA",
350 "renderer:src-tp": "SRG1-PP3-TXRX",
351 "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
352 headers = {'content-type': 'application/json'}
353 response = requests.request(
354 "POST", url, data=json.dumps(data),
355 headers=headers, auth=('admin', 'admin'))
356 self.assertEqual(response.status_code, requests.codes.ok)
357 self.assertEqual(response.json(), {
360 'Roadm-connection successfully created for nodes [ROADMA]'}})
362 def test_23_delete_DEG1_TTP_TXRX_SRG1_PP3_TXRX(self):
363 url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
364 data = {"renderer:input": {
365 "renderer:service-name": "service_32",
366 "renderer:wave-number": "32",
367 "renderer:operation": "delete",
369 {"renderer:node-id": "ROADMA",
370 "renderer:src-tp": "DEG1-TTP-TXRX",
371 "renderer:dest-tp": "SRG1-PP3-TXRX"}]}}
372 headers = {'content-type': 'application/json'}
373 response = requests.request(
374 "POST", url, data=json.dumps(data),
375 headers=headers, auth=('admin', 'admin'))
376 self.assertEqual(response.status_code, requests.codes.ok)
377 self.assertEqual(response.json(), {
378 'output': {'result': 'Request processed'}})
380 def test_24_delete_SRG1_PP3_TXRX_DEG1_TTP_TXRX(self):
381 url = "http://127.0.0.1:8181/restconf/operations/renderer:service-path"
382 data = {"renderer:input": {
383 "renderer:service-name": "service_32",
384 "renderer:wave-number": "32",
385 "renderer:operation": "delete",
387 {"renderer:node-id": "ROADMA",
388 "renderer:src-tp": "SRG1-PP3-TXRX",
389 "renderer:dest-tp": "DEG1-TTP-TXRX"}]}}
390 headers = {'content-type': 'application/json'}
391 response = requests.request(
392 "POST", url, data=json.dumps(data),
393 headers=headers, auth=('admin', 'admin'))
394 self.assertEqual(response.status_code, requests.codes.ok)
395 self.assertEqual(response.json(), {
396 'output': {'result': 'Request processed'}})
399 if __name__ == "__main__":
400 unittest.main(verbosity=2)