*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
-import io.netty.channel.Channel;
-import io.netty.util.internal.ConcurrentSet;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoField;
+import java.time.temporal.TemporalAccessor;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.Executors;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import java.util.concurrent.ExecutionException;
+import javax.xml.xpath.XPathExpressionException;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
+import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
+import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Features of subscribing part of both notifications.
*/
-abstract class AbstractCommonSubscriber extends AbstractQueryParams implements BaseListenerInterface {
-
+abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData implements BaseListenerInterface {
private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
+ private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder()
+ .appendValue(ChronoField.YEAR, 4).appendLiteral('-')
+ .appendValue(ChronoField.MONTH_OF_YEAR, 2).appendLiteral('-')
+ .appendValue(ChronoField.DAY_OF_MONTH, 2).appendLiteral('T')
+ .appendValue(ChronoField.HOUR_OF_DAY, 2).appendLiteral(':')
+ .appendValue(ChronoField.MINUTE_OF_HOUR, 2).appendLiteral(':')
+ .appendValue(ChronoField.SECOND_OF_MINUTE, 2)
+ .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
+ .appendOffset("+HH:MM", "Z").toFormatter();
- private final Set<Channel> subscribers = new ConcurrentSet<>();
- private final EventBus eventBus;
+ private final EventFormatterFactory<T> formatterFactory;
+ private final NotificationOutputType outputType;
+ private final String streamName;
+ private final P path;
- @SuppressWarnings("rawtypes")
- private EventBusChangeRecorder eventBusChangeRecorder;
+ @GuardedBy("this")
+ private final Set<StreamSessionHandler> subscribers = new HashSet<>();
+ @GuardedBy("this")
+ private Registration registration;
- private volatile ListenerRegistration<?> registration;
+ // FIXME: these should be final
+ private Instant start = null;
+ private Instant stop = null;
+ private boolean leafNodesOnly = false;
+ private boolean skipNotificationData = false;
+ private EventFormatter<T> formatter;
- /**
- * Creating {@link EventBus}.
- */
- protected AbstractCommonSubscriber() {
- this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+ AbstractCommonSubscriber(final QName lastQName, final String streamName, final P path,
+ final NotificationOutputType outputType, final EventFormatterFactory<T> formatterFactory) {
+ super(lastQName);
+ this.streamName = requireNonNull(streamName);
+ checkArgument(!streamName.isEmpty());
+ this.path = requireNonNull(path);
+
+ this.outputType = requireNonNull(outputType);
+ this.formatterFactory = requireNonNull(formatterFactory);
+ formatter = formatterFactory.getFormatter();
+ }
+
+ @Override
+ public final String getStreamName() {
+ return streamName;
+ }
+
+ @Override
+ public final String getOutputType() {
+ return outputType.getName();
+ }
+
+ @Override
+ public final synchronized boolean hasSubscribers() {
+ return !subscribers.isEmpty();
}
@Override
- public final boolean hasSubscribers() {
- return !this.subscribers.isEmpty();
+ public final synchronized Set<StreamSessionHandler> getSubscribers() {
+ return new HashSet<>(subscribers);
+ }
+
+ @Override
+ public final synchronized void close() throws InterruptedException, ExecutionException {
+ if (registration != null) {
+ registration.close();
+ registration = null;
+ }
+ deleteDataInDS().get();
+ subscribers.clear();
}
@Override
- public final Set<Channel> getSubscribers() {
- return this.subscribers;
+ public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
+ final boolean isConnected = subscriber.isConnected();
+ checkState(isConnected);
+ LOG.debug("Subscriber {} is added.", subscriber);
+ subscribers.add(subscriber);
}
@Override
- public final void close() throws Exception {
- if (this.registration != null) {
- this.registration.close();
- this.registration = null;
+ public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
+ final boolean isConnected = subscriber.isConnected();
+ checkState(isConnected);
+ LOG.debug("Subscriber {} is removed", subscriber);
+ subscribers.remove(subscriber);
+ if (!hasSubscribers()) {
+ ListenersBroker.getInstance().removeAndCloseListener(this);
}
+ }
- deleteDataInDS();
- unregister();
+ public final Instant getStart() {
+ return start;
}
/**
- * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
- * subscriber to the event and post event into event bus.
+ * Set query parameters for listener.
*
- * @param subscriber
- * Channel
+ * @param params NotificationQueryParams to use.
*/
- public void addSubscriber(final Channel subscriber) {
- if (!subscriber.isActive()) {
- LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
+ public final void setQueryParams(final NotificationQueryParams params) {
+ final var startTime = params.startTime();
+ start = startTime == null ? Instant.now() : parseDateAndTime(startTime.value());
+
+ final var stopTime = params.stopTime();
+ stop = stopTime == null ? null : parseDateAndTime(stopTime.value());
+
+ final var leafNodes = params.leafNodesOnly();
+ leafNodesOnly = leafNodes == null ? false : leafNodes.value();
+
+ final var skipData = params.skipNotificationData();
+ skipNotificationData = skipData == null ? false : skipData.value();
+
+ final var filter = params.filter();
+ final String filterValue = filter == null ? null : filter.paramValue();
+ if (filterValue != null && !filterValue.isEmpty()) {
+ try {
+ formatter = formatterFactory.getFormatter(filterValue);
+ } catch (XPathExpressionException e) {
+ throw new IllegalArgumentException("Failed to get filter", e);
+ }
+ } else {
+ formatter = formatterFactory.getFormatter();
}
- final Event event = new Event(EventType.REGISTER);
- event.setSubscriber(subscriber);
- this.eventBus.post(event);
+ }
+
+ final P path() {
+ return path;
}
/**
- * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
- * subscriber to the event and posts event into event bus.
+ * Check whether this query should only notify about leaf node changes.
*
- * @param subscriber subscriber channel
+ * @return true if this query should only notify about leaf node changes
*/
- public void removeSubscriber(final Channel subscriber) {
- LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
- final Event event = new Event(EventType.DEREGISTER);
- event.setSubscriber(subscriber);
- this.eventBus.post(event);
+ final boolean getLeafNodesOnly() {
+ return leafNodesOnly;
}
/**
- * Sets {@link ListenerRegistration} registration.
+ * Check whether this query should notify changes without data.
*
- * @param registration
- * DOMDataChangeListener registration
+ * @return true if this query should notify about changes with data
*/
- public void setRegistration(final ListenerRegistration<?> registration) {
- this.registration = registration;
+ final boolean isSkipNotificationData() {
+ return skipNotificationData;
+ }
+
+ final EventFormatter<T> formatter() {
+ return formatter;
}
/**
- * Checks if {@link ListenerRegistration} registration exist.
+ * Sets {@link Registration} registration.
*
- * @return True if exist, false otherwise.
+ * @param registration a listener registration registration.
*/
- public boolean isListening() {
- return this.registration != null;
+ @Holding("this")
+ final void setRegistration(final Registration registration) {
+ this.registration = requireNonNull(registration);
}
/**
- * Creating and registering {@link EventBusChangeRecorder} of specific
- * listener on {@link EventBus}.
+ * Checks if {@link Registration} registration exists.
*
- * @param listener
- * specific listener of notifications
+ * @return {@code true} if exists, {@code false} otherwise.
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected <T extends BaseListenerInterface> void register(final T listener) {
- this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
- this.eventBus.register(this.eventBusChangeRecorder);
+ @Holding("this")
+ final boolean isListening() {
+ return registration != null;
}
/**
- * Post event to event bus.
+ * Post data to subscribed SSE session handlers.
*
- * @param event
- * data of incoming notifications
+ * @param data Data of incoming notifications.
*/
- protected void post(final Event event) {
- this.eventBus.post(event);
+ synchronized void post(final String data) {
+ final Iterator<StreamSessionHandler> iterator = subscribers.iterator();
+ while (iterator.hasNext()) {
+ final StreamSessionHandler subscriber = iterator.next();
+ final boolean isConnected = subscriber.isConnected();
+ if (isConnected) {
+ subscriber.sendDataMessage(data);
+ LOG.debug("Data was sent to subscriber {} on connection {}:", this, subscriber);
+ } else {
+ // removal is probably not necessary, because it will be removed explicitly soon after invocation of
+ // onWebSocketClosed(..) in handler; but just to be sure ...
+ iterator.remove();
+ LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this);
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ final boolean checkStartStop(final Instant now) {
+ if (stop != null) {
+ if (start.compareTo(now) < 0 && stop.compareTo(now) > 0) {
+ return true;
+ }
+ if (stop.compareTo(now) < 0) {
+ try {
+ close();
+ } catch (final Exception e) {
+ throw new RestconfDocumentedException("Problem with unregister listener." + e);
+ }
+ }
+ } else if (start != null) {
+ if (start.compareTo(now) < 0) {
+ start = null;
+ return true;
+ }
+ } else {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("path", path)
+ .add("stream-name", streamName)
+ .add("output-type", getOutputType())
+ .toString();
}
/**
- * Removes all subscribers and unregisters event bus change recorder form
- * event bus.
+ * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
+ * to {@link Instant} format.
+ *
+ * @param uriValue Start-time or stop-time as string in {@link DateAndTime} format.
+ * @return Parsed {@link Instant} by entry.
*/
- protected void unregister() {
- this.subscribers.clear();
- this.eventBus.unregister(this.eventBusChangeRecorder);
+ private static @NonNull Instant parseDateAndTime(final DateAndTime dateAndTime) {
+ final TemporalAccessor accessor;
+ try {
+ accessor = FORMATTER.parse(dateAndTime.getValue());
+ } catch (final DateTimeParseException e) {
+ throw new RestconfDocumentedException("Cannot parse of value in date: " + dateAndTime, e);
+ }
+ return Instant.from(accessor);
}
}