Removal of EventBusChangeRecorder
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / AbstractCommonSubscriber.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
9
10 import io.netty.channel.Channel;
11 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
12 import java.util.HashSet;
13 import java.util.Set;
14 import org.opendaylight.yangtools.concepts.ListenerRegistration;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 /**
19  * Features of subscribing part of both notifications.
20  */
21 abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
22
23     private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
24
25     private final Set<Channel> subscribers = new HashSet<>();
26     private volatile ListenerRegistration<?> registration;
27
28     @Override
29     public final synchronized boolean hasSubscribers() {
30         return !this.subscribers.isEmpty();
31     }
32
33     @Override
34     public final synchronized Set<Channel> getSubscribers() {
35         return new HashSet<>(this.subscribers);
36     }
37
38     @Override
39     public final synchronized void close() throws Exception {
40         if (this.registration != null) {
41             this.registration.close();
42             this.registration = null;
43         }
44         deleteDataInDS();
45         this.subscribers.clear();
46     }
47
48     @Override
49     public synchronized void addSubscriber(final Channel subscriber) {
50         if (!subscriber.isActive()) {
51             LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
52         }
53         subscribers.add(subscriber);
54     }
55
56     @Override
57     public synchronized void removeSubscriber(final Channel subscriber) {
58         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
59         subscribers.remove(subscriber);
60         if (!hasSubscribers()) {
61             ListenersBroker.getInstance().removeAndCloseListener(this);
62         }
63     }
64
65     @Override
66     public void setRegistration(final ListenerRegistration<?> registration) {
67         this.registration = registration;
68     }
69
70     @Override
71     public boolean isListening() {
72         return this.registration != null;
73     }
74
75     /**
76      * Post data to subscribed channels.
77      *
78      * @param data Data of incoming notifications.
79      */
80     synchronized void post(final String data) {
81         for (final Channel subscriber : subscribers) {
82             if (subscriber.isActive()) {
83                 LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
84                 subscriber.writeAndFlush(new TextWebSocketFrame(data));
85             } else {
86                 LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
87                 subscribers.remove(subscriber);
88             }
89         }
90     }
91 }