import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yangtools.concepts.Registration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@Reference final DOMMountPointService mountPointService,
@Reference final DOMNotificationService notificationService, @Reference final DOMRpcService rpcService,
@Reference final DOMSchemaService schemaService, @Reference final DatabindProvider databindProvider,
- @Reference final MdsalRestconfServer server, final Configuration configuration) throws ServletException {
+ @Reference final MdsalRestconfServer server, @Reference final ListenersBroker listenersBroker,
+ final Configuration configuration) throws ServletException {
this(webServer, webContextSecurer, servletSupport, filterAdapterConfiguration, actionService, dataBroker,
mountPointService, notificationService, rpcService, schemaService, databindProvider, server,
- configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
- new StreamsConfiguration(configuration.maximum$_$fragment$_$length(), configuration.idle$_$timeout(),
- configuration.heartbeat$_$interval(), configuration.use$_$sse()));
+ listenersBroker, configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(),
+ new StreamsConfiguration(configuration.maximum$_$fragment$_$length(),
+ configuration.idle$_$timeout(), configuration.heartbeat$_$interval(), configuration.use$_$sse()));
}
public JaxRsNorthbound(final WebServer webServer, final WebContextSecurer webContextSecurer,
final DOMMountPointService mountPointService, final DOMNotificationService notificationService,
final DOMRpcService rpcService, final DOMSchemaService schemaService,
final DatabindProvider databindProvider, final MdsalRestconfServer server,
- final String pingNamePrefix, final int pingMaxThreadCount,
+ final ListenersBroker listenersBroker, final String pingNamePrefix, final int pingMaxThreadCount,
final StreamsConfiguration streamsConfiguration) throws ServletException {
final var scheduledThreadPool = new ScheduledThreadPoolWrapper(pingMaxThreadCount,
new NamingThreadPoolFactory(pingNamePrefix));
.addUrlPattern("/*")
.servlet(servletSupport.createHttpServletBuilder(
new RestconfApplication(databindProvider, server, mountPointService, dataBroker, rpcService,
- actionService, notificationService, schemaService, streamsConfiguration))
+ actionService, notificationService, schemaService, listenersBroker, streamsConfiguration))
.build())
.asyncSupported(true)
.build())
.addUrlPattern("/" + SSE_SUBPATH + "/*")
.servlet(servletSupport.createHttpServletBuilder(
new DataStreamApplication(databindProvider,
- new RestconfDataStreamServiceImpl(scheduledThreadPool, streamsConfiguration)))
+ new RestconfDataStreamServiceImpl(scheduledThreadPool, listenersBroker, streamsConfiguration)))
.build())
.name("notificationServlet")
.asyncSupported(true)
.addServlet(ServletDetails.builder()
.addUrlPattern("/" + DATA_SUBSCRIPTION + "/*")
.addUrlPattern("/" + NOTIFICATION_STREAM + "/*")
- .servlet(new WebSocketInitializer(scheduledThreadPool, streamsConfiguration))
+ .servlet(new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration))
.build())
// Allows user to add javax.servlet.Filter(s) in front of REST services
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfSchemaServiceImpl;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
@Singleton
public class RestconfApplication extends AbstractRestconfApplication {
final DOMMountPointService mountPointService,
final RestconfStreamsSubscriptionService streamSubscription, final DOMDataBroker dataBroker,
final DOMActionService actionService, final DOMNotificationService notificationService,
- final DOMSchemaService domSchemaService, final StreamsConfiguration configuration) {
+ final DOMSchemaService domSchemaService, final ListenersBroker listenersBroker,
+ final StreamsConfiguration configuration) {
super(databindProvider, List.of(
streamSubscription,
new RestconfDataServiceImpl(databindProvider, server, dataBroker, streamSubscription, actionService,
+ listenersBroker, configuration),
+ new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, listenersBroker,
configuration),
- new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, configuration),
new RestconfOperationsServiceImpl(databindProvider, server),
new RestconfSchemaServiceImpl(domSchemaService, mountPointService),
new RestconfImpl(databindProvider)));
final DOMMountPointService mountPointService, final DOMDataBroker dataBroker,
final DOMRpcService rpcService, final DOMActionService actionService,
final DOMNotificationService notificationService, final DOMSchemaService domSchemaService,
- final StreamsConfiguration configuration) {
+ final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
this(databindProvider, server, mountPointService,
new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
- configuration),
- dataBroker, actionService, notificationService, domSchemaService, configuration);
+ listenersBroker, configuration),
+ dataBroker, actionService, notificationService, domSchemaService, listenersBroker, configuration);
}
}
*
* @param baseUrl base Url
* @param input RPC input
- * @param streamUtil stream utility
* @param mountPointService dom mount point service
+ * @param listenersBroker {@link ListenersBroker}
* @return {@link DOMRpcResult} - Output of RPC - example in JSON
*/
// FIXME: this should be an RPC invocation
static ContainerNode createDeviceNotificationListener(final String baseUrl, final ContainerNode input,
- final SubscribeToStreamUtil streamUtil, final DOMMountPointService mountPointService) {
+ final SubscribeToStreamUtil streamUtil, final DOMMountPointService mountPointService,
+ final ListenersBroker listenersBroker) {
// parsing out of container with settings and path
// FIXME: ugly cast
final YangInstanceIdentifier path =
ErrorTag.OPERATION_FAILED);
}
- final DeviceNotificationListenerAdaptor notificationListenerAdapter = streamUtil.listenersBroker()
+ final DeviceNotificationListenerAdaptor notificationListenerAdapter = listenersBroker
.registerDeviceNotificationListener(deviceName, prepareOutputType(input), mountModelContext,
mountPointService, mountPoint.getIdentifier());
notificationListenerAdapter.listen(mountNotifService, notificationPaths);
private final MdsalRestconfServer server;
@Deprecated(forRemoval = true)
private final DOMDataBroker dataBroker;
- private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+ private final ListenersBroker listenersBroker;
public RestconfDataServiceImpl(final DatabindProvider databindProvider, final MdsalRestconfServer server,
final DOMDataBroker dataBroker, final RestconfStreamsSubscriptionService delegRestconfSubscrService,
- final DOMActionService actionService, final StreamsConfiguration configuration) {
+ final DOMActionService actionService, final ListenersBroker listenersBroker,
+ final StreamsConfiguration configuration) {
this.databindProvider = requireNonNull(databindProvider);
this.server = requireNonNull(server);
this.dataBroker = requireNonNull(dataBroker);
this.delegRestconfSubscrService = requireNonNull(delegRestconfSubscrService);
this.actionService = requireNonNull(actionService);
- streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
- : SubscribeToStreamUtil.webSockets();
+ this.listenersBroker = requireNonNull(listenersBroker);
+ streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+ : SubscribeToStreamUtil.webSockets(listenersBroker);
}
/**
final var notifName = notification.argument();
writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction,
- createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.XML));
+ createYangNotifiStream(moduleName, notifName, NotificationOutputType.XML));
writeNotificationStreamToDatastore(schemaContext, uriInfo, transaction,
- createYangNotifiStream(listenersBroker, moduleName, notifName, NotificationOutputType.JSON));
+ createYangNotifiStream(moduleName, notifName, NotificationOutputType.JSON));
});
}
}
}
- private static NotificationListenerAdapter createYangNotifiStream(final ListenersBroker listenersBroker,
- final String moduleName, final QName notifName, final NotificationOutputType outputType) {
+ private NotificationListenerAdapter createYangNotifiStream(final String moduleName, final QName notifName,
+ final NotificationOutputType outputType) {
final var streamName = createNotificationStreamName(moduleName, notifName.getLocalName(), outputType);
final var existing = listenersBroker.notificationListenerFor(streamName);
*/
package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
+import static java.util.Objects.requireNonNull;
+
import java.util.concurrent.ScheduledExecutorService;
import javax.inject.Inject;
import javax.inject.Singleton;
public final class RestconfDataStreamServiceImpl {
private static final Logger LOG = LoggerFactory.getLogger(RestconfDataStreamServiceImpl.class);
- private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+ private final ListenersBroker listenersBroker;
private final ScheduledExecutorService executorService;
private final int maximumFragmentLength;
private final int heartbeatInterval;
@Inject
public RestconfDataStreamServiceImpl(final ScheduledThreadPool scheduledThreadPool,
- final StreamsConfiguration configuration) {
+ final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
executorService = scheduledThreadPool.getExecutor();
+ this.listenersBroker = requireNonNull(listenersBroker);
heartbeatInterval = configuration.heartbeatInterval();
maximumFragmentLength = configuration.maximumFragmentLength();
}
import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
import org.opendaylight.yangtools.yang.common.ErrorTag;
@Deprecated(forRemoval = true)
private final DOMMountPointService mountPointService;
private final SubscribeToStreamUtil streamUtils;
+ private final ListenersBroker listenersBroker;
public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
final MdsalRestconfServer server, final DOMMountPointService mountPointService,
- final StreamsConfiguration configuration) {
+ final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
this.databindProvider = requireNonNull(databindProvider);
this.server = requireNonNull(server);
this.mountPointService = requireNonNull(mountPointService);
- streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
- : SubscribeToStreamUtil.webSockets();
+ this.listenersBroker = requireNonNull(listenersBroker);
+ streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+ : SubscribeToStreamUtil.webSockets(listenersBroker);
}
/**
} else if (SubscribeDeviceNotification.QNAME.equals(type)) {
final var baseUrl = streamUtils.prepareUriByStreamName(uriInfo, "").toString();
return RestconfFuture.of(Optional.of(CreateStreamUtil.createDeviceNotificationListener(baseUrl, input,
- streamUtils, mountPointService)));
+ streamUtils, mountPointService, listenersBroker)));
}
}
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yang.gen.v1.subscribe.to.notification.rev161028.Notifi;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
*/
public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
final DOMNotificationService notificationService, final DatabindProvider databindProvider,
- final StreamsConfiguration configuration) {
+ final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
- streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents()
- : SubscribeToStreamUtil.webSockets();
+ streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+ : SubscribeToStreamUtil.webSockets(listenersBroker);
}
@Override
* Implementation of SubscribeToStreamUtil for Server-sent events.
*/
private static final class ServerSentEvents extends SubscribeToStreamUtil {
- static final ServerSentEvents INSTANCE = new ServerSentEvents(ListenersBroker.getInstance());
private ServerSentEvents(final ListenersBroker listenersBroker) {
super(listenersBroker);
* Implementation of SubscribeToStreamUtil for Web sockets.
*/
private static final class WebSockets extends SubscribeToStreamUtil {
- static final WebSockets INSTANCE = new WebSockets(ListenersBroker.getInstance());
private WebSockets(final ListenersBroker listenersBroker) {
super(listenersBroker);
this.listenersBroker = requireNonNull(listenersBroker);
}
- static SubscribeToStreamUtil serverSentEvents() {
- return ServerSentEvents.INSTANCE;
+ static SubscribeToStreamUtil serverSentEvents(final ListenersBroker listenersBroker) {
+ return new ServerSentEvents(listenersBroker);
}
- static SubscribeToStreamUtil webSockets() {
- return WebSockets.INSTANCE;
+ static SubscribeToStreamUtil webSockets(final ListenersBroker listenersBroker) {
+ return new WebSockets(listenersBroker);
}
public final @NonNull ListenersBroker listenersBroker() {
*/
package org.opendaylight.restconf.nb.rfc8040.streams;
+import static java.util.Objects.requireNonNull;
+
import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serial;
private final int maximumFragmentLength;
private final int heartbeatInterval;
private final int idleTimeoutMillis;
+ @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Required for session mgmt")
+ private final ListenersBroker listenersBroker;
/**
* Creation of the web-socket initializer.
*/
@Inject
public WebSocketInitializer(final ScheduledThreadPool scheduledThreadPool,
- final StreamsConfiguration configuration) {
+ final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
executorService = scheduledThreadPool.getExecutor();
+ this.listenersBroker = requireNonNull(listenersBroker);
maximumFragmentLength = configuration.maximumFragmentLength();
heartbeatInterval = configuration.heartbeatInterval();
idleTimeoutMillis = configuration.idleTimeout();
@Override
public void configure(final WebSocketServletFactory factory) {
factory.getPolicy().setIdleTimeout(idleTimeoutMillis);
- factory.setCreator(new WebSocketFactory(executorService, maximumFragmentLength, heartbeatInterval));
+ factory.setCreator(new WebSocketFactory(executorService, listenersBroker, maximumFragmentLength,
+ heartbeatInterval));
}
/**
private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class);
private final ScheduledExecutorService executorService;
- // FIXME: inject this reference
- private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+ private final ListenersBroker listenersBroker;
private final int maximumFragmentLength;
private final int heartbeatInterval;
* (exceeded message length leads to fragmentation of messages).
* @param heartbeatInterval Interval in milliseconds between sending of ping control frames.
*/
- WebSocketFactory(final ScheduledExecutorService executorService, final int maximumFragmentLength,
- final int heartbeatInterval) {
+ WebSocketFactory(final ScheduledExecutorService executorService, final ListenersBroker listenersBroker,
+ final int maximumFragmentLength, final int heartbeatInterval) {
this.executorService = executorService;
+ this.listenersBroker = listenersBroker;
this.maximumFragmentLength = maximumFragmentLength;
this.heartbeatInterval = heartbeatInterval;
}
private final EventFormatterFactory<T> formatterFactory;
private final NotificationOutputType outputType;
private final String streamName;
+ private final ListenersBroker listenersBroker;
@GuardedBy("this")
private final Set<StreamSessionHandler> subscribers = new HashSet<>();
private Instant stop = null;
AbstractCommonSubscriber(final String streamName, final NotificationOutputType outputType,
- final EventFormatterFactory<T> formatterFactory) {
+ final EventFormatterFactory<T> formatterFactory, final ListenersBroker listenersBroker) {
this.streamName = requireNonNull(streamName);
checkArgument(!streamName.isEmpty());
this.outputType = requireNonNull(outputType);
this.formatterFactory = requireNonNull(formatterFactory);
+ this.listenersBroker = requireNonNull(listenersBroker);
formatter = formatterFactory.emptyFormatter();
}
subscribers.remove(subscriber);
LOG.debug("Subscriber {} is removed", subscriber);
if (!hasSubscribers()) {
- ListenersBroker.getInstance().removeAndCloseListener(this);
+ listenersBroker.removeAndCloseListener(this);
}
}
private static final NotificationFormatterFactory JSON_FORMATTER_FACTORY =
JSONNotificationFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
- AbstractNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType) {
- super(streamName, outputType, getFormatterFactory(outputType));
+ AbstractNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType,
+ final ListenersBroker listenersBroker) {
+ super(streamName, outputType, getFormatterFactory(outputType), listenersBroker);
}
private static NotificationFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
private final @NonNull EffectiveModelContext effectiveModel;
private final @NonNull DOMMountPointService mountPointService;
private final @NonNull YangInstanceIdentifier instanceIdentifier;
- private final @NonNull ListenersBroker listenersBroker = ListenersBroker.getInstance();
+ private final @NonNull ListenersBroker listenersBroker;
private ListenerRegistration<DOMMountPointListener> reg;
public DeviceNotificationListenerAdaptor(final String streamName, final NotificationOutputType outputType,
final EffectiveModelContext effectiveModel, final DOMMountPointService mountPointService,
- final YangInstanceIdentifier path) {
- super(streamName, outputType);
+ final YangInstanceIdentifier path, final ListenersBroker listenersBroker) {
+ super(streamName, outputType, listenersBroker);
this.effectiveModel = requireNonNull(effectiveModel);
this.mountPointService = requireNonNull(mountPointService);
instanceIdentifier = requireNonNull(path);
+ this.listenersBroker = requireNonNull(listenersBroker);
}
public synchronized void listen(final DOMNotificationService notificationService, final Set<Absolute> paths) {
*/
@VisibleForTesting
public ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
- final NotificationOutputType outputType) {
- super(streamName, outputType, getFormatterFactory(outputType));
+ final NotificationOutputType outputType, final ListenersBroker listenersBroker) {
+ super(streamName, outputType, getFormatterFactory(outputType), listenersBroker);
this.path = requireNonNull(path);
}
import static java.util.Objects.requireNonNull;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.StampedLock;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.mdsal.dom.api.DOMMountPointService;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
* {@link NotificationListenerAdapter} listeners.
*/
-// FIXME: NETCONF-1104: this should be a component
// FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream
// names. We essentially need a component which deals with allocation of stream names and their lifecycle and
// the contents of /restconf-state/streams.
+@Singleton
+@Component(service = ListenersBroker.class, immediate = true)
public final class ListenersBroker {
- // FIXME: NETCONF-1104: remove this class
- @Deprecated(since = "7.0.0")
- private static final class Holder {
- static final ListenersBroker INSTANCE = new ListenersBroker();
- }
-
private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
private final StampedLock dataChangeListenersLock = new StampedLock();
private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
private final BiMap<String, DeviceNotificationListenerAdaptor> deviceNotificationListeners = HashBiMap.create();
- private ListenersBroker() {
- // FIXME: NETCONF-1104: this constructor should be a public thing
- }
+ @Inject
+ @Activate
+ public ListenersBroker() {
- /**
- * Creation of the singleton listeners broker.
- *
- * @return Reusable instance of {@link ListenersBroker}.
- */
- // FIXME: NETCONF-1104: remove this method
- @Deprecated(since = "7.0.0")
- public static ListenersBroker getInstance() {
- return Holder.INSTANCE;
}
/**
final long stamp = dataChangeListenersLock.writeLock();
try {
return dataChangeListeners.computeIfAbsent(streamName,
- stream -> new ListenerAdapter(path, stream, outputType));
+ stream -> new ListenerAdapter(path, stream, outputType, this));
} finally {
dataChangeListenersLock.unlockWrite(stamp);
}
final long stamp = notificationListenersLock.writeLock();
try {
return notificationListeners.computeIfAbsent(streamName,
- stream -> new NotificationListenerAdapter(schemaPath, stream, outputType));
+ stream -> new NotificationListenerAdapter(schemaPath, stream, outputType, this));
} finally {
notificationListenersLock.unlockWrite(stamp);
}
try {
return deviceNotificationListeners.computeIfAbsent(streamName,
stream -> new DeviceNotificationListenerAdaptor(streamName, outputType, refSchemaCtx,
- mountPointService, path));
+ mountPointService, path, this));
} finally {
deviceNotificationListenersLock.unlockWrite(stamp);
}
}
return result;
}
-
- @VisibleForTesting
- public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
- final long stamp = dataChangeListenersLock.writeLock();
- try {
- dataChangeListeners.clear();
- dataChangeListeners.putAll(listenerAdapterCollection);
- } finally {
- dataChangeListenersLock.unlockWrite(stamp);
- }
- }
}
\ No newline at end of file
* @param streamName Name of the stream.
* @param outputType Type of output on notification (JSON or XML).
*/
- NotificationListenerAdapter(final Absolute path, final String streamName, final NotificationOutputType outputType) {
- super(streamName, outputType);
+ NotificationListenerAdapter(final Absolute path, final String streamName, final NotificationOutputType outputType,
+ final ListenersBroker listenersBroker) {
+ super(streamName, outputType, listenersBroker);
this.path = requireNonNull(path);
}
public class CreateStreamUtilTest {
private static EffectiveModelContext SCHEMA_CTX;
- private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+ private final ListenersBroker listenersBroker = new ListenersBroker();
@BeforeClass
public static void setUp() {
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
final var dataService = new RestconfDataServiceImpl(() -> DatabindContext.ofModel(IID_SCHEMA),
new MdsalRestconfServer(dataBroker, rpcService, mountPointService), dataBroker, restconfStreamSubService,
- actionService, new StreamsConfiguration(0, 1, 0, false));
+ actionService, new ListenersBroker(), new StreamsConfiguration(0, 1, 0, false));
final var response = dataService.postDataJSON("instance-identifier-module:cont/cont1/reset",
stringInputStream("""
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.patch.rev170222.yang.patch.yang.patch.Edit.Operation;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
.withChild(Builders.mapBuilder().withNodeIdentifier(PLAYLIST_NID).build())
.build();
- private RestconfDataServiceImpl dataService;
-
@Mock
private UriInfo uriInfo;
@Mock
@Mock
private AsyncResponse asyncResponse;
+ private RestconfDataServiceImpl dataService;
+
@Before
public void setUp() throws Exception {
doReturn(Set.of()).when(queryParamenters).entrySet();
dataService = new RestconfDataServiceImpl(() -> DatabindContext.ofModel(JUKEBOX_SCHEMA),
new MdsalRestconfServer(dataBroker, rpcService, mountPointService), dataBroker, delegRestconfSubscrService,
- actionService, new StreamsConfiguration(0, 1, 0, false));
+ actionService, new ListenersBroker(), new StreamsConfiguration(0, 1, 0, false));
doReturn(Optional.of(mountPoint)).when(mountPointService)
.getMountPoint(any(YangInstanceIdentifier.class));
doReturn(Optional.of(FixedDOMSchemaService.of(JUKEBOX_SCHEMA))).when(mountPoint)
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
import org.opendaylight.yangtools.yang.common.QName;
public void setup() {
server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService);
invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server, mountPointService,
- new StreamsConfiguration(0, 1, 0, false));
+ new ListenersBroker(), new StreamsConfiguration(0, 1, 0, false));
}
@Test
import com.google.common.collect.ImmutableClassToInstanceMap;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Map;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
-import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+ "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
private static EffectiveModelContext MODEL_CONTEXT;
- // FIXME: NETCONF-1104: this should be non-static and set up for each test separately
- private static ListenersBroker LISTENERS_BROKER;
@Mock
private DOMDataBroker dataBroker;
@Mock
private DOMNotificationService notificationService;
+ private final ListenersBroker listenersBroker = new ListenersBroker();
private StreamsConfiguration configurationWs;
private StreamsConfiguration configurationSse;
-
private DatabindProvider databindProvider;
@BeforeClass
public static void beforeClass() {
MODEL_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications");
-
- final String name =
- "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
- final ListenerAdapter adapter = new ListenerAdapter(YangInstanceIdentifier.of(
- QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
- name, NotificationOutputType.JSON);
- LISTENERS_BROKER = ListenersBroker.getInstance();
- LISTENERS_BROKER.setDataChangeListeners(Map.of(name, adapter));
- }
-
- @AfterClass
- public static void afterClass() {
- if (LISTENERS_BROKER != null) {
- LISTENERS_BROKER.setDataChangeListeners(Map.of());
- LISTENERS_BROKER = null;
- }
}
@Before
public void setUp() throws URISyntaxException {
+ final var name = "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
+ listenersBroker.registerDataChangeListener(
+ YangInstanceIdentifier.of(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")),
+ name, NotificationOutputType.JSON);
final DOMDataTreeWriteTransaction wTx = mock(DOMDataTreeWriteTransaction.class);
doReturn(wTx).when(dataBroker).newWriteOnlyTransaction();
doReturn(CommitInfo.emptyFluentFuture()).when(wTx).commit();
@Test
public void testSubscribeToStreamSSE() {
- LISTENERS_BROKER.registerDataChangeListener(
+ listenersBroker.registerDataChangeListener(
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
NotificationOutputType.XML);
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, configurationSse);
+ notificationService, databindProvider, listenersBroker, configurationSse);
final var response = streamsSubscriptionService.subscribeToStream(
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
assertEquals("http://localhost:8181/" + URLConstants.BASE_PATH + "/" + URLConstants.SSE_SUBPATH
@Test
public void testSubscribeToStreamWS() {
- LISTENERS_BROKER.registerDataChangeListener(
+ listenersBroker.registerDataChangeListener(
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT),
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
NotificationOutputType.XML);
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, configurationWs);
+ notificationService, databindProvider, listenersBroker, configurationWs);
final var response = streamsSubscriptionService.subscribeToStream(
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
assertEquals("ws://localhost:8181/" + URLConstants.BASE_PATH
public void testSubscribeToStreamMissingDatastoreInPath() {
final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
- configurationWs);
+ listenersBroker, configurationWs);
final var errors = assertThrows(RestconfDocumentedException.class,
() -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
.getErrors();
public void testSubscribeToStreamMissingScopeInPath() {
final RestconfStreamsSubscriptionServiceImpl streamsSubscriptionService =
new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
- configurationWs);
+ listenersBroker, configurationWs);
final var errors = assertThrows(RestconfDocumentedException.class,
() -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
uriInfo)).getErrors();
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer.WebSocketFactory;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
public class WebSocketFactoryTest {
-
private static final String REGISTERED_STREAM_NAME = "data-change-event-subscription/"
+ "toaster:toaster/datastore=CONFIGURATION/scope=SUBTREE";
private static final YangInstanceIdentifier TOASTER_YIID = YangInstanceIdentifier.builder()
.node(QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"))
.build();
+ private final ListenersBroker listenersBroker = new ListenersBroker();
+
private final WebSocketFactory webSocketFactory = new WebSocketFactory(mock(ScheduledExecutorService.class),
- 5000, 2000);
+ listenersBroker, 5000, 2000);
- @BeforeClass
- public static void prepareListenersBroker() {
- ListenersBroker.getInstance().registerDataChangeListener(TOASTER_YIID, REGISTERED_STREAM_NAME,
+ @Before
+ public void prepareListenersBroker() {
+ listenersBroker.registerDataChangeListener(TOASTER_YIID, REGISTERED_STREAM_NAME,
NotificationOutputTypeGrouping.NotificationOutputType.JSON);
}
public class JsonNotificationListenerTest extends AbstractNotificationListenerTest {
private static final Logger LOG = LoggerFactory.getLogger(JsonNotificationListenerTest.class);
- private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+ private final ListenersBroker listenersBroker = new ListenersBroker();
@Test
public void notifi_leafTest() throws Exception {
private static EffectiveModelContext SCHEMA_CONTEXT;
+ private final ListenersBroker listenersBroker = new ListenersBroker();
private DataBroker dataBroker;
private DOMDataBroker domDataBroker;
private DatabindProvider databindProvider;
ListenerAdapterTester(final YangInstanceIdentifier path, final String streamName,
final NotificationOutputType outputType, final boolean leafNodesOnly,
- final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly) {
- super(path, streamName, outputType);
+ final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly,
+ final ListenersBroker listenersBroker) {
+ super(path, streamName, outputType, listenersBroker);
setQueryParams(NotificationQueryParams.of(StartTimeParam.forUriValue("1970-01-01T00:00:00Z"), null, null,
leafNodesOnly ? LeafNodesOnlyParam.of(true) : null,
skipNotificationData ? SkipNotificationDataParam.of(true) : null,
@Test
public void testJsonNotifsLeaves() throws Exception {
ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
- true, false, false, false);
+ true, false, false, false, listenersBroker);
adapter.setCloseVars(domDataBroker, databindProvider);
final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
@Test
public void testJsonNotifsChangedLeaves() throws Exception {
ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.JSON,
- false, false, true, false);
+ false, false, true, false, listenersBroker);
adapter.setCloseVars(domDataBroker, databindProvider);
final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
@Test
public void testJsonChildNodesOnly() throws Exception {
final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
- NotificationOutputType.JSON, false, false, false, true);
+ NotificationOutputType.JSON, false, false, false, true, listenersBroker);
adapter.setCloseVars(domDataBroker, databindProvider);
final var changeService = domDataBroker.getExtensions()
@Test
public void testXmlLeavesOnly() throws Exception {
ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
- true, false, false, false);
+ true, false, false, false, listenersBroker);
adapter.setCloseVars(domDataBroker, databindProvider);
DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
@Test
public void testXmlChangedLeavesOnly() throws Exception {
ListenerAdapterTester adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey", NotificationOutputType.XML,
- false, false, true, false);
+ false, false, true, false, listenersBroker);
adapter.setCloseVars(domDataBroker, databindProvider);
DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
@Test
public void testXmlChildNodesOnly() throws Exception {
final var adapter = new ListenerAdapterTester(PATCH_CONT_YIID, "Casey",
- NotificationOutputType.XML, false, false, false, true);
+ NotificationOutputType.XML, false, false, false, true, listenersBroker);
adapter.setCloseVars(domDataBroker, databindProvider);
final var changeService = domDataBroker.getExtensions()
private void jsonNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
final String jsonNotifCreate, final String jsonNotifUpdate, final String jsonNotifDelete) throws Exception {
final var adapter = new ListenerAdapterTester(pathYiid, "Casey",
- NotificationOutputType.JSON, false, skipData, false, false);
+ NotificationOutputType.JSON, false, skipData, false, false, listenersBroker);
adapter.setCloseVars(domDataBroker, databindProvider);
final var changeService = domDataBroker.getExtensions()
private void xmlNotifications(final YangInstanceIdentifier pathYiid, final boolean skipData,
final String xmlNotifCreate, final String xmlNotifUpdate, final String xmlNotifDelete) throws Exception {
final var adapter = new ListenerAdapterTester(pathYiid, "Casey", NotificationOutputType.XML,
- false, skipData, false, false);
+ false, skipData, false, false, listenersBroker);
adapter.setCloseVars(domDataBroker, databindProvider);
final var changeService = domDataBroker.getExtensions()
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class XmlNotificationListenerTest extends AbstractNotificationListenerTest {
- private final ListenersBroker listenersBroker = ListenersBroker.getInstance();
+ private final ListenersBroker listenersBroker = new ListenersBroker();
@Test
public void notifi_leafTest() throws Exception {