Refactor NBINotification & add ServiceListener tests
[transportpce.git] / tests / transportpce_tests / 2.2.1 / test_nbinotifications.py
index 00bcae7eb135930ae57542970aa3ae431a5b31e4..27ff94b5947f6095c375b830052a7ec9694c4f37 100644 (file)
@@ -12,6 +12,7 @@
 # pylint: disable=too-many-public-methods
 
 import os
+import json
 import sys
 import unittest
 import time
@@ -307,7 +308,74 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         self.assertEqual(res['output']['notification-service'][-1]['message'], 'Service implemented !')
         time.sleep(2)
 
-    def test_17_delete_eth_service1(self):
+    def test_17_get_notifications_alarm_service1(self):
+        data = {
+            "input": {
+                "connection-type": "service",
+                "id-consumer": "consumer",
+                "group-id": "transportpceTest"
+            }
+        }
+        response = test_utils.get_notifications_alarm_service_request(data)
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['service-name'], 'service1')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['connection-type'], 'service')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['operational-state'], 'inService')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['message'], 'The service is now inService')
+        time.sleep(2)
+
+    def test_18_change_status_port_roadma_srg(self):
+        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
+        body = {"ports": [{
+            "port-name": "C1",
+            "logical-connection-point": "SRG1-PP1",
+            "port-type": "client",
+            "circuit-id": "SRG1",
+            "administrative-state": "outOfService",
+            "port-qual": "roadm-external"}]}
+        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
+                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        time.sleep(2)
+
+    def test_19_get_notifications_alarm_service1(self):
+        data = {
+            "input": {
+                "connection-type": "service",
+                "id-consumer": "consumer",
+                "group-id": "transportpceTest"
+            }
+        }
+        response = test_utils.get_notifications_alarm_service_request(data)
+        self.assertEqual(response.status_code, requests.codes.ok)
+        res = response.json()
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['service-name'], 'service1')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['connection-type'], 'service')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['operational-state'], 'outOfService')
+        self.assertEqual(res['output']['notification-alarm-service'][-1]['message'], 'The service is now outOfService')
+        time.sleep(2)
+
+    def test_20_restore_status_port_roadma_srg(self):
+        url = "{}/config/org-openroadm-device:org-openroadm-device/circuit-packs/3%2F0/ports/C1"
+        body = {"ports": [{
+            "port-name": "C1",
+            "logical-connection-point": "SRG1-PP1",
+            "port-type": "client",
+            "circuit-id": "SRG1",
+            "administrative-state": "inService",
+            "port-qual": "roadm-external"}]}
+        response = requests.request("PUT", url.format("http://127.0.0.1:8141/restconf"),
+                                    data=json.dumps(body), headers=test_utils.TYPE_APPLICATION_JSON,
+                                    auth=(test_utils.ODL_LOGIN, test_utils.ODL_PWD))
+        self.assertEqual(response.status_code, requests.codes.ok)
+        time.sleep(2)
+
+    def test_21_get_notifications_alarm_service1(self):
+        self.test_17_get_notifications_alarm_service1()
+
+    def test_22_delete_eth_service1(self):
         response = test_utils.service_delete_request("service1")
         self.assertEqual(response.status_code, requests.codes.ok)
         res = response.json()
@@ -315,7 +383,7 @@ class TransportNbiNotificationstesting(unittest.TestCase):
                       res['output']['configuration-response-common']['response-message'])
         time.sleep(20)
 
-    def test_18_get_notifications_service1(self):
+    def test_23_get_notifications_service1(self):
         data = {
             "input": {
                 "connection-type": "service",
@@ -331,19 +399,19 @@ class TransportNbiNotificationstesting(unittest.TestCase):
         self.assertEqual(res['output']['notification-service'][-1]['message'], 'Service deleted !')
         time.sleep(2)
 
-    def test_19_disconnect_XPDRA(self):
+    def test_24_disconnect_XPDRA(self):
         response = test_utils.unmount_device("XPDR-A1")
         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
-    def test_20_disconnect_XPDRC(self):
+    def test_25_disconnect_XPDRC(self):
         response = test_utils.unmount_device("XPDR-C1")
         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
-    def test_21_disconnect_ROADMA(self):
+    def test_26_disconnect_ROADMA(self):
         response = test_utils.unmount_device("ROADM-A1")
         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)
 
-    def test_22_disconnect_ROADMC(self):
+    def test_27_disconnect_ROADMC(self):
         response = test_utils.unmount_device("ROADM-C1")
         self.assertEqual(response.status_code, requests.codes.ok, test_utils.CODE_SHOULD_BE_200)