43c9ce6669feecdf1f9b7f143ae3a74fddb5b7aa
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / AbstractCommonSubscriber.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Preconditions;
13 import java.time.Instant;
14 import java.time.format.DateTimeFormatter;
15 import java.time.format.DateTimeFormatterBuilder;
16 import java.time.format.DateTimeParseException;
17 import java.time.temporal.ChronoField;
18 import java.time.temporal.TemporalAccessor;
19 import java.util.HashSet;
20 import java.util.Iterator;
21 import java.util.Set;
22 import java.util.concurrent.ExecutionException;
23 import javax.xml.xpath.XPathExpressionException;
24 import org.checkerframework.checker.lock.qual.GuardedBy;
25 import org.checkerframework.checker.lock.qual.Holding;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.eclipse.jdt.annotation.Nullable;
28 import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
29 import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
30 import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
31 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
32 import org.opendaylight.yangtools.concepts.Registration;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Features of subscribing part of both notifications.
38  */
39 abstract class AbstractCommonSubscriber extends AbstractNotificationsData implements BaseListenerInterface {
40     private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
41     private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder()
42         .appendValue(ChronoField.YEAR, 4).appendLiteral('-')
43         .appendValue(ChronoField.MONTH_OF_YEAR, 2).appendLiteral('-')
44         .appendValue(ChronoField.DAY_OF_MONTH, 2).appendLiteral('T')
45         .appendValue(ChronoField.HOUR_OF_DAY, 2).appendLiteral(':')
46         .appendValue(ChronoField.MINUTE_OF_HOUR, 2).appendLiteral(':')
47         .appendValue(ChronoField.SECOND_OF_MINUTE, 2)
48         .appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
49         .appendOffset("+HH:MM", "Z").toFormatter();
50
51     @GuardedBy("this")
52     private final Set<StreamSessionHandler> subscribers = new HashSet<>();
53     @GuardedBy("this")
54     private Registration registration;
55
56     // FIXME: these should be final
57     private Instant start = null;
58     private Instant stop = null;
59     private boolean leafNodesOnly = false;
60     private boolean skipNotificationData = false;
61
62     @Override
63     public final synchronized boolean hasSubscribers() {
64         return !subscribers.isEmpty();
65     }
66
67     @Override
68     public final synchronized Set<StreamSessionHandler> getSubscribers() {
69         return new HashSet<>(subscribers);
70     }
71
72     @Override
73     public final synchronized void close() throws InterruptedException, ExecutionException {
74         if (registration != null) {
75             registration.close();
76             registration = null;
77         }
78         deleteDataInDS().get();
79         subscribers.clear();
80     }
81
82     @Override
83     public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
84         final boolean isConnected = subscriber.isConnected();
85         Preconditions.checkState(isConnected);
86         LOG.debug("Subscriber {} is added.", subscriber);
87         subscribers.add(subscriber);
88     }
89
90     @Override
91     public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
92         final boolean isConnected = subscriber.isConnected();
93         Preconditions.checkState(isConnected);
94         LOG.debug("Subscriber {} is removed", subscriber);
95         subscribers.remove(subscriber);
96         if (!hasSubscribers()) {
97             ListenersBroker.getInstance().removeAndCloseListener(this);
98         }
99     }
100
101     public final Instant getStart() {
102         return start;
103     }
104
105     /**
106      * Set query parameters for listener.
107      *
108      * @param params     NotificationQueryParams to use.
109      */
110     public final void setQueryParams(final NotificationQueryParams params) {
111         final var startTime = params.startTime();
112         start = startTime == null ? Instant.now() : parseDateAndTime(startTime.value());
113
114         final var stopTime = params.stopTime();
115         stop = stopTime == null ? null : parseDateAndTime(stopTime.value());
116
117         final var leafNodes = params.leafNodesOnly();
118         leafNodesOnly = leafNodes == null ? false : leafNodes.value();
119
120         final var skipData = params.skipNotificationData();
121         skipNotificationData = skipData == null ? false : skipData.value();
122
123         final var filter = params.filter();
124         if (filter != null) {
125             try {
126                 setFilter(filter.paramValue());
127             } catch (XPathExpressionException e) {
128                 throw new IllegalArgumentException("Failed to get filter", e);
129             }
130         }
131     }
132
133     abstract void setFilter(@Nullable String xpathString) throws XPathExpressionException;
134
135     /**
136      * Check whether this query should only notify about leaf node changes.
137      *
138      * @return true if this query should only notify about leaf node changes
139      */
140     final boolean getLeafNodesOnly() {
141         return leafNodesOnly;
142     }
143
144     /**
145      * Check whether this query should notify changes without data.
146      *
147      * @return true if this query should notify about changes with  data
148      */
149     final boolean isSkipNotificationData() {
150         return skipNotificationData;
151     }
152
153     /**
154      * Sets {@link Registration} registration.
155      *
156      * @param registration a listener registration registration.
157      */
158     @Holding("this")
159     final void setRegistration(final Registration registration) {
160         this.registration = requireNonNull(registration);
161     }
162
163     /**
164      * Checks if {@link Registration} registration exists.
165      *
166      * @return {@code true} if exists, {@code false} otherwise.
167      */
168     @Holding("this")
169     final boolean isListening() {
170         return registration != null;
171     }
172
173     /**
174      * Post data to subscribed SSE session handlers.
175      *
176      * @param data Data of incoming notifications.
177      */
178     synchronized void post(final String data) {
179         final Iterator<StreamSessionHandler> iterator = subscribers.iterator();
180         while (iterator.hasNext()) {
181             final StreamSessionHandler subscriber = iterator.next();
182             final boolean isConnected = subscriber.isConnected();
183             if (isConnected) {
184                 subscriber.sendDataMessage(data);
185                 LOG.debug("Data was sent to subscriber {} on connection {}:", this, subscriber);
186             } else {
187                 // removal is probably not necessary, because it will be removed explicitly soon after invocation of
188                 // onWebSocketClosed(..) in handler; but just to be sure ...
189                 iterator.remove();
190                 LOG.debug("Subscriber for {} was removed - web-socket session is not open.", this);
191             }
192         }
193     }
194
195     @SuppressWarnings("checkstyle:IllegalCatch")
196     final boolean checkStartStop(final Instant now) {
197         if (stop != null) {
198             if (start.compareTo(now) < 0 && stop.compareTo(now) > 0) {
199                 return true;
200             }
201             if (stop.compareTo(now) < 0) {
202                 try {
203                     close();
204                 } catch (final Exception e) {
205                     throw new RestconfDocumentedException("Problem with unregister listener." + e);
206                 }
207             }
208         } else if (start != null) {
209             if (start.compareTo(now) < 0) {
210                 start = null;
211                 return true;
212             }
213         } else {
214             return true;
215         }
216         return false;
217     }
218
219     /**
220      * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
221      * to {@link Instant} format.
222      *
223      * @param uriValue Start-time or stop-time as string in {@link DateAndTime} format.
224      * @return Parsed {@link Instant} by entry.
225      */
226     private static @NonNull Instant parseDateAndTime(final DateAndTime dateAndTime) {
227         final TemporalAccessor accessor;
228         try {
229             accessor = FORMATTER.parse(dateAndTime.getValue());
230         } catch (final DateTimeParseException e) {
231             throw new RestconfDocumentedException("Cannot parse of value in date: " + dateAndTime, e);
232         }
233         return Instant.from(accessor);
234     }
235 }