private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
private final EventBus eventBus;
- @SuppressWarnings("rawtypes")
private EventBusChangeRecorder eventBusChangeRecorder;
private volatile ListenerRegistration<?> registration;
/**
* Creating {@link EventBus}.
*/
- protected AbstractCommonSubscriber() {
+ AbstractCommonSubscriber() {
this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
}
this.registration.close();
this.registration = null;
}
-
deleteDataInDS();
unregister();
}
- /**
- * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
- * subscriber to the event and post event into event bus.
- *
- * @param subscriber
- * Channel
- */
+ @Override
public void addSubscriber(final Channel subscriber) {
if (!subscriber.isActive()) {
LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
this.eventBus.post(event);
}
- /**
- * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
- * subscriber to the event and posts event into event bus.
- *
- * @param subscriber subscriber channel
- */
+ @Override
public void removeSubscriber(final Channel subscriber) {
LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
final Event event = new Event(EventType.DEREGISTER);
this.eventBus.post(event);
}
- /**
- * Sets {@link ListenerRegistration} registration.
- *
- * @param registration
- * DOMDataChangeListener registration
- */
+ @Override
public void setRegistration(final ListenerRegistration<?> registration) {
this.registration = registration;
}
- /**
- * Checks if {@link ListenerRegistration} registration exist.
- *
- * @return True if exist, false otherwise.
- */
+ @Override
public boolean isListening() {
return this.registration != null;
}
* Creating and registering {@link EventBusChangeRecorder} of specific
* listener on {@link EventBus}.
*
- * @param listener
- * specific listener of notifications
+ * @param listener Specific listener of notifications.
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected <T extends BaseListenerInterface> void register(final T listener) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ <T extends BaseListenerInterface> void register(final T listener) {
this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
this.eventBus.register(this.eventBusChangeRecorder);
}
/**
* Post event to event bus.
*
- * @param event
- * data of incoming notifications
+ * @param event Data of incoming notifications.
*/
protected void post(final Event event) {
this.eventBus.post(event);
}
/**
- * Removes all subscribers and unregisters event bus change recorder form
- * event bus.
+ * Removes all subscribers and unregisters event bus change recorder form event bus.
*/
- protected void unregister() {
+ private void unregister() {
this.subscribers.clear();
this.eventBus.unregister(this.eventBusChangeRecorder);
}