11e5656d344ddeb6465ffbab8863ce6a94282008
[netconf.git] / restconf / restconf-nb-bierman02 / src / main / java / org / opendaylight / netconf / sal / streams / listeners / EventBusChangeRecorder.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.sal.streams.listeners;
9
10 import com.google.common.eventbus.Subscribe;
11 import io.netty.channel.Channel;
12 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15
16 class EventBusChangeRecorder<T extends BaseListenerInterface> {
17
18     private static final Logger LOG = LoggerFactory.getLogger(EventBusChangeRecorder.class);
19     private final T listener;
20
21     /**
22      * Event bus change recorder of specific listener of notifications.
23      *
24      * @param listener
25      *             specific listener
26      */
27     EventBusChangeRecorder(final T listener) {
28         this.listener = listener;
29     }
30
31     @Subscribe
32     public void recordCustomerChange(final Event event) {
33         if (event.getType() == EventType.REGISTER) {
34             final Channel subscriber = event.getSubscriber();
35             if (!this.listener.getSubscribers().contains(subscriber)) {
36                 this.listener.getSubscribers().add(subscriber);
37             }
38         } else if (event.getType() == EventType.DEREGISTER) {
39             this.listener.getSubscribers().remove(event.getSubscriber());
40             Notificator.removeListenerIfNoSubscriberExists(this.listener);
41         } else if (event.getType() == EventType.NOTIFY) {
42             for (final Channel subscriber : this.listener.getSubscribers()) {
43                 if (subscriber.isActive()) {
44                     LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
45                     subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
46                 } else {
47                     LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
48                     this.listener.getSubscribers().remove(subscriber);
49                 }
50             }
51         }
52     }
53 }