* Abstract base class for functionality shared between {@link NotificationListenerAdapter} and
* {@link DeviceNotificationListenerAdaptor}.
*/
-abstract class AbstractNotificationListenerAdaptor extends AbstractCommonSubscriber<DOMNotification>
+abstract class AbstractNotificationListenerAdaptor extends AbstractStream<DOMNotification>
implements DOMNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class);
private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY =
import org.slf4j.LoggerFactory;
/**
- * Features of subscribing part of both notifications.
+ * Base superclass for all stream types.
*/
-abstract class AbstractCommonSubscriber<T> implements BaseListenerInterface {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
+abstract class AbstractStream<T> implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractStream.class);
private final EventFormatterFactory<T> formatterFactory;
private final NotificationOutputType outputType;
protected DatabindProvider databindProvider;
private DOMDataBroker dataBroker;
- AbstractCommonSubscriber(final String streamName, final NotificationOutputType outputType,
+ AbstractStream(final String streamName, final NotificationOutputType outputType,
final EventFormatterFactory<T> formatterFactory, final ListenersBroker listenersBroker) {
this.streamName = requireNonNull(streamName);
checkArgument(!streamName.isEmpty());
formatter = formatterFactory.emptyFormatter();
}
- @Override
+ /**
+ * Get name of stream.
+ *
+ * @return Stream name.
+ */
public final String getStreamName() {
return streamName;
}
- @Override
- public final String getOutputType() {
+ /**
+ * Get output type.
+ *
+ * @return Output type (JSON or XML).
+ */
+ final String getOutputType() {
return outputType.getName();
}
- @Override
- public final synchronized boolean hasSubscribers() {
+ /**
+ * Checks if exists at least one {@link StreamSessionHandler} subscriber.
+ *
+ * @return {@code true} if exist at least one {@link StreamSessionHandler} subscriber, {@code false} otherwise.
+ */
+ final synchronized boolean hasSubscribers() {
return !subscribers.isEmpty();
}
- @Override
- public final synchronized Set<StreamSessionHandler> getSubscribers() {
+ /**
+ * Return all subscribers of listener.
+ *
+ * @return Set of all subscribers.
+ */
+ final synchronized Set<StreamSessionHandler> getSubscribers() {
return new HashSet<>(subscribers);
}
subscribers.clear();
}
- @Override
- public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
+ /**
+ * Registers {@link StreamSessionHandler} subscriber.
+ *
+ * @param subscriber SSE or WS session handler.
+ */
+ synchronized void addSubscriber(final StreamSessionHandler subscriber) {
final boolean isConnected = subscriber.isConnected();
checkState(isConnected);
LOG.debug("Subscriber {} is added.", subscriber);
subscribers.add(subscriber);
}
- @Override
- public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
+ /**
+ * Removes {@link StreamSessionHandler} subscriber.
+ *
+ * @param subscriber SSE or WS session handler.
+ */
+ synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
subscribers.remove(subscriber);
LOG.debug("Subscriber {} is removed", subscriber);
if (!hasSubscribers()) {
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams;
-
-import java.util.Set;
-
-/**
- * Base interface for both listeners({@link ListenerAdapter}, {@link NotificationListenerAdapter}).
- */
-public interface BaseListenerInterface extends AutoCloseable {
- /**
- * Return all subscribers of listener.
- *
- * @return Set of all subscribers.
- */
- Set<StreamSessionHandler> getSubscribers();
-
- /**
- * Checks if exists at least one {@link StreamSessionHandler} subscriber.
- *
- * @return {@code true} if exist at least one {@link StreamSessionHandler} subscriber, {@code false} otherwise.
- */
- boolean hasSubscribers();
-
- /**
- * Get name of stream.
- *
- * @return Stream name.
- */
- String getStreamName();
-
- /**
- * Get output type.
- *
- * @return Output type (JSON or XML).
- */
- String getOutputType();
-
- /**
- * Registers {@link StreamSessionHandler} subscriber.
- *
- * @param subscriber SSE or WS session handler.
- */
- void addSubscriber(StreamSessionHandler subscriber);
-
- /**
- * Removes {@link StreamSessionHandler} subscriber.
- *
- * @param subscriber SSE or WS session handler.
- */
- void removeSubscriber(StreamSessionHandler subscriber);
-}
/**
* {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
*/
-public class ListenerAdapter extends AbstractCommonSubscriber<Collection<DataTreeCandidate>>
+public class ListenerAdapter extends AbstractStream<Collection<DataTreeCandidate>>
implements ClusteredDOMDataTreeChangeListener {
private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
private static final DataTreeCandidateFormatterFactory JSON_FORMATTER_FACTORY =
* @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
* or {@link Optional#empty()} if listener with specified stream name doesn't exist.
*/
- public final @Nullable BaseListenerInterface listenerFor(final String streamName) {
+ public final @Nullable AbstractStream<?> listenerFor(final String streamName) {
if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
return notificationListenerFor(streamName);
} else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
/**
* Removal and closing of general listener (data-change or notification listener).
*
- * @param listener Listener to be closed and removed from cache.
+ * @param stream Stream to be closed and removed from cache.
*/
- final void removeAndCloseListener(final BaseListenerInterface listener) {
- requireNonNull(listener);
- if (listener instanceof ListenerAdapter) {
- removeAndCloseDataChangeListener((ListenerAdapter) listener);
- } else if (listener instanceof NotificationListenerAdapter) {
- removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
+ final void removeAndCloseListener(final AbstractStream<?> stream) {
+ requireNonNull(stream);
+ if (stream instanceof ListenerAdapter dataChange) {
+ removeAndCloseDataChangeListener(dataChange);
+ } else if (stream instanceof NotificationListenerAdapter notification) {
+ removeAndCloseNotificationListener(notification);
}
}
*/
public final class SSESessionHandler implements StreamSessionHandler {
private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
- private static final String PING_PAYLOAD = "ping";
-
private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n");
private final ScheduledExecutorService executorService;
- private final BaseListenerInterface listener;
+ // FIXME: this really should include subscription details like formatter etc.
+ private final AbstractStream<?> listener;
private final int maximumFragmentLength;
private final int heartbeatInterval;
private final SseEventSink sink;
* session up. Ping control frames are disabled if this parameter is set to 0.
*/
public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
- final BaseListenerInterface listener, final int maximumFragmentLength, final int heartbeatInterval) {
+ final AbstractStream<?> listener, final int maximumFragmentLength, final int heartbeatInterval) {
this.executorService = executorService;
this.sse = sse;
this.sink = sink;
private synchronized void sendPingMessage() {
if (!sink.isClosed()) {
- LOG.debug("sending PING:{}", PING_PAYLOAD);
- sink.send(sse.newEventBuilder().comment(PING_PAYLOAD).build());
+ LOG.debug("sending PING");
+ sink.send(sse.newEventBuilder().comment("ping").build());
} else {
close();
}
private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
private final ScheduledExecutorService executorService;
- private final BaseListenerInterface listener;
+ // FIXME: this really should include formatter etc.
+ private final AbstractStream<?> listener;
private final int maximumFragmentLength;
private final int heartbeatInterval;
* @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint
* to keep session up. Ping control frames are disabled if this parameter is set to 0.
*/
- WebSocketSessionHandler(final ScheduledExecutorService executorService, final BaseListenerInterface listener,
+ WebSocketSessionHandler(final ScheduledExecutorService executorService, final AbstractStream<?> listener,
final int maximumFragmentLength, final int heartbeatInterval) {
this.executorService = executorService;
this.listener = listener;
@Mock
private ScheduledExecutorService executorService;
@Mock
- private BaseListenerInterface listener;
+ private AbstractStream<?> listener;
@Mock
private ScheduledFuture<?> pingFuture;
@Mock
import org.mockito.ArgumentCaptor;
public class WebSocketSessionHandlerTest {
-
private static final class WebSocketTestSessionState {
- private final BaseListenerInterface listener;
+ private final AbstractStream<?> listener;
private final ScheduledExecutorService executorService;
private final WebSocketSessionHandler webSocketSessionHandler;
private final int heartbeatInterval;
private final int maxFragmentSize;
private final ScheduledFuture pingFuture;
- private WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) {
- listener = mock(BaseListenerInterface.class);
+ WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) {
+ listener = mock(AbstractStream.class);
executorService = mock(ScheduledExecutorService.class);
this.heartbeatInterval = heartbeatInterval;
this.maxFragmentSize = maxFragmentSize;