import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfOperationsServiceImpl;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfSchemaServiceImpl;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl;
+import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.SubscribeToStreamUtil;
import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
final DOMMountPointService mountPointService,
final RestconfStreamsSubscriptionService streamSubscription, final DOMDataBroker dataBroker,
final DOMActionService actionService, final DOMNotificationService notificationService,
- final DOMSchemaService domSchemaService, final ListenersBroker listenersBroker,
- final StreamsConfiguration configuration) {
+ final DOMSchemaService domSchemaService, final SubscribeToStreamUtil streamUtils) {
super(databindProvider, List.of(
streamSubscription,
new RestconfDataServiceImpl(databindProvider, server, dataBroker, streamSubscription, actionService),
- new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, listenersBroker,
- configuration),
+ new RestconfInvokeOperationsServiceImpl(databindProvider, server, mountPointService, streamUtils),
new RestconfOperationsServiceImpl(databindProvider, server),
new RestconfSchemaServiceImpl(domSchemaService, mountPointService),
new RestconfImpl(databindProvider)));
}
+ private RestconfApplication(final DatabindProvider databindProvider, final MdsalRestconfServer server,
+ final DOMMountPointService mountPointService, final DOMDataBroker dataBroker,
+ final DOMActionService actionService, final DOMNotificationService notificationService,
+ final DOMSchemaService domSchemaService, final SubscribeToStreamUtil streamUtils) {
+ this(databindProvider, server, mountPointService,
+ new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider, streamUtils),
+ dataBroker, actionService, notificationService, domSchemaService, streamUtils);
+ }
+
@Inject
public RestconfApplication(final DatabindProvider databindProvider, final MdsalRestconfServer server,
final DOMMountPointService mountPointService, final DOMDataBroker dataBroker,
final DOMRpcService rpcService, final DOMActionService actionService,
final DOMNotificationService notificationService, final DOMSchemaService domSchemaService,
final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
- this(databindProvider, server, mountPointService,
- new RestconfStreamsSubscriptionServiceImpl(dataBroker, notificationService, databindProvider,
- listenersBroker, configuration),
- dataBroker, actionService, notificationService, domSchemaService, listenersBroker, configuration);
+ this(databindProvider, server, mountPointService, dataBroker, actionService, notificationService,
+ domSchemaService, configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
+ : SubscribeToStreamUtil.webSockets(listenersBroker));
}
}
import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody;
import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
-import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream;
@Deprecated(forRemoval = true)
private final DOMMountPointService mountPointService;
private final SubscribeToStreamUtil streamUtils;
- private final ListenersBroker listenersBroker;
public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider,
final MdsalRestconfServer server, final DOMMountPointService mountPointService,
- final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
+ final SubscribeToStreamUtil streamUtils) {
this.databindProvider = requireNonNull(databindProvider);
this.server = requireNonNull(server);
this.mountPointService = requireNonNull(mountPointService);
- this.listenersBroker = requireNonNull(listenersBroker);
- streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
- : SubscribeToStreamUtil.webSockets(listenersBroker);
+ this.streamUtils = requireNonNull(streamUtils);
}
/**
return CreateStreamUtil.createNotificationStream(streamUtils.listenersBroker(), input,
localDatabind.modelContext());
} else if (SubscribeDeviceNotification.QNAME.equals(type)) {
- return CreateStreamUtil.createDeviceNotificationListener(listenersBroker, input,
+ return CreateStreamUtil.createDeviceNotificationListener(streamUtils.listenersBroker(), input,
streamUtils.prepareUriByStreamName(uriInfo, "").toString(), mountPointService);
}
}
*/
package org.opendaylight.restconf.nb.rfc8040.rests.services.impl;
+import static java.util.Objects.requireNonNull;
+
import java.net.URI;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
-import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yang.gen.v1.subscribe.to.notification.rev161028.Notifi;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
* @param dataBroker {@link DOMDataBroker}
* @param notificationService {@link DOMNotificationService}
* @param databindProvider a {@link DatabindProvider}
- * @param configuration configuration for RESTCONF {@link StreamsConfiguration}}
+ * @param streamUtils a {@link SubscribeToStreamUtil}
*/
public RestconfStreamsSubscriptionServiceImpl(final DOMDataBroker dataBroker,
final DOMNotificationService notificationService, final DatabindProvider databindProvider,
- final ListenersBroker listenersBroker, final StreamsConfiguration configuration) {
+ final SubscribeToStreamUtil streamUtils) {
handlersHolder = new HandlersHolder(dataBroker, notificationService, databindProvider);
- streamUtils = configuration.useSSE() ? SubscribeToStreamUtil.serverSentEvents(listenersBroker)
- : SubscribeToStreamUtil.webSockets(listenersBroker);
+ this.streamUtils = requireNonNull(streamUtils);
}
@Override
/**
* Subscribe to stream util class.
*/
-abstract class SubscribeToStreamUtil {
+public abstract class SubscribeToStreamUtil {
/**
* Implementation of SubscribeToStreamUtil for Server-sent events.
*/
private static final class ServerSentEvents extends SubscribeToStreamUtil {
-
- private ServerSentEvents(final ListenersBroker listenersBroker) {
+ ServerSentEvents(final ListenersBroker listenersBroker) {
super(listenersBroker);
}
* Implementation of SubscribeToStreamUtil for Web sockets.
*/
private static final class WebSockets extends SubscribeToStreamUtil {
-
- private WebSockets(final ListenersBroker listenersBroker) {
+ WebSockets(final ListenersBroker listenersBroker) {
super(listenersBroker);
}
private final @NonNull ListenersBroker listenersBroker;
- SubscribeToStreamUtil(final ListenersBroker listenersBroker) {
+ private SubscribeToStreamUtil(final ListenersBroker listenersBroker) {
this.listenersBroker = requireNonNull(listenersBroker);
}
- static SubscribeToStreamUtil serverSentEvents(final ListenersBroker listenersBroker) {
+ public static @NonNull SubscribeToStreamUtil serverSentEvents(final ListenersBroker listenersBroker) {
return new ServerSentEvents(listenersBroker);
}
- static SubscribeToStreamUtil webSockets(final ListenersBroker listenersBroker) {
+ public static @NonNull SubscribeToStreamUtil webSockets(final ListenersBroker listenersBroker) {
return new WebSockets(listenersBroker);
}
import org.opendaylight.restconf.common.errors.RestconfDocumentedException;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload;
-import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.yangtools.yang.common.ErrorTag;
import org.opendaylight.yangtools.yang.common.ErrorType;
public void setup() {
server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService);
invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server, mountPointService,
- new ListenersBroker(), new StreamsConfiguration(0, 1, 0, false));
+ SubscribeToStreamUtil.webSockets(new ListenersBroker()));
}
@Test
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest.MODEL_CONTEXT;
import com.google.common.collect.ImmutableClassToInstanceMap;
import java.net.URI;
import org.opendaylight.restconf.nb.rfc8040.URLConstants;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext;
import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider;
-import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.AbstractNotificationListenerTest;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
private DOMNotificationService notificationService;
private final ListenersBroker listenersBroker = new ListenersBroker();
- private StreamsConfiguration configurationWs;
- private StreamsConfiguration configurationSse;
- private DatabindProvider databindProvider;
-
+ private final DatabindProvider databindProvider = () -> DatabindContext.ofModel(MODEL_CONTEXT);
@Before
public void setUp() throws URISyntaxException {
doReturn(new MultivaluedHashMap<>()).when(uriInfo).getQueryParameters();
doReturn(UriBuilder.fromUri("http://localhost:8181")).when(uriInfo).getBaseUriBuilder();
doReturn(new URI("http://127.0.0.1/" + URI)).when(uriInfo).getAbsolutePath();
-
- databindProvider = () -> DatabindContext.ofModel(MODEL_CONTEXT);
- configurationWs = new StreamsConfiguration(0, 100, 10, false);
- configurationSse = new StreamsConfiguration(0, 100, 10, true);
}
@Test
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
NotificationOutputType.XML);
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, listenersBroker, configurationSse);
+ notificationService, databindProvider, SubscribeToStreamUtil.serverSentEvents(listenersBroker));
final var response = streamsSubscriptionService.subscribeToStream(
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
assertEquals("http://localhost:8181/" + URLConstants.BASE_PATH + "/" + URLConstants.SSE_SUBPATH
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", MODEL_CONTEXT), Scope.ONE,
NotificationOutputType.XML);
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, listenersBroker, configurationWs);
+ notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
final var response = streamsSubscriptionService.subscribeToStream(
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", uriInfo);
assertEquals("ws://localhost:8181/" + URLConstants.BASE_PATH
@Test
public void testSubscribeToStreamMissingDatastoreInPath() {
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, listenersBroker, configurationWs);
+ notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
final var errors = assertThrows(RestconfDocumentedException.class,
() -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/scope=ONE", uriInfo))
.getErrors();
@Test
public void testSubscribeToStreamMissingScopeInPath() {
final var streamsSubscriptionService = new RestconfStreamsSubscriptionServiceImpl(dataBroker,
- notificationService, databindProvider, listenersBroker, configurationWs);
+ notificationService, databindProvider, SubscribeToStreamUtil.webSockets(listenersBroker));
final var errors = assertThrows(RestconfDocumentedException.class,
() -> streamsSubscriptionService.subscribeToStream("toaster:toaster/toasterStatus/datastore=OPERATIONAL",
uriInfo)).getErrors();
public abstract class AbstractNotificationListenerTest {
static final QNameModule MODULE = QNameModule.create(XMLNamespace.of("notifi:mod"), Revision.of("2016-11-23"));
- public static final EffectiveModelContext MODEL_CONTEXT =
+ protected static final EffectiveModelContext MODEL_CONTEXT =
YangParserTestUtils.parseYangResourceDirectory("/notifications");
}