888a8e91fc228751b85ecd386c41bd49cd2f4537
[transportpce.git] / tests / transportpce_tests / common / test_utils.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2021 Orange, Inc. and others.  All rights reserved.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # pylint: disable=no-member
13
14 import json
15 import os
16 # pylint: disable=wrong-import-order
17 import sys
18 import re
19 import signal
20 import subprocess
21 import time
22
23 import psutil
24 import requests
25 import urllib.parse
26
27 from dict2xml import dict2xml
28 from netconf_client.connect import connect_ssh
29 from netconf_client.ncclient import Manager
30
31 # pylint: disable=import-error
32 import simulators
33
34 SIMS = simulators.SIMS
35
36 HONEYNODE_OK_START_MSG = 'Netconf SSH endpoint started successfully at 0.0.0.0'
37 LIGHTYNODE_OK_START_MSG = 'Data tree change listeners registered'
38 KARAF_OK_START_MSG = "Transportpce controller started"
39 LIGHTY_OK_START_MSG = re.escape("lighty.io and RESTCONF-NETCONF started")
40
41 ODL_LOGIN = 'admin'
42 ODL_PWD = 'admin'
43 NODES_LOGIN = 'admin'
44 NODES_PWD = 'admin'
45
46 TYPE_APPLICATION_JSON = {'Content-Type': 'application/json', 'Accept': 'application/json'}
47 TYPE_APPLICATION_XML = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
48
49 REQUEST_TIMEOUT = 10
50
51 CODE_SHOULD_BE_200 = 'Http status code should be 200'
52 CODE_SHOULD_BE_201 = 'Http status code should be 201'
53 T100GE = 'Transponder 100GE'
54 T0_MULTILAYER_TOPO = 'T0 - Multi-layer topology'
55 T0_FULL_MULTILAYER_TOPO = 'T0 - Full Multi-layer topology'
56
57 SIM_LOG_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log')
58
59 process_list = []
60
61 if 'USE_ODL_ALT_RESTCONF_PORT' in os.environ:
62     RESTCONF_PORT = os.environ['USE_ODL_ALT_RESTCONF_PORT']
63 else:
64     RESTCONF_PORT = 8181
65
66 RESTCONF_PATH_PREFIX = {'rfc8040': '/rests',
67                         'draft-bierman02': '/restconf'}
68
69 if 'USE_ODL_RESTCONF_VERSION' in os.environ:
70     RESTCONF_VERSION = os.environ['USE_ODL_RESTCONF_VERSION']
71     if RESTCONF_VERSION not in RESTCONF_PATH_PREFIX:
72         print('unsupported RESTCONF version ' + RESTCONF_VERSION)
73         sys.exit(3)
74 else:
75     RESTCONF_VERSION = 'rfc8040'
76
77 RESTCONF_BASE_URL = 'http://localhost:' + str(RESTCONF_PORT) + RESTCONF_PATH_PREFIX[RESTCONF_VERSION]
78
79 if 'USE_ODL_ALT_KARAF_INSTALL_DIR' in os.environ:
80     KARAF_INSTALLDIR = os.environ['USE_ODL_ALT_KARAF_INSTALL_DIR']
81 else:
82     KARAF_INSTALLDIR = 'karaf'
83
84 KARAF_LOG = os.path.join(
85     os.path.dirname(os.path.realpath(__file__)),
86     '..', '..', '..', KARAF_INSTALLDIR, 'target', 'assembly', 'data', 'log', 'karaf.log')
87
88 if 'USE_LIGHTY' in os.environ and os.environ['USE_LIGHTY'] == 'True':
89     TPCE_LOG = 'odl-' + str(os.getpid()) + '.log'
90 else:
91     TPCE_LOG = KARAF_LOG
92
93 if 'USE_SIMS' not in os.environ:
94     SIMS_TO_USE = 'lightynode'
95     SIMS_TYPE = 'lightynode'
96 else:
97     SIMS_TO_USE = os.environ['USE_SIMS']
98     print("Forcing to use SIMS " + SIMS_TO_USE)
99     if SIMS_TO_USE != 'None' or 'SIMS_TYPE' not in os.environ:
100         SIMS_TYPE = SIMS_TO_USE
101     else:
102         SIMS_TYPE = os.environ['SIMS_TYPE']
103         print("Forcing to use SIMS type" + SIMS_TYPE)
104
105
106 #
107 # Basic HTTP operations
108 #
109
110
111 def get_request(url):
112     return requests.request(
113         'GET', url.format(RESTCONF_BASE_URL),
114         headers=TYPE_APPLICATION_JSON,
115         auth=(ODL_LOGIN, ODL_PWD),
116         timeout=REQUEST_TIMEOUT)
117
118
119 def put_request(url, data):
120     return requests.request(
121         'PUT', url.format(RESTCONF_BASE_URL),
122         data=json.dumps(data),
123         headers=TYPE_APPLICATION_JSON,
124         auth=(ODL_LOGIN, ODL_PWD),
125         timeout=REQUEST_TIMEOUT)
126
127
128 def delete_request(url):
129     return requests.request(
130         'DELETE', url.format(RESTCONF_BASE_URL),
131         headers=TYPE_APPLICATION_JSON,
132         auth=(ODL_LOGIN, ODL_PWD),
133         timeout=REQUEST_TIMEOUT)
134
135
136 def post_request(url, data):
137     if data:
138         return requests.request(
139             "POST", url.format(RESTCONF_BASE_URL),
140             data=json.dumps(data),
141             headers=TYPE_APPLICATION_JSON,
142             auth=(ODL_LOGIN, ODL_PWD),
143             timeout=REQUEST_TIMEOUT)
144     return requests.request(
145         "POST", url.format(RESTCONF_BASE_URL),
146         headers=TYPE_APPLICATION_JSON,
147         auth=(ODL_LOGIN, ODL_PWD),
148         timeout=REQUEST_TIMEOUT)
149
150 #
151 # Process management
152 #
153
154
155 def start_honeynode(log_file: str, sim):
156     executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
157                               '..', '..', 'honeynode', sim[1], 'honeynode-simulator', 'honeycomb-tpce')
158     sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
159                                     '..', '..', 'sample_configs', 'openroadm', sim[1])
160     if os.path.isfile(executable):
161         with open(log_file, 'w', encoding='utf-8') as outfile:
162             return subprocess.Popen(
163                 [executable, SIMS[sim]['port'], os.path.join(sample_directory, SIMS[sim]['configfile'])],
164                 stdout=outfile, stderr=outfile)
165     return None
166
167
168 def start_lightynode(log_file: str, sim):
169     executable = os.path.join(os.path.dirname(os.path.realpath(__file__)),
170                               '..', '..', 'lightynode', 'lightynode-openroadm-device', 'start-device.sh')
171     sample_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)),
172                                     '..', '..', 'sample_configs', 'openroadm', sim[1])
173     if os.path.isfile(executable):
174         with open(log_file, 'w', encoding='utf-8') as outfile:
175             return subprocess.Popen(
176                 [executable, "-v" + sim[1], "-p" + SIMS[sim]['port'], "-f" + os.path.join(sample_directory,
177                                                                                           SIMS[sim]['configfile'])],
178                 stdout=outfile, stderr=outfile)
179     return None
180
181
182 def start_sims(sims_list):
183     if SIMS_TO_USE == 'None':
184         return None
185     if SIMS_TO_USE == 'honeynode':
186         start_msg = HONEYNODE_OK_START_MSG
187         start_method = start_honeynode
188     else:
189         start_msg = LIGHTYNODE_OK_START_MSG
190         start_method = start_lightynode
191     for sim in sims_list:
192         print('starting simulator ' + sim[0] + ' in OpenROADM device version ' + sim[1] + '...')
193         log_file = os.path.join(SIM_LOG_DIRECTORY, SIMS[sim]['logfile'])
194         process = start_method(log_file, sim)
195         if wait_until_log_contains(log_file, start_msg, 100):
196             print('simulator for ' + sim[0] + ' started')
197         else:
198             print('simulator for ' + sim[0] + ' failed to start')
199             shutdown_process(process)
200             for pid in process_list:
201                 shutdown_process(pid)
202             sys.exit(3)
203         process_list.append(process)
204     return process_list
205
206
207 def start_tpce():
208     if 'NO_ODL_STARTUP' in os.environ:
209         print('No OpenDaylight instance to start!')
210         return []
211     print('starting OpenDaylight...')
212     if 'USE_LIGHTY' in os.environ and os.environ['USE_LIGHTY'] == 'True':
213         process = start_lighty()
214         start_msg = LIGHTY_OK_START_MSG
215     else:
216         process = start_karaf()
217         start_msg = KARAF_OK_START_MSG
218     if wait_until_log_contains(TPCE_LOG, start_msg, time_to_wait=100):
219         print('OpenDaylight started !')
220     else:
221         print('OpenDaylight failed to start !')
222         shutdown_process(process)
223         for pid in process_list:
224             shutdown_process(pid)
225         sys.exit(1)
226     process_list.append(process)
227     return process_list
228
229
230 def start_karaf():
231     print('starting KARAF TransportPCE build...')
232     executable = os.path.join(
233         os.path.dirname(os.path.realpath(__file__)),
234         '..', '..', '..', KARAF_INSTALLDIR, 'target', 'assembly', 'bin', 'karaf')
235     with open('odl.log', 'w', encoding='utf-8') as outfile:
236         return subprocess.Popen(
237             ['sh', executable, 'server'], stdout=outfile, stderr=outfile, stdin=None)
238
239
240 def start_lighty():
241     print('starting LIGHTY.IO TransportPCE build...')
242     executable = os.path.join(
243         os.path.dirname(os.path.realpath(__file__)),
244         '..', '..', '..', 'lighty', 'target', 'tpce',
245         'clean-start-controller.sh')
246     with open(TPCE_LOG, 'w', encoding='utf-8') as outfile:
247         return subprocess.Popen(
248             ['sh', executable], stdout=outfile, stderr=outfile, stdin=None)
249
250
251 def install_karaf_feature(feature_name: str):
252     print('installing feature ' + feature_name)
253     executable = os.path.join(
254         os.path.dirname(os.path.realpath(__file__)),
255         '..', '..', '..', KARAF_INSTALLDIR, 'target', 'assembly', 'bin', 'client')
256 # FIXME: https://jira.opendaylight.org/browse/TRNSPRTPCE-701
257 # -b option needed below because of Karaf client bug reporte in the JIRA ticket mentioned above
258     return subprocess.run([executable, '-b'],
259                           input='feature:install ' + feature_name + '\n feature:list | grep '
260                           + feature_name + ' \n logout \n',
261                           universal_newlines=True, check=False)
262
263
264 def shutdown_process(process):
265     if process is not None:
266         for child in psutil.Process(process.pid).children():
267             child.send_signal(signal.SIGINT)
268             child.wait()
269         process.send_signal(signal.SIGINT)
270
271
272 def wait_until_log_contains(log_file, regexp, time_to_wait=60):
273     # pylint: disable=lost-exception
274     # pylint: disable=consider-using-with
275     stringfound = False
276     line = None
277     try:
278         with TimeOut(seconds=time_to_wait):
279             while not os.path.exists(log_file):
280                 time.sleep(0.2)
281             with open(log_file, 'r', encoding='utf-8') as filelogs:
282                 filelogs.seek(0, 2)
283                 print("Searching for pattern '" + regexp + "' in " + os.path.basename(log_file), end='... ', flush=True)
284                 compiled_regexp = re.compile(regexp)
285                 while True:
286                     line = filelogs.readline()
287                     if compiled_regexp.search(line):
288                         print('Pattern found!', end=' ')
289                         stringfound = True
290                         break
291                     if not line:
292                         time.sleep(0.1)
293         return stringfound
294     except TimeoutError:
295         print('Pattern not found after ' + str(time_to_wait), end=' seconds! ', flush=True)
296         return stringfound
297     except PermissionError:
298         print('Permission Error when trying to access the log file', end=' ... ', flush=True)
299         return stringfound
300
301
302 class TimeOut:
303     def __init__(self, seconds=1, error_message='Timeout'):
304         self.seconds = seconds
305         self.error_message = error_message
306
307     def handle_timeout(self, signum, frame):
308         raise TimeoutError(self.error_message)
309
310     def __enter__(self):
311         signal.signal(signal.SIGALRM, self.handle_timeout)
312         signal.alarm(self.seconds)
313
314     def __exit__(self, type, value, traceback):
315         # pylint: disable=W0622
316         signal.alarm(0)
317
318 #
319 # Basic NetCONF device operations
320 #
321
322
323 def mount_device(node: str, sim: str):
324     url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}',
325            'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}'}
326     body = {'node': [{
327         'node-id': node,
328         'netconf-node-topology:username': NODES_LOGIN,
329         'netconf-node-topology:password': NODES_PWD,
330         'netconf-node-topology:host': '127.0.0.1',
331         'netconf-node-topology:port': SIMS[sim]['port'],
332         'netconf-node-topology:tcp-only': 'false',
333         'netconf-node-topology:pass-through': {}}]}
334     response = put_request(url[RESTCONF_VERSION].format('{}', node), body)
335     if wait_until_log_contains(TPCE_LOG, 'Triggering notification stream NETCONF for node ' + node, 180):
336         print('Node ' + node + ' correctly added to tpce topology', end='... ', flush=True)
337     else:
338         print('Node ' + node + ' still not added to tpce topology', end='... ', flush=True)
339         if response.status_code == requests.codes.ok:
340             print('It was probably loaded at start-up', end='... ', flush=True)
341         # TODO an else-clause to abort test would probably be nice here
342     return response
343
344
345 def unmount_device(node: str):
346     url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}',
347            'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}'}
348     response = delete_request(url[RESTCONF_VERSION].format('{}', node))
349     if wait_until_log_contains(TPCE_LOG, re.escape("onDeviceDisConnected: " + node), 180):
350         print('Node ' + node + ' correctly deleted from tpce topology', end='... ', flush=True)
351     else:
352         print('Node ' + node + ' still not deleted from tpce topology', end='... ', flush=True)
353     return response
354
355
356 def check_device_connection(node: str):
357     url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}?content=nonconfig',
358            'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}'}
359     response = get_request(url[RESTCONF_VERSION].format('{}', node))
360     res = response.json()
361     return_key = {'rfc8040': 'network-topology:node',
362                   'draft-bierman02': 'node'}
363     if return_key[RESTCONF_VERSION] in res.keys():
364         connection_status = res[return_key[RESTCONF_VERSION]][0]['netconf-node-topology:connection-status']
365     else:
366         connection_status = res['errors']['error'][0]
367     return {'status_code': response.status_code,
368             'connection-status': connection_status}
369
370
371 def check_node_request(node: str):
372     # pylint: disable=line-too-long
373     url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device?content=config',  # nopep8
374            'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device'}  # nopep8
375     response = get_request(url[RESTCONF_VERSION].format('{}', node))
376     res = response.json()
377     return_key = {'rfc8040': 'org-openroadm-device:org-openroadm-device',
378                   'draft-bierman02': 'org-openroadm-device'}
379     if return_key[RESTCONF_VERSION] in res.keys():
380         response_attribute = res[return_key[RESTCONF_VERSION]]
381     else:
382         response_attribute = res['errors']['error'][0]
383     return {'status_code': response.status_code,
384             'org-openroadm-device': response_attribute}
385
386
387 def check_node_attribute_request(node: str, attribute: str, attribute_value: str):
388     # pylint: disable=line-too-long
389     url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}?content=nonconfig',  # nopep8
390            'draft-bierman02': '{}/operational/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'}  # nopep8
391     response = get_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
392     res = response.json()
393     return_key = {'rfc8040': 'org-openroadm-device:' + attribute,
394                   'draft-bierman02': attribute}
395     if return_key[RESTCONF_VERSION] in res.keys():
396         response_attribute = res[return_key[RESTCONF_VERSION]]
397     elif 'errors' in res.keys():
398         response_attribute = res['errors']['error'][0]
399     else:
400         # status code 400 invalid request
401         response_attribute = res['message'] + ' ' + res['url']
402         print(response_attribute)
403     return {'status_code': response.status_code,
404             attribute: response_attribute}
405
406
407 def check_node_attribute2_request(node: str, attribute: str, attribute_value: str, attribute2: str):
408     # pylint: disable=line-too-long
409     url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}/{}?content=config',  # nopep8
410            'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}/{}'}  # nopep8
411     response = get_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value, attribute2))
412     res = response.json()
413     if attribute2 in res.keys():
414         response_attribute = res[attribute2]
415     else:
416         response_attribute = res['errors']['error'][0]
417     return {'status_code': response.status_code,
418             attribute2: response_attribute}
419
420
421 def del_node_attribute_request(node: str, attribute: str, attribute_value: str):
422     # pylint: disable=line-too-long
423     url = {'rfc8040': '{}/data/network-topology:network-topology/topology=topology-netconf/node={}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}={}',  # nopep8
424            'draft-bierman02': '{}/config/network-topology:network-topology/topology/topology-netconf/node/{}/yang-ext:mount/org-openroadm-device:org-openroadm-device/{}/{}'}  # nopep8
425     response = delete_request(url[RESTCONF_VERSION].format('{}', node, attribute, attribute_value))
426     return response
427
428 #
429 # Portmapping operations
430 #
431
432
433 def post_portmapping(payload: str):
434     url = {'rfc8040': '{}/data/transportpce-portmapping:network',
435            'draft-bierman02': '{}/config/transportpce-portmapping:network'}
436     json_payload = json.loads(payload)
437     response = post_request(url[RESTCONF_VERSION].format('{}'), json_payload)
438     return {'status_code': response.status_code}
439
440
441 def del_portmapping():
442     url = {'rfc8040': '{}/data/transportpce-portmapping:network',
443            'draft-bierman02': '{}/config/transportpce-portmapping:network'}
444     response = delete_request(url[RESTCONF_VERSION].format('{}'))
445     return {'status_code': response.status_code}
446
447
448 def get_portmapping_node_attr(node: str, attr: str, value: str):
449     # pylint: disable=consider-using-f-string
450     url = {'rfc8040': '{}/data/transportpce-portmapping:network/nodes={}',
451            'draft-bierman02': '{}/config/transportpce-portmapping:network/nodes/{}'}
452     target_url = url[RESTCONF_VERSION].format('{}', node)
453     if attr is not None:
454         target_url = (target_url + '/{}').format('{}', attr)
455         if value is not None:
456             suffix = {'rfc8040': '={}', 'draft-bierman02': '/{}'}
457             target_url = (target_url + suffix[RESTCONF_VERSION]).format('{}', value)
458     else:
459         attr = 'nodes'
460     response = get_request(target_url)
461     res = response.json()
462     return_key = {'rfc8040': 'transportpce-portmapping:' + attr,
463                   'draft-bierman02': attr}
464     if return_key[RESTCONF_VERSION] in res.keys():
465         return_output = res[return_key[RESTCONF_VERSION]]
466     else:
467         return_output = res['errors']['error'][0]
468     return {'status_code': response.status_code,
469             attr: return_output}
470
471 #
472 # Topology operations
473 #
474
475
476 def get_ietf_network_request(network: str, content: str):
477     url = {'rfc8040': '{}/data/ietf-network:networks/network={}?content={}',
478            'draft-bierman02': '{}/{}/ietf-network:networks/network/{}'}
479     if RESTCONF_VERSION in ('rfc8040'):
480         format_args = ('{}', network, content)
481     elif content == 'config':
482         format_args = ('{}', content, network)
483     else:
484         format_args = ('{}', 'operational', network)
485     response = get_request(url[RESTCONF_VERSION].format(*format_args))
486     if bool(response):
487         res = response.json()
488         return_key = {'rfc8040': 'ietf-network:network',
489                       'draft-bierman02': 'network'}
490         networks = res[return_key[RESTCONF_VERSION]]
491     else:
492         networks = None
493     return {'status_code': response.status_code,
494             'network': networks}
495
496
497 def put_ietf_network(network: str, payload: str):
498     url = {'rfc8040': '{}/data/ietf-network:networks/network={}',
499            'draft-bierman02': '{}/config/ietf-network:networks/network/{}'}
500     json_payload = json.loads(payload)
501     response = put_request(url[RESTCONF_VERSION].format('{}', network), json_payload)
502     return {'status_code': response.status_code}
503
504
505 def del_ietf_network(network: str):
506     url = {'rfc8040': '{}/data/ietf-network:networks/network={}',
507            'draft-bierman02': '{}/config/ietf-network:networks/network/{}'}
508     response = delete_request(url[RESTCONF_VERSION].format('{}', network))
509     return {'status_code': response.status_code}
510
511
512 def get_ietf_network_link_request(network: str, link: str, content: str):
513     url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
514            'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
515     if RESTCONF_VERSION in ('rfc8040'):
516         format_args = ('{}', network, link, content)
517     elif content == 'config':
518         format_args = ('{}', content, network, link)
519     else:
520         format_args = ('{}', 'operational', network, link)
521     response = get_request(url[RESTCONF_VERSION].format(*format_args))
522     res = response.json()
523     return_key = {'rfc8040': 'ietf-network-topology:link',
524                   'draft-bierman02': 'ietf-network-topology:link'}
525     link = res[return_key[RESTCONF_VERSION]][0]
526     return {'status_code': response.status_code,
527             'link': link}
528
529
530 def del_ietf_network_link_request(network: str, link: str, content: str):
531     url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}?content={}',
532            'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
533     if RESTCONF_VERSION in ('rfc8040'):
534         format_args = ('{}', network, link, content)
535     elif content == 'config':
536         format_args = ('{}', content, network, link)
537     else:
538         format_args = ('{}', 'operational', network, link)
539     response = delete_request(url[RESTCONF_VERSION].format(*format_args))
540     return response
541
542
543 def add_oms_attr_request(link: str, oms_attr: str):
544     url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}',
545            'draft-bierman02': '{}/config/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
546     url2 = url[RESTCONF_VERSION] + '/org-openroadm-network-topology:OMS-attributes/span'
547     network = 'openroadm-topology'
548     response = put_request(url2.format('{}', network, link), oms_attr)
549     return response
550
551
552 def del_oms_attr_request(link: str,):
553     url = {'rfc8040': '{}/data/ietf-network:networks/network={}/ietf-network-topology:link={}',
554            'draft-bierman02': '{}/config/ietf-network:networks/network/{}/ietf-network-topology:link/{}'}
555     url2 = url[RESTCONF_VERSION] + '/org-openroadm-network-topology:OMS-attributes/span'
556     network = 'openroadm-topology'
557     response = delete_request(url2.format('{}', network, link))
558     return response
559
560
561 def get_ietf_network_node_request(network: str, node: str, content: str):
562     url = {'rfc8040': '{}/data/ietf-network:networks/network={}/node={}?content={}',
563            'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/node/{}'}
564     if RESTCONF_VERSION in ('rfc8040'):
565         format_args = ('{}', network, node, content)
566     elif content == 'config':
567         format_args = ('{}', content, network, node)
568     else:
569         format_args = ('{}', 'operational', network, node)
570     response = get_request(url[RESTCONF_VERSION].format(*format_args))
571     if bool(response):
572         res = response.json()
573         return_key = {'rfc8040': 'ietf-network:node',
574                       'draft-bierman02': 'node'}
575         node = res[return_key[RESTCONF_VERSION]][0]
576     else:
577         node = None
578     return {'status_code': response.status_code,
579             'node': node}
580
581
582 def del_ietf_network_node_request(network: str, node: str, content: str):
583     url = {'rfc8040': '{}/data/ietf-network:networks/network={}/node={}?content={}',
584            'draft-bierman02': '{}/{}/ietf-network:networks/network/{}/node/{}'}
585     if RESTCONF_VERSION in ('rfc8040'):
586         format_args = ('{}', network, node, content)
587     elif content == 'config':
588         format_args = ('{}', content, network, node)
589     else:
590         format_args = ('{}', 'operational', network, node)
591     response = delete_request(url[RESTCONF_VERSION].format(*format_args))
592     return response
593
594
595 #
596 # Service list operations
597 #
598
599
600 def get_ordm_serv_list_request():
601     url = {'rfc8040': '{}/data/org-openroadm-service:service-list?content=nonconfig',
602            'draft-bierman02': '{}/operational/org-openroadm-service:service-list/'}
603     response = get_request(url[RESTCONF_VERSION])
604     res = response.json()
605     return_key = {'rfc8040': 'org-openroadm-service:service-list',
606                   'draft-bierman02': 'service-list'}
607     if return_key[RESTCONF_VERSION] in res.keys():
608         response_attribute = res[return_key[RESTCONF_VERSION]]
609     else:
610         response_attribute = res['errors']['error'][0]
611     return {'status_code': response.status_code,
612             'service-list': response_attribute}
613
614
615 def get_ordm_serv_list_attr_request(attribute: str, value: str):
616     url = {'rfc8040': '{}/data/org-openroadm-service:service-list/{}={}?content=nonconfig',
617            'draft-bierman02': '{}/operational/org-openroadm-service:service-list/{}/{}'}
618     format_args = ('{}', attribute, value)
619     response = get_request(url[RESTCONF_VERSION].format(*format_args))
620     res = response.json()
621     return_key = {'rfc8040': 'org-openroadm-service:' + attribute,
622                   'draft-bierman02': attribute}
623     if return_key[RESTCONF_VERSION] in res.keys():
624         response_attribute = res[return_key[RESTCONF_VERSION]]
625     else:
626         response_attribute = res['errors']['error'][0]
627     return {'status_code': response.status_code,
628             attribute: response_attribute}
629
630
631 def get_serv_path_list_attr(attribute: str, value: str):
632     url = {'rfc8040': '{}/data/transportpce-service-path:service-path-list/{}={}?content=nonconfig',
633            'draft-bierman02': '{}/operational/transportpce-service-path:service-path-list/{}/{}'}
634     response = get_request(url[RESTCONF_VERSION].format('{}', attribute, value))
635     res = response.json()
636     return_key = {'rfc8040': 'transportpce-service-path:' + attribute,
637                   'draft-bierman02': attribute}
638     if return_key[RESTCONF_VERSION] in res.keys():
639         response_attribute = res[return_key[RESTCONF_VERSION]]
640     else:
641         response_attribute = res['errors']['error'][0]
642     return {'status_code': response.status_code,
643             attribute: response_attribute}
644
645
646 #
647 # TransportPCE internal API RPCs
648 #
649
650
651 def prepend_dict_keys(input_dict: dict, prefix: str):
652     return_dict = {}
653     for key, value in input_dict.items():
654         newkey = prefix + key
655         if isinstance(value, dict):
656             return_dict[newkey] = prepend_dict_keys(value, prefix)
657             # TODO: perhaps some recursion depth limit or another solution has to be considered here
658             # even if recursion depth is given by the input_dict argument
659             # direct (self-)recursive functions may carry unwanted side-effects such as ressource consumptions
660         else:
661             return_dict[newkey] = value
662     return return_dict
663
664
665 def transportpce_api_rpc_request(api_module: str, rpc: str, payload: dict):
666     # pylint: disable=consider-using-f-string
667     url = "{}/operations/{}:{}".format('{}', api_module, rpc)
668     if payload is None:
669         data = None
670     elif RESTCONF_VERSION == 'draft-bierman02':
671         data = prepend_dict_keys({'input': payload}, api_module + ':')
672     else:
673         data = {'input': payload}
674     response = post_request(url, data)
675     if response.status_code == requests.codes.no_content:
676         return_output = None
677     else:
678         res = response.json()
679         return_key = {'rfc8040': api_module + ':output',
680                       'draft-bierman02': 'output'}
681         if response.status_code == requests.codes.internal_server_error:
682             return_output = res
683         else:
684             return_output = res[return_key[RESTCONF_VERSION]]
685     return {'status_code': response.status_code,
686             'output': return_output}
687
688 #
689 # simulators datastore operations
690 #
691
692
693 def sims_update_cp_port(sim: tuple, circuitpack: str, port: str, payload: dict):
694     if SIMS_TYPE == 'lightynode':
695         return sims_update_cp_port_ntcf(sim, circuitpack, payload)
696     if SIMS_TYPE == 'honeynode':
697         return sims_update_cp_port_rest(sim, circuitpack, port, payload)
698     return False
699
700
701 def sims_update_cp_port_rest(sim: tuple, circuitpack: str, port: str, payload: dict):
702     # pylint: disable=consider-using-f-string
703     url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/{}/ports/{}".format(
704         SIMS[sim]['restconf_baseurl'],
705         urllib.parse.quote(circuitpack, safe=''),
706         urllib.parse.quote(port, safe=''))
707     body = {"ports": [payload]}
708     response = requests.request("PUT",
709                                 url,
710                                 data=json.dumps(body),
711                                 headers=TYPE_APPLICATION_JSON,
712                                 auth=(ODL_LOGIN, ODL_PWD),
713                                 timeout=REQUEST_TIMEOUT)
714     return response.status_code == requests.codes.ok
715
716
717 def sims_update_cp_port_ntcf(sim: tuple, circuitpack: str, payload: dict):
718     body = {"circuit-packs": {"circuit-pack-name": circuitpack, "ports": payload}}
719     xml_body = '<config><org-openroadm-device xmlns="http://org/openroadm/device">'
720     xml_body += dict2xml(body, indent="  ")
721     xml_body += '</org-openroadm-device></config>'
722     with connect_ssh(host='127.0.0.1',
723                      port=int(SIMS[sim]['port']),
724                      username=NODES_LOGIN,
725                      password=NODES_PWD) as session:
726         mgr = Manager(session, timeout=120)
727         reply = mgr.edit_config(xml_body, target="candidate", default_operation="merge")
728     if "None" in str(reply):
729         return True
730     return False