Add DOMNotificationBroker Test Suite
[integration/test.git] / csit / libraries / MdsalLowlevelPy.py
1 """
2 Python invocation of several parallel publish-notifications RPCs.
3 """
4 import Queue
5 import requests
6 import string
7 import threading
8
9
10 def publish_notifications(host, grprefix, duration, rate, nrpairs=1):
11     """Invoke publish notification rpcs and verify the response.
12
13     :param host: ip address of odl node
14     :type host: string
15     :param grprefix: prefix identifier for publisher/listener pair
16     :type grprefix: string
17     :param duration: publishing notification duration in seconds
18     :type duration: int
19     :param rate: events rate per second
20     :type rate: int
21     :param nrpairs: number of publisher/listener pairs, id suffix is counted from it
22     :type nrpairs: int
23     """
24     def _publ_notifications(rqueue, url, grid, duration, rate):
25         dtmpl = string.Template('''<input xmlns="tag:opendaylight.org,2017:controller:yang:lowlevel:control">
26   <id>$ID</id>
27   <seconds>$DURATION</seconds>
28   <notifications-per-second>$RATE</notifications-per-second>
29 </input>''')
30         data = dtmpl.substitute({'ID': grid, 'DURATION': duration, 'RATE': rate})
31         try:
32             resp = requests.post(url=url, headers={'Content-Type': 'application/xml'},
33                                  data=data, auth=('admin', 'admin'), timeout=int(duration)+60)
34         except Exception as exc:
35             resp = exc
36         rqueue.put(resp)
37
38     resqueue = Queue.Queue()
39     lthreads = []
40     url = 'http://{}:8181/restconf/operations/odl-mdsal-lowlevel-control:publish-notifications'.format(host)
41     for i in range(nrpairs):
42         t = threading.Thread(target=_publ_notifications,
43                              args=(resqueue, url, '{}{}'.format(grprefix, i+1), duration, rate))
44         t.daemon = True
45         t.start()
46         lthreads.append(t)
47
48     for t in lthreads:
49         t.join()
50
51     for i in range(nrpairs):
52         resp = resqueue.get()
53         assert resp.status_code == 200