import requests import time from threading import Thread from functools import wraps # from multiprocessing import Process __all__ = ['configure_flows', 'wait_until', 'deconfigure_flows'] # class KeyWord(Process): class KeyWord(Thread): def __init__(self, *args, **kwargs): super(KeyWord, self).__init__(*args, **kwargs) self._stop = False self._kw_result = None def stop(self): self._stop = True def result(self): return self._kw_result def async_task(func): """Taken from http://code.activestate.com/recipes/576684-simple-threading-decorator/ and modified """ @wraps(func) def async_func(*args, **kwargs): func_hl = KeyWord(target=func, args=args, kwargs=kwargs) func_hl._Thread__args = (func_hl,) + func_hl._Thread__args # func_hl._args = (func_hl,) + func_hl._args func_hl.start() return func_hl return async_func def wait_until(*tasks, **kwargs): tstart = time.time() timeout = 30 if 'timeout' in kwargs: timeout = int(kwargs['timeout']) cnt = len(tasks) while time.time() < (timeout + tstart): tfinished = 0 for t in tasks: if t.is_alive() is False: tfinished += 1 continue t.join(timeout=0.2) if tfinished == cnt: return (time.time()-tstart) for t in tasks: if t.is_alive() is True: t.stop() # t.terminate() t.join() return (time.time()-tstart) @async_task def Example_of_robot_keyword(self, a, b, c): """be carefull, when calling this kw from robot, do not count on self, it is a thread object itself injected by decorator. The purpose is to make possibility to exit from thread on demand by wait until keywork which makes thread.stop() if needed. In your fw you should use self._stop variable. robot sample: ${thread}= Example Of Robot Keyword a b c """ while True: if self._stop is True: break @async_task def configure_flows(self, host, port, switchid, tableid, minid, maxid): flow_template = ''' false 0 0 {} {} 255 false 2048 10.0.1.0/24 1 FooXf{} {} false ''' self._kw_result = 0 ses = requests.Session() for i in range(int(minid), int(maxid) + 1): if self._stop is True: break fid = str(i) flow = flow_template.format(tableid, fid, fid, fid) url = 'http://{}:{}/restconf/config/opendaylight-inventory:nodes/node/openflow:{}/table/{}/flow/{}'.format( host, port, switchid, tableid, fid) try: rsp = ses.put(url, headers={'Content-Type': 'application/xml'}, data=flow, timeout=3) if rsp.status_code == 200: self._kw_result += 1 except Exception: pass @async_task def deconfigure_flows(self, host, port, switchid, tableid, minid, maxid): """Result will be the number of status code 200 returned""" self._kw_result = 0 ses = requests.Session() for fid in range(int(minid), int(maxid)): if self._stop is True: break url = 'http://{}:{}/restconf/config/opendaylight-inventory:nodes/node/openflow:{}/table/{}/flow/{}'.format( host, port, switchid, tableid, fid) try: rsp = ses.delete(url, headers={'Content-Type': 'application/xml'}, timeout=3) if rsp.status_code == 200: self._kw_result += 1 except Exception: pass