Added resource /streams/stream/<streamName>
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / Notificator.java
diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java
new file mode 100644 (file)
index 0000000..b0140ec
--- /dev/null
@@ -0,0 +1,91 @@
+package org.opendaylight.controller.sal.streams.listeners;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public class Notificator {
+
+    private static Map<String, ListenerAdapter> listenersByStreamName = new ConcurrentHashMap<>();
+    private static Map<InstanceIdentifier, ListenerAdapter> listenersByInstanceIdentifier = new ConcurrentHashMap<>();
+    private static final Lock lock = new ReentrantLock();
+
+    private Notificator() {
+    }
+
+    public static ListenerAdapter getListenerFor(String streamName) {
+        return listenersByStreamName.get(streamName);
+    }
+
+    public static ListenerAdapter getListenerFor(InstanceIdentifier path) {
+        return listenersByInstanceIdentifier.get(path);
+    }
+
+    public static boolean existListenerFor(InstanceIdentifier path) {
+        return listenersByInstanceIdentifier.containsKey(path);
+    }
+
+    public static ListenerAdapter createListener(InstanceIdentifier path, String streamName) {
+        ListenerAdapter listener = new ListenerAdapter(path, streamName);
+        lock.lock();
+        listenersByInstanceIdentifier.put(path, listener);
+        listenersByStreamName.put(streamName, listener);
+        lock.unlock();
+        return listener;
+    }
+
+    public static void removeListener(InstanceIdentifier path) {
+        ListenerAdapter listener = listenersByInstanceIdentifier.get(path);
+        deleteListener(listener);
+    }
+
+    public static String createStreamNameFromUri(String uri) {
+        if (uri == null) {
+            return null;
+        }
+        String result = uri;
+        if (result.startsWith("/")) {
+            result = result.substring(1);
+        }
+        if (result.endsWith("/")) {
+            result = result.substring(0, result.length());
+        }
+        return result;
+    }
+
+    public static void removeAllListeners() {
+        for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) {
+            try {
+                listener.close();
+            } catch (Exception e) {
+            }
+        }
+        lock.lock();
+        listenersByStreamName = new ConcurrentHashMap<>();
+        listenersByInstanceIdentifier = new ConcurrentHashMap<>();
+        lock.unlock();
+    }
+
+    public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) {
+        if (!listener.hasSubscribers()) {
+            deleteListener(listener);
+        }
+    }
+
+    private static void deleteListener(ListenerAdapter listener) {
+        if (listener != null) {
+            try {
+                listener.close();
+            } catch (Exception e) {
+            }
+            lock.lock();
+            listenersByInstanceIdentifier.remove(listener.getPath());
+            listenersByStreamName.remove(listener).getStreamName();
+            lock.unlock();
+        }
+    }
+
+}
\ No newline at end of file