Initial tapi notification implementation
[transportpce.git] / tapi / src / main / java / org / opendaylight / transportpce / tapi / listeners / TapiRendererListenerImpl.java
index 24d5e30975a166e9a45e1c3492b8ffe4a879e26d..0080c77c1e8e44b778cc2f074bfe32a87f147d8a 100644 (file)
@@ -8,22 +8,33 @@
 package org.opendaylight.transportpce.tapi.listeners;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.NotificationPublishService;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.transportpce.common.network.NetworkTransactionImpl;
 import org.opendaylight.transportpce.common.network.NetworkTransactionService;
 import org.opendaylight.transportpce.common.network.RequestProcessor;
 import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.renderer.rev210915.RendererRpcResultSp;
 import org.opendaylight.yang.gen.v1.http.org.opendaylight.transportpce.renderer.rev210915.TransportpceRendererListener;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.PublishTapiNotificationService;
+import org.opendaylight.yang.gen.v1.nbi.notifications.rev211013.PublishTapiNotificationServiceBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.AdministrativeState;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.Context;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.DateAndTime;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.LifecycleState;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.OperationalState;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.Uuid;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.global._class.Name;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.common.rev181210.global._class.NameKey;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.connectivity.rev181210.Context1;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.connectivity.rev181210.connectivity.context.ConnectionBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.connectivity.rev181210.connectivity.context.ConnectionKey;
@@ -31,6 +42,14 @@ import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.connectivity.rev18121
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.connectivity.rev181210.connectivity.context.ConnectivityServiceBuilder;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.connectivity.rev181210.connectivity.context.ConnectivityServiceKey;
 import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.connectivity.rev181210.connectivity.service.Connection;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.NotificationType;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.ObjectType;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributes;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributesBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.ChangedAttributesKey;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectName;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectNameBuilder;
+import org.opendaylight.yang.gen.v1.urn.onf.otcc.yang.tapi.notification.rev181210.notification.TargetObjectNameKey;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,10 +61,12 @@ public class TapiRendererListenerImpl implements TransportpceRendererListener {
     private Uuid serviceUuid;
     private RendererRpcResultSp serviceRpcResultSp;
     private final NetworkTransactionService networkTransactionService;
+    private final NotificationPublishService notificationPublishService;
 
-    public TapiRendererListenerImpl(DataBroker dataBroker) {
+    public TapiRendererListenerImpl(DataBroker dataBroker, NotificationPublishService notificationPublishService) {
         this.dataBroker = dataBroker;
         this.networkTransactionService = new NetworkTransactionImpl(new RequestProcessor(this.dataBroker));
+        this.notificationPublishService = notificationPublishService;
     }
 
     @Override
@@ -98,7 +119,7 @@ public class TapiRendererListenerImpl implements TransportpceRendererListener {
             LOG.error("Couldnt retrieve service from datastore");
             return;
         }
-        LOG.info("Connectivity service = {}", connectivityService.toString());
+        LOG.info("Connectivity service = {}", connectivityService);
         // TODO --> this throws error because the renderer goes really fast. Is this normal??
         ConnectivityService updtConnServ = new ConnectivityServiceBuilder(connectivityService)
             .setAdministrativeState(AdministrativeState.UNLOCKED)
@@ -109,6 +130,8 @@ public class TapiRendererListenerImpl implements TransportpceRendererListener {
             updateConnectionState(connection.getConnectionUuid());
         }
         updateConnectivityService(updtConnServ);
+        // TODO: need to send notification to kafka in case the topic exists!!
+        sendNbiNotification(createNbiNotification(updtConnServ));
     }
 
     /**
@@ -118,7 +141,7 @@ public class TapiRendererListenerImpl implements TransportpceRendererListener {
     private void onFailedServiceImplementation(String serviceName) {
         LOG.error("Renderer implementation failed !");
         LOG.info("PCE cancel resource done OK !");
-        Uuid suuid = new Uuid(UUID.nameUUIDFromBytes(serviceName.getBytes(Charset.forName("UTF-8")))
+        Uuid suuid = new Uuid(UUID.nameUUIDFromBytes(serviceName.getBytes(StandardCharsets.UTF_8))
                 .toString());
         // get connections of connectivity service and remove them from tapi context and then remove
         //  service from context. The CEPs are maintained as they could be reused by another service
@@ -278,6 +301,66 @@ public class TapiRendererListenerImpl implements TransportpceRendererListener {
         }
     }
 
+    private void sendNbiNotification(PublishTapiNotificationService service) {
+        try {
+            this.notificationPublishService.putNotification(service);
+        } catch (InterruptedException e) {
+            LOG.warn("Cannot send notification to nbi", e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private PublishTapiNotificationService createNbiNotification(ConnectivityService connService) {
+        if (connService == null) {
+            LOG.error("ConnService is null");
+            return null;
+        }
+        /*
+        Map<ChangedAttributesKey, ChangedAttributes> changedStates = changedAttributesMap.entrySet()
+                .stream()
+                .filter(e -> e.getKey().getValueName().equals("administrative")
+                        || e.getKey().getValueName().equals("operational"))
+                .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+
+         */
+        Map<ChangedAttributesKey, ChangedAttributes> changedStates = new HashMap<>();
+        changedStates.put(new ChangedAttributesKey("administrativeState"),
+            new ChangedAttributesBuilder()
+                .setNewValue(connService.getAdministrativeState().getName())
+                .setOldValue(AdministrativeState.LOCKED.getName())
+                .setValueName("administrativeState").build());
+        changedStates.put(new ChangedAttributesKey("operationalState"),
+            new ChangedAttributesBuilder()
+                .setNewValue(connService.getOperationalState().getName())
+                .setOldValue(OperationalState.DISABLED.getName())
+                .setValueName("operationalState").build());
+        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssxxx");
+        OffsetDateTime offsetDateTime = OffsetDateTime.now(ZoneOffset.UTC);
+        DateAndTime datetime = new DateAndTime(dtf.format(offsetDateTime));
+        Map<TargetObjectNameKey, TargetObjectName> targetObjectNames = new HashMap<>();
+        if (connService.getName() != null) {
+            for (Map.Entry<NameKey, Name> entry : connService.getName().entrySet()) {
+                targetObjectNames.put(new TargetObjectNameKey(entry.getKey().getValueName()),
+                    new TargetObjectNameBuilder()
+                        .setValueName(entry.getValue().getValueName())
+                        .setValue(entry.getValue().getValue())
+                        .build());
+            }
+        }
+
+        return new PublishTapiNotificationServiceBuilder()
+            .setUuid(new Uuid(UUID.randomUUID().toString()))
+            .setTopic(connService.getUuid().getValue())
+            .setTargetObjectIdentifier(connService.getUuid())
+            .setNotificationType(NotificationType.ATTRIBUTEVALUECHANGE)
+            .setChangedAttributes(changedStates)
+            .setEventTimeStamp(datetime)
+            .setTargetObjectName(targetObjectNames)
+            .setTargetObjectType(ObjectType.CONNECTIVITYSERVICE)
+            .setLayerProtocolName(connService.getServiceLayer())
+            .build();
+    }
+
     public void setServiceUuid(Uuid serviceUuid) {
         this.serviceUuid = serviceUuid;
     }