From 8616c9655db145125c23ca8e66b5f27745126458 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 8 Nov 2023 07:44:48 +0100 Subject: [PATCH] Introduce restconf.server.{api,spi,mdsal} This patch refactors restconf-nb wiring by introducing: - RestconfServer, capable of invokeRpc() - RestconfStreamRegistry, being the public interface to ListenersBroker - RpcImplementation, being a native RestconfServer RPC invocation interface This allows us to split out RPCs and logic for individual RestconfStream.Sources -- which now become normal components injected into MdsalRestconfServer. This necessitates dealing with the fact that ListenerBroker requires configuration. We take this on by splitting Config Admin interactions into OSGiNorthbound, which then instantiates components based on configuration. After the dust settles, we end up ditching a controller dependency and have a pretty crisp model of what gets done where and how. JIRA: NETCONF-773 Change-Id: I3ea139dd811b487d676353adf075dcb102abc0c5 Signed-off-by: Robert Varga --- features/odl-restconf-nb/pom.xml | 6 - .../src/main/feature/feature.xml | 1 - restconf/restconf-nb/pom.xml | 21 +- .../restconf/nb/rfc8040/JaxRsNorthbound.java | 67 +-- .../restconf/nb/rfc8040/OSGiNorthbound.java | 120 +++++ .../nb/rfc8040/RestconfApplication.java | 7 +- .../rfc8040/ServerSentEventsApplication.java | 32 -- .../services/impl/MdsalRestconfServer.java | 69 ++- .../impl/RestconfDataServiceImpl.java | 46 +- .../RestconfInvokeOperationsServiceImpl.java | 78 +--- .../impl/RestconfOperationsServiceImpl.java | 17 +- .../transactions/MdsalRestconfStrategy.java | 12 +- .../transactions/NetconfRestconfStrategy.java | 3 +- .../rests/transactions/RestconfStrategy.java | 79 ++-- .../rfc8040/streams/DefaultPingExecutor.java | 75 +++ .../DefaultRestconfStreamServletFactory.java | 84 ++++ .../nb/rfc8040/streams/ListenersBroker.java | 433 ------------------ .../nb/rfc8040/streams/PingExecutor.java | 18 + .../streams/RestconfStreamServletFactory.java | 20 + .../nb/rfc8040/streams/SSEApplication.java | 29 ++ ...{SSESessionHandler.java => SSESender.java} | 42 +- .../SSEStreamService.java} | 30 +- .../rfc8040/streams/StreamSessionHandler.java | 25 - .../rfc8040/streams/StreamsConfiguration.java | 3 +- .../nb/rfc8040/streams/WebSocketFactory.java | 16 +- .../rfc8040/streams/WebSocketInitializer.java | 20 +- ...ssionHandler.java => WebSocketSender.java} | 35 +- .../restconf/server/api/RestconfServer.java | 25 + .../restconf/server/api/package-info.java | 11 + .../mdsal}/CapabilitiesWriter.java | 2 +- .../mdsal/MdsalRestconfStreamRegistry.java | 81 ++++ .../devnotif}/DeviceNotificationSource.java | 12 +- .../SubscribeDeviceNotificationRpc.java | 92 ++++ .../mdsal/streams/devnotif/package-info.java | 11 + .../CreateDataChangeEventSubscriptionRpc.java | 121 +++++ .../dtcl}/DataTreeCandidateFormatter.java | 6 +- .../DataTreeCandidateFormatterFactory.java | 3 +- .../dtcl/DataTreeCandidateSerializer.java} | 11 +- .../streams/dtcl}/DataTreeChangeSource.java | 21 +- .../dtcl}/JSONDataTreeCandidateFormatter.java | 9 +- .../JSONDataTreeCandidateSerializer.java} | 6 +- .../dtcl}/XMLDataTreeCandidateFormatter.java | 11 +- .../dtcl/XMLDataTreeCandidateSerializer.java} | 6 +- .../mdsal/streams/dtcl/package-info.java | 11 + .../notif}/AbstractNotificationSource.java | 19 +- .../notif/CreateNotificationStreamRpc.java | 109 +++++ .../notif}/JSONNotificationFormatter.java | 5 +- .../streams/notif}/NotificationFormatter.java | 8 +- .../notif}/NotificationFormatterFactory.java | 4 +- .../streams/notif}/NotificationSource.java | 17 +- .../notif}/XMLNotificationFormatter.java | 5 +- .../mdsal/streams/notif/package-info.java | 11 + .../spi/AbstractRestconfStreamRegistry.java | 191 ++++++++ .../spi}/EventFormatter.java | 31 +- .../spi}/EventFormatterFactory.java | 2 +- .../restconf/server/spi/OperationInput.java | 40 ++ .../restconf/server/spi/OperationOutput.java | 33 ++ .../spi}/RestconfStream.java | 62 ++- .../server/spi/RpcImplementation.java | 70 +++ .../streams => server/spi}/Subscriber.java | 13 +- .../streams => server/spi}/Subscribers.java | 12 +- .../spi}/TextParameters.java | 13 +- .../restconf/server/spi/package-info.java | 11 + .../nb/rfc8040/CapabilitiesWriterTest.java | 37 -- .../impl/MdsalRestconfServerTest.java | 12 +- .../rests/services/impl/Netconf799Test.java | 6 +- .../impl/RestconfDataServiceImplTest.java | 6 +- ...stconfInvokeOperationsServiceImplTest.java | 42 +- .../RestconfOperationsServiceImplTest.java | 6 +- .../AbstractNotificationListenerTest.java | 4 +- .../streams/DataTreeChangeStreamTest.java | 24 +- .../streams/SSESessionHandlerTest.java | 36 +- .../rfc8040/streams/WebSocketFactoryTest.java | 26 +- .../streams/WebSocketSessionHandlerTest.java | 41 +- .../server/mdsal/CapabilitiesWriterTest.java | 37 ++ .../CreateNotificationStreamRpcTest.java} | 113 ++--- .../notif}/JSONNotificationFormatterTest.java | 3 +- .../notif}/XMLNotificationFormatterTest.java | 3 +- .../AbstractRestconfStreamRegistryTest.java} | 24 +- 79 files changed, 1780 insertions(+), 1123 deletions(-) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/OSGiNorthbound.java delete mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/ServerSentEventsApplication.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DefaultPingExecutor.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DefaultRestconfStreamServletFactory.java delete mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/PingExecutor.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamServletFactory.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSEApplication.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/{SSESessionHandler.java => SSESender.java} (80%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/{rests/services/impl/RestconfDataStreamServiceImpl.java => streams/SSEStreamService.java} (73%) delete mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamSessionHandler.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/{WebSocketSessionHandler.java => WebSocketSender.java} (89%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/api/RestconfServer.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/api/package-info.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040 => server/mdsal}/CapabilitiesWriter.java (99%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamRegistry.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/devnotif}/DeviceNotificationSource.java (90%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/SubscribeDeviceNotificationRpc.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/package-info.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/CreateDataChangeEventSubscriptionRpc.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/dtcl}/DataTreeCandidateFormatter.java (91%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/dtcl}/DataTreeCandidateFormatterFactory.java (83%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams/AbstractWebsocketSerializer.java => server/mdsal/streams/dtcl/DataTreeCandidateSerializer.java} (95%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/dtcl}/DataTreeChangeSource.java (80%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/dtcl}/JSONDataTreeCandidateFormatter.java (90%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams/JsonDataTreeCandidateSerializer.java => server/mdsal/streams/dtcl/JSONDataTreeCandidateSerializer.java} (94%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/dtcl}/XMLDataTreeCandidateFormatter.java (85%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams/XmlDataTreeCandidateSerializer.java => server/mdsal/streams/dtcl/XMLDataTreeCandidateSerializer.java} (94%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/package-info.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/notif}/AbstractNotificationSource.java (68%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/CreateNotificationStreamRpc.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/notif}/JSONNotificationFormatter.java (92%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/notif}/NotificationFormatter.java (88%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/notif}/NotificationFormatterFactory.java (75%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/notif}/NotificationSource.java (79%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/notif}/XMLNotificationFormatter.java (92%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/package-info.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistry.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/spi}/EventFormatter.java (82%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/spi}/EventFormatterFactory.java (95%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/OperationInput.java create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/OperationOutput.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/spi}/RestconfStream.java (83%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/RpcImplementation.java rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/spi}/Subscriber.java (73%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/spi}/Subscribers.java (95%) rename restconf/restconf-nb/src/main/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/spi}/TextParameters.java (69%) create mode 100644 restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/package-info.java delete mode 100644 restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/CapabilitiesWriterTest.java create mode 100644 restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/CapabilitiesWriterTest.java rename restconf/restconf-nb/src/test/java/org/opendaylight/restconf/{nb/rfc8040/streams/ListenersBrokerTest.java => server/mdsal/streams/dtcl/CreateNotificationStreamRpcTest.java} (66%) rename restconf/restconf-nb/src/test/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/notif}/JSONNotificationFormatterTest.java (97%) rename restconf/restconf-nb/src/test/java/org/opendaylight/restconf/{nb/rfc8040/streams => server/mdsal/streams/notif}/XMLNotificationFormatterTest.java (97%) rename restconf/restconf-nb/src/test/java/org/opendaylight/restconf/{nb/rfc8040/streams/RestconfStateStreamsTest.java => server/spi/AbstractRestconfStreamRegistryTest.java} (86%) diff --git a/features/odl-restconf-nb/pom.xml b/features/odl-restconf-nb/pom.xml index 4f997a375d..b372bdab5e 100644 --- a/features/odl-restconf-nb/pom.xml +++ b/features/odl-restconf-nb/pom.xml @@ -27,12 +27,6 @@ xml features - - org.opendaylight.controller - odl-controller-exp-netty-config - xml - features - org.opendaylight.netconf odl-restconf-common diff --git a/features/odl-restconf-nb/src/main/feature/feature.xml b/features/odl-restconf-nb/src/main/feature/feature.xml index dc53f8a52c..b99ff8a868 100644 --- a/features/odl-restconf-nb/src/main/feature/feature.xml +++ b/features/odl-restconf-nb/src/main/feature/feature.xml @@ -9,7 +9,6 @@ odl-mdsal-model-rfc8072 - odl-controller-exp-netty-config mvn:org.opendaylight.netconf/restconf-nb/${project.version}/cfg/config diff --git a/restconf/restconf-nb/pom.xml b/restconf/restconf-nb/pom.xml index 316973929e..9e75308014 100644 --- a/restconf/restconf-nb/pom.xml +++ b/restconf/restconf-nb/pom.xml @@ -59,6 +59,18 @@ true provided + + org.eclipse.jdt + org.eclipse.jdt.annotation + + + org.osgi + org.osgi.framework + + + org.osgi + org.osgi.service.component + org.osgi org.osgi.service.component.annotations @@ -195,15 +207,6 @@ rfc8040-ietf-restconf-monitoring - - org.opendaylight.controller - threadpool-config-api - - - org.opendaylight.controller - threadpool-config-impl - - net.java.dev.stax-utils stax-utils diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/JaxRsNorthbound.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/JaxRsNorthbound.java index 5bce185ad7..363a425767 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/JaxRsNorthbound.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/JaxRsNorthbound.java @@ -9,7 +9,6 @@ package org.opendaylight.restconf.nb.rfc8040; import com.google.common.annotations.Beta; import javax.servlet.ServletException; -import javax.servlet.http.HttpServlet; import org.opendaylight.aaa.filterchain.configuration.CustomFilterAdapterConfiguration; import org.opendaylight.aaa.filterchain.filters.CustomFilterAdapter; import org.opendaylight.aaa.web.FilterDetails; @@ -18,8 +17,6 @@ import org.opendaylight.aaa.web.WebContext; import org.opendaylight.aaa.web.WebContextSecurer; import org.opendaylight.aaa.web.WebServer; import org.opendaylight.aaa.web.servlet.ServletSupport; -import org.opendaylight.controller.config.threadpool.util.NamingThreadPoolFactory; -import org.opendaylight.controller.config.threadpool.util.ScheduledThreadPoolWrapper; import org.opendaylight.mdsal.dom.api.DOMActionService; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMMountPointService; @@ -28,43 +25,19 @@ import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.mdsal.dom.api.DOMSchemaService; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.MdsalRestconfServer; -import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker; -import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration; -import org.opendaylight.restconf.nb.rfc8040.streams.WebSocketInitializer; +import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStreamServletFactory; import org.opendaylight.yangtools.concepts.Registration; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Reference; -import org.osgi.service.metatype.annotations.AttributeDefinition; -import org.osgi.service.metatype.annotations.Designate; -import org.osgi.service.metatype.annotations.ObjectClassDefinition; /** * Main entrypoint into RFC8040 northbound. Take care of wiring up all applications activating them through JAX-RS. */ @Beta -@Component(service = { }, configurationPid = "org.opendaylight.restconf.nb.rfc8040") -@Designate(ocd = JaxRsNorthbound.Configuration.class) +@Component(service = { }) public final class JaxRsNorthbound implements AutoCloseable { - @ObjectClassDefinition - public @interface Configuration { - @AttributeDefinition(min = "0", max = "" + StreamsConfiguration.MAXIMUM_FRAGMENT_LENGTH_LIMIT) - int maximum$_$fragment$_$length() default 0; - @AttributeDefinition(min = "0") - int heartbeat$_$interval() default 10000; - @AttributeDefinition(min = "1") - int idle$_$timeout() default 30000; - @AttributeDefinition(min = "1") - String ping$_$executor$_$name$_$prefix() default "ping-executor"; - // FIXME: this is a misnomer: it specifies the core pool size, i.e. minimum thread count, the maximum is set to - // Integer.MAX_VALUE, which is not what we want - @AttributeDefinition(min = "0") - int max$_$thread$_$count() default 1; - @AttributeDefinition - boolean use$_$sse() default true; - } - private final Registration discoveryReg; private final Registration restconfReg; @@ -76,36 +49,8 @@ public final class JaxRsNorthbound implements AutoCloseable { @Reference final DOMMountPointService mountPointService, @Reference final DOMNotificationService notificationService, @Reference final DOMRpcService rpcService, @Reference final DOMSchemaService schemaService, @Reference final DatabindProvider databindProvider, - @Reference final MdsalRestconfServer server, final Configuration configuration) throws ServletException { - this(webServer, webContextSecurer, servletSupport, filterAdapterConfiguration, actionService, dataBroker, - mountPointService, notificationService, rpcService, schemaService, databindProvider, server, - configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count(), - new StreamsConfiguration(configuration.maximum$_$fragment$_$length(), - configuration.idle$_$timeout(), configuration.heartbeat$_$interval(), configuration.use$_$sse())); - } - - public JaxRsNorthbound(final WebServer webServer, final WebContextSecurer webContextSecurer, - final ServletSupport servletSupport, final CustomFilterAdapterConfiguration filterAdapterConfiguration, - final DOMActionService actionService, final DOMDataBroker dataBroker, - final DOMMountPointService mountPointService, final DOMNotificationService notificationService, - final DOMRpcService rpcService, final DOMSchemaService schemaService, - final DatabindProvider databindProvider, final MdsalRestconfServer server, final String pingNamePrefix, - final int pingMaxThreadCount, final StreamsConfiguration streamsConfiguration) throws ServletException { - final var scheduledThreadPool = new ScheduledThreadPoolWrapper(pingMaxThreadCount, - new NamingThreadPoolFactory(pingNamePrefix)); - - final ListenersBroker listenersBroker; - final HttpServlet streamServlet; - if (streamsConfiguration.useSSE()) { - listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker, notificationService, mountPointService); - streamServlet = servletSupport.createHttpServletBuilder( - new ServerSentEventsApplication(scheduledThreadPool, listenersBroker, streamsConfiguration)) - .build(); - } else { - listenersBroker = new ListenersBroker.WebSockets(dataBroker, notificationService, mountPointService); - streamServlet = new WebSocketInitializer(scheduledThreadPool, listenersBroker, streamsConfiguration); - } - + @Reference final MdsalRestconfServer server, @Reference final RestconfStreamServletFactory servletFactory) + throws ServletException { final var restconfBuilder = WebContext.builder() .name("RFC8040 RESTCONF") .contextPath("/" + URLConstants.BASE_PATH) @@ -114,13 +59,13 @@ public final class JaxRsNorthbound implements AutoCloseable { .addUrlPattern("/*") .servlet(servletSupport.createHttpServletBuilder( new RestconfApplication(databindProvider, server, mountPointService, dataBroker, actionService, - notificationService, schemaService, listenersBroker)) + notificationService, schemaService)) .build()) .asyncSupported(true) .build()) .addServlet(ServletDetails.builder() .addUrlPattern("/" + URLConstants.STREAMS_SUBPATH + "/*") - .servlet(streamServlet) + .servlet(servletFactory.newStreamServlet()) .name("notificationServlet") .asyncSupported(true) .build()) diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/OSGiNorthbound.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/OSGiNorthbound.java new file mode 100644 index 0000000000..a4e4223ba1 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/OSGiNorthbound.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; +import org.opendaylight.restconf.nb.rfc8040.streams.DefaultPingExecutor; +import org.opendaylight.restconf.nb.rfc8040.streams.DefaultRestconfStreamServletFactory; +import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration; +import org.opendaylight.restconf.server.mdsal.MdsalRestconfStreamRegistry; +import org.osgi.framework.FrameworkUtil; +import org.osgi.service.component.ComponentFactory; +import org.osgi.service.component.ComponentInstance; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Modified; +import org.osgi.service.component.annotations.Reference; +import org.osgi.service.metatype.annotations.AttributeDefinition; +import org.osgi.service.metatype.annotations.Designate; +import org.osgi.service.metatype.annotations.ObjectClassDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Component managing global RESTCONF northbound configuration. + */ +@Component(service = { }, configurationPid = "org.opendaylight.restconf.nb.rfc8040") +@Designate(ocd = OSGiNorthbound.Configuration.class) +public final class OSGiNorthbound { + @ObjectClassDefinition + public @interface Configuration { + @AttributeDefinition(min = "0", max = "" + StreamsConfiguration.MAXIMUM_FRAGMENT_LENGTH_LIMIT) + int maximum$_$fragment$_$length() default 0; + @AttributeDefinition(min = "0") + int heartbeat$_$interval() default 10000; + @AttributeDefinition(min = "1") + int idle$_$timeout() default 30000; + @AttributeDefinition(min = "1") + String ping$_$executor$_$name$_$prefix() default DefaultPingExecutor.DEFAULT_NAME_PREFIX; + // FIXME: this is a misnomer: it specifies the core pool size, i.e. minimum thread count, the maximum is set to + // Integer.MAX_VALUE, which is not what we want + @AttributeDefinition(min = "0") + int max$_$thread$_$count() default DefaultPingExecutor.DEFAULT_CORE_POOL_SIZE; + @AttributeDefinition + boolean use$_$sse() default true; + } + + private static final Logger LOG = LoggerFactory.getLogger(OSGiNorthbound.class); + + private final ComponentFactory registryFactory; + private final ComponentFactory servletFactoryFactory; + + private ComponentInstance registry; + private boolean useSSE; + + private ComponentInstance servletFactory; + private Map servletProps; + + @Activate + public OSGiNorthbound( + @Reference(target = "(component.factory=" + DefaultRestconfStreamServletFactory.FACTORY_NAME + ")") + final ComponentFactory servletFactoryFactory, + @Reference(target = "(component.factory=" + MdsalRestconfStreamRegistry.FACTORY_NAME + ")") + final ComponentFactory registryFactory, final Configuration configuration) { + this.registryFactory = requireNonNull(registryFactory); + this.servletFactoryFactory = requireNonNull(servletFactoryFactory); + + useSSE = configuration.use$_$sse(); + registry = registryFactory.newInstance(FrameworkUtil.asDictionary(MdsalRestconfStreamRegistry.props(useSSE))); + + servletProps = DefaultRestconfStreamServletFactory.props(registry.getInstance(), useSSE, + new StreamsConfiguration(configuration.maximum$_$fragment$_$length(), + configuration.idle$_$timeout(), configuration.heartbeat$_$interval()), + configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count()); + servletFactory = servletFactoryFactory.newInstance(FrameworkUtil.asDictionary(servletProps)); + + LOG.info("Global RESTCONF northbound pools started"); + } + + @Modified + void modified(final Configuration configuration) { + final var newUseSSE = configuration.use$_$sse(); + if (newUseSSE != useSSE) { + useSSE = newUseSSE; + registry.dispose(); + registry = registryFactory.newInstance(FrameworkUtil.asDictionary( + MdsalRestconfStreamRegistry.props(useSSE))); + LOG.debug("ListenersBroker restarted with {}", newUseSSE ? "SSE" : "Websockets"); + } + + final var newServletProps = DefaultRestconfStreamServletFactory.props(registry.getInstance(), useSSE, + new StreamsConfiguration(configuration.maximum$_$fragment$_$length(), + configuration.idle$_$timeout(), configuration.heartbeat$_$interval()), + configuration.ping$_$executor$_$name$_$prefix(), configuration.max$_$thread$_$count()); + if (!newServletProps.equals(servletProps)) { + servletProps = newServletProps; + servletFactory.dispose(); + servletFactory = servletFactoryFactory.newInstance(FrameworkUtil.asDictionary(servletProps)); + LOG.debug("RestconfStreamServletFactory restarted with {}", servletProps); + } + + LOG.debug("Applied {}", configuration); + } + + @Deactivate + void deactivate() { + servletFactory.dispose(); + servletFactory = null; + registry.dispose(); + registry = null; + LOG.info("Global RESTCONF northbound pools stopped"); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java index 5756d4de47..584e93e705 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/RestconfApplication.java @@ -28,7 +28,6 @@ import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfImpl; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfInvokeOperationsServiceImpl; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfOperationsServiceImpl; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfSchemaServiceImpl; -import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker; final class RestconfApplication extends Application { private final Set singletons; @@ -36,12 +35,12 @@ final class RestconfApplication extends Application { RestconfApplication(final DatabindProvider databindProvider, final MdsalRestconfServer server, final DOMMountPointService mountPointService, final DOMDataBroker dataBroker, final DOMActionService actionService, final DOMNotificationService notificationService, - final DOMSchemaService domSchemaService, final ListenersBroker listenersBroker) { + final DOMSchemaService domSchemaService) { singletons = Set.of( new RestconfDocumentedExceptionMapper(databindProvider), new RestconfDataServiceImpl(databindProvider, server, actionService), - new RestconfInvokeOperationsServiceImpl(databindProvider, server, listenersBroker), - new RestconfOperationsServiceImpl(databindProvider, server), + new RestconfInvokeOperationsServiceImpl(server), + new RestconfOperationsServiceImpl(server), new RestconfSchemaServiceImpl(domSchemaService, mountPointService), new RestconfImpl(databindProvider)); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/ServerSentEventsApplication.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/ServerSentEventsApplication.java deleted file mode 100644 index 21880382f1..0000000000 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/ServerSentEventsApplication.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.restconf.nb.rfc8040; - -import java.util.Set; -import javax.ws.rs.core.Application; -import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; -import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfDataStreamServiceImpl; -import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker; -import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration; - -/** - * JAX-RS binding for Server-Sent Events. - */ -final class ServerSentEventsApplication extends Application { - private final RestconfDataStreamServiceImpl singleton; - - ServerSentEventsApplication(final ScheduledThreadPool scheduledThreadPool, final ListenersBroker listenersBroker, - final StreamsConfiguration configuration) { - singleton = new RestconfDataStreamServiceImpl(scheduledThreadPool, listenersBroker, configuration); - } - - @Override - public Set getSingletons() { - return Set.of(singleton); - } -} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/MdsalRestconfServer.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/MdsalRestconfServer.java index efa0b9bfbe..cb068fc1ae 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/MdsalRestconfServer.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/MdsalRestconfServer.java @@ -11,8 +11,13 @@ import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.IOException; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; +import java.net.URI; +import java.util.List; import javax.inject.Inject; import javax.inject.Singleton; import org.eclipse.jdt.annotation.NonNull; @@ -22,11 +27,22 @@ import org.opendaylight.mdsal.dom.api.DOMMountPoint; import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.opendaylight.restconf.common.errors.RestconfFuture; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; +import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody; import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext; import org.opendaylight.restconf.nb.rfc8040.rests.transactions.MdsalRestconfStrategy; import org.opendaylight.restconf.nb.rfc8040.rests.transactions.RestconfStrategy; import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier; +import org.opendaylight.restconf.server.api.RestconfServer; +import org.opendaylight.restconf.server.spi.OperationInput; +import org.opendaylight.restconf.server.spi.OperationOutput; +import org.opendaylight.restconf.server.spi.RpcImplementation; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -37,11 +53,10 @@ import org.slf4j.LoggerFactory; /** * A RESTCONF server implemented on top of MD-SAL. */ -// FIXME: factor out the 'RestconfServer' interface once we're ready // FIXME: this should live in 'org.opendaylight.restconf.server.mdsal' package @Singleton -@Component(service = MdsalRestconfServer.class) -public final class MdsalRestconfServer { +@Component(service = { MdsalRestconfServer.class, RestconfServer.class }) +public final class MdsalRestconfServer implements RestconfServer { private static final Logger LOG = LoggerFactory.getLogger(MdsalRestconfServer.class); private static final VarHandle LOCAL_STRATEGY; @@ -54,7 +69,9 @@ public final class MdsalRestconfServer { } } + private final @NonNull ImmutableMap localRpcs; private final @NonNull DOMMountPointService mountPointService; + private final @NonNull DatabindProvider databindProvider; private final @NonNull DOMDataBroker dataBroker; private final @Nullable DOMRpcService rpcService; @@ -63,13 +80,28 @@ public final class MdsalRestconfServer { @Inject @Activate - public MdsalRestconfServer(@Reference final DOMDataBroker dataBroker, @Reference final DOMRpcService rpcService, - @Reference final DOMMountPointService mountPointService) { + public MdsalRestconfServer(@Reference final DatabindProvider databindProvider, + @Reference final DOMDataBroker dataBroker, @Reference final DOMRpcService rpcService, + @Reference final DOMMountPointService mountPointService, + @Reference final List localRpcs) { + this.databindProvider = requireNonNull(databindProvider); this.dataBroker = requireNonNull(dataBroker); this.rpcService = requireNonNull(rpcService); this.mountPointService = requireNonNull(mountPointService); + this.localRpcs = Maps.uniqueIndex(localRpcs, RpcImplementation::qname); } + public MdsalRestconfServer(final DatabindProvider databind, final DOMDataBroker dataBroker, + final DOMRpcService rpcService, final DOMMountPointService mountPointService, + final RpcImplementation... localRpcs) { + this(databind, dataBroker, rpcService, mountPointService, List.of(localRpcs)); + } + + @NonNull InstanceIdentifierContext bindRequestPath(final String identifier) { + return bindRequestPath(databindProvider.currentContext(), identifier); + } + + @Deprecated @NonNull InstanceIdentifierContext bindRequestPath(final DatabindContext databind, final String identifier) { // FIXME: go through ApiPath first. That part should eventually live in callers // FIXME: DatabindContext looks like it should be internal @@ -77,9 +109,28 @@ public final class MdsalRestconfServer { mountPointService)); } - @SuppressWarnings("static-method") - @NonNull InstanceIdentifierContext bindRequestRoot(final DatabindContext databind) { - return InstanceIdentifierContext.ofLocalRoot(databind.modelContext()); + @Override + public RestconfFuture invokeRpc(final URI restconfURI, final String apiPath, + final OperationInputBody body) { + final var currentContext = databindProvider.currentContext(); + final var reqPath = bindRequestPath(currentContext, apiPath); + final var inference = reqPath.inference(); + final ContainerNode input; + try { + input = body.toContainerNode(inference); + } catch (IOException e) { + LOG.debug("Error reading input", e); + return RestconfFuture.failed(new RestconfDocumentedException("Error parsing input: " + e.getMessage(), + ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE, e)); + } + + return getRestconfStrategy(reqPath.getSchemaContext(), reqPath.getMountPoint()) + .invokeRpc(restconfURI, reqPath.getSchemaNode().getQName(), + new OperationInput(currentContext, inference, input)); + } + + @NonNull InstanceIdentifierContext bindRequestRoot() { + return InstanceIdentifierContext.ofLocalRoot(databindProvider.currentContext().modelContext()); } @VisibleForTesting @@ -105,7 +156,7 @@ public final class MdsalRestconfServer { return local; } - final var created = new MdsalRestconfStrategy(modelContext, dataBroker, rpcService); + final var created = new MdsalRestconfStrategy(modelContext, dataBroker, rpcService, localRpcs); LOCAL_STRATEGY.setRelease(this, created); return created; } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java index ef20881549..22e986cfd3 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java @@ -130,7 +130,7 @@ public final class RestconfDataServiceImpl { }) public Response readData(@Context final UriInfo uriInfo) { final var readParams = QueryParams.newReadDataParams(uriInfo); - return readData(server.bindRequestRoot(databindProvider.currentContext()), readParams); + return readData(server.bindRequestRoot(), readParams); } /** @@ -152,7 +152,7 @@ public final class RestconfDataServiceImpl { public Response readData(@Encoded @PathParam("identifier") final String identifier, @Context final UriInfo uriInfo) { final var readParams = QueryParams.newReadDataParams(uriInfo); - return readData(server.bindRequestPath(databindProvider.currentContext(), identifier), readParams); + return readData(server.bindRequestPath(identifier), readParams); } private Response readData(final InstanceIdentifierContext reqPath, final ReadDataParams readParams) { @@ -169,8 +169,8 @@ public final class RestconfDataServiceImpl { } if (node == null) { throw new RestconfDocumentedException( - "Request could not be completed because the relevant data model content does not exist", - ErrorType.PROTOCOL, ErrorTag.DATA_MISSING); + "Request could not be completed because the relevant data model content does not exist", + ErrorType.PROTOCOL, ErrorTag.DATA_MISSING); } return switch (readParams.content()) { @@ -273,7 +273,7 @@ public final class RestconfDataServiceImpl { private void putData(final @Nullable String identifier, final UriInfo uriInfo, final ResourceBody body, final AsyncResponse ar) { - final var reqPath = server.bindRequestPath(databindProvider.currentContext(), identifier); + final var reqPath = server.bindRequestPath(identifier); final var insert = QueryParams.parseInsert(reqPath.getSchemaContext(), uriInfo); final var req = bindResourceRequest(reqPath, body); @@ -325,7 +325,7 @@ public final class RestconfDataServiceImpl { }) public void postDataJSON(@Encoded @PathParam("identifier") final String identifier, final InputStream body, @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) { - final var reqPath = server.bindRequestPath(databindProvider.currentContext(), identifier); + final var reqPath = server.bindRequestPath(identifier); if (reqPath.getSchemaNode() instanceof ActionDefinition) { try (var jsonBody = new JsonOperationInputBody(body)) { invokeAction(reqPath, jsonBody, ar); @@ -375,7 +375,7 @@ public final class RestconfDataServiceImpl { }) public void postDataXML(@Encoded @PathParam("identifier") final String identifier, final InputStream body, @Context final UriInfo uriInfo, @Suspended final AsyncResponse ar) { - final var reqPath = server.bindRequestPath(databindProvider.currentContext(), identifier); + final var reqPath = server.bindRequestPath(identifier); if (reqPath.getSchemaNode() instanceof ActionDefinition) { try (var xmlBody = new XmlOperationInputBody(body)) { invokeAction(reqPath, xmlBody, ar); @@ -449,7 +449,7 @@ public final class RestconfDataServiceImpl { @Path("/data/{identifier:.+}") public void deleteData(@Encoded @PathParam("identifier") final String identifier, @Suspended final AsyncResponse ar) { - final var reqPath = server.bindRequestPath(databindProvider.currentContext(), identifier); + final var reqPath = server.bindRequestPath(identifier); final var strategy = server.getRestconfStrategy(reqPath.getSchemaContext(), reqPath.getMountPoint()); strategy.delete(reqPath.getInstanceIdentifier()).addCallback(new JaxRsRestconfCallback<>(ar) { @@ -550,7 +550,7 @@ public final class RestconfDataServiceImpl { * @param ar {@link AsyncResponse} which needs to be completed */ private void plainPatchData(final ResourceBody body, final AsyncResponse ar) { - plainPatchData(server.bindRequestRoot(databindProvider.currentContext()), body, ar); + plainPatchData(server.bindRequestRoot(), body, ar); } /** @@ -562,7 +562,7 @@ public final class RestconfDataServiceImpl { * @param ar {@link AsyncResponse} which needs to be completed */ private void plainPatchData(final String identifier, final ResourceBody body, final AsyncResponse ar) { - plainPatchData(server.bindRequestPath(databindProvider.currentContext(), identifier), body, ar); + plainPatchData(server.bindRequestPath(identifier), body, ar); } /** @@ -680,13 +680,13 @@ public final class RestconfDataServiceImpl { } private void yangPatchData(final @NonNull PatchBody body, final AsyncResponse ar) { - final var context = databindProvider.currentContext().modelContext(); + final var context = server.bindRequestRoot().getSchemaContext(); yangPatchData(context, parsePatchBody(context, YangInstanceIdentifier.of(), body), null, ar); } private void yangPatchData(final String identifier, final @NonNull PatchBody body, final AsyncResponse ar) { - final var reqPath = server.bindRequestPath(databindProvider.currentContext(), identifier); + final var reqPath = server.bindRequestPath(identifier); final var modelContext = reqPath.getSchemaContext(); yangPatchData(modelContext, parsePatchBody(modelContext, reqPath.getInstanceIdentifier(), body), reqPath.getMountPoint(), ar); @@ -707,19 +707,17 @@ public final class RestconfDataServiceImpl { private static Status getStatusCode(final PatchStatusContext result) { if (result.ok()) { return Status.OK; + } else if (result.globalErrors() == null || result.globalErrors().isEmpty()) { + return result.editCollection().stream() + .filter(patchStatus -> !patchStatus.isOk() && !patchStatus.getEditErrors().isEmpty()) + .findFirst() + .map(PatchStatusEntity::getEditErrors) + .flatMap(errors -> errors.stream().findFirst()) + .map(error -> ErrorTags.statusOf(error.getErrorTag())) + .orElse(Status.INTERNAL_SERVER_ERROR); } else { - if (result.globalErrors() == null || result.globalErrors().isEmpty()) { - return result.editCollection().stream() - .filter(patchStatus -> !patchStatus.isOk() && !patchStatus.getEditErrors().isEmpty()) - .findFirst() - .map(PatchStatusEntity::getEditErrors) - .flatMap(errors -> errors.stream().findFirst()) - .map(error -> ErrorTags.statusOf(error.getErrorTag())) - .orElse(Status.INTERNAL_SERVER_ERROR); - } else { - final var error = result.globalErrors().iterator().next(); - return ErrorTags.statusOf(error.getErrorTag()); - } + final var error = result.globalErrors().iterator().next(); + return ErrorTags.statusOf(error.getErrorTag()); } } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java index 4e8741d695..9819b1b675 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java @@ -9,9 +9,7 @@ package org.opendaylight.restconf.nb.rfc8040.rests.services.impl; import static java.util.Objects.requireNonNull; -import java.io.IOException; import java.io.InputStream; -import java.util.Optional; import javax.ws.rs.Consumes; import javax.ws.rs.Encoded; import javax.ws.rs.POST; @@ -24,25 +22,12 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; -import org.opendaylight.restconf.common.errors.RestconfDocumentedException; -import org.opendaylight.restconf.common.errors.RestconfFuture; import org.opendaylight.restconf.nb.rfc8040.MediaTypes; -import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; -import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; import org.opendaylight.restconf.nb.rfc8040.databind.JsonOperationInputBody; import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody; import org.opendaylight.restconf.nb.rfc8040.databind.XmlOperationInputBody; -import org.opendaylight.restconf.nb.rfc8040.legacy.InstanceIdentifierContext; import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload; -import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker; -import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream; -import org.opendaylight.yangtools.yang.common.ErrorTag; -import org.opendaylight.yangtools.yang.common.ErrorType; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.restconf.server.spi.OperationOutput; /** * An operation resource represents a protocol operation defined with the YANG {@code rpc} statement. It is invoked @@ -50,17 +35,10 @@ import org.slf4j.LoggerFactory; */ @Path("/") public final class RestconfInvokeOperationsServiceImpl { - private static final Logger LOG = LoggerFactory.getLogger(RestconfInvokeOperationsServiceImpl.class); - - private final DatabindProvider databindProvider; private final MdsalRestconfServer server; - private final ListenersBroker listenersBroker; - public RestconfInvokeOperationsServiceImpl(final DatabindProvider databindProvider, - final MdsalRestconfServer server, final ListenersBroker listenersBroker) { - this.databindProvider = requireNonNull(databindProvider); + public RestconfInvokeOperationsServiceImpl(final MdsalRestconfServer server) { this.server = requireNonNull(server); - this.listenersBroker = requireNonNull(listenersBroker); } /** @@ -124,48 +102,14 @@ public final class RestconfInvokeOperationsServiceImpl { private void invokeRpc(final String identifier, final UriInfo uriInfo, final AsyncResponse ar, final OperationInputBody body) { - final var databind = databindProvider.currentContext(); - final var reqPath = server.bindRequestPath(databind, identifier); - - final ContainerNode input; - try { - input = body.toContainerNode(reqPath.inference()); - } catch (IOException e) { - LOG.debug("Error reading input", e); - throw new RestconfDocumentedException("Error parsing input: " + e.getMessage(), ErrorType.PROTOCOL, - ErrorTag.MALFORMED_MESSAGE, e); - } - - hackInvokeRpc(databind, reqPath, uriInfo, input).addCallback(new JaxRsRestconfCallback<>(ar) { - @Override - Response transform(final Optional result) { - return result - .filter(output -> !output.isEmpty()) - .map(output -> Response.ok().entity(new NormalizedNodePayload(reqPath.inference(), output)).build()) - .orElseGet(() -> Response.noContent().build()); - } - }); - } - - private RestconfFuture> hackInvokeRpc(final DatabindContext localDatabind, - final InstanceIdentifierContext reqPath, final UriInfo uriInfo, final ContainerNode input) { - // RPC type - final var type = reqPath.getSchemaNode().getQName(); - final var mountPoint = reqPath.getMountPoint(); - if (mountPoint == null) { - final var baseURI = uriInfo.getBaseUri(); - // Hacked-up integration of streams - if (CreateDataChangeEventSubscription.QNAME.equals(type)) { - return listenersBroker.createDataChangeNotifiStream(databindProvider, baseURI, input, - localDatabind.modelContext()); - } else if (CreateNotificationStream.QNAME.equals(type)) { - return listenersBroker.createNotificationStream(databindProvider, baseURI, input, - localDatabind.modelContext()); - } else if (SubscribeDeviceNotification.QNAME.equals(type)) { - return listenersBroker.createDeviceNotificationStream(baseURI, input, localDatabind.modelContext()); - } - } - - return server.getRestconfStrategy(reqPath.getSchemaContext(), mountPoint).invokeRpc(type, input); + server.invokeRpc(uriInfo.getBaseUri(), identifier, body) + .addCallback(new JaxRsRestconfCallback(ar) { + @Override + Response transform(final OperationOutput result) { + final var body = result.output(); + return body == null ? Response.noContent().build() + : Response.ok().entity(new NormalizedNodePayload(result.operation(), body)).build(); + } + }); } } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfOperationsServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfOperationsServiceImpl.java index 7368688f14..ba3c1b52e2 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfOperationsServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfOperationsServiceImpl.java @@ -23,17 +23,14 @@ import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; */ @Path("/") public final class RestconfOperationsServiceImpl { - private final DatabindProvider databindProvider; private final MdsalRestconfServer server; /** * Set {@link DatabindProvider} for getting actual {@link EffectiveModelContext}. * - * @param databindProvider a {@link DatabindProvider} * @param server a {@link MdsalRestconfServer} */ - public RestconfOperationsServiceImpl(final DatabindProvider databindProvider, final MdsalRestconfServer server) { - this.databindProvider = requireNonNull(databindProvider); + public RestconfOperationsServiceImpl(final MdsalRestconfServer server) { this.server = requireNonNull(server); } @@ -46,8 +43,7 @@ public final class RestconfOperationsServiceImpl { @Path("/operations") @Produces({ MediaTypes.APPLICATION_YANG_DATA_JSON, MediaType.APPLICATION_JSON }) public String getOperationsJSON() { - return OperationsContent.JSON.bodyFor( - server.bindRequestRoot(databindProvider.currentContext()).inference()); + return OperationsContent.JSON.bodyFor(server.bindRequestRoot().inference()); } /** @@ -60,8 +56,7 @@ public final class RestconfOperationsServiceImpl { @Path("/operations/{identifier:.+}") @Produces({ MediaTypes.APPLICATION_YANG_DATA_JSON, MediaType.APPLICATION_JSON }) public String getOperationJSON(@PathParam("identifier") final String identifier) { - return OperationsContent.JSON.bodyFor( - server.bindRequestPath(databindProvider.currentContext(), identifier).inference()); + return OperationsContent.JSON.bodyFor(server.bindRequestPath(identifier).inference()); } /** @@ -73,8 +68,7 @@ public final class RestconfOperationsServiceImpl { @Path("/operations") @Produces({ MediaTypes.APPLICATION_YANG_DATA_XML, MediaType.APPLICATION_XML, MediaType.TEXT_XML }) public String getOperationsXML() { - return OperationsContent.XML.bodyFor( - server.bindRequestRoot(databindProvider.currentContext()).inference()); + return OperationsContent.XML.bodyFor(server.bindRequestRoot().inference()); } /** @@ -87,7 +81,6 @@ public final class RestconfOperationsServiceImpl { @Path("/operations/{identifier:.+}") @Produces({ MediaTypes.APPLICATION_YANG_DATA_XML, MediaType.APPLICATION_XML, MediaType.TEXT_XML }) public String getOperationXML(@PathParam("identifier") final String identifier) { - return OperationsContent.XML.bodyFor( - server.bindRequestPath(databindProvider.currentContext(), identifier).inference()); + return OperationsContent.XML.bodyFor(server.bindRequestPath(identifier).inference()); } } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/MdsalRestconfStrategy.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/MdsalRestconfStrategy.java index 5400c52b36..e30c101f56 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/MdsalRestconfStrategy.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/MdsalRestconfStrategy.java @@ -10,6 +10,7 @@ package org.opendaylight.restconf.nb.rfc8040.rests.transactions; import static java.util.Objects.requireNonNull; import static org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -25,9 +26,11 @@ import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.common.errors.SettableRestconfFuture; +import org.opendaylight.restconf.server.spi.RpcImplementation; import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.common.ErrorTag; import org.opendaylight.yangtools.yang.common.ErrorType; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; @@ -42,11 +45,16 @@ public final class MdsalRestconfStrategy extends RestconfStrategy { private final DOMDataBroker dataBroker; public MdsalRestconfStrategy(final EffectiveModelContext modelContext, final DOMDataBroker dataBroker, - final @Nullable DOMRpcService rpcService) { - super(modelContext, rpcService); + final @Nullable DOMRpcService rpcService, final ImmutableMap localRpcs) { + super(modelContext, localRpcs, rpcService); this.dataBroker = requireNonNull(dataBroker); } + public MdsalRestconfStrategy(final EffectiveModelContext modelContext, final DOMDataBroker dataBroker, + final @Nullable DOMRpcService rpcService) { + this(modelContext, dataBroker, rpcService, ImmutableMap.of()); + } + @Override RestconfTransaction prepareWriteExecution() { return new MdsalRestconfTransaction(modelContext(), dataBroker); diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/NetconfRestconfStrategy.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/NetconfRestconfStrategy.java index 9f3c3c963c..e141aea600 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/NetconfRestconfStrategy.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/NetconfRestconfStrategy.java @@ -9,6 +9,7 @@ package org.opendaylight.restconf.nb.rfc8040.rests.transactions; import static java.util.Objects.requireNonNull; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -38,7 +39,7 @@ public final class NetconfRestconfStrategy extends RestconfStrategy { public NetconfRestconfStrategy(final EffectiveModelContext modelContext, final NetconfDataTreeService netconfService, final @Nullable DOMRpcService rpcService) { - super(modelContext, rpcService); + super(modelContext, ImmutableMap.of(), rpcService); this.netconfService = requireNonNull(netconfService); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/RestconfStrategy.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/RestconfStrategy.java index e9c5da5fd6..a755b139e4 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/RestconfStrategy.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/transactions/RestconfStrategy.java @@ -10,10 +10,12 @@ package org.opendaylight.restconf.nb.rfc8040.rests.transactions; import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -42,6 +44,9 @@ import org.opendaylight.restconf.common.patch.PatchContext; import org.opendaylight.restconf.common.patch.PatchStatusContext; import org.opendaylight.restconf.common.patch.PatchStatusEntity; import org.opendaylight.restconf.nb.rfc8040.Insert; +import org.opendaylight.restconf.server.spi.OperationInput; +import org.opendaylight.restconf.server.spi.OperationOutput; +import org.opendaylight.restconf.server.spi.RpcImplementation; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.with.defaults.rev110601.WithDefaultsMode; import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.common.ErrorTag; @@ -111,10 +116,13 @@ public abstract class RestconfStrategy { private static final Logger LOG = LoggerFactory.getLogger(RestconfStrategy.class); private final @NonNull EffectiveModelContext modelContext; - private final @Nullable DOMRpcService rpcService; + private final @NonNull ImmutableMap localRpcs; + private final DOMRpcService rpcService; - RestconfStrategy(final EffectiveModelContext modelContext, final @Nullable DOMRpcService rpcService) { + RestconfStrategy(final EffectiveModelContext modelContext, final ImmutableMap localRpcs, + final @Nullable DOMRpcService rpcService) { this.modelContext = requireNonNull(modelContext); + this.localRpcs = requireNonNull(localRpcs); this.rpcService = rpcService; } @@ -1038,42 +1046,45 @@ public abstract class RestconfStrategy { y -> builder.addChild((T) prepareData(y.getValue(), stateMap.get(y.getKey())))); } - public @NonNull RestconfFuture> invokeRpc(final QName type, final ContainerNode input) { - final var ret = new SettableRestconfFuture>(); - - final var local = rpcService; + public @NonNull RestconfFuture invokeRpc(final URI restconfURI, final QName type, + final OperationInput input) { + final var local = localRpcs.get(type); if (local != null) { - Futures.addCallback(local.invokeRpc(requireNonNull(type), requireNonNull(input)), - new FutureCallback() { - @Override - public void onSuccess(final DOMRpcResult response) { - final var errors = response.errors(); - if (errors.isEmpty()) { - ret.set(Optional.ofNullable(response.value())); - } else { - LOG.debug("RPC invocation reported {}", response.errors()); - ret.setFailure(new RestconfDocumentedException("RPC implementation reported errors", null, - response.errors())); - } - } - - @Override - public void onFailure(final Throwable cause) { - LOG.debug("RPC invocation failed, cause"); - if (cause instanceof RestconfDocumentedException ex) { - ret.setFailure(ex); - } else { - // TODO: YangNetconfErrorAware if we ever get into a broader invocation scope - ret.setFailure(new RestconfDocumentedException(cause, - new RestconfError(ErrorType.RPC, ErrorTag.OPERATION_FAILED, cause.getMessage()))); - } - } - }, MoreExecutors.directExecutor()); - } else { + return local.invoke(restconfURI, input); + } + if (rpcService == null) { LOG.debug("RPC invocation is not available"); - ret.setFailure(new RestconfDocumentedException("RPC invocation is not available", + return RestconfFuture.failed(new RestconfDocumentedException("RPC invocation is not available", ErrorType.PROTOCOL, ErrorTag.OPERATION_NOT_SUPPORTED)); } + + final var ret = new SettableRestconfFuture(); + Futures.addCallback(rpcService.invokeRpc(requireNonNull(type), input.input()), + new FutureCallback() { + @Override + public void onSuccess(final DOMRpcResult response) { + final var errors = response.errors(); + if (errors.isEmpty()) { + ret.set(input.newOperationOutput(response.value())); + } else { + LOG.debug("RPC invocation reported {}", response.errors()); + ret.setFailure(new RestconfDocumentedException("RPC implementation reported errors", null, + response.errors())); + } + } + + @Override + public void onFailure(final Throwable cause) { + LOG.debug("RPC invocation failed, cause"); + if (cause instanceof RestconfDocumentedException ex) { + ret.setFailure(ex); + } else { + // TODO: YangNetconfErrorAware if we ever get into a broader invocation scope + ret.setFailure(new RestconfDocumentedException(cause, + new RestconfError(ErrorType.RPC, ErrorTag.OPERATION_FAILED, cause.getMessage()))); + } + } + }, MoreExecutors.directExecutor()); return ret; } } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DefaultPingExecutor.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DefaultPingExecutor.java new file mode 100644 index 0000000000..1b3a74e185 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DefaultPingExecutor.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040.streams; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.yangtools.concepts.AbstractRegistration; +import org.opendaylight.yangtools.concepts.Registration; + +@Singleton +public final class DefaultPingExecutor implements PingExecutor, AutoCloseable { + private static final class Process extends AbstractRegistration implements Runnable { + private final Runnable task; + private final ScheduledFuture future; + + Process(final Runnable task, final ScheduledThreadPoolExecutor threadPool, final long delay, + final TimeUnit timeUnit) { + this.task = requireNonNull(task); + future = threadPool.scheduleWithFixedDelay(task, delay, delay, timeUnit); + } + + @Override + protected void removeRegistration() { + future.cancel(false); + } + + @Override + public void run() { + if (notClosed()) { + task.run(); + } + } + } + + public static final String DEFAULT_NAME_PREFIX = "ping-executor"; + public static final int DEFAULT_CORE_POOL_SIZE = 1; + + // FIXME: Java 21: just use thread-per-task executor with virtual threads + private final ScheduledThreadPoolExecutor threadPool; + + public DefaultPingExecutor(final String namePrefix, final int corePoolSize) { + final var counter = new AtomicLong(); + final var group = new ThreadGroup(requireNonNull(namePrefix)); + threadPool = new ScheduledThreadPoolExecutor(corePoolSize, + target -> new Thread(group, target, namePrefix + '-' + counter.incrementAndGet())); + } + + @Inject + public DefaultPingExecutor() { + this(DEFAULT_NAME_PREFIX, DEFAULT_CORE_POOL_SIZE); + } + + @Override + public Registration startPingProcess(final Runnable task, final long delay, final TimeUnit timeUnit) { + return new Process(task, threadPool, delay, timeUnit); + } + + @Override + @PreDestroy + public void close() { + threadPool.shutdown(); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DefaultRestconfStreamServletFactory.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DefaultRestconfStreamServletFactory.java new file mode 100644 index 0000000000..fc8a53e4a7 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DefaultRestconfStreamServletFactory.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040.streams; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; +import javax.servlet.http.HttpServlet; +import org.opendaylight.aaa.web.servlet.ServletSupport; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Deactivate; +import org.osgi.service.component.annotations.Reference; + +/** + * Auxiliary interface for instantiating JAX-RS streams. + */ +@Component(factory = DefaultRestconfStreamServletFactory.FACTORY_NAME, service = RestconfStreamServletFactory.class) +public final class DefaultRestconfStreamServletFactory implements RestconfStreamServletFactory, AutoCloseable { + public static final String FACTORY_NAME = + "org.opendaylight.restconf.nb.rfc8040.streams.RestconfStreamServletFactory"; + + private static final String PROP_STREAM_REGISTRY = ".streamRegistry"; + private static final String PROP_NAME_PREFIX = ".namePrefix"; + private static final String PROP_CORE_POOL_SIZE = ".corePoolSize"; + private static final String PROP_USE_WEBSOCKETS = ".useWebsockets"; + private static final String PROP_STREAMS_CONFIGURATION = ".streamsConfiguration"; + + private final RestconfStream.Registry streamRegistry; + private final ServletSupport servletSupport; + + private final DefaultPingExecutor pingExecutor; + private final StreamsConfiguration streamsConfiguration; + private final boolean useWebsockets; + + public DefaultRestconfStreamServletFactory(final ServletSupport servletSupport, + final RestconfStream.Registry streamRegistry, final StreamsConfiguration streamsConfiguration, + final String namePrefix, final int corePoolSize, final boolean useWebsockets) { + this.servletSupport = requireNonNull(servletSupport); + this.streamRegistry = requireNonNull(streamRegistry); + this.streamsConfiguration = requireNonNull(streamsConfiguration); + this.useWebsockets = useWebsockets; + pingExecutor = new DefaultPingExecutor(namePrefix, corePoolSize); + } + + @Activate + public DefaultRestconfStreamServletFactory(@Reference final ServletSupport servletSupport, + final Map props) { + this(servletSupport, (RestconfStream.Registry) props.get(PROP_STREAM_REGISTRY), + (StreamsConfiguration) props.get(PROP_STREAMS_CONFIGURATION), + (String) props.get(PROP_NAME_PREFIX), (int) requireNonNull(props.get(PROP_CORE_POOL_SIZE)), + (boolean) requireNonNull(props.get(PROP_USE_WEBSOCKETS))); + } + + @Override + public HttpServlet newStreamServlet() { + return useWebsockets ? new WebSocketInitializer(streamRegistry, pingExecutor, streamsConfiguration) + : servletSupport.createHttpServletBuilder( + new SSEApplication(streamRegistry, pingExecutor, streamsConfiguration)) + .build(); + } + + @Override + @Deactivate + public void close() { + pingExecutor.close(); + } + + public static Map props(final RestconfStream.Registry streamRegistry, final boolean useSSE, + final StreamsConfiguration streamsConfiguration, final String namePrefix, final int corePoolSize) { + return Map.of( + PROP_STREAM_REGISTRY, streamRegistry, + PROP_USE_WEBSOCKETS, !useSSE, + PROP_STREAMS_CONFIGURATION, streamsConfiguration, + PROP_NAME_PREFIX, namePrefix, + PROP_CORE_POOL_SIZE, corePoolSize); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java deleted file mode 100644 index ad90bcd748..0000000000 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBroker.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Copyright © 2019 FRINX s.r.o. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.restconf.nb.rfc8040.streams; - -import static java.util.Objects.requireNonNull; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.MoreExecutors; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.eclipse.jdt.annotation.NonNull; -import org.eclipse.jdt.annotation.Nullable; -import org.opendaylight.mdsal.common.api.CommitInfo; -import org.opendaylight.mdsal.common.api.LogicalDatastoreType; -import org.opendaylight.mdsal.dom.api.DOMDataBroker; -import org.opendaylight.mdsal.dom.api.DOMMountPointService; -import org.opendaylight.mdsal.dom.api.DOMNotificationService; -import org.opendaylight.mdsal.dom.api.DOMRpcResult; -import org.opendaylight.restconf.common.errors.RestconfDocumentedException; -import org.opendaylight.restconf.common.errors.RestconfFuture; -import org.opendaylight.restconf.common.errors.SettableRestconfFuture; -import org.opendaylight.restconf.nb.rfc8040.URLConstants; -import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source; -import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.RestconfState; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.Streams; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream; -import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access; -import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput; -import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.CreateDataChangeEventSubscriptionInput1; -import org.opendaylight.yangtools.yang.common.ErrorTag; -import org.opendaylight.yangtools.yang.common.ErrorType; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; -import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; -import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; -import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; -import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This singleton class is responsible for creation, removal and searching for {@link DataTreeChangeSource} or - * {@link NotificationSource} listeners. - */ -// FIXME: furthermore, this should be tied to ietf-restconf-monitoring, as the Strings used in its maps are stream -// names. We essentially need a component which deals with allocation of stream names and their lifecycle and -// the contents of /restconf-state/streams. -public abstract sealed class ListenersBroker { - /** - * A ListenersBroker working with Server-Sent Events. - */ - public static final class ServerSentEvents extends ListenersBroker { - public ServerSentEvents(final DOMDataBroker dataBroker, final DOMNotificationService notificationService, - final DOMMountPointService mountPointService) { - super(dataBroker, notificationService, mountPointService); - } - } - - /** - * A ListenersBroker working with WebSockets. - */ - public static final class WebSockets extends ListenersBroker { - public WebSockets(final DOMDataBroker dataBroker, final DOMNotificationService notificationService, - final DOMMountPointService mountPointService) { - super(dataBroker, notificationService, mountPointService); - } - - @Override - String streamsScheme(final URI baseURI) { - return switch (super.streamsScheme(baseURI)) { - // Secured HTTP goes to Secured WebSockets - case "https" -> "wss"; - // Unsecured HTTP and others go to unsecured WebSockets - default -> "ws"; - }; - } - } - - private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class); - private static final YangInstanceIdentifier RESTCONF_STATE_STREAMS = YangInstanceIdentifier.of( - NodeIdentifier.create(RestconfState.QNAME), - NodeIdentifier.create(Streams.QNAME), - NodeIdentifier.create(Stream.QNAME)); - - @VisibleForTesting - static final QName NAME_QNAME = QName.create(Stream.QNAME, "name").intern(); - @VisibleForTesting - static final QName DESCRIPTION_QNAME = QName.create(Stream.QNAME, "description").intern(); - @VisibleForTesting - static final QName ENCODING_QNAME = QName.create(Stream.QNAME, "encoding").intern(); - @VisibleForTesting - static final QName LOCATION_QNAME = QName.create(Stream.QNAME, "location").intern(); - - private static final NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create( - QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern()); - private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID = - NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern()); - private static final NodeIdentifier DEVICE_NOTIFICATION_STREAM_PATH_NODEID = - NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "stream-path").intern()); - - private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID = - NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME); - private static final NodeIdentifier NOTIFICATIONS = - NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern()); - private static final NodeIdentifier PATH_NODEID = - NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern()); - private static final NodeIdentifier STREAM_NAME_NODEID = - NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern()); - - private final ConcurrentMap> streams = new ConcurrentHashMap<>(); - private final DOMDataBroker dataBroker; - @Deprecated(forRemoval = true) - private final DOMMountPointService mountPointService; - @Deprecated(forRemoval = true) - private final DOMNotificationService notificationService; - - private ListenersBroker(final DOMDataBroker dataBroker, final DOMNotificationService notificationService, - final DOMMountPointService mountPointService) { - this.dataBroker = requireNonNull(dataBroker); - this.notificationService = requireNonNull(notificationService); - this.mountPointService = requireNonNull(mountPointService); - } - - /** - * Get a {@link RestconfStream} by its name. - * - * @param streamName Stream name. - * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist. - * @throws NullPointerException if {@code streamName} is {@code null} - */ - public final @Nullable RestconfStream getStream(final String streamName) { - return streams.get(streamName); - } - - /** - * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name, create - * the corresponding instance and register it. - * - * @param Stream type - * @param baseStreamLocation base streams location - * @param factory Factory for creating the actual stream instance - * @return A {@link RestconfStream} instance - * @throws NullPointerException if {@code factory} is {@code null} - */ - final @NonNull RestconfFuture> createStream(final String description, - final String baseStreamLocation, final Source source) { - final var stream = allocateStream(source); - final var name = stream.name(); - - // Now issue a put operation - final var ret = new SettableRestconfFuture>(); - final var tx = dataBroker.newWriteOnlyTransaction(); - tx.put(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(name), - streamEntry(name, description, baseStreamLocation, stream.encodings())); - tx.commit().addCallback(new FutureCallback() { - @Override - public void onSuccess(final CommitInfo result) { - LOG.debug("Stream {} added", name); - ret.set(stream); - } - - @Override - public void onFailure(final Throwable cause) { - LOG.debug("Failed to add stream {}", name, cause); - streams.remove(name, stream); - ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + name, cause)); - } - }, MoreExecutors.directExecutor()); - return ret; - } - - private @NonNull RestconfStream allocateStream(final Source source) { - String name; - RestconfStream stream; - do { - // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor - // it into UUID URN namespace as defined by RFC4122 - name = "urn:uuid:" + UUID.randomUUID().toString(); - stream = new RestconfStream<>(this, source, name); - } while (streams.putIfAbsent(name, stream) != null); - - return stream; - } - - /** - * Remove a particular stream and remove its entry from operational datastore. - * - * @param stream Stream to remove - */ - final void removeStream(final RestconfStream stream) { - // Defensive check to see if we are still tracking the stream - final var streamName = stream.name(); - if (streams.get(streamName) != stream) { - LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", streamName, stream); - return; - } - - // Now issue a delete operation while the name is still protected by being associated in the map. - final var tx = dataBroker.newWriteOnlyTransaction(); - tx.delete(LogicalDatastoreType.OPERATIONAL, restconfStateStreamPath(streamName)); - tx.commit().addCallback(new FutureCallback() { - @Override - public void onSuccess(final CommitInfo result) { - LOG.debug("Stream {} removed", streamName); - streams.remove(streamName, stream); - } - - @Override - public void onFailure(final Throwable cause) { - LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", streamName, cause); - streams.remove(streamName, stream); - } - }, MoreExecutors.directExecutor()); - } - - private static @NonNull YangInstanceIdentifier restconfStateStreamPath(final String streamName) { - return RESTCONF_STATE_STREAMS.node(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, streamName)); - } - - /** - * Return the base location URL of the streams service based on request URI. - * - * @param baseURI request base URI - * @throws IllegalArgumentException if the result would have been malformed - */ - public final @NonNull String baseStreamLocation(final URI baseURI) { - try { - return new URI(streamsScheme(baseURI), baseURI.getRawUserInfo(), baseURI.getHost(), baseURI.getPort(), - URLConstants.BASE_PATH + '/' + URLConstants.STREAMS_SUBPATH, null, null) - .toString(); - } catch (URISyntaxException e) { - throw new IllegalArgumentException("Cannot derive streams location", e); - } - } - - String streamsScheme(final URI baseURI) { - return baseURI.getScheme(); - } - - /** - * Create data-change-event stream with POST operation via RPC. - * - * @param input Input of RPC - example in JSON (data-change-event stream): - *
-     *              {@code
-     *                  {
-     *                      "input": {
-     *                          "path": "/toaster:toaster/toaster:toasterStatus",
-     *                          "sal-remote-augment:datastore": "OPERATIONAL",
-     *                      }
-     *                  }
-     *              }
-     *              
- * @param modelContext Reference to {@link EffectiveModelContext}. - * @return {@link DOMRpcResult} - Output of RPC - example in JSON: - *
-     *     {@code
-     *         {
-     *             "output": {
-     *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
-     *             }
-     *         }
-     *     }
-     *     
- */ - // FIXME: this really should be a normal RPC implementation - public final RestconfFuture> createDataChangeNotifiStream( - final DatabindProvider databindProvider, final URI baseURI, final ContainerNode input, - final EffectiveModelContext modelContext) { - final var datastoreName = extractStringLeaf(input, DATASTORE_NODEID); - final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) - : LogicalDatastoreType.CONFIGURATION; - final var path = preparePath(input); - - return createStream( - "Events occuring in " + datastore + " datastore under /" + IdentifierCodec.serialize(path, modelContext), - baseStreamLocation(baseURI), new DataTreeChangeSource(databindProvider, dataBroker, datastore, path)) - .transform(stream -> Optional.of(Builders.containerBuilder() - .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID) - .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name())) - .build())); - } - - // FIXME: this really should be a normal RPC implementation - public final RestconfFuture> createNotificationStream( - final DatabindProvider databindProvider, final URI baseURI, final ContainerNode input, - final EffectiveModelContext modelContext) { - final var qnames = ((LeafSetNode) input.getChildByArg(NOTIFICATIONS)).body().stream() - .map(LeafSetEntryNode::body) - .map(QName::create) - .sorted() - .collect(ImmutableSet.toImmutableSet()); - - final var description = new StringBuilder("YANG notifications matching any of {"); - var haveFirst = false; - for (var qname : qnames) { - final var module = modelContext.findModuleStatement(qname.getModule()) - .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module", - ErrorType.APPLICATION, ErrorTag.INVALID_VALUE)); - final var stmt = module.findSchemaTreeNode(qname) - .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown notification", - ErrorType.APPLICATION, ErrorTag.INVALID_VALUE)); - if (!(stmt instanceof NotificationEffectiveStatement)) { - throw new RestconfDocumentedException(qname + " refers to a non-notification", - ErrorType.APPLICATION, ErrorTag.INVALID_VALUE); - } - - if (haveFirst) { - description.append(",\n"); - } else { - haveFirst = true; - } - description.append("\n ") - .append(module.argument().getLocalName()).append(':').append(qname.getLocalName()); - } - description.append("\n}"); - - return createStream(description.toString(), baseStreamLocation(baseURI), - new NotificationSource(databindProvider, notificationService, qnames)) - .transform(stream -> Optional.of(Builders.containerBuilder() - .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID) - .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name())) - .build())); - } - - /** - * Create device notification stream. - * - * @param input RPC input - * @return {@link DOMRpcResult} - Output of RPC - example in JSON - */ - // FIXME: this should be an RPC invocation - public final RestconfFuture> createDeviceNotificationStream(final URI baseURI, - final ContainerNode input, final EffectiveModelContext modelContext) { - // parsing out of container with settings and path - // FIXME: ugly cast - final var path = (YangInstanceIdentifier) input.findChildByArg(DEVICE_NOTIFICATION_PATH_NODEID) - .map(DataContainerChild::body) - .orElseThrow(() -> new RestconfDocumentedException("No path specified", ErrorType.APPLICATION, - ErrorTag.DATA_MISSING)); - - if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) { - throw new RestconfDocumentedException("Path does not refer to a list item", ErrorType.APPLICATION, - ErrorTag.INVALID_VALUE); - } - if (listId.size() != 1) { - throw new RestconfDocumentedException("Target list uses multiple keys", ErrorType.APPLICATION, - ErrorTag.INVALID_VALUE); - } - - final var baseStreamsUri = baseStreamLocation(baseURI); - return createStream( - "All YANG notifications occuring on mount point /" + IdentifierCodec.serialize(path, modelContext), - baseStreamsUri, - new DeviceNotificationSource(mountPointService, path)) - .transform(stream -> Optional.of(Builders.containerBuilder() - .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME)) - .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH_NODEID, - baseStreamsUri + '/' + stream.name())) - .build())); - } - - /** - * Prepare {@link YangInstanceIdentifier} of stream source. - * - * @param data Container with stream settings (RPC create-stream). - * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications - * are going to be generated. - */ - private static YangInstanceIdentifier preparePath(final ContainerNode data) { - final var pathLeaf = data.childByArg(PATH_NODEID); - if (pathLeaf != null && pathLeaf.body() instanceof YangInstanceIdentifier pathValue) { - return pathValue; - } - - throw new RestconfDocumentedException("Instance identifier was not normalized correctly", - ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED); - } - - private static @Nullable String extractStringLeaf(final ContainerNode data, final NodeIdentifier childName) { - return data.childByArg(childName) instanceof LeafNode leafNode && leafNode.body() instanceof String str - ? str : null; - } - - @VisibleForTesting - static @NonNull MapEntryNode streamEntry(final String name, final String description, - final String baseStreamLocation, final Set encodings) { - final var accessBuilder = Builders.mapBuilder().withNodeIdentifier(new NodeIdentifier(Access.QNAME)); - for (var encoding : encodings) { - final var encodingName = encoding.name(); - accessBuilder.withChild(Builders.mapEntryBuilder() - .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, encodingName)) - .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, encodingName)) - .withChild(ImmutableNodes.leafNode(LOCATION_QNAME, - baseStreamLocation + '/' + encodingName + '/' + name)) - .build()); - } - - return Builders.mapEntryBuilder() - .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name)) - .withChild(ImmutableNodes.leafNode(NAME_QNAME, name)) - .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description)) - .withChild(accessBuilder.build()) - .build(); - } -} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/PingExecutor.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/PingExecutor.java new file mode 100644 index 0000000000..864430ac75 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/PingExecutor.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040.streams; + +import java.util.concurrent.TimeUnit; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.opendaylight.yangtools.concepts.Registration; + +@NonNullByDefault +public interface PingExecutor { + + Registration startPingProcess(Runnable task, long delay, TimeUnit timeUnit); +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamServletFactory.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamServletFactory.java new file mode 100644 index 0000000000..20f8f179b9 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStreamServletFactory.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040.streams; + +import javax.servlet.http.HttpServlet; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.restconf.server.spi.RestconfStream; + +/** + * A helper for creating {@link HttpServlet}s which provide bridge between JAX-RS and {@link RestconfStream.Registry}. + */ +public interface RestconfStreamServletFactory { + + @NonNull HttpServlet newStreamServlet(); +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSEApplication.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSEApplication.java new file mode 100644 index 0000000000..5e8380e5eb --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSEApplication.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.nb.rfc8040.streams; + +import java.util.Set; +import javax.ws.rs.core.Application; +import org.opendaylight.restconf.server.spi.RestconfStream; + +/** + * JAX-RS binding for Server-Sent Events. + */ +final class SSEApplication extends Application { + private final SSEStreamService singleton; + + SSEApplication(final RestconfStream.Registry streamRegistry, final PingExecutor pingExecutor, + final StreamsConfiguration configuration) { + singleton = new SSEStreamService(streamRegistry, pingExecutor, configuration); + } + + @Override + public Set getSingletons() { + return Set.of(singleton); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandler.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESender.java similarity index 80% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandler.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESender.java index b5e96939f9..3aed886699 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandler.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESender.java @@ -13,14 +13,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.CharMatcher; import com.google.common.base.Strings; import java.io.UnsupportedEncodingException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.sse.Sse; import javax.ws.rs.sse.SseEventSink; import javax.xml.xpath.XPathExpressionException; import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream.Sender; import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,26 +29,26 @@ import org.slf4j.LoggerFactory; * SSE session handler that is responsible for controlling of session, managing subscription to data-change-event or * notification listener, and sending of data over established SSE session. */ -public final class SSESessionHandler implements StreamSessionHandler { - private static final Logger LOG = LoggerFactory.getLogger(SSESessionHandler.class); +final class SSESender implements Sender { + private static final Logger LOG = LoggerFactory.getLogger(SSESender.class); private static final CharMatcher CR_OR_LF = CharMatcher.anyOf("\r\n"); - private final ScheduledExecutorService executorService; + private final PingExecutor pingExecutor; private final RestconfStream stream; private final EncodingName encoding; private final ReceiveEventsParams params; private final SseEventSink sink; private final Sse sse; private final int maximumFragmentLength; - private final int heartbeatInterval; + private final long heartbeatMillis; - private ScheduledFuture pingProcess; + private Registration pingProcess; private Registration subscriber; /** * Creation of the new server-sent events session handler. * - * @param executorService Executor that is used for periodical sending of SSE ping messages to keep session up even + * @param pingExecutor Executor that is used for periodical sending of SSE ping messages to keep session up even * if the notifications doesn't flow from server to clients or clients don't implement ping-pong * service. * @param stream YANG notification or data-change event listener to which client on this SSE session subscribes to. @@ -57,20 +57,20 @@ public final class SSESessionHandler implements StreamSessionHandler { * (exceeded notification length ends in error). If the parameter is set to non-zero positive value, * messages longer than this parameter are fragmented into multiple SSE messages sent in one * transaction. - * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint to keep + * @param heartbeatMillis Interval in milliseconds of sending of ping control frames to remote endpoint to keep * session up. Ping control frames are disabled if this parameter is set to 0. */ - public SSESessionHandler(final ScheduledExecutorService executorService, final SseEventSink sink, final Sse sse, - final RestconfStream stream, final EncodingName encoding, final ReceiveEventsParams params, - final int maximumFragmentLength, final int heartbeatInterval) { - this.executorService = requireNonNull(executorService); + SSESender(final PingExecutor pingExecutor, final SseEventSink sink, final Sse sse, final RestconfStream stream, + final EncodingName encoding, final ReceiveEventsParams params, final int maximumFragmentLength, + final long heartbeatMillis) { + this.pingExecutor = requireNonNull(pingExecutor); this.sse = requireNonNull(sse); this.sink = requireNonNull(sink); this.stream = requireNonNull(stream); this.encoding = requireNonNull(encoding); this.params = requireNonNull(params); this.maximumFragmentLength = maximumFragmentLength; - this.heartbeatInterval = heartbeatInterval; + this.heartbeatMillis = heartbeatMillis; } /** @@ -88,9 +88,8 @@ public final class SSESessionHandler implements StreamSessionHandler { } subscriber = local; - if (heartbeatInterval != 0) { - pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval, - heartbeatInterval, TimeUnit.MILLISECONDS); + if (heartbeatMillis != 0) { + pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatMillis, TimeUnit.MILLISECONDS); } return true; } @@ -153,7 +152,7 @@ public final class SSESessionHandler implements StreamSessionHandler { return outputMessage.toString(); } - private synchronized void sendPingMessage() { + private synchronized void sendPing() { if (!sink.isClosed()) { LOG.debug("sending PING"); sink.send(sse.newEventBuilder().comment("ping").build()); @@ -163,8 +162,9 @@ public final class SSESessionHandler implements StreamSessionHandler { } private void stopPingProcess() { - if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) { - pingProcess.cancel(true); + if (pingProcess != null) { + pingProcess.close(); + pingProcess = null; } } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSEStreamService.java similarity index 73% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSEStreamService.java index e54b419c59..59f52a586b 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataStreamServiceImpl.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/SSEStreamService.java @@ -5,13 +5,12 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.rests.services.impl; +package org.opendaylight.restconf.nb.rfc8040.streams; import static java.util.Objects.requireNonNull; import com.google.common.collect.ImmutableMap; import java.io.UnsupportedEncodingException; -import java.util.concurrent.ScheduledExecutorService; import javax.ws.rs.BadRequestException; import javax.ws.rs.GET; import javax.ws.rs.NotFoundException; @@ -24,12 +23,9 @@ import javax.ws.rs.core.UriInfo; import javax.ws.rs.sse.Sse; import javax.ws.rs.sse.SseEventSink; import javax.xml.xpath.XPathExpressionException; -import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams; -import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; -import org.opendaylight.restconf.nb.rfc8040.streams.SSESessionHandler; -import org.opendaylight.restconf.nb.rfc8040.streams.StreamsConfiguration; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,18 +33,18 @@ import org.slf4j.LoggerFactory; * Access to notification streams via Server-Sent Events. */ @Path("/") -public final class RestconfDataStreamServiceImpl { - private static final Logger LOG = LoggerFactory.getLogger(RestconfDataStreamServiceImpl.class); +final class SSEStreamService { + private static final Logger LOG = LoggerFactory.getLogger(SSEStreamService.class); - private final ListenersBroker listenersBroker; - private final ScheduledExecutorService executorService; + private final RestconfStream.Registry streamRegistry; + private final PingExecutor pingExecutor; private final int maximumFragmentLength; private final int heartbeatInterval; - public RestconfDataStreamServiceImpl(final ScheduledThreadPool scheduledThreadPool, - final ListenersBroker listenersBroker, final StreamsConfiguration configuration) { - executorService = scheduledThreadPool.getExecutor(); - this.listenersBroker = requireNonNull(listenersBroker); + SSEStreamService(final RestconfStream.Registry streamRegistry, final PingExecutor pingExecutor, + final StreamsConfiguration configuration) { + this.streamRegistry = requireNonNull(streamRegistry); + this.pingExecutor = requireNonNull(pingExecutor); heartbeatInterval = configuration.heartbeatInterval(); maximumFragmentLength = configuration.maximumFragmentLength(); } @@ -64,7 +60,7 @@ public final class RestconfDataStreamServiceImpl { public void getSSE(@PathParam("encodingName") final EncodingName encodingName, @PathParam("streamName") final String streamName, @Context final UriInfo uriInfo, @Context final SseEventSink sink, @Context final Sse sse) { - final var stream = listenersBroker.getStream(streamName); + final var stream = streamRegistry.lookupStream(streamName); if (stream == null) { LOG.debug("Listener for stream with name {} was not found.", streamName); throw new NotFoundException("No such stream: " + streamName); @@ -96,7 +92,7 @@ public final class RestconfDataStreamServiceImpl { LOG.debug("Listener for stream with name {} has been found, SSE session handler will be created.", streamName); // FIXME: invert control here: we should call 'listener.addSession()', which in turn should call // handler.init()/handler.close() - final var handler = new SSESessionHandler(executorService, sink, sse, stream, encodingName, params, + final var handler = new SSESender(pingExecutor, sink, sse, stream, encodingName, params, maximumFragmentLength, heartbeatInterval); try { diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamSessionHandler.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamSessionHandler.java deleted file mode 100644 index 6b478276eb..0000000000 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamSessionHandler.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2020 Lumina Networks, Inc. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.restconf.nb.rfc8040.streams; - -/** - * Interface for session handler that is responsible for sending of data over established session. - */ -public interface StreamSessionHandler { - /** - * Interface for sending String message through one of implementation. - * - * @param data Message data to be send. - */ - void sendDataMessage(String data); - - /** - * Called when the stream has reached its end. The handler should close all underlying resources. - */ - void endOfStream(); -} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamsConfiguration.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamsConfiguration.java index 95af24a110..d89d888b5a 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamsConfiguration.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/StreamsConfiguration.java @@ -16,9 +16,8 @@ import static com.google.common.base.Preconditions.checkArgument; * (exceeded message length leads to fragmentation of messages). * @param idleTimeout Maximum idle time of web-socket session before the session is closed (milliseconds). * @param heartbeatInterval Interval in milliseconds between sending of ping control frames. - * @param useSSE when is {@code true} use SSE else use WS */ -public record StreamsConfiguration(int maximumFragmentLength, int idleTimeout, int heartbeatInterval, boolean useSSE) { +public record StreamsConfiguration(int maximumFragmentLength, int idleTimeout, int heartbeatInterval) { // FIXME: can this be 64KiB exactly? if so, maximumFragmentLength should become a Uint16 and validation should be // pushed out to users public static final int MAXIMUM_FRAGMENT_LENGTH_LIMIT = 65534; diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java index 08a0f9572d..21a1bccad8 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactory.java @@ -9,13 +9,13 @@ package org.opendaylight.restconf.nb.rfc8040.streams; import static java.util.Objects.requireNonNull; -import java.util.concurrent.ScheduledExecutorService; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; import org.eclipse.jetty.websocket.servlet.WebSocketCreator; import org.opendaylight.restconf.nb.rfc8040.URLConstants; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory; * @param heartbeatInterval Interval in milliseconds between sending of ping control frames. */ record WebSocketFactory( - ScheduledExecutorService executorService, - ListenersBroker listenersBroker, + RestconfStream.Registry streamRegistry, + PingExecutor pingExecutor, int maximumFragmentLength, int heartbeatInterval) implements WebSocketCreator { private static final Logger LOG = LoggerFactory.getLogger(WebSocketFactory.class); @@ -37,8 +37,8 @@ record WebSocketFactory( "/" + URLConstants.BASE_PATH + "/" + URLConstants.STREAMS_SUBPATH + "/"; WebSocketFactory { - requireNonNull(executorService); - requireNonNull(listenersBroker); + requireNonNull(pingExecutor); + requireNonNull(streamRegistry); } /** @@ -70,7 +70,7 @@ record WebSocketFactory( return notFound(resp); } final var streamName = stripped.substring(slash + 1); - final var stream = listenersBroker.getStream(streamName); + final var stream = streamRegistry.lookupStream(streamName); if (stream == null) { LOG.debug("Listener for stream with name {} was not found.", streamName); return notFound(resp); @@ -82,7 +82,7 @@ record WebSocketFactory( resp.setStatusCode(HttpServletResponse.SC_SWITCHING_PROTOCOLS); // note: every web-socket manages PING process individually because this approach scales better than // sending PING frames at once over all web-socket sessions - return new WebSocketSessionHandler(executorService, stream, new EncodingName(stripped.substring(0, slash)), + return new WebSocketSender(pingExecutor, stream, new EncodingName(stripped.substring(0, slash)), null, maximumFragmentLength, heartbeatInterval); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java index a0f483b883..83f9b74752 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketInitializer.java @@ -11,33 +11,23 @@ import java.io.IOException; import java.io.NotSerializableException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import javax.inject.Inject; -import javax.inject.Singleton; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; -import org.opendaylight.controller.config.threadpool.ScheduledThreadPool; +import org.opendaylight.restconf.server.spi.RestconfStream; /** * Web-socket servlet listening on ws or wss schemas for created data-change-event or notification streams. */ -@Singleton -public final class WebSocketInitializer extends WebSocketServlet { +final class WebSocketInitializer extends WebSocketServlet { @java.io.Serial private static final long serialVersionUID = 1L; private final transient WebSocketFactory creator; private final int idleTimeoutMillis; - /** - * Creation of the web-socket initializer. - * - * @param scheduledThreadPool ODL thread pool used for fetching of scheduled executors. - * @param configuration Web-socket configuration holder. - */ - @Inject - public WebSocketInitializer(final ScheduledThreadPool scheduledThreadPool, - final ListenersBroker listenersBroker, final StreamsConfiguration configuration) { - creator = new WebSocketFactory(scheduledThreadPool.getExecutor(), listenersBroker, + WebSocketInitializer(final RestconfStream.Registry streamRegistry, final PingExecutor pingExecutor, + final StreamsConfiguration configuration) { + creator = new WebSocketFactory(streamRegistry, pingExecutor, configuration.maximumFragmentLength(), configuration.heartbeatInterval()); idleTimeoutMillis = configuration.idleTimeout(); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandler.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSender.java similarity index 89% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandler.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSender.java index 0352834ddb..ff7075bb01 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandler.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSender.java @@ -17,8 +17,6 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.xml.xpath.XPathExpressionException; @@ -31,7 +29,9 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream.Sender; import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,25 +41,25 @@ import org.slf4j.LoggerFactory; * to data-change-event or notification listener, and sending of data over established web-socket session. */ @WebSocket -public final class WebSocketSessionHandler implements StreamSessionHandler { - private static final Logger LOG = LoggerFactory.getLogger(WebSocketSessionHandler.class); +final class WebSocketSender implements Sender { + private static final Logger LOG = LoggerFactory.getLogger(WebSocketSender.class); private static final byte[] PING_PAYLOAD = "ping".getBytes(Charset.defaultCharset()); - private final ScheduledExecutorService executorService; + private final PingExecutor pingExecutor; private final RestconfStream stream; private final EncodingName encodingName; private final ReceiveEventsParams params; private final int maximumFragmentLength; - private final int heartbeatInterval; + private final long heartbeatInterval; private Session session; private Registration subscriber; - private ScheduledFuture pingProcess; + private Registration pingProcess; /** * Creation of the new web-socket session handler. * - * @param executorService Executor that is used for periodical sending of web-socket ping messages to keep + * @param pingExecutor Executor that is used for periodical sending of web-socket ping messages to keep * session up even if the notifications doesn't flow from server to clients or clients * don't implement ping-pong service. * @param stream YANG notification or data-change event listener to which client on this web-socket @@ -72,10 +72,9 @@ public final class WebSocketSessionHandler implements StreamSessionHandler { * @param heartbeatInterval Interval in milliseconds of sending of ping control frames to remote endpoint * to keep session up. Ping control frames are disabled if this parameter is set to 0. */ - WebSocketSessionHandler(final ScheduledExecutorService executorService, final RestconfStream stream, - final EncodingName encodingName, final @Nullable ReceiveEventsParams params, - final int maximumFragmentLength, final int heartbeatInterval) { - this.executorService = requireNonNull(executorService); + WebSocketSender(final PingExecutor pingExecutor, final RestconfStream stream, final EncodingName encodingName, + final @Nullable ReceiveEventsParams params, final int maximumFragmentLength, final long heartbeatInterval) { + this.pingExecutor = requireNonNull(pingExecutor); this.stream = requireNonNull(stream); this.encodingName = requireNonNull(encodingName); // FIXME: NETCONF-1102: require params @@ -109,8 +108,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler { if (heartbeatInterval != 0) { // sending of PING frame can be long if there is an error on web-socket - from this reason // the fixed-rate should not be used - pingProcess = executorService.scheduleWithFixedDelay(this::sendPingMessage, heartbeatInterval, - heartbeatInterval, TimeUnit.MILLISECONDS); + pingProcess = pingExecutor.startPingProcess(this::sendPing, heartbeatInterval, TimeUnit.MILLISECONDS); } } } @@ -167,8 +165,9 @@ public final class WebSocketSessionHandler implements StreamSessionHandler { } private void stopPingProcess() { - if (pingProcess != null && !pingProcess.isDone() && !pingProcess.isCancelled()) { - pingProcess.cancel(true); + if (pingProcess != null) { + pingProcess.close(); + pingProcess = null; } } @@ -233,7 +232,7 @@ public final class WebSocketSessionHandler implements StreamSessionHandler { } } - private synchronized void sendPingMessage() { + private synchronized void sendPing() { try { Objects.requireNonNull(session).getRemote().sendPing(ByteBuffer.wrap(PING_PAYLOAD)); } catch (IOException e) { diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/api/RestconfServer.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/api/RestconfServer.java new file mode 100644 index 0000000000..fff6dc23d9 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/api/RestconfServer.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.api; + +import java.net.URI; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.opendaylight.restconf.common.errors.RestconfFuture; +import org.opendaylight.restconf.nb.rfc8040.databind.OperationInputBody; +import org.opendaylight.restconf.server.spi.OperationOutput; + +/** + * An implementation of a RESTCONF server, implementing the + * RESTCONF API Resource. + */ +@NonNullByDefault +public interface RestconfServer { + + // FIXME: use ApiPath instead of String + RestconfFuture invokeRpc(URI restconfURI, String apiPath, OperationInputBody body); +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/api/package-info.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/api/package-info.java new file mode 100644 index 0000000000..d31d477278 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/api/package-info.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +/** + * Interface to a RESTCONF server instance. The primary entry point is {@link RestconfServer}. + */ +package org.opendaylight.restconf.server.api; \ No newline at end of file diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/CapabilitiesWriter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/CapabilitiesWriter.java similarity index 99% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/CapabilitiesWriter.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/CapabilitiesWriter.java index 72a0a2a5d5..be041f7619 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/CapabilitiesWriter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/CapabilitiesWriter.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040; +package org.opendaylight.restconf.server.mdsal; import static java.util.Objects.requireNonNull; import static org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.$YangModuleInfoImpl.qnameOf; diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamRegistry.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamRegistry.java new file mode 100644 index 0000000000..89da78fc13 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/MdsalRestconfStreamRegistry.java @@ -0,0 +1,81 @@ +/* + * Copyright © 2019 FRINX s.r.o. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.mdsal; + +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.Map; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.restconf.server.spi.AbstractRestconfStreamRegistry; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.RestconfState; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +/** + * This singleton class is responsible for creation, removal and searching for {@link RestconfStream}s. + */ +@Singleton +@Component(factory = MdsalRestconfStreamRegistry.FACTORY_NAME, service = RestconfStream.Registry.class) +public final class MdsalRestconfStreamRegistry extends AbstractRestconfStreamRegistry { + public static final String FACTORY_NAME = "org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker"; + + private static final YangInstanceIdentifier RESTCONF_STATE_STREAMS = YangInstanceIdentifier.of( + NodeIdentifier.create(RestconfState.QNAME), + NodeIdentifier.create(Streams.QNAME), + NodeIdentifier.create(Stream.QNAME)); + private static final String USE_WEBSOCKETS_PROP = ".useWebsockets"; + + private final DOMDataBroker dataBroker; + + public MdsalRestconfStreamRegistry(final DOMDataBroker dataBroker, final boolean useWebsockets) { + super(useWebsockets); + this.dataBroker = requireNonNull(dataBroker); + } + + @Inject + public MdsalRestconfStreamRegistry(final DOMDataBroker dataBroker) { + this(dataBroker, false); + } + + @Activate + public MdsalRestconfStreamRegistry(@Reference final DOMDataBroker dataBroker, final Map props) { + this(dataBroker, (boolean) requireNonNull(props.get(USE_WEBSOCKETS_PROP))); + } + + public static Map props(final boolean useSSE) { + return Map.of(USE_WEBSOCKETS_PROP, !useSSE); + } + + @Override + protected ListenableFuture putStream(final MapEntryNode stream) { + // Now issue a put operation + final var tx = dataBroker.newWriteOnlyTransaction(); + tx.put(LogicalDatastoreType.OPERATIONAL, RESTCONF_STATE_STREAMS.node(stream.name()), stream); + return tx.commit(); + } + + @Override + protected ListenableFuture deleteStream(final NodeIdentifierWithPredicates streamName) { + // Now issue a delete operation while the name is still protected by being associated in the map. + final var tx = dataBroker.newWriteOnlyTransaction(); + tx.delete(LogicalDatastoreType.OPERATIONAL, RESTCONF_STATE_STREAMS.node(streamName)); + return tx.commit(); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DeviceNotificationSource.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/DeviceNotificationSource.java similarity index 90% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DeviceNotificationSource.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/DeviceNotificationSource.java index 356546117a..ca95521b68 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DeviceNotificationSource.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/DeviceNotificationSource.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.devnotif; import static java.util.Objects.requireNonNull; @@ -17,7 +17,9 @@ import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.mdsal.dom.api.DOMNotificationService; import org.opendaylight.mdsal.dom.api.DOMSchemaService; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink; +import org.opendaylight.restconf.server.mdsal.streams.notif.AbstractNotificationSource; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.Sink; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement; @@ -28,7 +30,7 @@ import org.slf4j.LoggerFactory; /** * A {@link RestconfStream} reporting YANG notifications coming from a mounted device. */ -public final class DeviceNotificationSource extends AbstractNotificationSource implements DOMMountPointListener { +final class DeviceNotificationSource extends AbstractNotificationSource implements DOMMountPointListener { private static final Logger LOG = LoggerFactory.getLogger(DeviceNotificationSource.class); private final AtomicReference onRemoved = new AtomicReference<>(); @@ -85,8 +87,8 @@ public final class DeviceNotificationSource extends AbstractNotificationSource i return endOfStream(sink); } - final var notifReg = optNotification.orElseThrow().registerNotificationListener( - new Listener(sink, () -> modelContext), paths); + final var notifReg = optNotification.orElseThrow() + .registerNotificationListener(new Listener(sink, () -> modelContext), paths); // Notifications are running now. // If we get removed we need to close those. But since we are running lockless and we need to set up diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/SubscribeDeviceNotificationRpc.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/SubscribeDeviceNotificationRpc.java new file mode 100644 index 0000000000..377216c6f6 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/SubscribeDeviceNotificationRpc.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.mdsal.streams.devnotif; + +import static java.util.Objects.requireNonNull; + +import java.net.URI; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.mdsal.dom.api.DOMMountPointService; +import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.opendaylight.restconf.common.errors.RestconfFuture; +import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; +import org.opendaylight.restconf.server.spi.OperationInput; +import org.opendaylight.restconf.server.spi.OperationOutput; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RpcImplementation; +import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotification; +import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.device.notification.rev221106.SubscribeDeviceNotificationOutput; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +/** + * RESTCONF implementation of {@link SubscribeDeviceNotification}. + */ +@Singleton +@Component +public final class SubscribeDeviceNotificationRpc extends RpcImplementation { + private static final NodeIdentifier DEVICE_NOTIFICATION_PATH_NODEID = + NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "path").intern()); + // FIXME: NETCONF-1102: this should be 'stream-name' + private static final NodeIdentifier DEVICE_NOTIFICATION_STREAM_PATH_NODEID = + NodeIdentifier.create(QName.create(SubscribeDeviceNotificationInput.QNAME, "stream-path").intern()); + + private final DOMMountPointService mountPointService; + private final RestconfStream.Registry streamRegistry; + + @Inject + @Activate + public SubscribeDeviceNotificationRpc(@Reference final RestconfStream.Registry streamRegistry, + @Reference final DOMMountPointService mountPointService) { + super(SubscribeDeviceNotification.QNAME); + this.mountPointService = requireNonNull(mountPointService); + this.streamRegistry = requireNonNull(streamRegistry); + } + + @Override + public RestconfFuture invoke(final URI restconfURI, final OperationInput input) { + final var body = input.input(); + final var pathLeaf = body.childByArg(DEVICE_NOTIFICATION_PATH_NODEID); + if (pathLeaf == null) { + return RestconfFuture.failed(new RestconfDocumentedException("No path specified", ErrorType.APPLICATION, + ErrorTag.MISSING_ELEMENT)); + } + final var pathLeafBody = pathLeaf.body(); + if (!(pathLeafBody instanceof YangInstanceIdentifier path)) { + return RestconfFuture.failed(new RestconfDocumentedException("Unexpected path " + pathLeafBody, + ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT)); + } + if (!(path.getLastPathArgument() instanceof NodeIdentifierWithPredicates listId)) { + return RestconfFuture.failed(new RestconfDocumentedException(path + " does not refer to a list item", + ErrorType.APPLICATION, ErrorTag.BAD_ELEMENT)); + } + if (listId.size() != 1) { + return RestconfFuture.failed(new RestconfDocumentedException(path + " uses multiple keys", + ErrorType.APPLICATION, ErrorTag.INVALID_VALUE)); + } + + return streamRegistry.createStream(restconfURI, new DeviceNotificationSource(mountPointService, path), + "All YANG notifications occuring on mount point /" + + IdentifierCodec.serialize(path, input.currentContext().modelContext())) + .transform(stream -> input.newOperationOutput(Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(SubscribeDeviceNotificationOutput.QNAME)) + .withChild(ImmutableNodes.leafNode(DEVICE_NOTIFICATION_STREAM_PATH_NODEID, stream.name())) + .build())); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/package-info.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/package-info.java new file mode 100644 index 0000000000..9ad9582afe --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/devnotif/package-info.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +/** + * Support for streams of YANG 1.0 notifications coming from a mounted device. + */ +package org.opendaylight.restconf.server.mdsal.streams.devnotif; \ No newline at end of file diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/CreateDataChangeEventSubscriptionRpc.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/CreateDataChangeEventSubscriptionRpc.java new file mode 100644 index 0000000000..129d143108 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/CreateDataChangeEventSubscriptionRpc.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.mdsal.streams.dtcl; + +import static java.util.Objects.requireNonNull; + +import java.net.URI; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.dom.api.DOMDataBroker; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; +import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.opendaylight.restconf.common.errors.RestconfFuture; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; +import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; +import org.opendaylight.restconf.server.spi.OperationInput; +import org.opendaylight.restconf.server.spi.OperationOutput; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RpcImplementation; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput; +import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev231103.CreateDataChangeEventSubscriptionInput1; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +/** + * RESTCONF implementation of {@link CreateDataChangeEventSubscription}. + */ +@Singleton +@Component +public final class CreateDataChangeEventSubscriptionRpc extends RpcImplementation { + private static final @NonNull NodeIdentifier DATASTORE_NODEID = NodeIdentifier.create( + QName.create(CreateDataChangeEventSubscriptionInput1.QNAME, "datastore").intern()); + private static final @NonNull NodeIdentifier STREAM_NAME_NODEID = + NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern()); + private static final @NonNull NodeIdentifier PATH_NODEID = + NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionInput.QNAME, "path").intern()); + private static final @NonNull NodeIdentifier OUTPUT_NODEID = + NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME); + + private final DatabindProvider databindProvider; + private final DOMDataTreeChangeService changeService; + private final RestconfStream.Registry streamRegistry; + + @Inject + @Activate + public CreateDataChangeEventSubscriptionRpc(@Reference final RestconfStream.Registry streamRegistry, + @Reference final DatabindProvider databindProvider, @Reference final DOMDataBroker dataBroker) { + super(CreateDataChangeEventSubscription.QNAME); + this.databindProvider = requireNonNull(databindProvider); + changeService = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class); + if (changeService == null) { + throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService"); + } + this.streamRegistry = requireNonNull(streamRegistry); + } + + /** + * Create data-change-event stream with POST operation via RPC. + * + * @param input Input of RPC - example in JSON (data-change-event stream): + *
+     *              {@code
+     *                  {
+     *                      "input": {
+     *                          "path": "/toaster:toaster/toaster:toasterStatus",
+     *                          "sal-remote-augment:datastore": "OPERATIONAL",
+     *                      }
+     *                  }
+     *              }
+     *              
+ * @return Future output of RPC - example in JSON: + *
+     *     {@code
+     *         {
+     *             "output": {
+     *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
+     *             }
+     *         }
+     *     }
+     *     
+ */ + @Override + public RestconfFuture invoke(final URI restconfURI, final OperationInput input) { + final var body = input.input(); + final var datastoreName = leaf(body, DATASTORE_NODEID, String.class); + final var datastore = datastoreName != null ? LogicalDatastoreType.valueOf(datastoreName) + : LogicalDatastoreType.CONFIGURATION; + + final var path = leaf(body, PATH_NODEID, YangInstanceIdentifier.class); + if (path == null) { + return RestconfFuture.failed( + new RestconfDocumentedException("missing path", ErrorType.APPLICATION, ErrorTag.MISSING_ELEMENT)); + } + + return streamRegistry.createStream(restconfURI, + new DataTreeChangeSource(databindProvider, changeService, datastore, path), + "Events occuring in " + datastore + " datastore under /" + + IdentifierCodec.serialize(path, input.currentContext().modelContext())) + .transform(stream -> input.newOperationOutput(Builders.containerBuilder() + .withNodeIdentifier(OUTPUT_NODEID) + .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name())) + .build())); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeCandidateFormatter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateFormatter.java similarity index 91% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeCandidateFormatter.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateFormatter.java index ba85c10753..5502e79151 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeCandidateFormatter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateFormatter.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import java.io.IOException; import java.time.Instant; @@ -13,6 +13,8 @@ import java.util.List; import javax.xml.stream.XMLStreamException; import javax.xml.transform.dom.DOMResult; import javax.xml.xpath.XPathExpressionException; +import org.opendaylight.restconf.server.spi.EventFormatter; +import org.opendaylight.restconf.server.spi.TextParameters; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.DataChangedNotification; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent; import org.opendaylight.yangtools.yang.data.codec.xml.XMLStreamNormalizedNodeStreamWriter; @@ -38,7 +40,7 @@ abstract class DataTreeCandidateFormatter extends EventFormatter input) throws IOException { final var notificationElement = createNotificationElement(doc, Instant.now()); final var notificationEventElement = doc.createElementNS(DATA_CHANGED_NOTIFICATION_NS, diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeCandidateFormatterFactory.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateFormatterFactory.java similarity index 83% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeCandidateFormatterFactory.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateFormatterFactory.java index 451a2e4f83..0f9849cb65 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeCandidateFormatterFactory.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateFormatterFactory.java @@ -5,9 +5,10 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import java.util.List; +import org.opendaylight.restconf.server.spi.EventFormatterFactory; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; abstract class DataTreeCandidateFormatterFactory extends EventFormatterFactory> { diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractWebsocketSerializer.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateSerializer.java similarity index 95% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractWebsocketSerializer.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateSerializer.java index 3d0858de87..aefaac686e 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractWebsocketSerializer.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeCandidateSerializer.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import static com.google.common.base.Verify.verifyNotNull; import static java.util.Objects.requireNonNull; @@ -18,6 +18,7 @@ import java.util.Comparator; import java.util.Deque; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.restconf.server.spi.TextParameters; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent.Operation; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.data.change.event.Data; @@ -41,8 +42,8 @@ import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack.Inference import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class AbstractWebsocketSerializer { - private static final Logger LOG = LoggerFactory.getLogger(AbstractWebsocketSerializer.class); +abstract class DataTreeCandidateSerializer { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidateSerializer.class); static final @NonNull QName PATH_QNAME = QName.create(DataChangeEvent.QNAME, "path").intern(); static final @NonNull NodeIdentifier PATH_NID = NodeIdentifier.create(PATH_QNAME); static final @NonNull QName OPERATION_QNAME = QName.create(DataChangeEvent.QNAME, "operation").intern(); @@ -51,11 +52,11 @@ abstract class AbstractWebsocketSerializer { private final EffectiveModelContext context; - AbstractWebsocketSerializer(final EffectiveModelContext context) { + DataTreeCandidateSerializer(final EffectiveModelContext context) { this.context = requireNonNull(context); } - public final boolean serialize(final DataTreeCandidate candidate, final TextParameters params) throws T { + final boolean serialize(final DataTreeCandidate candidate, final TextParameters params) throws T { final var skipData = params.skipData(); final var changedLeafNodesOnly = params.changedLeafNodesOnly(); if (changedLeafNodesOnly || params.leafNodesOnly()) { diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeSource.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeChangeSource.java similarity index 80% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeSource.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeChangeSource.java index 3e3d37520a..377d02f6fc 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeSource.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/DataTreeChangeSource.java @@ -5,10 +5,11 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import static java.util.Objects.requireNonNull; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableMap; import java.time.Instant; @@ -16,13 +17,13 @@ import java.util.List; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener; -import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream.Sink; +import org.opendaylight.restconf.server.spi.RestconfStream.Source; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; @@ -30,6 +31,7 @@ import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; /** * A {@link RestconfStream} reporting changes on a particular data tree. */ +@VisibleForTesting public final class DataTreeChangeSource extends Source> { private static final ImmutableMap ENCODINGS = ImmutableMap.of( EncodingName.RFC8040_JSON, JSONDataTreeCandidateFormatter.FACTORY, @@ -40,18 +42,13 @@ public final class DataTreeChangeSource extends Source> private final @NonNull LogicalDatastoreType datastore; private final @NonNull YangInstanceIdentifier path; - DataTreeChangeSource(final DatabindProvider databindProvider, final DOMDataBroker dataBroker, + public DataTreeChangeSource(final DatabindProvider databindProvider, final DOMDataTreeChangeService changeService, final LogicalDatastoreType datastore, final YangInstanceIdentifier path) { super(ENCODINGS); this.databindProvider = requireNonNull(databindProvider); + this.changeService = requireNonNull(changeService); this.datastore = requireNonNull(datastore); this.path = requireNonNull(path); - - final var dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class); - if (dtcs == null) { - throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService"); - } - changeService = dtcs; } @Override diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONDataTreeCandidateFormatter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/JSONDataTreeCandidateFormatter.java similarity index 90% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONDataTreeCandidateFormatter.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/JSONDataTreeCandidateFormatter.java index b7ed34d15c..489979bc63 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONDataTreeCandidateFormatter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/JSONDataTreeCandidateFormatter.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import com.google.gson.stream.JsonWriter; import java.io.IOException; @@ -14,13 +14,14 @@ import java.time.Instant; import java.util.List; import javax.xml.xpath.XPathExpressionException; import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.restconf.server.spi.TextParameters; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.$YangModuleInfoImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.DataChangedNotification; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.data.changed.notification.DataChangeEvent; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; -public final class JSONDataTreeCandidateFormatter extends DataTreeCandidateFormatter { +final class JSONDataTreeCandidateFormatter extends DataTreeCandidateFormatter { private static final @NonNull String DATA_CHANGED_EVENT_NAME = DataChangeEvent.QNAME.getLocalName(); private static final @NonNull String DATA_CHANGED_NOTIFICATION_NAME = $YangModuleInfoImpl.getInstance().getName().getLocalName() + ":" + DataChangedNotification.QNAME.getLocalName(); @@ -50,7 +51,7 @@ public final class JSONDataTreeCandidateFormatter extends DataTreeCandidateForma } @Override - String createText(final TextParameters params, final EffectiveModelContext schemaContext, + protected String createText(final TextParameters params, final EffectiveModelContext schemaContext, final List input, final Instant now) throws IOException { try (var writer = new StringWriter()) { boolean nonEmpty = false; @@ -61,7 +62,7 @@ public final class JSONDataTreeCandidateFormatter extends DataTreeCandidateForma .name(DATA_CHANGED_NOTIFICATION_NAME).beginObject() .name(DATA_CHANGED_EVENT_NAME).beginArray(); - final var serializer = new JsonDataTreeCandidateSerializer(schemaContext, jsonWriter); + final var serializer = new JSONDataTreeCandidateSerializer(schemaContext, jsonWriter); for (var candidate : input) { nonEmpty |= serializer.serialize(candidate, params); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JsonDataTreeCandidateSerializer.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/JSONDataTreeCandidateSerializer.java similarity index 94% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JsonDataTreeCandidateSerializer.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/JSONDataTreeCandidateSerializer.java index 412836a3f8..c494ad8649 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JsonDataTreeCandidateSerializer.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/JSONDataTreeCandidateSerializer.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import static java.util.Objects.requireNonNull; import static org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter.createNestedWriter; @@ -26,13 +26,13 @@ import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack.Inference; -final class JsonDataTreeCandidateSerializer extends AbstractWebsocketSerializer { +final class JSONDataTreeCandidateSerializer extends DataTreeCandidateSerializer { private static final XMLNamespace SAL_REMOTE_NS = DataChangedNotification.QNAME.getNamespace(); private static final Absolute DATA_CHANGE_EVENT = Absolute.of(DataChangedNotification.QNAME, DataChangeEvent.QNAME); private final JsonWriter jsonWriter; - JsonDataTreeCandidateSerializer(final EffectiveModelContext context, final JsonWriter jsonWriter) { + JSONDataTreeCandidateSerializer(final EffectiveModelContext context, final JsonWriter jsonWriter) { super(context); this.jsonWriter = requireNonNull(jsonWriter); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLDataTreeCandidateFormatter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/XMLDataTreeCandidateFormatter.java similarity index 85% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLDataTreeCandidateFormatter.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/XMLDataTreeCandidateFormatter.java index 984ef812b5..57d03ac32a 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLDataTreeCandidateFormatter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/XMLDataTreeCandidateFormatter.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import java.io.IOException; import java.io.StringWriter; @@ -14,10 +14,11 @@ import java.util.List; import javax.xml.XMLConstants; import javax.xml.stream.XMLStreamException; import javax.xml.xpath.XPathExpressionException; +import org.opendaylight.restconf.server.spi.TextParameters; import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; -public final class XMLDataTreeCandidateFormatter extends DataTreeCandidateFormatter { +final class XMLDataTreeCandidateFormatter extends DataTreeCandidateFormatter { private static final XMLDataTreeCandidateFormatter EMPTY = new XMLDataTreeCandidateFormatter(TextParameters.EMPTY); static final DataTreeCandidateFormatterFactory FACTORY = new DataTreeCandidateFormatterFactory(EMPTY) { @@ -43,17 +44,17 @@ public final class XMLDataTreeCandidateFormatter extends DataTreeCandidateFormat } @Override - String createText(final TextParameters params, final EffectiveModelContext schemaContext, + protected String createText(final TextParameters params, final EffectiveModelContext schemaContext, final List input, final Instant now) throws Exception { final var writer = new StringWriter(); boolean nonEmpty = false; try { - final var xmlStreamWriter = NotificationFormatter.createStreamWriterWithNotification(writer, now); + final var xmlStreamWriter = createStreamWriterWithNotification(writer, now); xmlStreamWriter.writeStartElement(XMLConstants.DEFAULT_NS_PREFIX, DATA_CHANGED_NOTIFICATION_ELEMENT, DATA_CHANGED_NOTIFICATION_NS); xmlStreamWriter.writeDefaultNamespace(DATA_CHANGED_NOTIFICATION_NS); - final var serializer = new XmlDataTreeCandidateSerializer(schemaContext, xmlStreamWriter); + final var serializer = new XMLDataTreeCandidateSerializer(schemaContext, xmlStreamWriter); for (var candidate : input) { nonEmpty |= serializer.serialize(candidate, params); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XmlDataTreeCandidateSerializer.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/XMLDataTreeCandidateSerializer.java similarity index 94% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XmlDataTreeCandidateSerializer.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/XMLDataTreeCandidateSerializer.java index e100846cbf..7e14708f09 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XmlDataTreeCandidateSerializer.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/XMLDataTreeCandidateSerializer.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import static java.util.Objects.requireNonNull; @@ -26,12 +26,12 @@ import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack; import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack.Inference; -final class XmlDataTreeCandidateSerializer extends AbstractWebsocketSerializer { +final class XMLDataTreeCandidateSerializer extends DataTreeCandidateSerializer { private static final @NonNull NodeIdentifier DATA_CHANGE_EVENT_NID = NodeIdentifier.create(DataChangeEvent.QNAME); private final XMLStreamWriter xmlWriter; - XmlDataTreeCandidateSerializer(final EffectiveModelContext context, final XMLStreamWriter xmlWriter) { + XMLDataTreeCandidateSerializer(final EffectiveModelContext context, final XMLStreamWriter xmlWriter) { super(context); this.xmlWriter = requireNonNull(xmlWriter); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/package-info.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/package-info.java new file mode 100644 index 0000000000..acf93e34a9 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/package-info.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +/** + * Support for data tree change streams. + */ +package org.opendaylight.restconf.server.mdsal.streams.dtcl; \ No newline at end of file diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationSource.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/AbstractNotificationSource.java similarity index 68% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationSource.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/AbstractNotificationSource.java index b23e828215..8eec4f7074 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationSource.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/AbstractNotificationSource.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.notif; import static java.util.Objects.requireNonNull; @@ -14,21 +14,20 @@ import java.time.Instant; import org.opendaylight.mdsal.dom.api.DOMEvent; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.mdsal.dom.api.DOMNotificationListener; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream.Sink; +import org.opendaylight.restconf.server.spi.RestconfStream.Source; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextProvider; /** - * Abstract base class for functionality shared between {@link NotificationSource} and - * {@link DeviceNotificationSource}. + * Abstract base class for functionality shared between {@link DOMNotification}-based sources. */ -abstract class AbstractNotificationSource extends Source { - static final class Listener implements DOMNotificationListener { +public abstract class AbstractNotificationSource extends Source { + protected static final class Listener implements DOMNotificationListener { private final Sink sink; private final EffectiveModelContextProvider modelContext; - Listener(final Sink sink, final EffectiveModelContextProvider modelContext) { + public Listener(final Sink sink, final EffectiveModelContextProvider modelContext) { this.sink = requireNonNull(sink); this.modelContext = requireNonNull(modelContext); } @@ -44,7 +43,7 @@ abstract class AbstractNotificationSource extends Source { EncodingName.RFC8040_JSON, JSONNotificationFormatter.FACTORY, EncodingName.RFC8040_XML, XMLNotificationFormatter.FACTORY); - AbstractNotificationSource() { + protected AbstractNotificationSource() { super(ENCODINGS); } } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/CreateNotificationStreamRpc.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/CreateNotificationStreamRpc.java new file mode 100644 index 0000000000..7c5d7e972d --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/CreateNotificationStreamRpc.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.mdsal.streams.notif; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableSet; +import java.net.URI; +import javax.inject.Inject; +import javax.inject.Singleton; +import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.opendaylight.restconf.common.errors.RestconfFuture; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; +import org.opendaylight.restconf.server.spi.OperationInput; +import org.opendaylight.restconf.server.spi.OperationOutput; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RpcImplementation; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStreamInput; +import org.opendaylight.yangtools.yang.common.ErrorTag; +import org.opendaylight.yangtools.yang.common.ErrorType; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.stmt.NotificationEffectiveStatement; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; + +/** + * RESTCONF implementation of {@link CreateNotificationStream}. + */ +@Singleton +@Component +public final class CreateNotificationStreamRpc extends RpcImplementation { + private static final NodeIdentifier SAL_REMOTE_OUTPUT_NODEID = + NodeIdentifier.create(CreateDataChangeEventSubscriptionOutput.QNAME); + private static final NodeIdentifier NOTIFICATIONS = + NodeIdentifier.create(QName.create(CreateNotificationStreamInput.QNAME, "notifications").intern()); + private static final NodeIdentifier STREAM_NAME_NODEID = + NodeIdentifier.create(QName.create(CreateDataChangeEventSubscriptionOutput.QNAME, "stream-name").intern()); + + private final DatabindProvider databindProvider; + private final DOMNotificationService notificationService; + private final RestconfStream.Registry streamRegistry; + + @Inject + @Activate + public CreateNotificationStreamRpc(@Reference final RestconfStream.Registry streamRegistry, + @Reference final DatabindProvider databindProvider, + @Reference final DOMNotificationService notificationService) { + super(CreateNotificationStream.QNAME); + this.databindProvider = requireNonNull(databindProvider); + this.notificationService = requireNonNull(notificationService); + this.streamRegistry = requireNonNull(streamRegistry); + } + + @Override + public RestconfFuture invoke(final URI restconfURI, final OperationInput input) { + final var body = input.input(); + final var qnames = ((LeafSetNode) body.getChildByArg(NOTIFICATIONS)).body().stream() + .map(LeafSetEntryNode::body) + .map(QName::create) + .sorted() + .collect(ImmutableSet.toImmutableSet()); + + final var modelContext = input.currentContext().modelContext(); + final var description = new StringBuilder("YANG notifications matching any of {"); + var haveFirst = false; + for (var qname : qnames) { + final var module = modelContext.findModuleStatement(qname.getModule()) + .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown module", + ErrorType.APPLICATION, ErrorTag.INVALID_VALUE)); + final var stmt = module.findSchemaTreeNode(qname) + .orElseThrow(() -> new RestconfDocumentedException(qname + " refers to an unknown notification", + ErrorType.APPLICATION, ErrorTag.INVALID_VALUE)); + if (!(stmt instanceof NotificationEffectiveStatement)) { + throw new RestconfDocumentedException(qname + " refers to a non-notification", + ErrorType.APPLICATION, ErrorTag.INVALID_VALUE); + } + + if (haveFirst) { + description.append(",\n"); + } else { + haveFirst = true; + } + description.append("\n ") + .append(module.argument().getLocalName()).append(':').append(qname.getLocalName()); + } + description.append("\n}"); + + return streamRegistry.createStream(restconfURI, + new NotificationSource(databindProvider, notificationService, qnames), description.toString()) + .transform(stream -> input.newOperationOutput(Builders.containerBuilder() + .withNodeIdentifier(SAL_REMOTE_OUTPUT_NODEID) + .withChild(ImmutableNodes.leafNode(STREAM_NAME_NODEID, stream.name())) + .build())); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONNotificationFormatter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/JSONNotificationFormatter.java similarity index 92% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONNotificationFormatter.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/JSONNotificationFormatter.java index 9ad4aa8325..e62fda8b8a 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONNotificationFormatter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/JSONNotificationFormatter.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.notif; import com.google.common.annotations.VisibleForTesting; import com.google.gson.stream.JsonWriter; @@ -15,6 +15,7 @@ import java.time.Instant; import javax.xml.xpath.XPathExpressionException; import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.restconf.server.spi.TextParameters; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.rev170126.$YangModuleInfoImpl; import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier; import org.opendaylight.yangtools.yang.data.codec.gson.JSONNormalizedNodeStreamWriter; @@ -49,7 +50,7 @@ final class JSONNotificationFormatter extends NotificationFormatter { } @Override - String createText(final TextParameters params, final EffectiveModelContext schemaContext, + protected String createText(final TextParameters params, final EffectiveModelContext schemaContext, final DOMNotification input, final Instant now) throws IOException { try (var writer = new StringWriter()) { try (var jsonWriter = new JsonWriter(writer)) { diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationFormatter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationFormatter.java similarity index 88% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationFormatter.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationFormatter.java index c009fd7f39..7e4b5ec09c 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationFormatter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationFormatter.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.notif; import java.io.IOException; import java.time.Instant; @@ -14,6 +14,8 @@ import javax.xml.transform.dom.DOMResult; import javax.xml.xpath.XPathExpressionException; import org.opendaylight.mdsal.dom.api.DOMEvent; import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.restconf.server.spi.EventFormatter; +import org.opendaylight.restconf.server.spi.TextParameters; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateNotificationStream; import org.opendaylight.yangtools.yang.data.codec.xml.XMLStreamNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; @@ -33,8 +35,8 @@ abstract class NotificationFormatter extends EventFormatter { } @Override - final void fillDocument(final Document doc, final EffectiveModelContext schemaContext, final DOMNotification input) - throws IOException { + protected final void fillDocument(final Document doc, final EffectiveModelContext schemaContext, + final DOMNotification input) throws IOException { final var notificationElement = createNotificationElement(doc, input instanceof DOMEvent domEvent ? domEvent.getEventInstant() : Instant.now()); // FIXME: what is this really?! diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationFormatterFactory.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationFormatterFactory.java similarity index 75% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationFormatterFactory.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationFormatterFactory.java index bd58037b72..2266f88c85 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationFormatterFactory.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationFormatterFactory.java @@ -5,9 +5,11 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.notif; import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.restconf.server.spi.EventFormatter; +import org.opendaylight.restconf.server.spi.EventFormatterFactory; abstract class NotificationFormatterFactory extends EventFormatterFactory { NotificationFormatterFactory(final EventFormatter emptyFormatter) { diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationSource.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationSource.java similarity index 79% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationSource.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationSource.java index c12b33779f..15ed3a05b9 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/NotificationSource.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/NotificationSource.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.notif; import static java.util.Objects.requireNonNull; @@ -14,8 +14,8 @@ import com.google.common.collect.ImmutableSet; import org.opendaylight.mdsal.dom.api.DOMNotification; import org.opendaylight.mdsal.dom.api.DOMNotificationService; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Sink; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.Source; +import org.opendaylight.restconf.server.spi.RestconfStream.Sink; +import org.opendaylight.restconf.server.spi.RestconfStream.Source; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; @@ -23,7 +23,7 @@ import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absol /** * A {@link Source} reporting YANG notifications. */ -public final class NotificationSource extends AbstractNotificationSource { +final class NotificationSource extends AbstractNotificationSource { private final DatabindProvider databindProvider; private final DOMNotificationService notificationService; private final ImmutableSet qnames; @@ -35,15 +35,6 @@ public final class NotificationSource extends AbstractNotificationSource { this.qnames = requireNonNull(qnames); } - /** - * Return notification QNames. - * - * @return The YANG notification {@link QName}s this listener is bound to - */ - public ImmutableSet qnames() { - return qnames; - } - @Override protected Registration start(final Sink sink) { return notificationService.registerNotificationListener( diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLNotificationFormatter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/XMLNotificationFormatter.java similarity index 92% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLNotificationFormatter.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/XMLNotificationFormatter.java index 5970b5b140..090bdf4d91 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLNotificationFormatter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/XMLNotificationFormatter.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.notif; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -14,6 +14,7 @@ import java.time.Instant; import javax.xml.stream.XMLStreamException; import javax.xml.xpath.XPathExpressionException; import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.restconf.server.spi.TextParameters; import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; import org.opendaylight.yangtools.yang.data.codec.xml.XMLStreamNormalizedNodeStreamWriter; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; @@ -44,7 +45,7 @@ final class XMLNotificationFormatter extends NotificationFormatter { } @Override - String createText(final TextParameters params, final EffectiveModelContext schemaContext, + protected String createText(final TextParameters params, final EffectiveModelContext schemaContext, final DOMNotification input, final Instant now) throws IOException { final var writer = new StringWriter(); diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/package-info.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/package-info.java new file mode 100644 index 0000000000..678beff917 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/mdsal/streams/notif/package-info.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +/** + * Support for YANG 1.0 notification streams. + */ +package org.opendaylight.restconf.server.mdsal.streams.notif; \ No newline at end of file diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistry.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistry.java new file mode 100644 index 0000000000..6b628975b6 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistry.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.spi; + +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.restconf.common.errors.RestconfDocumentedException; +import org.opendaylight.restconf.common.errors.RestconfFuture; +import org.opendaylight.restconf.common.errors.SettableRestconfFuture; +import org.opendaylight.restconf.nb.rfc8040.URLConstants; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream.Source; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reference base class for {@link RestconfStream.Registry} implementations. + */ +public abstract class AbstractRestconfStreamRegistry implements RestconfStream.Registry { + private static final Logger LOG = LoggerFactory.getLogger(AbstractRestconfStreamRegistry.class); + + @VisibleForTesting + public static final QName NAME_QNAME = QName.create(Stream.QNAME, "name").intern(); + @VisibleForTesting + public static final QName DESCRIPTION_QNAME = QName.create(Stream.QNAME, "description").intern(); + @VisibleForTesting + public static final QName ENCODING_QNAME = QName.create(Stream.QNAME, "encoding").intern(); + @VisibleForTesting + public static final QName LOCATION_QNAME = QName.create(Stream.QNAME, "location").intern(); + + private final ConcurrentMap> streams = new ConcurrentHashMap<>(); + private final boolean useWebsockets; + + protected AbstractRestconfStreamRegistry(final boolean useWebsockets) { + this.useWebsockets = useWebsockets; + } + + @Override + public final @Nullable RestconfStream lookupStream(final String name) { + return streams.get(requireNonNull(name)); + } + + @Override + public final RestconfFuture> createStream(final URI restconfURI, final Source source, + final String description) { + final var baseStreamLocation = baseStreamLocation(restconfURI); + final var stream = allocateStream(source); + final var name = stream.name(); + if (description.isBlank()) { + throw new IllegalArgumentException("Description must be descriptive"); + } + + final var ret = new SettableRestconfFuture>(); + Futures.addCallback(putStream(streamEntry(name, description, baseStreamLocation, stream.encodings())), + new FutureCallback() { + @Override + public void onSuccess(final Object result) { + LOG.debug("Stream {} added", name); + ret.set(stream); + } + + @Override + public void onFailure(final Throwable cause) { + LOG.debug("Failed to add stream {}", name, cause); + streams.remove(name, stream); + ret.setFailure(new RestconfDocumentedException("Failed to allocate stream " + name, cause)); + } + }, MoreExecutors.directExecutor()); + return ret; + } + + private RestconfStream allocateStream(final Source source) { + String name; + RestconfStream stream; + do { + // Use Type 4 (random) UUID. While we could just use it as a plain string, be nice to observers and anchor + // it into UUID URN namespace as defined by RFC4122 + name = "urn:uuid:" + UUID.randomUUID().toString(); + stream = new RestconfStream<>(this, source, name); + } while (streams.putIfAbsent(name, stream) != null); + + return stream; + } + + protected abstract @NonNull ListenableFuture putStream(@NonNull MapEntryNode stream); + + /** + * Remove a particular stream and remove its entry from operational datastore. + * + * @param stream Stream to remove + */ + final void removeStream(final RestconfStream stream) { + // Defensive check to see if we are still tracking the stream + final var name = stream.name(); + if (streams.get(name) != stream) { + LOG.warn("Stream {} does not match expected instance {}, skipping datastore update", name, stream); + return; + } + + Futures.addCallback(deleteStream(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name)), + new FutureCallback() { + @Override + public void onSuccess(final Object result) { + LOG.debug("Stream {} removed", name); + streams.remove(name, stream); + } + + @Override + public void onFailure(final Throwable cause) { + LOG.warn("Failed to remove stream {}, operational datastore may be inconsistent", name, cause); + streams.remove(name, stream); + } + }, MoreExecutors.directExecutor()); + } + + protected abstract @NonNull ListenableFuture deleteStream(@NonNull NodeIdentifierWithPredicates streamName); + + /** + * Return the base location URL of the streams service based on request URI. + * + * @param restconfURI request base URI + * @throws IllegalArgumentException if the result would have been malformed + */ + protected final @NonNull String baseStreamLocation(final URI restconfURI) { + var scheme = restconfURI.getScheme(); + if (useWebsockets) { + scheme = switch (scheme) { + // Secured HTTP goes to Secured WebSockets + case "https" -> "wss"; + // Unsecured HTTP and others go to unsecured WebSockets + default -> "ws"; + }; + } + + try { + return new URI(scheme, restconfURI.getRawUserInfo(), restconfURI.getHost(), restconfURI.getPort(), + restconfURI.getPath() + '/' + URLConstants.STREAMS_SUBPATH, null, null) + .toString(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Cannot derive streams location", e); + } + } + + @VisibleForTesting + public static final @NonNull MapEntryNode streamEntry(final String name, final String description, + final String baseStreamLocation, final Set encodings) { + final var accessBuilder = Builders.mapBuilder().withNodeIdentifier(new NodeIdentifier(Access.QNAME)); + for (var encoding : encodings) { + final var encodingName = encoding.name(); + accessBuilder.withChild(Builders.mapEntryBuilder() + .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, ENCODING_QNAME, encodingName)) + .withChild(ImmutableNodes.leafNode(ENCODING_QNAME, encodingName)) + .withChild(ImmutableNodes.leafNode(LOCATION_QNAME, + baseStreamLocation + '/' + encodingName + '/' + name)) + .build()); + } + + return Builders.mapEntryBuilder() + .withNodeIdentifier(NodeIdentifierWithPredicates.of(Stream.QNAME, NAME_QNAME, name)) + .withChild(ImmutableNodes.leafNode(NAME_QNAME, name)) + .withChild(ImmutableNodes.leafNode(DESCRIPTION_QNAME, description)) + .withChild(accessBuilder.build()) + .build(); + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/EventFormatter.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/EventFormatter.java similarity index 82% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/EventFormatter.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/EventFormatter.java index f1645736ac..872eec8736 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/EventFormatter.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/EventFormatter.java @@ -5,10 +5,11 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.spi; import static java.util.Objects.requireNonNull; +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.Writer; import java.time.Instant; @@ -63,17 +64,17 @@ public abstract class EventFormatter implements Immutable { DBF = f; } - static final XMLOutputFactory XML_OUTPUT_FACTORY = XMLOutputFactory.newFactory(); + protected static final XMLOutputFactory XML_OUTPUT_FACTORY = XMLOutputFactory.newFactory(); private final TextParameters textParams; private final XPathExpression filter; - EventFormatter(final TextParameters textParams) { + protected EventFormatter(final TextParameters textParams) { this.textParams = requireNonNull(textParams); filter = null; } - EventFormatter(final TextParameters params, final String xpathFilter) throws XPathExpressionException { + protected EventFormatter(final TextParameters params, final String xpathFilter) throws XPathExpressionException { textParams = requireNonNull(params); final XPath xpath; @@ -84,8 +85,9 @@ public abstract class EventFormatter implements Immutable { filter = xpath.compile(xpathFilter); } - final @Nullable String eventData(final EffectiveModelContext schemaContext, final T input, final Instant now) - throws Exception { + @VisibleForTesting + public final @Nullable String eventData(final EffectiveModelContext schemaContext, final T input, + final Instant now) throws Exception { return filterMatches(schemaContext, input, now) ? createText(textParams, schemaContext, input, now) : null; } @@ -97,7 +99,7 @@ public abstract class EventFormatter implements Immutable { * @param input data to export * @throws IOException if any IOException occurs during export to the document */ - abstract void fillDocument(Document doc, EffectiveModelContext schemaContext, T input) throws IOException; + protected abstract void fillDocument(Document doc, EffectiveModelContext schemaContext, T input) throws IOException; /** * Format the input data into string representation of the data provided. @@ -109,8 +111,8 @@ public abstract class EventFormatter implements Immutable { * @return String representation of the formatted data * @throws Exception if the underlying formatters fail to export the data to the requested format */ - abstract String createText(TextParameters params, EffectiveModelContext schemaContext, T input, Instant now) - throws Exception; + protected abstract String createText(TextParameters params, EffectiveModelContext schemaContext, T input, + Instant now) throws Exception; private boolean filterMatches(final EffectiveModelContext schemaContext, final T input, final Instant now) throws IOException { @@ -142,11 +144,11 @@ public abstract class EventFormatter implements Immutable { * @param now time stamp * @return Data specified by RFC3339. */ - static final String toRFC3339(final Instant now) { + protected static final String toRFC3339(final Instant now) { return DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(OffsetDateTime.ofInstant(now, ZoneId.systemDefault())); } - static final @NonNull Element createNotificationElement(final Document doc, final Instant now) { + protected static final @NonNull Element createNotificationElement(final Document doc, final Instant now) { final var notificationElement = doc.createElementNS(NamespaceURN.NOTIFICATION, "notification"); final var eventTimeElement = doc.createElement("eventTime"); eventTimeElement.setTextContent(toRFC3339(now)); @@ -154,8 +156,8 @@ public abstract class EventFormatter implements Immutable { return notificationElement; } - static final @NonNull XMLStreamWriter createStreamWriterWithNotification(final Writer writer, final Instant now) - throws XMLStreamException { + protected static final @NonNull XMLStreamWriter createStreamWriterWithNotification(final Writer writer, + final Instant now) throws XMLStreamException { final var xmlStreamWriter = XML_OUTPUT_FACTORY.createXMLStreamWriter(writer); xmlStreamWriter.setDefaultNamespace(NamespaceURN.NOTIFICATION); @@ -168,7 +170,8 @@ public abstract class EventFormatter implements Immutable { return xmlStreamWriter; } - static final void writeBody(final NormalizedNodeStreamWriter writer, final NormalizedNode body) throws IOException { + protected static final void writeBody(final NormalizedNodeStreamWriter writer, final NormalizedNode body) + throws IOException { try (var nodeWriter = NormalizedNodeWriter.forStreamWriter(writer)) { nodeWriter.write(body); } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/EventFormatterFactory.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/EventFormatterFactory.java similarity index 95% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/EventFormatterFactory.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/EventFormatterFactory.java index 46edecaa9e..c51fc46197 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/EventFormatterFactory.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/EventFormatterFactory.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.spi; import static java.util.Objects.requireNonNull; diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/OperationInput.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/OperationInput.java new file mode 100644 index 0000000000..75e7e732d1 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/OperationInput.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.spi; + +import static java.util.Objects.requireNonNull; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack.Inference; + +/** + * Input to an operation invocation. + */ +@NonNullByDefault +public record OperationInput(DatabindContext currentContext, Inference operation, ContainerNode input) + implements DatabindProvider { + public OperationInput { + requireNonNull(currentContext); + requireNonNull(operation); + requireNonNull(input); + } + + /** + * Create an {@link OperationOutput} with equal {@link #currentContext()} and {@link #operation()}. + * + * @param output Output payload + * @return An {@link OperationOutput} + */ + public OperationOutput newOperationOutput(final @Nullable ContainerNode output) { + return new OperationOutput(currentContext, operation, output); + } +} \ No newline at end of file diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/OperationOutput.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/OperationOutput.java new file mode 100644 index 0000000000..4352f93162 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/OperationOutput.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.spi; + +import static java.util.Objects.requireNonNull; + +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack.Inference; + +/** + * Output of {@link RpcImplementation#invoke(java.net.URI, OperationInput)}. + */ +public record OperationOutput( + @NonNull DatabindContext currentContext, + @NonNull Inference operation, + @Nullable ContainerNode output) implements DatabindProvider { + public OperationOutput { + requireNonNull(currentContext); + requireNonNull(operation); + if (output != null && output.isEmpty()) { + output = null; + } + } +} \ No newline at end of file diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStream.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/RestconfStream.java similarity index 83% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStream.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/RestconfStream.java index c13317e15d..fc01ab7880 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStream.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/RestconfStream.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.spi; import static java.util.Objects.requireNonNull; @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMap; import java.io.UnsupportedEncodingException; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; +import java.net.URI; import java.time.Instant; import java.util.Set; import java.util.regex.Pattern; @@ -22,6 +23,7 @@ import javax.xml.xpath.XPathExpressionException; import org.checkerframework.checker.lock.qual.GuardedBy; import org.eclipse.jdt.annotation.NonNull; import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.restconf.common.errors.RestconfFuture; import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access; import org.opendaylight.yangtools.concepts.Registration; @@ -107,6 +109,51 @@ public final class RestconfStream { } } + /** + * Interface for session handler that is responsible for sending of data over established session. + */ + public interface Sender { + /** + * Interface for sending String message through one of implementation. + * + * @param data Message data to be send. + */ + void sendDataMessage(String data); + + /** + * Called when the stream has reached its end. The handler should close all underlying resources. + */ + void endOfStream(); + } + + /** + * An entity managing allocation and lookup of {@link RestconfStream}s. + */ + public interface Registry { + /** + * Get a {@link RestconfStream} by its name. + * + * @param name Stream name. + * @return A {@link RestconfStream}, or {@code null} if the stream with specified name does not exist. + * @throws NullPointerException if {@code name} is {@code null} + */ + @Nullable RestconfStream lookupStream(String name); + + /** + * Create a {@link RestconfStream} with a unique name. This method will atomically generate a stream name, + * create the corresponding instance and register it. + * + * @param Stream type + * @param restconfURI resolved {@code {+restconf}} resource name + * @param source Stream instance + * @param description Stream descriptiion + * @return A future {@link RestconfStream} instance + * @throws NullPointerException if any argument is {@code null} + */ + @NonNull RestconfFuture> createStream(URI restconfURI, Source source, + String description); + } + private static final Logger LOG = LoggerFactory.getLogger(RestconfStream.class); private static final VarHandle SUBSCRIBERS; @@ -139,7 +186,7 @@ public final class RestconfStream { } } }; - private final @NonNull ListenersBroker listenersBroker; + private final @NonNull AbstractRestconfStreamRegistry registry; private final @NonNull Source source; private final @NonNull String name; @@ -150,8 +197,8 @@ public final class RestconfStream { @GuardedBy("this") private Registration registration; - RestconfStream(final ListenersBroker listenersBroker, final Source source, final String name) { - this.listenersBroker = requireNonNull(listenersBroker); + RestconfStream(final AbstractRestconfStreamRegistry registry, final Source source, final String name) { + this.registry = requireNonNull(registry); this.source = requireNonNull(source); this.name = requireNonNull(name); } @@ -177,7 +224,7 @@ public final class RestconfStream { } /** - * Registers {@link StreamSessionHandler} subscriber. + * Registers {@link Sender} subscriber. * * @param handler SSE or WS session handler. * @param encoding Requested event stream encoding @@ -186,9 +233,8 @@ public final class RestconfStream { * @throws NullPointerException if any argument is {@code null} * @throws UnsupportedEncodingException if {@code encoding} is not supported * @throws XPathExpressionException if requested filter is not valid - * @throws InvalidArgumentException if the parameters are not supported */ - @Nullable Registration addSubscriber(final StreamSessionHandler handler, final EncodingName encoding, + public @Nullable Registration addSubscriber(final Sender handler, final EncodingName encoding, final ReceiveEventsParams params) throws UnsupportedEncodingException, XPathExpressionException { final var factory = source.encodings.get(requireNonNull(encoding)); if (factory == null) { @@ -292,7 +338,7 @@ public final class RestconfStream { registration = null; } } - listenersBroker.removeStream(this); + registry.removeStream(this); } @Override diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/RpcImplementation.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/RpcImplementation.java new file mode 100644 index 0000000000..b3d5688bce --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/RpcImplementation.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.spi; + +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; +import java.net.URI; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.restconf.common.errors.RestconfFuture; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; + +/** + * An implementation of a YANG-defined RPC. + */ +@NonNullByDefault +public abstract class RpcImplementation { + private final QName qname; + + protected RpcImplementation(final QName qname) { + this.qname = requireNonNull(qname); + } + + /** + * Return the RPC name, as defined by {@code rpc} statement's argument. + * + * @return The RPC name + */ + public final QName qname() { + return qname; + } + + /** + * Asynchronously invoke this implementation. Implementations are expected to report all results via the returned + * future, e.g. not throw exceptions. + * + * @param restconfURI Request URI trimmed to the root RESTCONF endpoint, resolved {@code {+restconf}} resource name + * @param input RPC input + * @return Future RPC output + */ + public abstract RestconfFuture invoke(URI restconfURI, OperationInput input); + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("qname", qname).toString(); + } + + protected static final @Nullable T leaf(final ContainerNode parent, final NodeIdentifier arg, + final Class type) { + final var child = parent.childByArg(arg); + if (child instanceof LeafNode leafNode) { + final var body = leafNode.body(); + try { + return type.cast(body); + } catch (ClassCastException e) { + throw new IllegalArgumentException("Bad child " + child.prettyTree(), e); + } + } + return null; + } +} diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscriber.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/Subscriber.java similarity index 73% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscriber.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/Subscriber.java index eb380350cb..18d9a2e4fb 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscriber.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/Subscriber.java @@ -5,11 +5,12 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.spi; import static java.util.Objects.requireNonNull; import org.eclipse.jdt.annotation.NonNull; +import org.opendaylight.restconf.server.spi.RestconfStream.Sender; import org.opendaylight.yangtools.concepts.AbstractRegistration; /** @@ -17,12 +18,12 @@ import org.opendaylight.yangtools.concepts.AbstractRegistration; */ final class Subscriber extends AbstractRegistration { private final @NonNull RestconfStream stream; - private final @NonNull StreamSessionHandler handler; + private final @NonNull Sender sender; private final @NonNull EventFormatter formatter; - Subscriber(final RestconfStream stream, final StreamSessionHandler handler, final EventFormatter formatter) { + Subscriber(final RestconfStream stream, final Sender sender, final EventFormatter formatter) { this.stream = requireNonNull(stream); - this.handler = requireNonNull(handler); + this.sender = requireNonNull(sender); this.formatter = requireNonNull(formatter); } @@ -30,8 +31,8 @@ final class Subscriber extends AbstractRegistration { return formatter; } - @NonNull StreamSessionHandler handler() { - return handler; + @NonNull Sender sender() { + return sender; } @Override diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscribers.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/Subscribers.java similarity index 95% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscribers.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/Subscribers.java index db13ca7e4a..2be633eff6 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/Subscribers.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/Subscribers.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.spi; import static java.util.Objects.requireNonNull; @@ -76,14 +76,14 @@ abstract sealed class Subscribers { @Override void endOfStream() { - subscriber.handler().endOfStream(); + subscriber.sender().endOfStream(); } @Override void publish(final EffectiveModelContext modelContext, final T input, final Instant now) { final var formatted = format(subscriber.formatter(), modelContext, input, now); if (formatted != null) { - subscriber.handler().sendDataMessage(formatted); + subscriber.sender().sendDataMessage(formatted); } } } @@ -114,7 +114,7 @@ abstract sealed class Subscribers { @Override void endOfStream() { - subscribers.forEach((formatter, subscriber) -> subscriber.handler().endOfStream()); + subscribers.forEach((formatter, subscriber) -> subscriber.sender().endOfStream()); } @Override @@ -123,7 +123,7 @@ abstract sealed class Subscribers { final var formatted = format(entry.getKey(), modelContext, input, now); if (formatted != null) { for (var subscriber : entry.getValue()) { - subscriber.handler().sendDataMessage(formatted); + subscriber.sender().sendDataMessage(formatted); } } } @@ -144,7 +144,7 @@ abstract sealed class Subscribers { * @return An empty {@link Subscribers} file */ @SuppressWarnings("unchecked") - static @NonNull Subscribers empty() { + static org.opendaylight.restconf.server.spi.Subscribers empty() { return (Subscribers) Empty.INSTANCE; } diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/TextParameters.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/TextParameters.java similarity index 69% rename from restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/TextParameters.java rename to restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/TextParameters.java index 922911fccf..df55cfca99 100644 --- a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/TextParameters.java +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/TextParameters.java @@ -5,9 +5,9 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.spi; -import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.NonNull; /** * Text formatting parameters. @@ -18,7 +18,10 @@ import org.eclipse.jdt.annotation.NonNullByDefault; * changed nodes * @param childNodesOnly {@code true} if this query should only notify about child node changes */ -@NonNullByDefault -record TextParameters(boolean leafNodesOnly, boolean skipData, boolean changedLeafNodesOnly, boolean childNodesOnly) { - static final TextParameters EMPTY = new TextParameters(false, false, false, false); +public record TextParameters( + boolean leafNodesOnly, + boolean skipData, + boolean changedLeafNodesOnly, + boolean childNodesOnly) { + public static final @NonNull TextParameters EMPTY = new TextParameters(false, false, false, false); } \ No newline at end of file diff --git a/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/package-info.java b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/package-info.java new file mode 100644 index 0000000000..dfc97df219 --- /dev/null +++ b/restconf/restconf-nb/src/main/java/org/opendaylight/restconf/server/spi/package-info.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2023 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +/** + * Interface towards RestconfServer implementations. + */ +package org.opendaylight.restconf.server.spi; \ No newline at end of file diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/CapabilitiesWriterTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/CapabilitiesWriterTest.java deleted file mode 100644 index a17795dd7f..0000000000 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/CapabilitiesWriterTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.restconf.nb.rfc8040; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; - -import org.junit.Test; -import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; - -public class CapabilitiesWriterTest { - @Test - public void restconfStateCapabilitiesTest() { - final var capability = CapabilitiesWriter.mapCapabilities(); - assertEquals(CapabilitiesWriter.CAPABILITY, capability.name()); - - assertThat(capability.body().stream().map(LeafSetEntryNode::body).toList(), - containsInAnyOrder( - equalTo("urn:ietf:params:restconf:capability:depth:1.0"), - equalTo("urn:ietf:params:restconf:capability:fields:1.0"), - equalTo("urn:ietf:params:restconf:capability:filter:1.0"), - equalTo("urn:ietf:params:restconf:capability:replay:1.0"), - equalTo("urn:ietf:params:restconf:capability:with-defaults:1.0"), - equalTo("urn:opendaylight:params:restconf:capability:pretty-print:1.0"), - equalTo("urn:opendaylight:params:restconf:capability:leaf-nodes-only:1.0"), - equalTo("urn:opendaylight:params:restconf:capability:changed-leaf-nodes-only:1.0"), - equalTo("urn:opendaylight:params:restconf:capability:skip-notification-data:1.0"), - equalTo("urn:opendaylight:params:restconf:capability:child-nodes-only:1.0"))); - } -} diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/MdsalRestconfServerTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/MdsalRestconfServerTest.java index 1b3f633bf6..2552b2c9e0 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/MdsalRestconfServerTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/MdsalRestconfServerTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.doReturn; import java.util.Optional; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -26,6 +27,8 @@ import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.netconf.dom.api.NetconfDataTreeService; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.nb.rfc8040.AbstractJukeboxTest; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; import org.opendaylight.restconf.nb.rfc8040.rests.transactions.MdsalRestconfStrategy; import org.opendaylight.restconf.nb.rfc8040.rests.transactions.NetconfRestconfStrategy; import org.opendaylight.yangtools.yang.common.ErrorTag; @@ -33,6 +36,8 @@ import org.opendaylight.yangtools.yang.common.ErrorType; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class MdsalRestconfServerTest extends AbstractJukeboxTest { + private static DatabindProvider DATABIND_PROVIDER; + @Mock private DOMMountPointService mountPointService; @Mock @@ -46,9 +51,14 @@ public class MdsalRestconfServerTest extends AbstractJukeboxTest { private MdsalRestconfServer server; + @BeforeClass + public static void setupDatabind() { + DATABIND_PROVIDER = () -> DatabindContext.ofModel(JUKEBOX_SCHEMA); + } + @Before public void before() { - server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService); + server = new MdsalRestconfServer(DATABIND_PROVIDER, dataBroker, rpcService, mountPointService); doReturn(Optional.of(rpcService)).when(mountPoint).getService(DOMRpcService.class); } diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/Netconf799Test.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/Netconf799Test.java index cf4f366816..e2bb2d6c90 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/Netconf799Test.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/Netconf799Test.java @@ -29,6 +29,7 @@ import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.mdsal.dom.spi.SimpleDOMActionResult; import org.opendaylight.restconf.nb.rfc8040.AbstractInstanceIdentifierTest; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; @@ -57,8 +58,9 @@ public class Netconf799Test extends AbstractInstanceIdentifierTest { Builders.containerBuilder().withNodeIdentifier(NodeIdentifier.create(OUTPUT_QNAME)).build()))) .when(actionService).invokeAction(eq(Absolute.of(CONT_QNAME, CONT1_QNAME, RESET_QNAME)), any(), any()); - final var dataService = new RestconfDataServiceImpl(() -> DatabindContext.ofModel(IID_SCHEMA), - new MdsalRestconfServer(dataBroker, rpcService, mountPointService), actionService); + final DatabindProvider databindProvider = () -> DatabindContext.ofModel(IID_SCHEMA); + final var dataService = new RestconfDataServiceImpl(databindProvider, + new MdsalRestconfServer(databindProvider, dataBroker, rpcService, mountPointService), actionService); doReturn(true).when(asyncResponse).resume(captor.capture()); dataService.postDataJSON("instance-identifier-module:cont/cont1/reset", diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImplTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImplTest.java index 8c0516387e..be6b7e861a 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImplTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImplTest.java @@ -61,6 +61,7 @@ import org.opendaylight.restconf.common.patch.PatchEntity; import org.opendaylight.restconf.common.patch.PatchStatusContext; import org.opendaylight.restconf.nb.rfc8040.AbstractJukeboxTest; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.patch.rev170222.yang.patch.yang.patch.Edit.Operation; import org.opendaylight.yangtools.yang.common.ErrorTag; @@ -129,8 +130,9 @@ public class RestconfDataServiceImplTest extends AbstractJukeboxTest { doReturn(read).when(dataBroker).newReadOnlyTransaction(); doReturn(readWrite).when(dataBroker).newReadWriteTransaction(); - dataService = new RestconfDataServiceImpl(() -> DatabindContext.ofModel(JUKEBOX_SCHEMA), - new MdsalRestconfServer(dataBroker, rpcService, mountPointService), actionService); + final DatabindProvider databindProvider = () -> DatabindContext.ofModel(JUKEBOX_SCHEMA); + dataService = new RestconfDataServiceImpl(databindProvider, + new MdsalRestconfServer(databindProvider, dataBroker, rpcService, mountPointService), actionService); doReturn(Optional.of(mountPoint)).when(mountPointService) .getMountPoint(any(YangInstanceIdentifier.class)); doReturn(Optional.of(FixedDOMSchemaService.of(JUKEBOX_SCHEMA))).when(mountPoint) diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java index b9604e2233..878fb9ebb4 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImplTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify; import com.google.common.util.concurrent.Futures; import java.io.ByteArrayInputStream; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; @@ -44,7 +45,7 @@ import org.opendaylight.netconf.dom.api.NetconfDataTreeService; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; import org.opendaylight.restconf.nb.rfc8040.legacy.NormalizedNodePayload; -import org.opendaylight.restconf.nb.rfc8040.streams.ListenersBroker; +import org.opendaylight.restconf.server.spi.OperationInput; import org.opendaylight.yangtools.yang.common.ErrorTag; import org.opendaylight.yangtools.yang.common.ErrorType; import org.opendaylight.yangtools.yang.common.QName; @@ -52,11 +53,14 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdent import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute; +import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack; import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class RestconfInvokeOperationsServiceImplTest { - private static final QName RPC = QName.create("ns", "2015-02-28", "test-rpc"); + private static final URI RESTCONF_URI = URI.create("/restconf"); + private static final QName RPC = QName.create("invoke:rpc:module", "2013-12-03", "rpc-test"); private static final ContainerNode INPUT = Builders.containerBuilder() .withNodeIdentifier(new NodeIdentifier(QName.create(RPC, "input"))) .withChild(ImmutableNodes.leafNode(QName.create(RPC, "content"), "test")) @@ -65,9 +69,10 @@ public class RestconfInvokeOperationsServiceImplTest { .withNodeIdentifier(new NodeIdentifier(QName.create(RPC, "output"))) .withChild(ImmutableNodes.leafNode(QName.create(RPC, "content"), "operation result")) .build(); - private static final DatabindContext CONTEXT = DatabindContext.ofModel(YangParserTestUtils.parseYangResourceDirectory("/invoke-rpc")); + private static final OperationInput OPER_INPUT = new OperationInput(CONTEXT, + SchemaInferenceStack.of(CONTEXT.modelContext(), Absolute.of(RPC)).toInference(), INPUT); @Mock private DOMDataBroker dataBroker; @@ -85,9 +90,8 @@ public class RestconfInvokeOperationsServiceImplTest { @Before public void setup() { - server = new MdsalRestconfServer(dataBroker, rpcService, mountPointService); - invokeOperationsService = new RestconfInvokeOperationsServiceImpl(() -> CONTEXT, server, - new ListenersBroker.WebSockets(dataBroker, notificationService, mountPointService)); + server = new MdsalRestconfServer(() -> CONTEXT, dataBroker, rpcService, mountPointService); + invokeOperationsService = new RestconfInvokeOperationsServiceImpl(server); } @Test @@ -132,8 +136,10 @@ public class RestconfInvokeOperationsServiceImplTest { public void invokeRpcTest() throws Exception { doReturn(Futures.immediateFuture(new DefaultDOMRpcResult(OUTPUT, List.of()))).when(rpcService) .invokeRpc(RPC, INPUT); - assertEquals(Optional.of(OUTPUT), Futures.getDone(server.getRestconfStrategy(CONTEXT.modelContext(), null) - .invokeRpc(RPC, INPUT))); + assertEquals(OUTPUT, + Futures.getDone( + server.getRestconfStrategy(CONTEXT.modelContext(), null).invokeRpc(RESTCONF_URI, RPC, OPER_INPUT)) + .output()); } @Test @@ -143,8 +149,9 @@ public class RestconfInvokeOperationsServiceImplTest { "No implementation of RPC " + errorRpc + " available."); doReturn(Futures.immediateFailedFuture(exception)).when(rpcService).invokeRpc(errorRpc, INPUT); final var ex = assertInstanceOf(RestconfDocumentedException.class, - assertThrows(ExecutionException.class, () -> Futures.getDone( - server.getRestconfStrategy(CONTEXT.modelContext(), null).invokeRpc(errorRpc, INPUT))).getCause()); + assertThrows(ExecutionException.class, + () -> Futures.getDone(server.getRestconfStrategy(CONTEXT.modelContext(), null) + .invokeRpc(RESTCONF_URI, errorRpc, OPER_INPUT))).getCause()); final var errorList = ex.getErrors(); assertEquals(1, errorList.size()); final var actual = errorList.iterator().next(); @@ -160,8 +167,10 @@ public class RestconfInvokeOperationsServiceImplTest { doReturn(Optional.of(dataBroker)).when(mountPoint).getService(DOMDataBroker.class); doReturn(Futures.immediateFuture(new DefaultDOMRpcResult(OUTPUT, List.of()))).when(rpcService) .invokeRpc(RPC, INPUT); - assertEquals(Optional.of(OUTPUT), Futures.getDone( - server.getRestconfStrategy(CONTEXT.modelContext(), mountPoint).invokeRpc(RPC, INPUT))); + assertEquals(OUTPUT, + Futures.getDone( + server.getRestconfStrategy(CONTEXT.modelContext(), mountPoint).invokeRpc(RESTCONF_URI, RPC, OPER_INPUT)) + .output()); } @Test @@ -171,7 +180,8 @@ public class RestconfInvokeOperationsServiceImplTest { doReturn(Optional.of(dataBroker)).when(mountPoint).getService(DOMDataBroker.class); final var strategy = server.getRestconfStrategy(CONTEXT.modelContext(), mountPoint); final var ex = assertInstanceOf(RestconfDocumentedException.class, - assertThrows(ExecutionException.class, () -> Futures.getDone(strategy.invokeRpc(RPC, INPUT))).getCause()); + assertThrows(ExecutionException.class, + () -> Futures.getDone(strategy.invokeRpc(RESTCONF_URI, RPC, OPER_INPUT))).getCause()); final var errors = ex.getErrors(); assertEquals(1, errors.size()); final var error = errors.get(0); @@ -184,8 +194,10 @@ public class RestconfInvokeOperationsServiceImplTest { public void checkResponseTest() throws Exception { doReturn(Futures.immediateFuture(new DefaultDOMRpcResult(OUTPUT, List.of()))) .when(rpcService).invokeRpc(RPC, INPUT); - assertEquals(Optional.of(OUTPUT), Futures.getDone( - server.getRestconfStrategy(CONTEXT.modelContext(), null).invokeRpc(RPC, INPUT))); + assertEquals(OUTPUT, + Futures.getDone(server.getRestconfStrategy(CONTEXT.modelContext(), null) + .invokeRpc(RESTCONF_URI, RPC, OPER_INPUT)) + .output()); } private void prepNNC(final ContainerNode result) { diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfOperationsServiceImplTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfOperationsServiceImplTest.java index 79768ec77e..29b2e36ad9 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfOperationsServiceImplTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfOperationsServiceImplTest.java @@ -25,6 +25,7 @@ import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.mdsal.dom.api.DOMSchemaService; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; +import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; import org.opendaylight.yang.gen.v1.module._1.rev140101.Module1Data; import org.opendaylight.yang.gen.v1.module._2.rev140102.Module2Data; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology; @@ -82,8 +83,9 @@ public class RestconfOperationsServiceImplTest { doReturn(Optional.of(schemaService)).when(mountPoint).getService(DOMSchemaService.class); doReturn(Optional.of(mountPoint)).when(mountPointService).getMountPoint(any()); - opService = new RestconfOperationsServiceImpl(() -> DatabindContext.ofModel(SCHEMA), - new MdsalRestconfServer(dataBroker, rpcService, mountPointService)); + final DatabindProvider databindProvider = () -> DatabindContext.ofModel(SCHEMA); + opService = new RestconfOperationsServiceImpl( + new MdsalRestconfServer(databindProvider, dataBroker, rpcService, mountPointService)); } @Test diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationListenerTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationListenerTest.java index 5080f5289c..03e6eceb63 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationListenerTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/AbstractNotificationListenerTest.java @@ -14,8 +14,8 @@ import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils; public abstract class AbstractNotificationListenerTest { - static final QNameModule MODULE = QNameModule.create(XMLNamespace.of("notifi:mod"), Revision.of("2016-11-23")); - + protected static final QNameModule MODULE = + QNameModule.create(XMLNamespace.of("notifi:mod"), Revision.of("2016-11-23")); protected static final EffectiveModelContext MODEL_CONTEXT = YangParserTestUtils.parseYangResourceDirectory("/notifications"); } diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStreamTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStreamTest.java index c4c78f88da..4e328cb507 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStreamTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/DataTreeChangeStreamTest.java @@ -11,10 +11,10 @@ import static org.hamcrest.CoreMatchers.allOf; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; +import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.Paths; @@ -30,8 +30,7 @@ import org.opendaylight.mdsal.binding.api.DataBroker; import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractConcurrentDataBrokerTest; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; -import org.opendaylight.mdsal.dom.api.DOMMountPointService; -import org.opendaylight.mdsal.dom.api.DOMNotificationService; +import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.restconf.api.query.ChangedLeafNodesOnlyParam; import org.opendaylight.restconf.api.query.ChildNodesOnlyParam; import org.opendaylight.restconf.api.query.LeafNodesOnlyParam; @@ -39,7 +38,11 @@ import org.opendaylight.restconf.api.query.SkipNotificationDataParam; import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.mdsal.MdsalRestconfStreamRegistry; +import org.opendaylight.restconf.server.mdsal.streams.dtcl.DataTreeChangeSource; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream.Sender; import org.opendaylight.yang.gen.v1.augment.instance.identifier.patch.module.rev220218.PatchCont1Builder; import org.opendaylight.yang.gen.v1.augment.instance.identifier.patch.module.rev220218.patch.cont.patch.choice1.PatchCase1Builder; import org.opendaylight.yang.gen.v1.augment.instance.identifier.patch.module.rev220218.patch.cont.patch.choice2.PatchCase11Builder; @@ -63,7 +66,7 @@ import org.slf4j.LoggerFactory; import org.xmlunit.assertj.XmlAssert; public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest { - private static final class TestHandler implements StreamSessionHandler { + private static final class TestHandler implements Sender { private CountDownLatch notificationLatch = new CountDownLatch(1); private volatile String lastNotification; @@ -203,7 +206,7 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest { private DataBroker dataBroker; private DOMDataBroker domDataBroker; private DatabindProvider databindProvider; - private ListenersBroker listenersBroker; + private RestconfStream.Registry streamRegistry; @BeforeClass public static void beforeClass() { @@ -220,15 +223,16 @@ public class DataTreeChangeStreamTest extends AbstractConcurrentDataBrokerTest { dataBroker = getDataBroker(); domDataBroker = getDomBroker(); databindProvider = () -> DatabindContext.ofModel(SCHEMA_CONTEXT); - listenersBroker = new ListenersBroker.ServerSentEvents(domDataBroker, mock(DOMNotificationService.class), - mock(DOMMountPointService.class)); + streamRegistry = new MdsalRestconfStreamRegistry(domDataBroker); } TestHandler createHandler(final YangInstanceIdentifier path, final String streamName, final NotificationOutputType outputType, final boolean leafNodesOnly, final boolean skipNotificationData, final boolean changedLeafNodesOnly, final boolean childNodesOnly) throws Exception { - final var stream = listenersBroker.createStream("test", "baseURI", - new DataTreeChangeSource(databindProvider, domDataBroker, LogicalDatastoreType.CONFIGURATION, path)) + final var stream = streamRegistry.createStream(URI.create("baseURI"), + new DataTreeChangeSource(databindProvider, + domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class), + LogicalDatastoreType.CONFIGURATION, path), "test") .getOrThrow(); final var handler = new TestHandler(); stream.addSubscriber(handler, diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandlerTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandlerTest.java index 71c2357d3f..dbda937232 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandlerTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/SSESessionHandlerTest.java @@ -10,17 +10,14 @@ package org.opendaylight.restconf.nb.rfc8040.streams; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.ws.rs.sse.Sse; import javax.ws.rs.sse.SseEventSink; @@ -31,17 +28,18 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opendaylight.restconf.nb.rfc8040.ReceiveEventsParams; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; import org.opendaylight.yangtools.concepts.Registration; @ExtendWith(MockitoExtension.class) class SSESessionHandlerTest { @Mock - private ScheduledExecutorService executorService; + private PingExecutor pingExecutor; @Mock private RestconfStream stream; @Mock - private ScheduledFuture pingFuture; + private Registration pingRegistration; @Mock private SseEventSink eventSink; @Mock @@ -49,8 +47,8 @@ class SSESessionHandlerTest { @Mock private Registration reg; - private SSESessionHandler setup(final int maxFragmentSize, final int heartbeatInterval) throws Exception { - final var sseSessionHandler = new SSESessionHandler(executorService, eventSink, sse, stream, + private SSESender setup(final int maxFragmentSize, final long heartbeatInterval) throws Exception { + final var sseSessionHandler = new SSESender(pingExecutor, eventSink, sse, stream, EncodingName.RFC8040_XML, new ReceiveEventsParams(null, null, null, null, null, null, null), maxFragmentSize, heartbeatInterval); doReturn(reg).when(stream).addSubscriber(eq(sseSessionHandler), any(), any()); @@ -63,19 +61,17 @@ class SSESessionHandlerTest { } private void setupPing(final long maxFragmentSize, final long heartbeatInterval) { - doReturn(pingFuture).when(executorService) - .scheduleWithFixedDelay(any(Runnable.class), eq(heartbeatInterval), eq(heartbeatInterval), - eq(TimeUnit.MILLISECONDS)); + doReturn(pingRegistration).when(pingExecutor) + .startPingProcess(any(Runnable.class), eq(heartbeatInterval), eq(TimeUnit.MILLISECONDS)); } @Test void onSSEConnectedWithEnabledPing() throws Exception { - final int heartbeatInterval = 1000; + final var heartbeatInterval = 1000L; final var sseSessionHandler = setup(1000, heartbeatInterval); sseSessionHandler.init(); - verify(executorService).scheduleWithFixedDelay(any(Runnable.class), eq((long) heartbeatInterval), - eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS)); + verify(pingExecutor).startPingProcess(any(Runnable.class), eq(heartbeatInterval), eq(TimeUnit.MILLISECONDS)); } @Test @@ -84,7 +80,7 @@ class SSESessionHandlerTest { final var sseSessionHandler = setup(1000, heartbeatInterval); sseSessionHandler.init(); - verifyNoMoreInteractions(executorService); + verifyNoMoreInteractions(pingExecutor); } @Test @@ -102,12 +98,10 @@ class SSESessionHandlerTest { final var sseSessionHandler = setup(150, 8000); setupPing(150, 8000); sseSessionHandler.init(); - doReturn(false).when(pingFuture).isCancelled(); - doReturn(false).when(pingFuture).isDone(); + doNothing().when(pingRegistration).close(); sseSessionHandler.close(); verify(reg).close(); - verify(pingFuture).cancel(anyBoolean()); } @Test @@ -116,9 +110,9 @@ class SSESessionHandlerTest { setupPing(150, 8000); sseSessionHandler.init(); + doNothing().when(pingRegistration).close(); sseSessionHandler.close(); verify(reg).close(); - verify(pingFuture).cancel(anyBoolean()); } @Test @@ -128,7 +122,7 @@ class SSESessionHandlerTest { sseSessionHandler.close(); verify(reg).close(); - verify(pingFuture, never()).cancel(anyBoolean()); + verifyNoMoreInteractions(pingRegistration); } @Test diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java index ddb6a224f7..3763689f8d 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketFactoryTest.java @@ -12,9 +12,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.verify; -import com.google.common.collect.ImmutableClassToInstanceMap; import java.net.URI; -import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; import org.junit.jupiter.api.BeforeEach; @@ -30,6 +28,8 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMMountPointService; import org.opendaylight.mdsal.dom.api.DOMNotificationService; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; +import org.opendaylight.restconf.server.mdsal.MdsalRestconfStreamRegistry; +import org.opendaylight.restconf.server.mdsal.streams.dtcl.DataTreeChangeSource; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -38,7 +38,7 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest { private static final QName TOASTER = QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster"); @Mock - private ScheduledExecutorService execService; + private PingExecutor pingExecutor; @Mock private ServletUpgradeRequest upgradeRequest; @Mock @@ -46,33 +46,31 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest { @Mock private DOMDataBroker dataBroker; @Mock - private DOMDataTreeChangeService changeService; - @Mock private DOMDataTreeWriteTransaction tx; @Mock + private DOMDataTreeChangeService changeService; + @Mock private DatabindProvider databindProvider; @Mock private DOMMountPointService mountPointService; @Mock private DOMNotificationService notificationService; - private ListenersBroker listenersBroker; private WebSocketFactory webSocketFactory; private String streamName; @BeforeEach void prepareListenersBroker() { - doReturn(ImmutableClassToInstanceMap.of(DOMDataTreeChangeService.class, changeService)).when(dataBroker) - .getExtensions(); doReturn(tx).when(dataBroker).newWriteOnlyTransaction(); doReturn(CommitInfo.emptyFluentFuture()).when(tx).commit(); - listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker, notificationService, mountPointService); - webSocketFactory = new WebSocketFactory(execService, listenersBroker, 5000, 2000); + final var streamRegistry = new MdsalRestconfStreamRegistry(dataBroker); + webSocketFactory = new WebSocketFactory(streamRegistry, pingExecutor, 5000, 2000); - streamName = listenersBroker.createStream("description", "streams", - new DataTreeChangeSource(databindProvider, dataBroker, LogicalDatastoreType.CONFIGURATION, - YangInstanceIdentifier.of(TOASTER))) + streamName = streamRegistry.createStream(URI.create("https://localhost:8181/rests"), + new DataTreeChangeSource(databindProvider, changeService, LogicalDatastoreType.CONFIGURATION, + YangInstanceIdentifier.of(TOASTER)), + "description") .getOrThrow() .name(); } @@ -82,7 +80,7 @@ class WebSocketFactoryTest extends AbstractNotificationListenerTest { doReturn(URI.create("https://localhost:8181/rests/streams/xml/" + streamName)) .when(upgradeRequest).getRequestURI(); - assertInstanceOf(WebSocketSessionHandler.class, + assertInstanceOf(WebSocketSender.class, webSocketFactory.createWebSocket(upgradeRequest, upgradeResponse)); verify(upgradeResponse).setSuccess(true); verify(upgradeResponse).setStatusCode(101); diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandlerTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandlerTest.java index 6e165a2620..cc385d9fe0 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandlerTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/WebSocketSessionHandlerTest.java @@ -22,8 +22,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.websocket.api.RemoteEndpoint; import org.eclipse.jetty.websocket.api.Session; @@ -32,25 +30,26 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; import org.opendaylight.yangtools.concepts.Registration; @ExtendWith(MockitoExtension.class) class WebSocketSessionHandlerTest { private final class WebSocketTestSessionState { - private final WebSocketSessionHandler webSocketSessionHandler; - private final int heartbeatInterval; + private final WebSocketSender webSocketSessionHandler; + private final long heartbeatInterval; private final int maxFragmentSize; - WebSocketTestSessionState(final int maxFragmentSize, final int heartbeatInterval) { + WebSocketTestSessionState(final int maxFragmentSize, final long heartbeatInterval) { this.heartbeatInterval = heartbeatInterval; this.maxFragmentSize = maxFragmentSize; - webSocketSessionHandler = new WebSocketSessionHandler(executorService, stream, - ENCODING, null, maxFragmentSize, heartbeatInterval); + webSocketSessionHandler = new WebSocketSender(pingExecutor, stream, ENCODING, null, maxFragmentSize, + heartbeatInterval); if (heartbeatInterval != 0) { - doReturn(pingFuture).when(executorService).scheduleWithFixedDelay(any(Runnable.class), - eq((long) heartbeatInterval), eq((long) heartbeatInterval), eq(TimeUnit.MILLISECONDS)); + doReturn(pingRegistration).when(pingExecutor).startPingProcess(any(Runnable.class), + eq(heartbeatInterval), eq(TimeUnit.MILLISECONDS)); } } } @@ -60,9 +59,9 @@ class WebSocketSessionHandlerTest { @Mock private RestconfStream stream; @Mock - private ScheduledExecutorService executorService; + private PingExecutor pingExecutor; @Mock - private ScheduledFuture pingFuture; + private Registration pingRegistration; @Mock private Session session; @@ -73,9 +72,8 @@ class WebSocketSessionHandlerTest { webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session); verify(stream).addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null); - verify(executorService).scheduleWithFixedDelay(any(Runnable.class), - eq((long) webSocketTestSessionState.heartbeatInterval), - eq((long) webSocketTestSessionState.heartbeatInterval), eq(TimeUnit.MILLISECONDS)); + verify(pingExecutor).startPingProcess(any(Runnable.class), eq(webSocketTestSessionState.heartbeatInterval), + eq(TimeUnit.MILLISECONDS)); } @Test @@ -85,7 +83,7 @@ class WebSocketSessionHandlerTest { webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session); verify(stream).addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null); - verifyNoMoreInteractions(executorService); + verifyNoMoreInteractions(pingExecutor); } @Test @@ -126,15 +124,12 @@ class WebSocketSessionHandlerTest { when(stream.addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null)) .thenReturn(reg); webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session); - when(pingFuture.isCancelled()).thenReturn(false); - when(pingFuture.isDone()).thenReturn(false); final var sampleError = new IllegalStateException("Simulated error"); doNothing().when(reg).close(); + doNothing().when(pingRegistration).close(); webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError); - verify(reg).close(); verify(session).close(); - verify(pingFuture).cancel(anyBoolean()); } @Test @@ -148,10 +143,10 @@ class WebSocketSessionHandlerTest { webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session); final var sampleError = new IllegalStateException("Simulated error"); + doNothing().when(reg).close(); + doNothing().when(pingRegistration).close(); webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError); - verify(reg).close(); verify(session, never()).close(); - verify(pingFuture).cancel(anyBoolean()); } @Test @@ -163,13 +158,11 @@ class WebSocketSessionHandlerTest { when(stream.addSubscriber(webSocketTestSessionState.webSocketSessionHandler, ENCODING, null)) .thenReturn(reg); webSocketTestSessionState.webSocketSessionHandler.onWebSocketConnected(session); - when(pingFuture.isDone()).thenReturn(true); final var sampleError = new IllegalStateException("Simulated error"); webSocketTestSessionState.webSocketSessionHandler.onWebSocketError(sampleError); verify(reg).close(); verify(session, never()).close(); - verify(pingFuture, never()).cancel(anyBoolean()); } @Test diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/CapabilitiesWriterTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/CapabilitiesWriterTest.java new file mode 100644 index 0000000000..c1a919bd64 --- /dev/null +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/CapabilitiesWriterTest.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2022 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.restconf.server.mdsal; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Set; +import org.junit.jupiter.api.Test; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; + +class CapabilitiesWriterTest { + @Test + void restconfStateCapabilitiesTest() { + final var capability = CapabilitiesWriter.mapCapabilities(); + assertEquals(CapabilitiesWriter.CAPABILITY, capability.name()); + + final var entries = capability.body().stream().map(LeafSetEntryNode::body).toList(); + final var unique = Set.copyOf(entries); + assertEquals(Set.of( + "urn:ietf:params:restconf:capability:depth:1.0", + "urn:ietf:params:restconf:capability:fields:1.0", + "urn:ietf:params:restconf:capability:filter:1.0", + "urn:ietf:params:restconf:capability:replay:1.0", + "urn:ietf:params:restconf:capability:with-defaults:1.0", + "urn:opendaylight:params:restconf:capability:pretty-print:1.0", + "urn:opendaylight:params:restconf:capability:leaf-nodes-only:1.0", + "urn:opendaylight:params:restconf:capability:changed-leaf-nodes-only:1.0", + "urn:opendaylight:params:restconf:capability:skip-notification-data:1.0", + "urn:opendaylight:params:restconf:capability:child-nodes-only:1.0"), unique); + assertEquals(unique.size(), entries.size()); + } +} diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBrokerTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/CreateNotificationStreamRpcTest.java similarity index 66% rename from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBrokerTest.java rename to restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/CreateNotificationStreamRpcTest.java index 6c035585dc..b5a5ea23e0 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/ListenersBrokerTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/dtcl/CreateNotificationStreamRpcTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.dtcl; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; @@ -20,6 +20,7 @@ import static org.mockito.Mockito.doReturn; import com.google.common.collect.ImmutableClassToInstanceMap; import java.net.URI; import java.util.UUID; +import org.eclipse.jdt.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -32,12 +33,13 @@ import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; -import org.opendaylight.mdsal.dom.api.DOMMountPointService; -import org.opendaylight.mdsal.dom.api.DOMNotificationService; import org.opendaylight.restconf.common.errors.RestconfDocumentedException; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindContext; import org.opendaylight.restconf.nb.rfc8040.databind.DatabindProvider; +import org.opendaylight.restconf.server.mdsal.MdsalRestconfStreamRegistry; +import org.opendaylight.restconf.server.spi.OperationInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.restconf.monitoring.rev170126.restconf.state.streams.stream.Access; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscription; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.remote.rev140114.CreateDataChangeEventSubscriptionOutput; import org.opendaylight.yangtools.yang.common.ErrorTag; import org.opendaylight.yangtools.yang.common.ErrorType; @@ -50,55 +52,51 @@ import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.model.api.ContainerLike; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; -import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode; -import org.opendaylight.yangtools.yang.model.api.RpcDefinition; +import org.opendaylight.yangtools.yang.model.api.stmt.RpcEffectiveStatement; +import org.opendaylight.yangtools.yang.model.util.SchemaInferenceStack; import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils; @ExtendWith(MockitoExtension.class) -class ListenersBrokerTest { +class CreateNotificationStreamRpcTest { private static final EffectiveModelContext SCHEMA_CTX = YangParserTestUtils.parseYangResourceDirectory("/streams"); - private static final URI BASE_URI = URI.create("baseURI"); + private static final URI RESTCONF_URI = URI.create("/rests"); + private static final YangInstanceIdentifier TOASTER = YangInstanceIdentifier.of( + QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toaster")); @Mock private DOMDataBroker dataBroker; @Mock private DOMDataTreeChangeService treeChange; @Mock - private DOMMountPointService mountPointService; - @Mock - private DOMNotificationService notificationService; - @Mock private DOMDataTreeWriteTransaction tx; @Captor private ArgumentCaptor pathCaptor; @Captor private ArgumentCaptor dataCaptor; - private ListenersBroker listenersBroker; private DatabindProvider databindProvider; + private CreateDataChangeEventSubscriptionRpc rpc; + @BeforeEach public void before() { - listenersBroker = new ListenersBroker.ServerSentEvents(dataBroker, notificationService, mountPointService); databindProvider = () -> DatabindContext.ofModel(SCHEMA_CTX); - } - @Test - void createStreamTest() { doReturn(ImmutableClassToInstanceMap.of(DOMDataTreeChangeService.class, treeChange)) .when(dataBroker).getExtensions(); + rpc = new CreateDataChangeEventSubscriptionRpc(new MdsalRestconfStreamRegistry(dataBroker), databindProvider, + dataBroker); + } + @Test + void createStreamTest() { doReturn(tx).when(dataBroker).newWriteOnlyTransaction(); doNothing().when(tx).put(eq(LogicalDatastoreType.OPERATIONAL), pathCaptor.capture(), dataCaptor.capture()); doReturn(CommitInfo.emptyFluentFuture()).when(tx).commit(); - final var output = assertInstanceOf(ContainerNode.class, - listenersBroker.createDataChangeNotifiStream(databindProvider, BASE_URI, - prepareDomPayload("create-data-change-event-subscription", "toaster", "path"), SCHEMA_CTX) - .getOrThrow() - .orElse(null)); + final var output = assertInstanceOf(ContainerNode.class, rpc.invoke(RESTCONF_URI, createInput("path", TOASTER)) + .getOrThrow().output()); assertEquals(new NodeIdentifier(CreateDataChangeEventSubscriptionOutput.QNAME), output.name()); assertEquals(1, output.size()); @@ -133,12 +131,12 @@ class ListenersBrokerTest { .withChild(Builders.mapEntryBuilder() .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, rcEncoding, "json")) .withChild(ImmutableNodes.leafNode(rcEncoding, "json")) - .withChild(ImmutableNodes.leafNode(rcLocation, "rests/streams/json/" + name)) + .withChild(ImmutableNodes.leafNode(rcLocation, "/rests/streams/json/" + name)) .build()) .withChild(Builders.mapEntryBuilder() .withNodeIdentifier(NodeIdentifierWithPredicates.of(Access.QNAME, rcEncoding, "xml")) .withChild(ImmutableNodes.leafNode(rcEncoding, "xml")) - .withChild(ImmutableNodes.leafNode(rcLocation, "rests/streams/xml/" + name)) + .withChild(ImmutableNodes.leafNode(rcLocation, "/rests/streams/xml/" + name)) .build()) .build()) .build().prettyTree().toString(), dataCaptor.getValue().prettyTree().toString()); @@ -146,58 +144,39 @@ class ListenersBrokerTest { @Test void createStreamWrongValueTest() { - final var payload = prepareDomPayload("create-data-change-event-subscription", "String value", "path"); - final var errors = assertThrows(RestconfDocumentedException.class, - () -> listenersBroker.createDataChangeNotifiStream(databindProvider, BASE_URI, payload, SCHEMA_CTX)) - .getErrors(); - assertEquals(1, errors.size()); - final var error = errors.get(0); - assertEquals(ErrorType.APPLICATION, error.getErrorType()); - assertEquals(ErrorTag.OPERATION_FAILED, error.getErrorTag()); - assertEquals("Instance identifier was not normalized correctly", error.getErrorMessage()); + final var payload = createInput("path", "String value"); + final var ex = assertThrows(IllegalArgumentException.class, () -> rpc.invoke(RESTCONF_URI, payload)); + assertEquals(""" + Bad child leafNode (urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote@2014-01-14)path = \ + "String value"\ + """, ex.getMessage()); } @Test void createStreamWrongInputRpcTest() { - final var payload = prepareDomPayload("create-data-change-event-subscription2", "toaster", "path2"); - final var errors = assertThrows(RestconfDocumentedException.class, - () -> listenersBroker.createDataChangeNotifiStream(databindProvider, BASE_URI, payload, SCHEMA_CTX)) - .getErrors(); + final var future = rpc.invoke(RESTCONF_URI, createInput(null, null)); + final var errors = assertThrows(RestconfDocumentedException.class, future::getOrThrow).getErrors(); assertEquals(1, errors.size()); final var error = errors.get(0); assertEquals(ErrorType.APPLICATION, error.getErrorType()); - assertEquals(ErrorTag.OPERATION_FAILED, error.getErrorTag()); - assertEquals("Instance identifier was not normalized correctly", error.getErrorMessage()); + assertEquals(ErrorTag.MISSING_ELEMENT, error.getErrorTag()); + assertEquals("missing path", error.getErrorMessage()); } - private static ContainerNode prepareDomPayload(final String rpcName, final String toasterValue, - final String inputOutputName) { - final var rpcModule = SCHEMA_CTX.findModules("sal-remote").iterator().next(); - final QName rpcQName = QName.create(rpcModule.getQNameModule(), rpcName); - - ContainerLike containerSchemaNode = null; - for (final RpcDefinition rpc : rpcModule.getRpcs()) { - if (rpcQName.isEqualWithoutRevision(rpc.getQName())) { - containerSchemaNode = rpc.getInput(); - break; - } + private OperationInput createInput(final @Nullable String leafName, final Object leafValue) { + final var stack = SchemaInferenceStack.of(SCHEMA_CTX); + final var rpcStmt = assertInstanceOf(RpcEffectiveStatement.class, + stack.enterSchemaTree(CreateDataChangeEventSubscription.QNAME)); + final var inference = stack.toInference(); + + final var builder = Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(rpcStmt.input().argument())); + if (leafName != null) { + final var lfQName = QName.create(rpcStmt.argument(), leafName); + stack.enterDataTree(rpcStmt.input().argument()); + stack.enterDataTree(lfQName); + builder.withChild(ImmutableNodes.leafNode(lfQName, leafValue)); } - assertNotNull(containerSchemaNode); - - final QName lfQName = QName.create(rpcModule.getQNameModule(), inputOutputName); - assertInstanceOf(LeafSchemaNode.class, containerSchemaNode.dataChildByName(lfQName)); - - final Object o; - if ("toaster".equals(toasterValue)) { - final QName rpcQname = QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", toasterValue); - o = YangInstanceIdentifier.of(rpcQname); - } else { - o = toasterValue; - } - - return Builders.containerBuilder() - .withNodeIdentifier(new NodeIdentifier(containerSchemaNode.getQName())) - .withChild(ImmutableNodes.leafNode(lfQName, o)) - .build(); + return new OperationInput(databindProvider.currentContext(), inference, builder.build()); } } diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONNotificationFormatterTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/notif/JSONNotificationFormatterTest.java similarity index 97% rename from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONNotificationFormatterTest.java rename to restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/notif/JSONNotificationFormatterTest.java index ee9f638277..0f6167d1ab 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/JSONNotificationFormatterTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/notif/JSONNotificationFormatterTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.notif; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -17,6 +17,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.restconf.nb.rfc8040.streams.AbstractNotificationListenerTest; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLNotificationFormatterTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/notif/XMLNotificationFormatterTest.java similarity index 97% rename from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLNotificationFormatterTest.java rename to restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/notif/XMLNotificationFormatterTest.java index fd8d548682..7e61f69d37 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/XMLNotificationFormatterTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/mdsal/streams/notif/XMLNotificationFormatterTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.mdsal.streams.notif; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.when; @@ -16,6 +16,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opendaylight.mdsal.dom.api.DOMNotification; +import org.opendaylight.restconf.nb.rfc8040.streams.AbstractNotificationListenerTest; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; diff --git a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStateStreamsTest.java b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistryTest.java similarity index 86% rename from restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStateStreamsTest.java rename to restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistryTest.java index 4ba30ae3ed..034a6815cd 100644 --- a/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/RestconfStateStreamsTest.java +++ b/restconf/restconf-nb/src/test/java/org/opendaylight/restconf/server/spi/AbstractRestconfStreamRegistryTest.java @@ -5,7 +5,7 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ -package org.opendaylight.restconf.nb.rfc8040.streams; +package org.opendaylight.restconf.server.spi; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -15,7 +15,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import org.junit.jupiter.api.Test; -import org.opendaylight.restconf.nb.rfc8040.streams.RestconfStream.EncodingName; +import org.opendaylight.restconf.server.spi.RestconfStream.EncodingName; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.module.list.Module; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.module.list.module.Deviation; import org.opendaylight.yangtools.yang.common.QName; @@ -30,10 +30,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Unit tests for {@link RestconfStateStreams}. + * Unit tests for {@link AbstractRestconfStreamRegistry}. */ -class RestconfStateStreamsTest { - private static final Logger LOG = LoggerFactory.getLogger(RestconfStateStreamsTest.class); +class AbstractRestconfStreamRegistryTest { + private static final Logger LOG = LoggerFactory.getLogger(AbstractRestconfStreamRegistryTest.class); private static final EffectiveModelContext CONTEXT = // TODO: assemble these from dependencies? YangParserTestUtils.parseYangResourceDirectory("/modules/restconf-module-testing"); @@ -45,7 +45,8 @@ class RestconfStateStreamsTest { final var streamName = "/nested-module:depth1-cont/depth2-leaf1"; assertMappedData(prepareMap(streamName, uri, outputType), - ListenersBroker.streamEntry(streamName, "description", "location", Set.of(new EncodingName(outputType)))); + AbstractRestconfStreamRegistry.streamEntry(streamName, "description", "location", + Set.of(new EncodingName(outputType)))); } @Test @@ -54,15 +55,16 @@ class RestconfStateStreamsTest { final var uri = "uri"; assertMappedData(prepareMap("notifi", uri, outputType), - ListenersBroker.streamEntry("notifi", "description", "location", Set.of(new EncodingName(outputType)))); + AbstractRestconfStreamRegistry.streamEntry("notifi", "description", "location", + Set.of(new EncodingName(outputType)))); } private static Map prepareMap(final String name, final String uri, final String outputType) { return Map.of( - ListenersBroker.NAME_QNAME, name, - ListenersBroker.LOCATION_QNAME, uri, - ListenersBroker.ENCODING_QNAME, outputType, - ListenersBroker.DESCRIPTION_QNAME, "description"); + AbstractRestconfStreamRegistry.NAME_QNAME, name, + AbstractRestconfStreamRegistry.LOCATION_QNAME, uri, + AbstractRestconfStreamRegistry.ENCODING_QNAME, outputType, + AbstractRestconfStreamRegistry.DESCRIPTION_QNAME, "description"); } private static void assertMappedData(final Map map, final MapEntryNode mappedData) { -- 2.36.6