From 7750700d429c729a86ff3e3e49b002d0db0e9b18 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jaroslav=20T=C3=B3th?= Date: Wed, 10 Apr 2019 08:40:44 +0200 Subject: [PATCH] Removal of EventBusChangeRecorder MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit - Reasons to remove : 1) AbstractCommonSubsriber notifies event bus and then event bus performs operations on AbstractCommonSubsriber - what is the reason for this indirection? 2) AbstractCommonSubsriber does nearly nothing; only brokes input events to subscribers based on EventType. 3) There are only two possible types of subsribers that have common parent in which all logic is performed. - Correction of synchronization in AbstractCommonSubscriber - concurrent set is not enough, because in postData(..) method this set is accessed twice --> explicit sync using monitor or lock is required. Change-Id: Iac70127b7ebbbbe0c0ff849120818ddf71278a94 Signed-off-by: Jaroslav Tóth --- .../listeners/AbstractCommonSubscriber.java | 80 +++++++------------ .../listeners/BaseListenerInterface.java | 6 +- .../nb/rfc8040/streams/listeners/Event.java | 77 ------------------ .../listeners/EventBusChangeRecorder.java | 52 ------------ .../rfc8040/streams/listeners/EventType.java | 15 ---- .../streams/listeners/ListenerAdapter.java | 7 +- .../NotificationListenerAdapter.java | 16 +--- .../listeners/ListenerAdapterTest.java | 4 +- 8 files changed, 35 insertions(+), 222 deletions(-) delete mode 100644 restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Event.java delete mode 100644 restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java delete mode 100644 restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventType.java diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java index 7c65f5ca4f..a7449565aa 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java @@ -7,12 +7,10 @@ */ package org.opendaylight.restconf.nb.rfc8040.streams.listeners; -import com.google.common.eventbus.AsyncEventBus; -import com.google.common.eventbus.EventBus; import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import java.util.HashSet; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,56 +22,44 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class); - private final Set subscribers = ConcurrentHashMap.newKeySet(); - private final EventBus eventBus; - - private EventBusChangeRecorder eventBusChangeRecorder; - + private final Set subscribers = new HashSet<>(); private volatile ListenerRegistration registration; - /** - * Creating {@link EventBus}. - */ - AbstractCommonSubscriber() { - this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); - } - @Override - public final boolean hasSubscribers() { + public final synchronized boolean hasSubscribers() { return !this.subscribers.isEmpty(); } @Override - public final Set getSubscribers() { - return this.subscribers; + public final synchronized Set getSubscribers() { + return new HashSet<>(this.subscribers); } @Override - public final void close() throws Exception { + public final synchronized void close() throws Exception { if (this.registration != null) { this.registration.close(); this.registration = null; } deleteDataInDS(); - unregister(); + this.subscribers.clear(); } @Override - public void addSubscriber(final Channel subscriber) { + public synchronized void addSubscriber(final Channel subscriber) { if (!subscriber.isActive()) { LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress()); } - final Event event = new Event(EventType.REGISTER); - event.setSubscriber(subscriber); - this.eventBus.post(event); + subscribers.add(subscriber); } @Override - public void removeSubscriber(final Channel subscriber) { + public synchronized void removeSubscriber(final Channel subscriber) { LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); - final Event event = new Event(EventType.DEREGISTER); - event.setSubscriber(subscriber); - this.eventBus.post(event); + subscribers.remove(subscriber); + if (!hasSubscribers()) { + ListenersBroker.getInstance().removeAndCloseListener(this); + } } @Override @@ -87,31 +73,19 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B } /** - * Creating and registering {@link EventBusChangeRecorder} of specific - * listener on {@link EventBus}. + * Post data to subscribed channels. * - * @param listener Specific listener of notifications. + * @param data Data of incoming notifications. */ - @SuppressWarnings({"unchecked", "rawtypes"}) - void register(final T listener) { - this.eventBusChangeRecorder = new EventBusChangeRecorder(listener); - this.eventBus.register(this.eventBusChangeRecorder); - } - - /** - * Post event to event bus. - * - * @param event Data of incoming notifications. - */ - protected void post(final Event event) { - this.eventBus.post(event); - } - - /** - * Removes all subscribers and unregisters event bus change recorder form event bus. - */ - private void unregister() { - this.subscribers.clear(); - this.eventBus.unregister(this.eventBusChangeRecorder); + synchronized void post(final String data) { + for (final Channel subscriber : subscribers) { + if (subscriber.isActive()) { + LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); + subscriber.writeAndFlush(new TextWebSocketFrame(data)); + } else { + LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); + subscribers.remove(subscriber); + } + } } } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java index fda095b9a1..f1893f5034 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java @@ -45,16 +45,14 @@ public interface BaseListenerInterface extends AutoCloseable { String getOutputType(); /** - * Creates event of type {@link EventType#REGISTER}, set {@link Channel} - * subscriber to the event and post event into event bus. + * Registers {@link Channel} subscriber. * * @param subscriber Web-socket channel. */ void addSubscriber(Channel subscriber); /** - * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} - * subscriber to the event and posts event into event bus. + * Removes {@link Channel} subscriber. * * @param subscriber Subscriber channel. */ diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Event.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Event.java deleted file mode 100644 index 1e771746be..0000000000 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Event.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.restconf.nb.rfc8040.streams.listeners; - -import io.netty.channel.Channel; - -/** - * Represents event of specific {@link EventType} type, holds data and - * {@link Channel} subscriber. - */ -class Event { - private final EventType type; - private Channel subscriber; - private String data; - - /** - * Creates new event specified by {@link EventType} type. - * - * @param type - * EventType - */ - Event(final EventType type) { - this.type = type; - } - - /** - * Gets the {@link Channel} subscriber. - * - * @return Channel - */ - public Channel getSubscriber() { - return this.subscriber; - } - - /** - * Sets subscriber for event. - * - * @param subscriber - * Channel - */ - public void setSubscriber(final Channel subscriber) { - this.subscriber = subscriber; - } - - /** - * Gets event String. - * - * @return String representation of event data. - */ - public String getData() { - return this.data; - } - - /** - * Sets event data. - * - * @param data - * String. - */ - public void setData(final String data) { - this.data = data; - } - - /** - * Gets event type. - * - * @return The type of the event. - */ - public EventType getType() { - return this.type; - } -} diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java deleted file mode 100644 index 944631e3fa..0000000000 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.restconf.nb.rfc8040.streams.listeners; - -import com.google.common.eventbus.Subscribe; -import io.netty.channel.Channel; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class EventBusChangeRecorder { - - private static final Logger LOG = LoggerFactory.getLogger(EventBusChangeRecorder.class); - private final T listener; - - /** - * Event bus change recorder of specific listener of notifications. - * - * @param listener Specific listener. - */ - EventBusChangeRecorder(final T listener) { - this.listener = listener; - } - - @Subscribe - public void recordCustomerChange(final Event event) { - if (event.getType() == EventType.REGISTER) { - final Channel subscriber = event.getSubscriber(); - this.listener.getSubscribers().add(subscriber); - } else if (event.getType() == EventType.DEREGISTER) { - this.listener.getSubscribers().remove(event.getSubscriber()); - if (!this.listener.hasSubscribers()) { - ListenersBroker.getInstance().removeAndCloseListener(this.listener); - } - } else if (event.getType() == EventType.NOTIFY) { - for (final Channel subscriber : this.listener.getSubscribers()) { - if (subscriber.isActive()) { - LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); - subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); - } else { - LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); - this.listener.getSubscribers().remove(subscriber); - } - } - } - } -} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventType.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventType.java deleted file mode 100644 index 70b866186b..0000000000 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventType.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.restconf.nb.rfc8040.streams.listeners; - -/** - * Type of the event. - */ -enum EventType { - REGISTER, DEREGISTER, NOTIFY -} diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java index 12a6ae4904..2645383b88 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java @@ -62,7 +62,6 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster */ ListenerAdapter(final YangInstanceIdentifier path, final String streamName, final NotificationOutputType outputType) { - register(this); setLocalNameOfPath(path.getLastPathArgument().getNodeType().getLocalName()); this.outputType = Preconditions.checkNotNull(outputType); @@ -114,13 +113,11 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster * @param xml XML-formatted data. */ private void prepareAndPostData(final String xml) { - final Event event = new Event(EventType.NOTIFY); if (this.outputType.equals(NotificationOutputType.JSON)) { - event.setData(XML.toJSONObject(xml).toString()); + post(XML.toJSONObject(xml).toString()); } else { - event.setData(xml); + post(xml); } - post(event); } /** diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java index 32a8c66eeb..bbfad3127c 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java @@ -46,14 +46,13 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem private final String outputType; /** - * Set path of listener and stream name, register event bus. + * Set path of listener and stream name. * * @param path Schema path of YANG notification. * @param streamName Name of the stream. * @param outputType Type of output on notification (JSON or XML). */ NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) { - register(this); setLocalNameOfPath(path.getLastComponent().getLocalName()); this.outputType = Preconditions.checkNotNull(outputType); @@ -82,7 +81,7 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem final SchemaContext schemaContext = schemaHandler.get(); final String xml = prepareXml(schemaContext, notification); if (checkFilter(xml)) { - prepareAndPostData(outputType.equals("JSON") ? prepareJson(schemaContext, notification) : xml); + post(outputType.equals("JSON") ? prepareJson(schemaContext, notification) : xml); } } @@ -105,17 +104,6 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem return this.path; } - /** - * Prepare data of notification and data to client. - * - * @param data JSON or XML data that holds notification data. - */ - private void prepareAndPostData(final String data) { - final Event event = new Event(EventType.NOTIFY); - event.setData(data); - post(event); - } - /** * Creation of JSON from notification data. * diff --git a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java index 473d6af1ce..34116da78c 100644 --- a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java +++ b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java @@ -90,8 +90,8 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest { } @Override - protected void post(final Event event) { - this.lastNotification = event.getData(); + protected void post(final String data) { + this.lastNotification = data; notificationLatch.countDown(); } -- 2.36.6