2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.base.Preconditions;
13 import java.time.Instant;
14 import java.time.format.DateTimeFormatter;
15 import java.time.format.DateTimeFormatterBuilder;
16 import java.time.format.DateTimeParseException;
17 import java.time.temporal.ChronoField;
18 import java.time.temporal.TemporalAccessor;
19 import java.util.HashSet;
20 import java.util.Iterator;
22 import java.util.concurrent.ExecutionException;
23 import javax.xml.xpath.XPathExpressionException;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.checkerframework.checker.lock.qual.Holding;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
29 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
30 import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
32 import org.opendaylight.yangtools.concepts.Registration;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Features of subscribing part of both notifications.
39 abstract class AbstractCommonSubscriber extends AbstractNotificationsData implements BaseListenerInterface {
40 private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
41 private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder()
42 .appendValue(ChronoField.YEAR, 4).appendLiteral('-')
43 .appendValue(ChronoField.MONTH_OF_YEAR, 2).appendLiteral('-')
44 .appendValue(ChronoField.DAY_OF_MONTH, 2).appendLiteral('T')
45 .appendValue(ChronoField.HOUR_OF_DAY, 2).appendLiteral(':')
46 .appendValue(ChronoField.MINUTE_OF_HOUR, 2).appendLiteral(':')
47 .appendValue(ChronoField.SECOND_OF_MINUTE, 2)
48 .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
49 .appendOffset("+HH:MM", "Z").toFormatter();
52 private final Set<StreamSessionHandler> subscribers = new HashSet<>();
54 private Registration registration;
56 // FIXME: these should be final
57 private Instant start = null;
58 private Instant stop = null;
59 private boolean leafNodesOnly = false;
60 private boolean skipNotificationData = false;
63 public final synchronized boolean hasSubscribers() {
64 return !subscribers.isEmpty();
68 public final synchronized Set<StreamSessionHandler> getSubscribers() {
69 return new HashSet<>(subscribers);
73 public final synchronized void close() throws InterruptedException, ExecutionException {
74 if (registration != null) {
78 deleteDataInDS().get();
83 public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
84 final boolean isConnected = subscriber.isConnected();
85 Preconditions.checkState(isConnected);
86 LOG.debug("Subscriber {} is added.", subscriber);
87 subscribers.add(subscriber);
91 public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
92 final boolean isConnected = subscriber.isConnected();
93 Preconditions.checkState(isConnected);
94 LOG.debug("Subscriber {} is removed", subscriber);
95 subscribers.remove(subscriber);
96 if (!hasSubscribers()) {
97 ListenersBroker.getInstance().removeAndCloseListener(this);
101 public final Instant getStart() {
106 * Set query parameters for listener.
108 * @param params NotificationQueryParams to use.
110 public final void setQueryParams(final NotificationQueryParams params) {
111 final var startTime = params.startTime();
112 start = startTime == null ? Instant.now() : parseDateAndTime(startTime.value());
114 final var stopTime = params.stopTime();
115 stop = stopTime == null ? null : parseDateAndTime(stopTime.value());
117 final var leafNodes = params.leafNodesOnly();
118 leafNodesOnly = leafNodes == null ? false : leafNodes.value();
120 final var skipData = params.skipNotificationData();
121 skipNotificationData = skipData == null ? false : skipData.value();
123 final var filter = params.filter();
124 if (filter != null) {
126 setFilter(filter.paramValue());
127 } catch (XPathExpressionException e) {
128 throw new IllegalArgumentException("Failed to get filter", e);
133 abstract void setFilter(@Nullable String xpathString) throws XPathExpressionException;
136 * Check whether this query should only notify about leaf node changes.
138 * @return true if this query should only notify about leaf node changes
140 final boolean getLeafNodesOnly() {
141 return leafNodesOnly;
145 * Check whether this query should notify changes without data.
147 * @return true if this query should notify about changes with data
149 final boolean isSkipNotificationData() {
150 return skipNotificationData;
154 * Sets {@link Registration} registration.
156 * @param registration a listener registration registration.
159 final void setRegistration(final Registration registration) {
160 this.registration = requireNonNull(registration);
164 * Checks if {@link Registration} registration exists.
166 * @return {@code true} if exists, {@code false} otherwise.
169 final boolean isListening() {
170 return registration != null;
174 * Post data to subscribed SSE session handlers.
176 * @param data Data of incoming notifications.
178 synchronized void post(final String data) {
179 final Iterator<StreamSessionHandler> iterator = subscribers.iterator();
180 while (iterator.hasNext()) {
181 final StreamSessionHandler subscriber = iterator.next();
182 final boolean isConnected = subscriber.isConnected();
184 subscriber.sendDataMessage(data);
185 LOG.debug("Data was sent to subscriber {} on connection {}:", this, subscriber);
187 // removal is probably not necessary, because it will be removed explicitly soon after invocation of
188 // onWebSocketClosed(..) in handler; but just to be sure ...
190 LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this);
195 @SuppressWarnings("checkstyle:IllegalCatch")
196 final boolean checkStartStop(final Instant now) {
198 if (start.compareTo(now) < 0 && stop.compareTo(now) > 0) {
201 if (stop.compareTo(now) < 0) {
204 } catch (final Exception e) {
205 throw new RestconfDocumentedException("Problem with unregister listener." + e);
208 } else if (start != null) {
209 if (start.compareTo(now) < 0) {
220 * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
221 * to {@link Instant} format.
223 * @param uriValue Start-time or stop-time as string in {@link DateAndTime} format.
224 * @return Parsed {@link Instant} by entry.
226 private static @NonNull Instant parseDateAndTime(final DateAndTime dateAndTime) {
227 final TemporalAccessor accessor;
229 accessor = FORMATTER.parse(dateAndTime.getValue());
230 } catch (final DateTimeParseException e) {
231 throw new RestconfDocumentedException("Cannot parse of value in date: " + dateAndTime, e);
233 return Instant.from(accessor);