ListenerAdapter, AbstractStream et al. have rather unfortunate names.
Fix this by renaming:
- AbstractStream to RestconfStream
- AbstractNotificationListenerAdaptor to AbstractNotificationStream
- DeviceNotificationListenerAdaptor to DeviceNotificatioNStream
- ListenerAdapter to DataTreeChangeStream
- NotificationListenerAdapter to NotificationStream
JIRA: NETCONF-1102
Change-Id: Ib42d89344079c9815a628d9353abea15c67a72d5
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import org.slf4j.LoggerFactory;
/**
- * Abstract base class for functionality shared between {@link NotificationListenerAdapter} and
- * {@link DeviceNotificationListenerAdaptor}.
+ * Abstract base class for functionality shared between {@link NotificationStream} and
+ * {@link DeviceNotificationStream}.
*/
-abstract class AbstractNotificationListenerAdaptor extends AbstractStream<DOMNotification>
- implements DOMNotificationListener {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationListenerAdaptor.class);
+abstract class AbstractNotificationStream extends RestconfStream<DOMNotification> implements DOMNotificationListener {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractNotificationStream.class);
- AbstractNotificationListenerAdaptor(final ListenersBroker listenersBroker, final String streamName,
+ AbstractNotificationStream(final ListenersBroker listenersBroker, final String name,
final NotificationOutputType outputType) {
- super(listenersBroker, streamName, outputType, getFormatterFactory(outputType));
- }
-
- private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
- return switch (outputType) {
+ super(listenersBroker, name, outputType, switch (outputType) {
case JSON -> JSONNotificationFormatter.FACTORY;
case XML -> XMLNotificationFormatter.FACTORY;
- };
+ });
}
@Override
import static org.opendaylight.restconf.nb.rfc8040.streams.NotificationFormatter.XML_OUTPUT_FACTORY;
import java.io.IOException;
-import java.util.Collection;
+import java.util.List;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.dom.DOMResult;
/**
* Base formatter for DataTreeCandidates which only handles exporting to a document for filter checking purpose.
*/
-abstract class DataTreeCandidateFormatter extends EventFormatter<Collection<DataTreeCandidate>> {
+abstract class DataTreeCandidateFormatter extends EventFormatter<List<DataTreeCandidate>> {
DataTreeCandidateFormatter(final TextParameters textParams) {
super(textParams);
}
@Override
final void fillDocument(final Document doc, final EffectiveModelContext schemaContext,
- final Collection<DataTreeCandidate> input) throws IOException {
+ final List<DataTreeCandidate> input) throws IOException {
final Element notificationElement = NotificationFormatter.createNotificationElement(doc);
final Element notificationEventElement = doc.createElementNS(
SAL_REMOTE_NAMESPACE, DATA_CHANGED_NOTIFICATION_ELEMENT);
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import java.util.Collection;
+import java.util.List;
import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
-abstract class DataTreeCandidateFormatterFactory extends EventFormatterFactory<Collection<DataTreeCandidate>> {
+abstract class DataTreeCandidateFormatterFactory extends EventFormatterFactory<List<DataTreeCandidate>> {
DataTreeCandidateFormatterFactory(final DataTreeCandidateFormatter emptyFormatter) {
super(emptyFormatter);
}
import com.google.common.base.MoreObjects.ToStringHelper;
import java.time.Instant;
-import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.eclipse.jdt.annotation.NonNull;
import org.slf4j.LoggerFactory;
/**
- * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
+ * A {@link RestconfStream} reporting changes on a particular data tree.
*/
-public class ListenerAdapter extends AbstractStream<Collection<DataTreeCandidate>>
+public class DataTreeChangeStream extends RestconfStream<List<DataTreeCandidate>>
implements ClusteredDOMDataTreeChangeListener {
- private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeStream.class);
private final DatabindProvider databindProvider;
private final @NonNull LogicalDatastoreType datastore;
private final @NonNull YangInstanceIdentifier path;
- /**
- * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
- *
- * @param path Path to data in data store.
- * @param streamName The name of the stream.
- * @param outputType Type of output on notification (JSON, XML).
- */
- ListenerAdapter(final ListenersBroker listenersBroker, final String streamName,
+ DataTreeChangeStream(final ListenersBroker listenersBroker, final String name,
final NotificationOutputType outputType, final DatabindProvider databindProvider,
final LogicalDatastoreType datastore, final YangInstanceIdentifier path) {
- super(listenersBroker, streamName, outputType, getFormatterFactory(outputType));
+ super(listenersBroker, name, outputType, switch (outputType) {
+ case JSON -> JSONDataTreeCandidateFormatter.FACTORY;
+ case XML -> XMLDataTreeCandidateFormatter.FACTORY;
+ });
this.databindProvider = requireNonNull(databindProvider);
this.datastore = requireNonNull(datastore);
this.path = requireNonNull(path);
}
- private static DataTreeCandidateFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
- return switch (outputType) {
- case JSON -> JSONDataTreeCandidateFormatter.FACTORY;
- case XML -> XMLDataTreeCandidateFormatter.FACTORY;
- };
- }
-
@Override
public void onInitialData() {
// No-op
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.mdsal.dom.api.DOMNotificationService;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
/**
- * {@link DeviceNotificationListenerAdaptor} is responsible to track events on notifications.
+ * A {@link RestconfStream} reporting YANG notifications coming from a mounted device.
*/
-public final class DeviceNotificationListenerAdaptor extends AbstractNotificationListenerAdaptor
- implements DOMMountPointListener {
+public final class DeviceNotificationStream extends AbstractNotificationStream implements DOMMountPointListener {
private final @NonNull EffectiveModelContext effectiveModel;
private final @NonNull DOMMountPointService mountPointService;
private final @NonNull YangInstanceIdentifier instanceIdentifier;
- private ListenerRegistration<DOMMountPointListener> reg;
+ private Registration reg;
- DeviceNotificationListenerAdaptor(final ListenersBroker listenersBroker, final String streamName,
+ DeviceNotificationStream(final ListenersBroker listenersBroker, final String name,
final NotificationOutputType outputType, final EffectiveModelContext effectiveModel,
final DOMMountPointService mountPointService, final YangInstanceIdentifier instanceIdentifier) {
- super(listenersBroker, streamName, outputType);
+ super(listenersBroker, name, outputType);
this.effectiveModel = requireNonNull(effectiveModel);
this.mountPointService = requireNonNull(mountPointService);
this.instanceIdentifier = requireNonNull(instanceIdentifier);
import java.io.IOException;
import java.io.StringWriter;
import java.time.Instant;
-import java.util.Collection;
+import java.util.List;
import javax.xml.xpath.XPathExpressionException;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.$YangModuleInfoImpl;
@Override
String createText(final TextParameters params, final EffectiveModelContext schemaContext,
- final Collection<DataTreeCandidate> input, final Instant now) throws IOException {
+ final List<DataTreeCandidate> input, final Instant now) throws IOException {
try (var writer = new StringWriter()) {
boolean nonEmpty = false;
try (var jsonWriter = new JsonWriter(writer)) {
import org.slf4j.LoggerFactory;
/**
- * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
- * {@link NotificationListenerAdapter} listeners.
+ * This singleton class is responsible for creation, removal and searching for {@link DataTreeChangeStream} or
+ * {@link NotificationStream} listeners.
*/
// FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
// names. We essentially need a component which deals with allocation of stream names and their lifecycle and
}
/**
- * Factory interface for creating instances of {@link AbstractStream}.
+ * Factory interface for creating instances of {@link RestconfStream}.
*
- * @param <T> {@link AbstractStream} type
+ * @param <T> {@link RestconfStream} type
*/
@FunctionalInterface
- public interface StreamFactory<T extends AbstractStream<?>> {
+ public interface StreamFactory<T extends RestconfStream<?>> {
/**
* Create a stream with the supplied name.
*
* @param name Stream name
- * @return An {@link AbstractStream}
+ * @return An {@link RestconfStream}
*/
@NonNull T createStream(@NonNull String name);
}
// private static final QName LOCATION_QNAME = QName.create(Notifi.QNAME, "location").intern();
// private static final NodeIdentifier LOCATION_NODEID = NodeIdentifier.create(LOCATION_QNAME);
-// private static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
-// private static final String STREAM_PATH_PART = "/stream=";
-// private static final String STREAM_PATH = STREAMS_PATH + STREAM_PATH_PART;
-// private static final String STREAM_ACCESS_PATH_PART = "/access=";
-// private static final String STREAM_LOCATION_PATH_PART = "/location";
//
// private final ListenersBroker listenersBroker;
// private final HandlersHolder handlersHolder;
private static final NodeIdentifier STREAM_NAME_NODEID =
NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern());
- private final ConcurrentMap<String, AbstractStream<?>> streams = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, RestconfStream<?>> streams = new ConcurrentHashMap<>();
private final DOMDataBroker dataBroker;
private ListenersBroker(final DOMDataBroker dataBroker) {
}
/**
- * Get an {@link AbstractStream} by its name.
+ * Get a {@link RestconfStream} by its name.
*
* @param streamName Stream name.
- * @return An {@link AbstractStream}, or {@code null} if the stream with specified name does not exist.
+ * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist.
* @throws NullPointerException if {@code streamName} is {@code null}
*/
- public final @Nullable AbstractStream<?> getStream(final String streamName) {
+ public final @Nullable RestconfStream<?> getStream(final String streamName) {
return streams.get(streamName);
}
/**
- * Create an {@link AbstractStream} with a unique name. This method will atomically generate a stream name, create
+ * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name, create
* the corresponding instance and register it
*
* @param <T> Stream type
* @param factory Factory for creating the actual stream instance
- * @return An {@link AbstractStream} instance
+ * @return A {@link RestconfStream} instance
* @throws NullPointerException if {@code factory} is {@code null}
*/
- public final <T extends AbstractStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
+ public final <T extends RestconfStream<?>> @NonNull T createStream(final StreamFactory<T> factory) {
String name;
T stream;
do {
*
* @param stream Stream to remove
*/
- final void removeStream(final AbstractStream<?> stream) {
+ final void removeStream(final RestconfStream<?> stream) {
// Defensive check to see if we are still tracking the stream
- final var streamName = stream.getStreamName();
+ final var streamName = stream.name();
if (streams.get(streamName) != stream) {
LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream);
return;
: LogicalDatastoreType.CONFIGURATION;
final var path = preparePath(input);
final var outputType = prepareOutputType(input);
- final var adapter = createStream(name -> new ListenerAdapter(this, name, outputType, databindProvider,
+ final var adapter = createStream(name -> new DataTreeChangeStream(this, name, outputType, databindProvider,
datastore, path));
// building of output
return RestconfFuture.of(Optional.of(Builders.containerBuilder()
.withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
- .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
+ .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.name()))
.build()));
}
// registration of the listener
final var outputType = prepareOutputType(input);
- final var adapter = createStream(name -> new NotificationListenerAdapter(this, name, outputType,
+ final var adapter = createStream(name -> new NotificationStream(this, name, outputType,
databindProvider, qnames));
return RestconfFuture.of(Optional.of(Builders.containerBuilder()
.withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID)
- .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.getStreamName()))
+ .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, adapter.name()))
.build()));
}
final var outputType = prepareOutputType(input);
final var notificationListenerAdapter = createStream(
- streamName -> new DeviceNotificationListenerAdaptor(this, streamName, outputType, mountModelContext,
+ streamName -> new DeviceNotificationStream(this, streamName, outputType, mountModelContext,
mountPointService, mountPoint.getIdentifier()));
notificationListenerAdapter.listen(mountNotifService, notificationPaths);
return RestconfFuture.of(Optional.of(Builders.containerBuilder()
.withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME))
.withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH,
- baseUrl + notificationListenerAdapter.getStreamName()))
+ baseUrl + notificationListenerAdapter.name()))
.build()));
}
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
/**
- * {@link NotificationListenerAdapter} is responsible to track events on notifications.
+ * A {@link RestconfStream} reporting YANG notifications.
*/
-public final class NotificationListenerAdapter extends AbstractNotificationListenerAdaptor {
+public final class NotificationStream extends AbstractNotificationStream {
private final DatabindProvider databindProvider;
private final ImmutableSet<QName> paths;
- /**
- * Set path of listener and stream name.
- *
- * @param paths Top-level Schema path of YANG notification.
- * @param streamName Name of the stream.
- * @param outputType Type of output on notification (JSON or XML).
- * @param listenersBroker Associated {@link ListenersBroker}
- */
- NotificationListenerAdapter(final ListenersBroker listenersBroker, final String streamName,
+ NotificationStream(final ListenersBroker listenersBroker, final String name,
final NotificationOutputType outputType, final DatabindProvider databindProvider,
final ImmutableSet<QName> paths) {
- super(listenersBroker, streamName, outputType);
+ super(listenersBroker, name, outputType);
this.databindProvider = requireNonNull(databindProvider);
this.paths = requireNonNull(paths);
}
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
import com.google.common.base.MoreObjects;
/**
* Base superclass for all stream types.
*/
-abstract class AbstractStream<T> {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractStream.class);
+abstract class RestconfStream<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class);
- private final EventFormatterFactory<T> formatterFactory;
- private final NotificationOutputType outputType;
- private final String streamName;
- protected final @NonNull ListenersBroker listenersBroker;
+ private final @NonNull ListenersBroker listenersBroker;
+ private final @NonNull String name;
@GuardedBy("this")
private final Set<StreamSessionHandler> subscribers = new HashSet<>();
private Registration registration;
// FIXME: NETCONF-1102: this should be tied to a subscriber
+ private final EventFormatterFactory<T> formatterFactory;
+ private final NotificationOutputType outputType;
private @NonNull EventFormatter<T> formatter;
- AbstractStream(final ListenersBroker listenersBroker, final String streamName,
- final NotificationOutputType outputType, final EventFormatterFactory<T> formatterFactory) {
+ RestconfStream(final ListenersBroker listenersBroker, final String name, final NotificationOutputType outputType,
+ final EventFormatterFactory<T> formatterFactory) {
this.listenersBroker = requireNonNull(listenersBroker);
- this.streamName = requireNonNull(streamName);
- checkArgument(!streamName.isEmpty());
-
+ this.name = requireNonNull(name);
this.outputType = requireNonNull(outputType);
this.formatterFactory = requireNonNull(formatterFactory);
formatter = formatterFactory.emptyFormatter();
*
* @return Stream name.
*/
- public final String getStreamName() {
- return streamName;
+ public final @NonNull String name() {
+ return name;
}
/**
return outputType.getName();
}
- /**
- * Checks if exists at least one {@link StreamSessionHandler} subscriber.
- *
- * @return {@code true} if exist at least one {@link StreamSessionHandler} subscriber, {@code false} otherwise.
- */
- final synchronized boolean hasSubscribers() {
- return !subscribers.isEmpty();
- }
-
/**
* Registers {@link StreamSessionHandler} subscriber.
*
* @param subscriber SSE or WS session handler.
*/
synchronized void addSubscriber(final StreamSessionHandler subscriber) {
- final boolean isConnected = subscriber.isConnected();
- checkState(isConnected);
+ if (!subscriber.isConnected()) {
+ throw new IllegalStateException(subscriber + " is not connected");
+ }
LOG.debug("Subscriber {} is added.", subscriber);
subscribers.add(subscriber);
}
public final void setQueryParams(final ReceiveEventsParams params) {
final var startTime = params.startTime();
if (startTime != null) {
- throw new RestconfDocumentedException("Stream " + streamName + " does not support replay",
+ throw new RestconfDocumentedException("Stream " + name + " does not support replay",
ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
}
}
ToStringHelper addToStringAttributes(final ToStringHelper helper) {
- return helper.add("stream-name", streamName).add("output-type", getOutputType());
+ return helper.add("name", name).add("output-type", getOutputType());
}
}
private final ScheduledExecutorService executorService;
// FIXME: this really should include subscription details like formatter etc.
- private final AbstractStream<?> listener;
+ private final RestconfStream<?> listener;
private final int maximumFragmentLength;
private final int heartbeatInterval;
private final SseEventSink sink;
* session up. Ping control frames are disabled if this parameter is set to 0.
*/
public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse,
- final AbstractStream<?> listener, final int maximumFragmentLength, final int heartbeatInterval) {
+ final RestconfStream<?> listener, final int maximumFragmentLength, final int heartbeatInterval) {
this.executorService = executorService;
this.sse = sse;
this.sink = sink;
private final ScheduledExecutorService executorService;
// FIXME: this really should include formatter etc.
- private final AbstractStream<?> listener;
+ private final RestconfStream<?> listener;
private final int maximumFragmentLength;
private final int heartbeatInterval;
* @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint
* to keep session up. Ping control frames are disabled if this parameter is set to 0.
*/
- WebSocketSessionHandler(final ScheduledExecutorService executorService, final AbstractStream<?> listener,
+ WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream<?> listener,
final int maximumFragmentLength, final int heartbeatInterval) {
this.executorService = executorService;
this.listener = listener;
import java.io.IOException;
import java.io.StringWriter;
import java.time.Instant;
-import java.util.Collection;
+import java.util.List;
import javax.xml.XMLConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.xpath.XPathExpressionException;
@Override
String createText(final TextParameters params, final EffectiveModelContext schemaContext,
- final Collection<DataTreeCandidate> input, final Instant now) throws Exception {
+ final List<DataTreeCandidate> input, final Instant now) throws Exception {
final var writer = new StringWriter();
boolean nonEmpty = false;
try {
import org.slf4j.LoggerFactory;
import org.xmlunit.assertj.XmlAssert;
-public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
- private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapterTest.class);
+public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeStreamTest.class);
private static final String JSON_NOTIF_LEAVES_CREATE = "/listener-adapter-test/notif-leaves-create.json";
private static final String JSON_NOTIF_LEAVES_UPDATE = "/listener-adapter-test/notif-leaves-update.json";
listenersBroker = new ListenersBroker.ServerSentEvents(domDataBroker);
}
- class ListenerAdapterTester extends ListenerAdapter {
-
- private volatile String lastNotification;
+ class TestDataTreeChangeStream extends DataTreeChangeStream {
private CountDownLatch notificationLatch = new CountDownLatch(1);
+ private volatile String lastNotification;
- ListenerAdapterTester(final YangInstanceIdentifier path, final String streamName,
+ TestDataTreeChangeStream(final YangInstanceIdentifier path, final String streamName,
final NotificationOutputType outputType, final boolean leafNodesOnly,
final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly,
final ListenersBroker listenersBroker) {
@Test
public void testJsonNotifsLeaves() throws Exception {
- var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
+ var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
true, false, false, false, listenersBroker);
final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@Test
public void testJsonNotifsChangedLeaves() throws Exception {
- var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
+ var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
false, false, true, false, listenersBroker);
final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@Test
public void testJsonChildNodesOnly() throws Exception {
- final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
+ final var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey",
NotificationOutputType.JSON, false, false, false, true, listenersBroker);
final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@Test
public void testXmlLeavesOnly() throws Exception {
- var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
+ var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
true, false, false, false, listenersBroker);
final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@Test
public void testXmlChangedLeavesOnly() throws Exception {
- var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
+ var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
false, false, true, false, listenersBroker);
final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@Test
public void testXmlChildNodesOnly() throws Exception {
- final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
+ final var adapter = new TestDataTreeChangeStream(PATCH_CONT_YIID, "Casey",
NotificationOutputType.XML, false, false, false, true, listenersBroker);
final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
private void jsonNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
final String jsonNotifCreate, final String jsonNotifUpdate, final String jsonNotifDelete) throws Exception {
- final var adapter = new ListenerAdapterTester(pathYiid, "Casey",
+ final var adapter = new TestDataTreeChangeStream(pathYiid, "Casey",
NotificationOutputType.JSON, false, skipData, false, false, listenersBroker);
final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
private void xmlNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
final String xmlNotifCreate, final String xmlNotifUpdate, final String xmlNotifDelete) throws Exception {
- final var adapter = new ListenerAdapterTester(pathYiid, "Casey", NotificationOutputType.XML,
+ final var adapter = new TestDataTreeChangeStream(pathYiid, "Casey", NotificationOutputType.XML,
false, skipData, false, false, listenersBroker);
final var changeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
@Mock
private ScheduledExecutorService executorService;
@Mock
- private AbstractStream<?> listener;
+ private RestconfStream<?> listener;
@Mock
private ScheduledFuture<?> pingFuture;
@Mock
listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker);
webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000);
- streamName = listenersBroker.createStream(name -> new ListenerAdapter(listenersBroker, name,
+ streamName = listenersBroker.createStream(name -> new DataTreeChangeStream(listenersBroker, name,
NotificationOutputType.JSON, databindProvider, LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))))
- .getStreamName();
+ .name();
}
@Test
public class WebSocketSessionHandlerTest {
private static final class WebSocketTestSessionState {
- private final AbstractStream<?> listener;
+ private final RestconfStream<?> listener;
private final ScheduledExecutorService executorService;
private final WebSocketSessionHandler webSocketSessionHandler;
private final int heartbeatInterval;
private final ScheduledFuture pingFuture;
WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) {
- listener = mock(AbstractStream.class);
+ listener = mock(RestconfStream.class);
executorService = mock(ScheduledExecutorService.class);
this.heartbeatInterval = heartbeatInterval;
this.maxFragmentSize = maxFragmentSize;