*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
-import com.google.common.base.Preconditions;
+import com.google.common.base.MoreObjects;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.checkerframework.checker.lock.qual.Holding;
import org.eclipse.jdt.annotation.NonNull;
-import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
+import org.opendaylight.restconf.common.formatters.EventFormatter;
+import org.opendaylight.restconf.common.formatters.EventFormatterFactory;
import org.opendaylight.restconf.nb.rfc8040.NotificationQueryParams;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamSessionHandler;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.common.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Features of subscribing part of both notifications.
*/
-abstract class AbstractCommonSubscriber extends AbstractNotificationsData implements BaseListenerInterface {
+abstract class AbstractCommonSubscriber<P, T> extends AbstractNotificationsData implements BaseListenerInterface {
private static final Logger LOG = LoggerFactory.getLogger(AbstractCommonSubscriber.class);
private static final DateTimeFormatter FORMATTER = new DateTimeFormatterBuilder()
.appendValue(ChronoField.YEAR, 4).appendLiteral('-')
.appendFraction(ChronoField.NANO_OF_SECOND, 0, 9, true)
.appendOffset("+HH:MM", "Z").toFormatter();
+ private final EventFormatterFactory<T> formatterFactory;
+ private final NotificationOutputType outputType;
+ private final String streamName;
+ private final P path;
+
@GuardedBy("this")
private final Set<StreamSessionHandler> subscribers = new HashSet<>();
@GuardedBy("this")
private Instant stop = null;
private boolean leafNodesOnly = false;
private boolean skipNotificationData = false;
+ private EventFormatter<T> formatter;
+
+ AbstractCommonSubscriber(final QName lastQName, final String streamName, final P path,
+ final NotificationOutputType outputType, final EventFormatterFactory<T> formatterFactory) {
+ super(lastQName);
+ this.streamName = requireNonNull(streamName);
+ checkArgument(!streamName.isEmpty());
+ this.path = requireNonNull(path);
+
+ this.outputType = requireNonNull(outputType);
+ this.formatterFactory = requireNonNull(formatterFactory);
+ formatter = formatterFactory.getFormatter();
+ }
+
+ @Override
+ public final String getStreamName() {
+ return streamName;
+ }
+
+ @Override
+ public final String getOutputType() {
+ return outputType.getName();
+ }
@Override
public final synchronized boolean hasSubscribers() {
@Override
public synchronized void addSubscriber(final StreamSessionHandler subscriber) {
final boolean isConnected = subscriber.isConnected();
- Preconditions.checkState(isConnected);
+ checkState(isConnected);
LOG.debug("Subscriber {} is added.", subscriber);
subscribers.add(subscriber);
}
@Override
public synchronized void removeSubscriber(final StreamSessionHandler subscriber) {
final boolean isConnected = subscriber.isConnected();
- Preconditions.checkState(isConnected);
+ checkState(isConnected);
LOG.debug("Subscriber {} is removed", subscriber);
subscribers.remove(subscriber);
if (!hasSubscribers()) {
/**
* Set query parameters for listener.
*
- * @param params NotificationQueryParams to use.
+ * @param params NotificationQueryParams to use.
*/
public final void setQueryParams(final NotificationQueryParams params) {
final var startTime = params.startTime();
skipNotificationData = skipData == null ? false : skipData.value();
final var filter = params.filter();
- if (filter != null) {
+ final String filterValue = filter == null ? null : filter.paramValue();
+ if (filterValue != null && !filterValue.isEmpty()) {
try {
- setFilter(filter.paramValue());
+ formatter = formatterFactory.getFormatter(filterValue);
} catch (XPathExpressionException e) {
throw new IllegalArgumentException("Failed to get filter", e);
}
+ } else {
+ formatter = formatterFactory.getFormatter();
}
}
- abstract void setFilter(@Nullable String xpathString) throws XPathExpressionException;
+ final P path() {
+ return path;
+ }
/**
* Check whether this query should only notify about leaf node changes.
return skipNotificationData;
}
+ final EventFormatter<T> formatter() {
+ return formatter;
+ }
+
/**
* Sets {@link Registration} registration.
*
return false;
}
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("path", path)
+ .add("stream-name", streamName)
+ .add("output-type", getOutputType())
+ .toString();
+ }
+
/**
* Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
* to {@link Instant} format.