/**
* Interface for session handler that is responsible for sending of data over established session.
*/
-public interface SessionHandlerInterface {
-
+public interface StreamSessionHandler {
/**
* Identification of created session.
*/
* @param data Message data to be send.
*/
void sendDataMessage(String data);
-
}
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface;
+import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
- private final Set<SessionHandlerInterface> subscribers = new HashSet<>();
+ private final Set<StreamSessionHandler> subscribers = new HashSet<>();
private volatile ListenerRegistration<?> registration;
@Override
}
@Override
- public final synchronized Set<SessionHandlerInterface> getSubscribers() {
+ public final synchronized Set<StreamSessionHandler> getSubscribers() {
return new HashSet<>(this.subscribers);
}
}
@Override
- public synchronized void addSubscriber(final SessionHandlerInterface subscriber) {
+ public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
final boolean isConnected = subscriber.isConnected();
Preconditions.checkState(isConnected);
LOG.debug("Subscriber {} is added.", subscriber);
}
@Override
- public synchronized void removeSubscriber(final SessionHandlerInterface subscriber) {
+ public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
final boolean isConnected = subscriber.isConnected();
Preconditions.checkState(isConnected);
LOG.debug("Subscriber {} is removed", subscriber);
* @param data Data of incoming notifications.
*/
synchronized void post(final String data) {
- final Iterator<SessionHandlerInterface> iterator = subscribers.iterator();
+ final Iterator<StreamSessionHandler> iterator = subscribers.iterator();
while (iterator.hasNext()) {
- final SessionHandlerInterface subscriber = iterator.next();
+ final StreamSessionHandler subscriber = iterator.next();
final boolean isConnected = subscriber.isConnected();
if (isConnected) {
subscriber.sendDataMessage(data);
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
import java.util.Set;
-import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface;
+import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
/**
*
* @return Set of all subscribers.
*/
- Set<SessionHandlerInterface> getSubscribers();
+ Set<StreamSessionHandler> getSubscribers();
/**
- * Checks if exists at least one {@link SessionHandlerInterface} subscriber.
+ * Checks if exists at least one {@link StreamSessionHandler} subscriber.
*
- * @return {@code true} if exist at least one {@link SessionHandlerInterface} subscriber, {@code false} otherwise.
+ * @return {@code true} if exist at least one {@link StreamSessionHandler} subscriber, {@code false} otherwise.
*/
boolean hasSubscribers();
String getOutputType();
/**
- * Registers {@link SessionHandlerInterface} subscriber.
+ * Registers {@link StreamSessionHandler} subscriber.
*
* @param subscriber SSE or WS session handler.
*/
- void addSubscriber(SessionHandlerInterface subscriber);
+ void addSubscriber(StreamSessionHandler subscriber);
/**
- * Removes {@link SessionHandlerInterface} subscriber.
+ * Removes {@link StreamSessionHandler} subscriber.
*
* @param subscriber SSE or WS session handler.
*/
- void removeSubscriber(SessionHandlerInterface subscriber);
+ void removeSubscriber(StreamSessionHandler subscriber);
/**
* Sets {@link ListenerRegistration} registration.
import java.util.concurrent.TimeUnit;
import org.glassfish.jersey.media.sse.EventOutput;
import org.glassfish.jersey.media.sse.OutboundEvent;
-import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface;
+import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or
* notification listener, and sending of data over established SSE session.
*/
-public class SSESessionHandler implements SessionHandlerInterface {
+public class SSESessionHandler implements StreamSessionHandler {
private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class);
private static final String PING_PAYLOAD = "ping";
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.restconf.nb.rfc8040.streams.websockets;
import java.util.Optional;
* Factory that is used for creation of new web-sockets based on HTTP/HTTPS upgrade request.
*/
class WebSocketFactory implements WebSocketCreator {
-
private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class);
private final ScheduledExecutorService executorService;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.restconf.nb.rfc8040.streams.websockets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
* Web-socket servlet listening on ws or wss schemas for created data-change-event or notification streams.
*/
@Singleton
-@SuppressFBWarnings({"SE_NO_SERIALVERSIONID", "SE_BAD_FIELD"})
public class WebSocketInitializer extends WebSocketServlet {
+ private static final long serialVersionUID = 1L;
+ @SuppressFBWarnings(value = "SE_BAD_FIELD",
+ justification = "Servlet/WebSocket bridge, we need this service for heartbeats")
private final ScheduledExecutorService executorService;
- private final Configuration configuration;
+ private final int maximumFragmentLength;
+ private final int heartbeatInterval;
+ private final int idleTimeoutMillis;
/**
* Creation of the web-socket initializer.
* @param configuration Web-socket configuration holder.
*/
@Inject
- public WebSocketInitializer(final ScheduledThreadPool scheduledThreadPool,
- final Configuration configuration) {
+ public WebSocketInitializer(final ScheduledThreadPool scheduledThreadPool, final Configuration configuration) {
this.executorService = scheduledThreadPool.getExecutor();
- this.configuration = configuration;
+ this.maximumFragmentLength = configuration.getMaximumFragmentLength();
+ this.heartbeatInterval = configuration.getHeartbeatInterval();
+ this.idleTimeoutMillis = configuration.getIdleTimeout();
}
/**
*/
@Override
public void configure(final WebSocketServletFactory factory) {
- factory.getPolicy().setIdleTimeout(configuration.getIdleTimeout());
- factory.setCreator(new WebSocketFactory(executorService, configuration.getMaximumFragmentLength(),
- configuration.getHeartbeatInterval()));
+ factory.getPolicy().setIdleTimeout(idleTimeoutMillis);
+ factory.setCreator(new WebSocketFactory(executorService, maximumFragmentLength, heartbeatInterval));
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.restconf.nb.rfc8040.streams.websockets;
import com.google.common.base.Strings;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
-import org.opendaylight.restconf.nb.rfc8040.streams.SessionHandlerInterface;
+import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.BaseListenerInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* to data-change-event or notification listener, and sending of data over established web-socket session.
*/
@WebSocket
-public class WebSocketSessionHandler implements SessionHandlerInterface {
-
+public class WebSocketSessionHandler implements StreamSessionHandler {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketSessionHandler.class);
private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset());
*
* @param message Message data to be send over web-socket session.
*/
+ @Override
public synchronized void sendDataMessage(final String message) {
if (Strings.isNullOrEmpty(message)) {
// FIXME: should this be tolerated?