From: Jaroslav Tóth Date: Sun, 17 Mar 2019 19:21:10 +0000 (+0100) Subject: Refactoring of web-sockets in RESTCONF RFC-8040 X-Git-Tag: release/sodium~68^2~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;ds=sidebyside;h=56bcd8a77160847c2b25201fe3a02f60096129ef;hp=7410e4d2ae2c9c9efa81de0a7560c5d677b94993;p=netconf.git Refactoring of web-sockets in RESTCONF RFC-8040 - Notificator is removed and replaced by ListenersBroker; mods.: A. One stream-name can be associated only with one YANG notification listener, because it is sufficient (XML/JSON format is distinguished directly in stream name). B. YANG notification streams are divided from data-change event streams; refactored method signatures. C. Added synchronization (R/W locking that also differs between access to data-change and YANG notification listeners). - Exposing of public subsriber methods to BaseListenerInterface. - Refactoring of CreateStream utilities and RPC processors. - Refactoring of SubsribeToStream utilities and RPC processors. Change-Id: I521f3b893a5cfa53d59a438e871924c54d6f4d7d Signed-off-by: Jaroslav Tóth --- diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java index 4cf01c5992..5e21c2c3cf 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java @@ -153,8 +153,7 @@ public class RestconfDataServiceImpl implements RestconfDataService { && identifier.contains(STREAM_LOCATION_PATH_PART)) { final String value = (String) node.getValue(); final String streamName = value.substring( - value.indexOf(CREATE_NOTIFICATION_STREAM.toString() + RestconfConstants.SLASH), - value.length()); + value.indexOf(CREATE_NOTIFICATION_STREAM + RestconfConstants.SLASH)); this.delegRestconfSubscrService.subscribeToStream(streamName, uriInfo); } if (node == null) { diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java index 55c6be3d8d..58942fbca1 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java @@ -60,7 +60,7 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat @Override public NormalizedNodeContext invokeRpc(final String identifier, final NormalizedNodeContext payload, - final UriInfo uriInfo) { + final UriInfo uriInfo) { final SchemaContextRef refSchemaCtx = new SchemaContextRef(this.schemaContextHandler.get()); final SchemaPath schemaPath = payload.getInstanceIdentifierContext().getSchemaNode().getPath(); final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint(); @@ -70,8 +70,8 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat SchemaContextRef schemaContextRef; if (mountPoint == null) { - if (namespace.toString().equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE)) { - if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCR)) { + if (namespace.equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE.getNamespace())) { + if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCRIPTION)) { response = CreateStreamUtil.createDataChangeNotifiStream(payload, refSchemaCtx); } else { throw new RestconfDocumentedException("Not supported operation", ErrorType.RPC, diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImpl.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImpl.java index 9d4309e584..d9a3b6773f 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImpl.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImpl.java @@ -30,7 +30,6 @@ import org.opendaylight.restconf.nb.rfc8040.handlers.TransactionChainHandler; import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService; import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants; import org.opendaylight.restconf.nb.rfc8040.rests.utils.SubscribeToStreamUtil; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeBuilder; @@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory; /** * Implementation of {@link RestconfStreamsSubscriptionService}. - * */ @Path("/") public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSubscriptionService { @@ -83,11 +81,11 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu final NotificationQueryParams notificationQueryParams = NotificationQueryParams.fromUriInfo(uriInfo); URI response = null; - if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) { - response = SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, notificationQueryParams, + if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) { + response = SubscribeToStreamUtil.subscribeToDataStream(identifier, uriInfo, notificationQueryParams, this.handlersHolder); } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) { - response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, notificationQueryParams, + response = SubscribeToStreamUtil.subscribeToYangStream(identifier, uriInfo, notificationQueryParams, this.handlersHolder); } @@ -97,8 +95,7 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.handlersHolder.getSchemaHandler()); final NormalizedNodeBuilder> builder = ImmutableLeafNodeBuilder.create().withValue(response.toString()); - builder.withNodeIdentifier( - NodeIdentifier.create(QName.create("subscribe:to:notification", "2016-10-28", "location"))); + builder.withNodeIdentifier(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME)); // prepare new header with location final Map headers = new HashMap<>(); @@ -170,7 +167,6 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu /** * Parser and holder of query paramteres from uriInfo for notifications. - * */ public static final class NotificationQueryParams { @@ -255,5 +251,4 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu return Optional.ofNullable(filter); } } - -} +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/CreateStreamUtil.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/CreateStreamUtil.java index 97cfb02d79..e6cab92554 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/CreateStreamUtil.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/CreateStreamUtil.java @@ -7,9 +7,8 @@ */ package org.opendaylight.restconf.nb.rfc8040.rests.utils; -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.List; +import static java.util.Objects.requireNonNull; + import java.util.Optional; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMRpcResult; @@ -20,8 +19,8 @@ import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag; import org.opendaylight.restconf.common.errors.RestconfError.ErrorType; import org.opendaylight.restconf.common.util.DataChangeScope; import org.opendaylight.restconf.nb.rfc8040.references.SchemaContextRef; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter; -import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator; import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; import org.opendaylight.yangtools.yang.common.QName; @@ -36,119 +35,153 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Util class for streams. - * - *
    - *
  • create stream - *
  • subscribe - *
- * + * Utility class for creation of data-change-event or YANG notification streams. */ public final class CreateStreamUtil { private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class); - private static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type"; private CreateStreamUtil() { - throw new UnsupportedOperationException("Util class"); + throw new UnsupportedOperationException("Utility class"); } /** - * Create stream with POST operation via RPC. - * - * @param payload - * input of rpc - example in JSON: - * - *
-     *            {@code
-     *            {
-     *                "input": {
-     *                    "path": "/toaster:toaster/toaster:toasterStatus",
-     *                    "sal-remote-augment:datastore": "OPERATIONAL",
-     *                    "sal-remote-augment:scope": "ONE"
-     *                }
-     *            }
-     *            }
-     *            
- * - * @param refSchemaCtx - * reference to {@link SchemaContext} - - * {@link SchemaContextRef} - * @return {@link DOMRpcResult} - This means output of RPC - example in JSON: + * Create data-change-event or notification stream with POST operation via RPC. * - *
-     *         {@code
+     * @param payload      Input of RPC - example in JSON (data-change-event stream):
+     *                     
+     *                     {@code
+     *                         {
+     *                             "input": {
+     *                                 "path": "/toaster:toaster/toaster:toasterStatus",
+     *                                 "sal-remote-augment:datastore": "OPERATIONAL",
+     *                                 "sal-remote-augment:scope": "ONE"
+     *                             }
+     *                         }
+     *                     }
+     *                     
+ * @param refSchemaCtx Reference to {@link SchemaContext} - {@link SchemaContextRef}. + * @return {@link DOMRpcResult} - Output of RPC - example in JSON: + *
+     *     {@code
      *         {
      *             "output": {
      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
      *             }
      *         }
-     *         }
-     *         
- * + * } + *
*/ public static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload, final SchemaContextRef refSchemaCtx) { - final ContainerNode data = (ContainerNode) payload.getData(); + // parsing out of container with settings and path + final ContainerNode data = (ContainerNode) requireNonNull(payload).getData(); final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName(); final YangInstanceIdentifier path = preparePath(data, qname); - String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data); - - final QName outputQname = QName.create(qname, "output"); - final QName streamNameQname = QName.create(qname, "stream-name"); + // building of stream name + final StringBuilder streamNameBuilder = new StringBuilder( + prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx).get(), data)); final NotificationOutputType outputType = prepareOutputType(data); if (outputType.equals(NotificationOutputType.JSON)) { - streamName = streamName + "/JSON"; + streamNameBuilder.append('/').append(outputType.getName()); } + final String streamName = streamNameBuilder.toString(); - if (!Notificator.existListenerFor(streamName)) { - Notificator.createListener(path, streamName, outputType); - } + // registration of the listener + ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType); + + // building of output + final QName outputQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_CONTAINER_NAME); + final QName streamNameQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_STREAM_NAME); - final ContainerNode output = - ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname)) - .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build(); + final ContainerNode output = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(outputQname)) + .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build(); return new DefaultDOMRpcResult(output); } /** - * Prepare {@code NotificationOutputType}. + * Prepare {@link NotificationOutputType}. * - * @param data - * data of notification - * @return output type fo notification + * @param data Container with stream settings (RPC create-stream). + * @return Parsed {@link NotificationOutputType}. */ private static NotificationOutputType prepareOutputType(final ContainerNode data) { - NotificationOutputType outputType = parseEnum(data, NotificationOutputType.class, OUTPUT_TYPE_PARAM_NAME); + NotificationOutputType outputType = parseEnum( + data, NotificationOutputType.class, RestconfStreamsConstants.OUTPUT_TYPE_PARAM_NAME); return outputType == null ? NotificationOutputType.XML : outputType; } + /** + * Prepare stream name. + * + * @param path Path of element from which data-change-event notifications are going to be generated. + * @param schemaContext Schema context. + * @param data Container with stream settings (RPC create-stream). + * @return Parsed stream name. + */ private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path, - final SchemaContext schemaContext, - final ContainerNode data) { - LogicalDatastoreType ds = parseEnum(data, LogicalDatastoreType.class, - RestconfStreamsConstants.DATASTORE_PARAM_NAME); - ds = ds == null ? RestconfStreamsConstants.DEFAULT_DS : ds; + final SchemaContext schemaContext, final ContainerNode data) { + LogicalDatastoreType datastoreType = parseEnum( + data, LogicalDatastoreType.class, RestconfStreamsConstants.DATASTORE_PARAM_NAME); + datastoreType = datastoreType == null ? RestconfStreamsConstants.DEFAULT_DS : datastoreType; DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME); scope = scope == null ? RestconfStreamsConstants.DEFAULT_SCOPE : scope; - final String streamName = RestconfStreamsConstants.DATA_SUBSCR + "/" - + Notificator - .createStreamNameFromUri(ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext) - + RestconfStreamsConstants.DS_URI + ds + RestconfStreamsConstants.SCOPE_URI + scope); - return streamName; + return RestconfStreamsConstants.DATA_SUBSCRIPTION + + "/" + + ListenersBroker.createStreamNameFromUri( + ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext) + + RestconfStreamsConstants.DS_URI + + datastoreType + + RestconfStreamsConstants.SCOPE_URI + + scope); + } + + /** + * Prepare {@link YangInstanceIdentifier} of stream source. + * + * @param data Container with stream settings (RPC create-stream). + * @param qualifiedName QName of the input RPC context (used only in debugging). + * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications + * are going to be generated. + */ + private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) { + final Optional> path = data.getChild( + new YangInstanceIdentifier.NodeIdentifier(QName.create( + qualifiedName, + RestconfStreamsConstants.STREAM_PATH_PARAM_NAME))); + Object pathValue = null; + if (path.isPresent()) { + pathValue = path.get().getValue(); + } + if (!(pathValue instanceof YangInstanceIdentifier)) { + LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName); + throw new RestconfDocumentedException( + "Instance identifier was not normalized correctly", + ErrorType.APPLICATION, + ErrorTag.OPERATION_FAILED); + } + return (YangInstanceIdentifier) pathValue; } + /** + * Parsing out of enumeration from RPC create-stream body. + * + * @param data Container with stream settings (RPC create-stream). + * @param clazz Enum type to be parsed out from input container. + * @param paramName Local name of the enum element. + * @return Parsed enumeration. + */ private static T parseEnum(final ContainerNode data, final Class clazz, final String paramName) { final Optional> optAugNode = data.getChild( - RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER); + RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER); if (!optAugNode.isPresent()) { return null; } @@ -157,7 +190,7 @@ public final class CreateStreamUtil { return null; } final Optional> enumNode = ((AugmentationNode) augNode).getChild( - new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName))); + new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName))); if (!enumNode.isPresent()) { return null; } @@ -169,65 +202,43 @@ public final class CreateStreamUtil { return ResolveEnumUtil.resolveEnum(clazz, (String) value); } - private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) { - final Optional> path = data - .getChild(new YangInstanceIdentifier.NodeIdentifier(QName.create(qualifiedName, "path"))); - Object pathValue = null; - if (path.isPresent()) { - pathValue = path.get().getValue(); - } - if (!(pathValue instanceof YangInstanceIdentifier)) { - LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName); - throw new RestconfDocumentedException("Instance identifier was not normalized correctly", - ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED); - } - return (YangInstanceIdentifier) pathValue; - } - /** - * Create stream with POST operation via RPC. + * Create YANG notification stream using notification definition in YANG schema. * - * @param notificatinoDefinition - * input of RPC - * @param refSchemaCtx - * schemaContext - * @param outputType - * output type - * @return {@link DOMRpcResult} + * @param notificationDefinition YANG notification definition. + * @param refSchemaCtx Reference to {@link SchemaContext} - {@link SchemaContextRef}. + * @param outputType Output type (XML or JSON). + * @return {@link NotificationListenerAdapter} */ - public static List createYangNotifiStream( - final NotificationDefinition notificatinoDefinition, final SchemaContextRef refSchemaCtx, - final String outputType) { - final List paths = new ArrayList<>(); - final QName notificatinoDefinitionQName = notificatinoDefinition.getQName(); - final Module module = - refSchemaCtx.findModuleByNamespaceAndRevision(notificatinoDefinitionQName.getModule().getNamespace(), - notificatinoDefinitionQName.getModule().getRevision()); - Preconditions.checkNotNull(module, - "Module for namespace " + notificatinoDefinitionQName.getModule().getNamespace() + " does not exist"); - NotificationDefinition notifiDef = null; - for (final NotificationDefinition notification : module.getNotifications()) { - if (notification.getQName().equals(notificatinoDefinitionQName)) { - notifiDef = notification; - break; - } - } - final String moduleName = module.getName(); - Preconditions.checkNotNull(notifiDef, - "Notification " + notificatinoDefinitionQName + "doesn't exist in module " + moduleName); - paths.add(notifiDef.getPath()); - String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/"; - streamName = streamName + moduleName + ":" + notificatinoDefinitionQName.getLocalName(); - if (outputType.equals("JSON")) { - streamName = streamName + "/JSON"; - } + public static NotificationListenerAdapter createYangNotifiStream( + final NotificationDefinition notificationDefinition, final SchemaContextRef refSchemaCtx, + final NotificationOutputType outputType) { + final String streamName = parseNotificationStreamName(requireNonNull(notificationDefinition), + requireNonNull(refSchemaCtx), requireNonNull(outputType.getName())); + final Optional listenerForStreamName = ListenersBroker.getInstance() + .getNotificationListenerFor(streamName); + return listenerForStreamName.orElseGet(() -> ListenersBroker.getInstance().registerNotificationListener( + notificationDefinition.getPath(), streamName, outputType)); + } - if (Notificator.existNotificationListenerFor(streamName)) { - final List notificationListenerFor = - Notificator.getNotificationListenerFor(streamName); - return SubscribeToStreamUtil.pickSpecificListenerByOutput(notificationListenerFor, outputType); + private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition, + final SchemaContextRef refSchemaCtx, final String outputType) { + final QName notificationDefinitionQName = notificationDefinition.getQName(); + final Module module = refSchemaCtx.findModuleByNamespaceAndRevision( + notificationDefinitionQName.getModule().getNamespace(), + notificationDefinitionQName.getModule().getRevision()); + requireNonNull(module, String.format("Module for namespace %s does not exist.", + notificationDefinitionQName.getModule().getNamespace())); + + final StringBuilder streamNameBuilder = new StringBuilder(); + streamNameBuilder.append(RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM) + .append('/') + .append(module.getName()) + .append(':') + .append(notificationDefinitionQName.getLocalName()); + if (outputType.equals(NotificationOutputType.JSON.getName())) { + streamNameBuilder.append(NotificationOutputType.JSON.getName()); } - - return Notificator.createNotificationListener(paths, streamName, outputType); + return streamNameBuilder.toString(); } -} +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/ReadDataTransactionUtil.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/ReadDataTransactionUtil.java index 8d265d33f3..a2f2c55e7b 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/ReadDataTransactionUtil.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/ReadDataTransactionUtil.java @@ -10,7 +10,6 @@ package org.opendaylight.restconf.nb.rfc8040.rests.utils; import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAMS_PATH; import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAM_PATH_PART; -import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.FluentFuture; import java.net.URI; @@ -272,33 +271,41 @@ public final class ReadDataTransactionUtil { final SchemaContextRef schemaContextRef, final UriInfo uriInfo) { final SchemaContext schemaContext = schemaContextRef.get(); if (identifier.contains(STREAMS_PATH) && !identifier.contains(STREAM_PATH_PART)) { - final DOMDataTreeReadWriteTransaction wTx = transactionNode.getTransactionChain().newReadWriteTransaction(); - final boolean exist = SubscribeToStreamUtil.checkExist(schemaContext, wTx); - - for (final NotificationDefinition notificationDefinition : schemaContextRef.get().getNotifications()) { - final List notifiStreamXML = - CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef, - NotificationOutputType.XML.getName()); - final List notifiStreamJSON = - CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef, - NotificationOutputType.JSON.getName()); - for (final NotificationListenerAdapter listener : Iterables.concat(notifiStreamXML, notifiStreamJSON)) { - final URI uri = SubscribeToStreamUtil.prepareUriByStreamName(uriInfo, listener.getStreamName()); - final NormalizedNode mapToStreams = - RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring( - listener.getSchemaPath().getLastComponent(), schemaContext.getNotifications(), - null, listener.getOutputType(), uri, - SubscribeToStreamUtil.getMonitoringModule(schemaContext), exist); - SubscribeToStreamUtil.writeDataToDS(schemaContext, - listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, - mapToStreams); - } - } - SubscribeToStreamUtil.submitData(wTx); + createAllYangNotificationStreams(transactionNode, schemaContextRef, uriInfo); } return readData(content, transactionNode, withDefa, schemaContext); } + private static void createAllYangNotificationStreams(final TransactionVarsWrapper transactionNode, + final SchemaContextRef schemaContextRef, final UriInfo uriInfo) { + final DOMDataTreeReadWriteTransaction wTx = transactionNode.getTransactionChain().newReadWriteTransaction(); + final boolean exist = SubscribeToStreamUtil.checkExist(schemaContextRef.get(), wTx); + + for (final NotificationDefinition notificationDefinition : schemaContextRef.get().getNotifications()) { + final NotificationListenerAdapter notifiStreamXML = + CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef, + NotificationOutputType.XML); + final NotificationListenerAdapter notifiStreamJSON = + CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef, + NotificationOutputType.JSON); + writeNotificationStreamToDatastore(schemaContextRef, uriInfo, wTx, exist, notifiStreamXML); + writeNotificationStreamToDatastore(schemaContextRef, uriInfo, wTx, exist, notifiStreamJSON); + } + SubscribeToStreamUtil.submitData(wTx); + } + + private static void writeNotificationStreamToDatastore(final SchemaContextRef schemaContextRef, + final UriInfo uriInfo, final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist, + final NotificationListenerAdapter listener) { + final URI uri = SubscribeToStreamUtil.prepareUriByStreamName(uriInfo, listener.getStreamName()); + final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring( + listener.getSchemaPath().getLastComponent(), schemaContextRef.get().getNotifications(), null, + listener.getOutputType(), uri, + SubscribeToStreamUtil.getMonitoringModule(schemaContextRef.get()), exist); + SubscribeToStreamUtil.writeDataToDS(schemaContextRef.get(), + listener.getSchemaPath().getLastComponent().getLocalName(), readWriteTransaction, exist, mapToStreams); + } + private static NormalizedNode prepareDataByParamWithDef(final NormalizedNode result, final YangInstanceIdentifier path, final String withDefa, final SchemaContext ctx) { boolean trim; diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java index 94aab8f84f..0b4d14ec34 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java @@ -20,43 +20,52 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.Augmentat /** * Constants for streams. - * */ public final class RestconfStreamsConstants { - public static final String SAL_REMOTE_NAMESPACE = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote"; + public static final QNameModule SAL_REMOTE_AUGMENT = QNameModule.create( + URI.create("urn:sal:restconf:event:subscription"), + Revision.of("2014-07-08")); + public static final QNameModule SUBSCRIBE_TO_NOTIFICATION = QNameModule.create( + URI.create("subscribe:to:notification"), + Revision.of("2016-10-28")); + + public static final QName SAL_REMOTE_NAMESPACE = QName.create( + "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", + "2014-01-14", + "sal-remote"); + + public static final String STREAM_PATH_PARAM_NAME = "path"; public static final String DATASTORE_PARAM_NAME = "datastore"; + public static final String SCOPE_PARAM_NAME = "scope"; + public static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type"; + public static final String OUTPUT_CONTAINER_NAME = "output"; + public static final String OUTPUT_STREAM_NAME = "stream-name"; - private static final URI NAMESPACE_EVENT_SUBSCRIPTION_AUGMENT = URI.create("urn:sal:restconf:event:subscription"); - public static final QNameModule SAL_REMOTE_AUGMENT = QNameModule.create(NAMESPACE_EVENT_SUBSCRIPTION_AUGMENT, - Revision.of("2014-07-08")).intern(); + public static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(ImmutableSet.of( + QName.create(SAL_REMOTE_AUGMENT, SCOPE_PARAM_NAME), + QName.create(SAL_REMOTE_AUGMENT, DATASTORE_PARAM_NAME), + QName.create(SAL_REMOTE_AUGMENT, OUTPUT_TYPE_PARAM_NAME))); - public static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier( - ImmutableSet.of(QName.create(SAL_REMOTE_AUGMENT, "scope"), QName.create(SAL_REMOTE_AUGMENT, "datastore"), - QName.create(SAL_REMOTE_AUGMENT, "notification-output-type"))); + public static final QName LOCATION_QNAME = QName.create(SUBSCRIBE_TO_NOTIFICATION, "location"); + public static final QName NOTIFI_QNAME = QName.create(SUBSCRIBE_TO_NOTIFICATION, "notifi"); public static final DataChangeScope DEFAULT_SCOPE = DataChangeScope.BASE; - public static final LogicalDatastoreType DEFAULT_DS = LogicalDatastoreType.CONFIGURATION; - public static final String SCOPE_PARAM_NAME = "scope"; - public static final char EQUAL = ParserBuilderConstants.Deserializer.EQUAL; - public static final String DS_URI = RestconfConstants.SLASH + DATASTORE_PARAM_NAME + EQUAL; - public static final String SCOPE_URI = RestconfConstants.SLASH + SCOPE_PARAM_NAME + EQUAL; + public static final String SCHEMA_SUBSCRIBE_URI = "ws"; + public static final String SCHEMA_SUBSCRIBE_SECURED_URI = "wss"; + public static final String SCHEMA_UPGRADE_URI = "http"; + public static final String SCHEMA_UPGRADE_SECURED_URI = "https"; - public static final int NOTIFICATION_PORT = 8181; - - public static final String SCHEMA_SUBSCIBRE_URI = "ws"; - - public static final CharSequence DATA_SUBSCR = "data-change-event-subscription"; - public static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR; - - public static final CharSequence NOTIFICATION_STREAM = "notification-stream"; - public static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM; + public static final String DATA_SUBSCRIPTION = "data-change-event-subscription"; + public static final String CREATE_DATA_SUBSCRIPTION = "create-" + DATA_SUBSCRIPTION; + public static final String NOTIFICATION_STREAM = "notification-stream"; + public static final String CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM; public static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams"; public static final String STREAM_PATH_PART = "/stream="; @@ -67,5 +76,4 @@ public final class RestconfStreamsConstants { private RestconfStreamsConstants() { throw new UnsupportedOperationException("Util class."); } - -} +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java index 28fe9458fa..eee8e432ac 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.concurrent.ExecutionException; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; @@ -42,16 +43,13 @@ import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder; import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter; -import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator; -import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketServer; import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants; import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil; import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime; -import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; import org.opendaylight.yangtools.concepts.ListenerRegistration; -import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; @@ -82,199 +80,184 @@ public final class SubscribeToStreamUtil { .appendOffset("+HH:MM", "Z").toFormatter(); private SubscribeToStreamUtil() { - throw new UnsupportedOperationException("Util class"); + throw new UnsupportedOperationException("Utility class"); } /** - * Register listeners by streamName in identifier to listen to yang - * notifications, put or delete info about listener to DS according to - * ietf-restconf-monitoring. + * Register listener by streamName in identifier to listen to yang notifications, and put or delete information + * about listener to DS according to ietf-restconf-monitoring. * - * @param identifier - * identifier as stream name - * @param uriInfo - * for getting base URI information - * @param notificationQueryParams - * query parameters of notification - * @param handlersHolder - * holder of handlers for notifications - * @return location for listening + * @param identifier Name of the stream. + * @param uriInfo URI information. + * @param notificationQueryParams Query parameters of notification. + * @param handlersHolder Holder of handlers for notifications. + * @return Stream location for listening. */ @SuppressWarnings("rawtypes") - public static URI notifYangStream(final String identifier, final UriInfo uriInfo, + public static URI subscribeToYangStream(final String identifier, final UriInfo uriInfo, final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { - final String streamName = Notificator.createStreamNameFromUri(identifier); + final String streamName = ListenersBroker.createStreamNameFromUri(identifier); if (Strings.isNullOrEmpty(streamName)) { throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); } - List listeners = Notificator.getNotificationListenerFor(streamName); - if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) { - listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName()); - } else { - listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName()); - } - if (listeners.isEmpty()) { - throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, + final Optional notificationListenerAdapter = + ListenersBroker.getInstance().getNotificationListenerFor(streamName); + + if (!notificationListenerAdapter.isPresent()) { + throw new RestconfDocumentedException(String.format( + "Stream with name %s was not found.", streamName), + ErrorType.PROTOCOL, ErrorTag.UNKNOWN_ELEMENT); } - final DOMDataTreeReadWriteTransaction wTx = - handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction writeTransaction = handlersHolder + .getTransactionChainHandler() + .get() + .newReadWriteTransaction(); final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); - final boolean exist = checkExist(schemaContext, wTx); - + final boolean exist = checkExist(schemaContext, writeTransaction); final URI uri = prepareUriByStreamName(uriInfo, streamName); - for (final NotificationListenerAdapter listener : listeners) { - registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler()); - listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(), - notificationQueryParams.getFilter(), false); - listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); - final NormalizedNode mapToStreams = RestconfMappingNodeUtil - .mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(), - schemaContext.getNotifications(), notificationQueryParams.getStart(), - listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist); - writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist, - mapToStreams); - } - submitData(wTx); + registerToListenNotification( + notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler()); + notificationListenerAdapter.get().setQueryParams( + notificationQueryParams.getStart(), + notificationQueryParams.getStop().orElse(null), + notificationQueryParams.getFilter().orElse(null), + false); + notificationListenerAdapter.get().setCloseVars( + handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); + final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring( + notificationListenerAdapter.get().getSchemaPath().getLastComponent(), + schemaContext.getNotifications(), notificationQueryParams.getStart(), + notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext), + exist); + writeDataToDS(schemaContext, + notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction, + exist, mapToStreams); + submitData(writeTransaction); return uri; } - static List - pickSpecificListenerByOutput(final List listeners, final String outputType) { - for (final NotificationListenerAdapter notificationListenerAdapter : listeners) { - if (notificationListenerAdapter.getOutputType().equals(outputType)) { - final List list = new ArrayList<>(); - list.add(notificationListenerAdapter); - return list; - } - } - return listeners; - } - /** * Prepare InstanceIdentifierContext for Location leaf. * - * @param schemaHandler - * schemaContext handler - * @return InstanceIdentifier of Location leaf + * @param schemaHandler Schema context handler. + * @return InstanceIdentifier of Location leaf. */ public static InstanceIdentifierContext prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) { - final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi"); - final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get() - .findModule(qnameBase.getModule()).get() - .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location")); - final List path = new ArrayList<>(); - path.add(NodeIdentifier.create(qnameBase)); - path.add(NodeIdentifier.create(QName.create(qnameBase, "location"))); + final Optional module = schemaHandler.get() + .findModule(RestconfStreamsConstants.NOTIFI_QNAME.getModule()); + Preconditions.checkState(module.isPresent()); + final Optional notify = module.get() + .findDataChildByName(RestconfStreamsConstants.NOTIFI_QNAME); + Preconditions.checkState(notify.isPresent()); + final Optional location = ((ContainerSchemaNode) notify.get()) + .findDataChildByName(RestconfStreamsConstants.LOCATION_QNAME); + Preconditions.checkState(location.isPresent()); - return new InstanceIdentifierContext(YangInstanceIdentifier.create(path), location, null, - schemaHandler.get()); + final List path = new ArrayList<>(); + path.add(NodeIdentifier.create(RestconfStreamsConstants.NOTIFI_QNAME)); + path.add(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME)); + return new InstanceIdentifierContext(YangInstanceIdentifier.create(path), location.get(), + null, schemaHandler.get()); } /** - * Register listener by streamName in identifier to listen to data change - * notifications, put or delete info about listener to DS according to - * ietf-restconf-monitoring. + * Register listener by streamName in identifier to listen to data change notifications, and put or delete + * information about listener to DS according to ietf-restconf-monitoring. * - * @param identifier - * identifier as stream name - * @param uriInfo - * for getting base URI information - * @param notificationQueryParams - * query parameters of notification - * @param handlersHolder - * holder of handlers for notifications - * @return location for listening + * @param identifier Identifier as stream name. + * @param uriInfo Base URI information. + * @param notificationQueryParams Query parameters of notification. + * @param handlersHolder Holder of handlers for notifications. + * @return Location for listening. */ @SuppressWarnings("rawtypes") - public static URI notifiDataStream(final String identifier, final UriInfo uriInfo, + public static URI subscribeToDataStream(final String identifier, final UriInfo uriInfo, final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) { - final Map mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier); - - final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class, + final Map mapOfValues = mapValuesFromUri(identifier); + final LogicalDatastoreType datastoreType = parseURIEnum( + LogicalDatastoreType.class, mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME)); - if (ds == null) { - final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)"; - LOG.debug(msg); - throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); + if (datastoreType == null) { + final String message = "Stream name doesn't contain datastore value (pattern /datastore=)"; + LOG.debug(message); + throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); } - final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class, + final DataChangeScope scope = parseURIEnum( + DataChangeScope.class, mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME)); if (scope == null) { - final String msg = "Stream name doesn't contains datastore value (pattern /scope=)"; - LOG.warn(msg); - throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); + final String message = "Stream name doesn't contains datastore value (pattern /scope=)"; + LOG.warn(message); + throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE); } - final String streamName = Notificator.createStreamNameFromUri(identifier); - - final ListenerAdapter listener = Notificator.getListenerFor(streamName); - Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName); - - listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(), - notificationQueryParams.getFilter(), false); - listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); + final String streamName = ListenersBroker.createStreamNameFromUri(identifier); + final Optional listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName); + Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName); - registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get()); + listener.get().setQueryParams( + notificationQueryParams.getStart(), + notificationQueryParams.getStop().orElse(null), + notificationQueryParams.getFilter().orElse(null), + false); + listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler()); + registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get()); final URI uri = prepareUriByStreamName(uriInfo, streamName); - - final DOMDataTreeReadWriteTransaction wTx = - handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); + final DOMDataTreeReadWriteTransaction writeTransaction + = handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction(); final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get(); - final boolean exist = checkExist(schemaContext, wTx); + final boolean exist = checkExist(schemaContext, writeTransaction); final NormalizedNode mapToStreams = RestconfMappingNodeUtil - .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(), - notificationQueryParams.getStart(), listener.getOutputType(), uri, + .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(), + notificationQueryParams.getStart(), listener.get().getOutputType(), uri, getMonitoringModule(schemaContext), exist, schemaContext); - writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist, - mapToStreams); - submitData(wTx); + writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(), + writeTransaction, exist, mapToStreams); + submitData(writeTransaction); return uri; } - public static Module getMonitoringModule(final SchemaContext schemaContext) { + static Module getMonitoringModule(final SchemaContext schemaContext) { return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null); } /** - * Parse input of query parameters - start-time or stop-time - from - * {@link DateAndTime} format to {@link Instant} format. + * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format + * to {@link Instant} format. * - * @param entry - * start-time or stop-time as string in {@link DateAndTime} - * format - * @return parsed {@link Instant} by entry + * @param entry Start-time or stop-time as string in {@link DateAndTime} format. + * @return Parsed {@link Instant} by entry. */ public static Instant parseDateFromQueryParam(final Entry> entry) { final DateAndTime event = new DateAndTime(entry.getValue().iterator().next()); final String value = event.getValue(); - final TemporalAccessor p; + final TemporalAccessor accessor; try { - p = FORMATTER.parse(value); + accessor = FORMATTER.parse(value); } catch (final DateTimeParseException e) { throw new RestconfDocumentedException("Cannot parse of value in date: " + value, e); } - return Instant.from(p); - + return Instant.from(accessor); } @SuppressWarnings("rawtypes") - static void writeDataToDS(final SchemaContext schemaContext, - final String name, final DOMDataTreeReadWriteTransaction readWriteTransaction, - final boolean exist, final NormalizedNode mapToStreams) { - String pathId = ""; + static void writeDataToDS(final SchemaContext schemaContext, final String name, + final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist, + final NormalizedNode mapToStreams) { + String pathId; if (exist) { pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name; } else { pathId = MonitoringModule.PATH_TO_STREAMS; } - readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext), - mapToStreams); + readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, + IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams); } static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) { @@ -286,13 +269,12 @@ public final class SubscribeToStreamUtil { } /** - * Prepare map of values from URI. + * Prepare map of URI parameter-values. * - * @param identifier - * URI - * @return {@link Map} + * @param identifier String identification of URI. + * @return Map od URI parameters and values. */ - public static Map mapValuesFromUri(final String identifier) { + private static Map mapValuesFromUri(final String identifier) { final HashMap result = new HashMap<>(); for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) { final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL)); @@ -304,28 +286,28 @@ public final class SubscribeToStreamUtil { } static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) { + final String scheme = uriInfo.getAbsolutePath().getScheme(); final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder(); - - prepareNotificationPort(uriInfo.getBaseUri().getPort()); - uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI); + switch (scheme) { + case RestconfStreamsConstants.SCHEMA_UPGRADE_SECURED_URI: + uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_SECURED_URI); + break; + case RestconfStreamsConstants.SCHEMA_UPGRADE_URI: + default: + uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_URI); + } return uriBuilder.replacePath(streamName).build(); } /** - * Register data change listener in dom data broker and set it to listener - * on stream. + * Register data change listener in DOM data broker and set it to listener on stream. * - * @param ds - * {@link LogicalDatastoreType} - * @param scope - * {@link DataChangeScope} - * @param listener - * listener on specific stream - * @param domDataBroker - * data broker for register data change listener + * @param datastore {@link LogicalDatastoreType} + * @param listener listener on specific stream + * @param domDataBroker data broker for register data change listener */ - private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope, - final ListenerAdapter listener, final DOMDataBroker domDataBroker) { + private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener, + final DOMDataBroker domDataBroker) { if (listener.isListening()) { return; } @@ -336,37 +318,21 @@ public final class SubscribeToStreamUtil { throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService"); } - final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(ds, listener.getPath()); + final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath()); final ListenerRegistration registration = - changeService.registerDataTreeChangeListener(root, listener); - + changeService.registerDataTreeChangeListener(root, listener); listener.setRegistration(registration); } - /** - * Get port from web socket server. If doesn't exit, create it. - * - * @param port - * - port - */ - private static void prepareNotificationPort(final int port) { - try { - WebSocketServer.getInstance(); - } catch (final NullPointerException e) { - WebSocketServer.createInstance(port); - } - } - static boolean checkExist(final SchemaContext schemaContext, final DOMDataTreeReadOperations readWriteTransaction) { boolean exist; try { - exist = readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL, + return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get(); - } catch (final InterruptedException | ExecutionException e) { - throw new RestconfDocumentedException("Problem while checking data if exists", e); + } catch (final InterruptedException | ExecutionException exception) { + throw new RestconfDocumentedException("Problem while checking data if exists", exception); } - return exist; } private static void registerToListenNotification(final NotificationListenerAdapter listener, @@ -378,18 +344,15 @@ public final class SubscribeToStreamUtil { final SchemaPath path = listener.getSchemaPath(); final ListenerRegistration registration = notificationServiceHandler.get().registerNotificationListener(listener, path); - listener.setRegistration(registration); } /** - * Parse enum from URI. + * Parse out enumeration from URI. * - * @param clazz - * enum type - * @param value - * string of enum value - * @return enum + * @param clazz Target enumeration type. + * @param value String representation of enumeration value. + * @return Parsed enumeration type. */ private static T parseURIEnum(final Class clazz, final String value) { if (value == null || value.equals("")) { diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java index 06dec11304..7c65f5ca4f 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java @@ -27,7 +27,6 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B private final Set subscribers = ConcurrentHashMap.newKeySet(); private final EventBus eventBus; - @SuppressWarnings("rawtypes") private EventBusChangeRecorder eventBusChangeRecorder; private volatile ListenerRegistration registration; @@ -35,7 +34,7 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B /** * Creating {@link EventBus}. */ - protected AbstractCommonSubscriber() { + AbstractCommonSubscriber() { this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); } @@ -55,18 +54,11 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B this.registration.close(); this.registration = null; } - deleteDataInDS(); unregister(); } - /** - * Creates event of type {@link EventType#REGISTER}, set {@link Channel} - * subscriber to the event and post event into event bus. - * - * @param subscriber - * Channel - */ + @Override public void addSubscriber(final Channel subscriber) { if (!subscriber.isActive()) { LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress()); @@ -76,12 +68,7 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B this.eventBus.post(event); } - /** - * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} - * subscriber to the event and posts event into event bus. - * - * @param subscriber subscriber channel - */ + @Override public void removeSubscriber(final Channel subscriber) { LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); final Event event = new Event(EventType.DEREGISTER); @@ -89,21 +76,12 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B this.eventBus.post(event); } - /** - * Sets {@link ListenerRegistration} registration. - * - * @param registration - * DOMDataChangeListener registration - */ + @Override public void setRegistration(final ListenerRegistration registration) { this.registration = registration; } - /** - * Checks if {@link ListenerRegistration} registration exist. - * - * @return True if exist, false otherwise. - */ + @Override public boolean isListening() { return this.registration != null; } @@ -112,11 +90,10 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B * Creating and registering {@link EventBusChangeRecorder} of specific * listener on {@link EventBus}. * - * @param listener - * specific listener of notifications + * @param listener Specific listener of notifications. */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - protected void register(final T listener) { + @SuppressWarnings({"unchecked", "rawtypes"}) + void register(final T listener) { this.eventBusChangeRecorder = new EventBusChangeRecorder(listener); this.eventBus.register(this.eventBusChangeRecorder); } @@ -124,18 +101,16 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B /** * Post event to event bus. * - * @param event - * data of incoming notifications + * @param event Data of incoming notifications. */ protected void post(final Event event) { this.eventBus.post(event); } /** - * Removes all subscribers and unregisters event bus change recorder form - * event bus. + * Removes all subscribers and unregisters event bus change recorder form event bus. */ - protected void unregister() { + private void unregister() { this.subscribers.clear(); this.eventBus.unregister(this.eventBusChangeRecorder); } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractQueryParams.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractQueryParams.java index 1854a1dfc5..feb60142b4 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractQueryParams.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractQueryParams.java @@ -11,7 +11,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.StringReader; import java.time.Instant; -import java.util.Optional; import javax.xml.XMLConstants; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; @@ -24,7 +23,6 @@ import org.xml.sax.InputSource; /** * Features of query parameters part of both notifications. - * */ abstract class AbstractQueryParams extends AbstractNotificationsData { // FIXME: BUG-7956: switch to using UntrustedXML @@ -62,21 +60,17 @@ abstract class AbstractQueryParams extends AbstractNotificationsData { /** * Set query parameters for listener. * - * @param start - * start-time of getting notification - * @param stop - * stop-time of getting notification - * @param filter - * indicate which subset of all possible events are of interest - * @param leafNodesOnly - * if true, notifications will contain changes to leaf nodes only + * @param start Start-time of getting notification. + * @param stop Stop-time of getting notification. + * @param filter Indicates which subset of all possible events are of interest. + * @param leafNodesOnly If TRUE, notifications will contain changes of leaf nodes only. */ @SuppressWarnings("checkstyle:hiddenField") - public void setQueryParams(final Instant start, final Optional stop, final Optional filter, - final boolean leafNodesOnly) { + public void setQueryParams(final Instant start, final Instant stop, final String filter, + final boolean leafNodesOnly) { this.start = Preconditions.checkNotNull(start); - this.stop = stop.orElse(null); - this.filter = filter.orElse(null); + this.stop = stop; + this.filter = filter; this.leafNodesOnly = leafNodesOnly; } @@ -85,7 +79,7 @@ abstract class AbstractQueryParams extends AbstractNotificationsData { * * @return true if this query should only notify about leaf node changes */ - public boolean getLeafNodesOnly() { + boolean getLeafNodesOnly() { return leafNodesOnly; } @@ -116,14 +110,13 @@ abstract class AbstractQueryParams extends AbstractNotificationsData { /** * Check if is filter used and then prepare and post data do client. * - * @param xml data of notification + * @param xml XML data of notification. */ @SuppressWarnings("checkstyle:IllegalCatch") boolean checkFilter(final String xml) { if (this.filter == null) { return true; } - try { return parseFilterParam(xml); } catch (final Exception e) { @@ -132,11 +125,10 @@ abstract class AbstractQueryParams extends AbstractNotificationsData { } /** - * Parse and evaluate filter value by xml. + * Parse and evaluate filter statement by XML format. * - * @return true or false - depends on filter expression and data of - * notifiaction - * @throws Exception if operation fails + * @return {@code true} or {@code false} depending on filter expression and data of notification. + * @throws Exception If operation fails. */ private boolean parseFilterParam(final String xml) throws Exception { final Document docOfXml = DBF.newDocumentBuilder().parse(new InputSource(new StringReader(xml))); @@ -144,4 +136,4 @@ abstract class AbstractQueryParams extends AbstractNotificationsData { // FIXME: BUG-7956: xPath.setNamespaceContext(nsContext); return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN); } -} +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java index fdecd7dc61..fda095b9a1 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java @@ -9,39 +9,68 @@ package org.opendaylight.restconf.nb.rfc8040.streams.listeners; import io.netty.channel.Channel; import java.util.Set; +import org.opendaylight.yangtools.concepts.ListenerRegistration; /** - * Base interface for both listeners({@link ListenerAdapter}, - * {@link NotificationListenerAdapter}). + * Base interface for both listeners({@link ListenerAdapter}, {@link NotificationListenerAdapter}). */ -interface BaseListenerInterface extends AutoCloseable { +public interface BaseListenerInterface extends AutoCloseable { /** * Return all subscribers of listener. * - * @return set of subscribers + * @return Set of all subscribers. */ Set getSubscribers(); /** * Checks if exists at least one {@link Channel} subscriber. * - * @return True if exist at least one {@link Channel} subscriber, false - * otherwise. + * @return {@code true} if exist at least one {@link Channel} subscriber, {@code false} otherwise. */ boolean hasSubscribers(); /** * Get name of stream. * - * @return stream name + * @return Stream name. */ String getStreamName(); /** * Get output type. * - * @return outputType + * @return Output type (JSON or XML). */ String getOutputType(); + + /** + * Creates event of type {@link EventType#REGISTER}, set {@link Channel} + * subscriber to the event and post event into event bus. + * + * @param subscriber Web-socket channel. + */ + void addSubscriber(Channel subscriber); + + /** + * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} + * subscriber to the event and posts event into event bus. + * + * @param subscriber Subscriber channel. + */ + void removeSubscriber(Channel subscriber); + + /** + * Sets {@link ListenerRegistration} registration. + * + * @param registration DOMDataChangeListener registration. + */ + void setRegistration(ListenerRegistration registration); + + /** + * Checks if {@link ListenerRegistration} registration exists. + * + * @return {@code true} if exists, {@code false} otherwise. + */ + boolean isListening(); } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java index c43acae9e9..944631e3fa 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java @@ -21,8 +21,7 @@ class EventBusChangeRecorder { /** * Event bus change recorder of specific listener of notifications. * - * @param listener - * specific listener + * @param listener Specific listener. */ EventBusChangeRecorder(final T listener) { this.listener = listener; @@ -32,12 +31,12 @@ class EventBusChangeRecorder { public void recordCustomerChange(final Event event) { if (event.getType() == EventType.REGISTER) { final Channel subscriber = event.getSubscriber(); - if (!this.listener.getSubscribers().contains(subscriber)) { - this.listener.getSubscribers().add(subscriber); - } + this.listener.getSubscribers().add(subscriber); } else if (event.getType() == EventType.DEREGISTER) { this.listener.getSubscribers().remove(event.getSubscriber()); - Notificator.removeListenerIfNoSubscriberExists(this.listener); + if (!this.listener.hasSubscribers()) { + ListenersBroker.getInstance().removeAndCloseListener(this.listener); + } } else if (event.getType() == EventType.NOTIFY) { for (final Channel subscriber : this.listener.getSubscribers()) { if (subscriber.isActive()) { @@ -50,4 +49,4 @@ class EventBusChangeRecorder { } } } -} +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java index 386f4d07af..12a6ae4904 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java @@ -7,6 +7,7 @@ */ package org.opendaylight.restconf.nb.rfc8040.streams.listeners; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import java.io.IOException; import java.time.Instant; @@ -30,6 +31,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.util.DataSchemaContextNode; import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -41,8 +43,7 @@ import org.w3c.dom.Element; import org.w3c.dom.Node; /** - * {@link ListenerAdapter} is responsible to track events, which occurred by - * changing data in data source. + * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source. */ public class ListenerAdapter extends AbstractCommonSubscriber implements ClusteredDOMDataTreeChangeListener { @@ -53,15 +54,11 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster private final NotificationOutputType outputType; /** - * Creates new {@link ListenerAdapter} listener specified by path and stream - * name and register for subscribing. + * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing. * - * @param path - * Path to data in data store. - * @param streamName - * The name of the stream. - * @param outputType - * Type of output on notification (JSON, XML) + * @param path Path to data in data store. + * @param streamName The name of the stream. + * @param outputType Type of output on notification (JSON, XML). */ ListenerAdapter(final YangInstanceIdentifier path, final String streamName, final NotificationOutputType outputType) { @@ -114,7 +111,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster /** * Prepare data of notification and data to client. * - * @param xml data + * @param xml XML-formatted data. */ private void prepareAndPostData(final String xml) { final Event event = new Event(EventType.NOTIFY); @@ -126,15 +123,10 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster post(event); } - /** - * Tracks events of data change by customer. - */ - /** * Prepare data in printable form and transform it to String. * - * @param dataTreeCandidates the DataTreeCandidates to transform - * + * @param dataTreeCandidates Data-tree candidates to be transformed. * @return Data in printable form. */ private String prepareXml(final Collection dataTreeCandidates) { @@ -157,8 +149,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster */ @SuppressWarnings("checkstyle:hiddenField") private void addValuesToDataChangedNotificationEventElement(final Document doc, - final Element dataChangedNotificationEventElement, - final Collection dataTreeCandidates, + final Element dataChangedNotificationEventElement, final Collection dataTreeCandidates, final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { for (DataTreeCandidate dataTreeCandidate : dataTreeCandidates) { @@ -177,7 +168,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster final YangInstanceIdentifier parentYiid, final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { - Optional> optionalNormalizedNode = Optional.empty(); + Optional> optionalNormalizedNode = Optional.empty(); switch (candidateNode.getModificationType()) { case APPEARED: case SUBTREE_MODIFIED: @@ -198,11 +189,13 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster return; } - NormalizedNode normalizedNode = optionalNormalizedNode.get(); + NormalizedNode normalizedNode = optionalNormalizedNode.get(); YangInstanceIdentifier yiid = YangInstanceIdentifier.builder(parentYiid) - .append(normalizedNode.getIdentifier()).build(); + .append(normalizedNode.getIdentifier()).build(); - boolean isNodeMixin = dataSchemaContextTree.getChild(yiid).isMixin(); + final Optional> childrenSchemaNode = dataSchemaContextTree.findChild(yiid); + Preconditions.checkState(childrenSchemaNode.isPresent()); + boolean isNodeMixin = childrenSchemaNode.get().isMixin(); boolean isSkippedNonLeaf = getLeafNodesOnly() && !(normalizedNode instanceof LeafNode); if (!isNodeMixin && !isSkippedNonLeaf) { Node node = null; @@ -216,7 +209,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster break; case DELETE: case DISAPPEARED: - node = createDataChangeEventElement(doc, yiid, Operation.DELETED, schemaContext); + node = createDataChangeEventElement(doc, yiid, schemaContext); break; case UNMODIFIED: default: @@ -228,41 +221,35 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster } for (DataTreeCandidateNode childNode : candidateNode.getChildNodes()) { - addNodeToDataChangeNotificationEventElement(doc, dataChangedNotificationEventElement, childNode, - yiid, schemaContext, dataSchemaContextTree); + addNodeToDataChangeNotificationEventElement( + doc, dataChangedNotificationEventElement, childNode, yiid, schemaContext, dataSchemaContextTree); } } /** - * Creates changed event element from data. + * Creates data-changed event element from data. * - * @param doc - * {@link Document} - * @param path - * Path to data in data store. - * @param operation - * {@link Operation} - * @param schemaContext - * schema context - * @return {@link Node} node represented by changed event element. + * @param doc {@link Document} + * @param schemaContext Schema context. + * @return {@link Node} represented by changed event element. */ private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier eventPath, - final Operation operation, final SchemaContext schemaContext) { + final SchemaContext schemaContext) { final Element dataChangeEventElement = doc.createElement("data-change-event"); final Element pathElement = doc.createElement("path"); addPathAsValueToElement(eventPath, pathElement, schemaContext); dataChangeEventElement.appendChild(pathElement); final Element operationElement = doc.createElement("operation"); - operationElement.setTextContent(operation.value); + operationElement.setTextContent(Operation.DELETED.value); dataChangeEventElement.appendChild(operationElement); return dataChangeEventElement; } - private Node createCreatedChangedDataChangeEventElement(final Document doc, - final YangInstanceIdentifier eventPath, final NormalizedNode normalized, final Operation operation, - final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { + private Node createCreatedChangedDataChangeEventElement(final Document doc, final YangInstanceIdentifier eventPath, + final NormalizedNode normalized, final Operation operation, final SchemaContext schemaContext, + final DataSchemaContextTree dataSchemaContextTree) { final Element dataChangeEventElement = doc.createElement("data-change-event"); final Element pathElement = doc.createElement("path"); addPathAsValueToElement(eventPath, pathElement, schemaContext); @@ -274,10 +261,12 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster try { SchemaPath nodePath; + final Optional> childrenSchemaNode = dataSchemaContextTree.findChild(eventPath); + Preconditions.checkState(childrenSchemaNode.isPresent()); if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) { - nodePath = dataSchemaContextTree.getChild(eventPath).getDataSchemaNode().getPath(); + nodePath = childrenSchemaNode.get().getDataSchemaNode().getPath(); } else { - nodePath = dataSchemaContextTree.getChild(eventPath).getDataSchemaNode().getPath().getParent(); + nodePath = childrenSchemaNode.get().getDataSchemaNode().getPath().getParent(); } final DOMResult domResult = writeNormalizedNode(normalized, schemaContext, nodePath); final Node result = doc.importNode(domResult.getNode().getFirstChild(), true); @@ -296,12 +285,9 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster /** * Adds path as value to element. * - * @param eventPath - * Path to data in data store. - * @param element - * {@link Element} - * @param schemaContext - * schema context + * @param eventPath Path to data in data store. + * @param element {@link Element} + * @param schemaContext Schema context. */ @SuppressWarnings("rawtypes") private void addPathAsValueToElement(final YangInstanceIdentifier eventPath, final Element element, @@ -313,14 +299,14 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster continue; } textContent.append("/"); - writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), schemaContext); + writeIdentifierWithNamespacePrefix(textContent, pathArgument.getNodeType(), schemaContext); if (pathArgument instanceof NodeIdentifierWithPredicates) { final Map predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues(); for (final Entry entry : predicates.entrySet()) { final QName keyValue = entry.getKey(); final String predicateValue = String.valueOf(entry.getValue()); textContent.append("["); - writeIdentifierWithNamespacePrefix(element, textContent, keyValue, schemaContext); + writeIdentifierWithNamespacePrefix(textContent, keyValue, schemaContext); textContent.append("='"); textContent.append(predicateValue); textContent.append("'"); @@ -339,30 +325,32 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster /** * Writes identifier that consists of prefix and QName. * - * @param element - * {@link Element} - * @param textContent - * StringBuilder - * @param qualifiedName - * QName - * @param schemaContext - * schema context + * @param textContent Text builder that should be supplemented by QName and its modules name. + * @param qualifiedName QName of the element. + * @param schemaContext Schema context that holds modules which should contain module specified in QName. */ - private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent, - final QName qualifiedName, final SchemaContext schemaContext) { - final Module module = schemaContext.findModule(qualifiedName.getModule()).get(); - - textContent.append(module.getName()); - textContent.append(":"); - textContent.append(qualifiedName.getLocalName()); + private static void writeIdentifierWithNamespacePrefix(final StringBuilder textContent, final QName qualifiedName, + final SchemaContext schemaContext) { + final Optional module = schemaContext.findModule(qualifiedName.getModule()); + if (module.isPresent()) { + textContent.append(module.get().getName()); + textContent.append(":"); + textContent.append(qualifiedName.getLocalName()); + } else { + LOG.error("Cannot write identifier with namespace prefix in data-change listener adapter: " + + "Cannot find module in schema context for input QName {}.", qualifiedName); + throw new IllegalStateException(String.format("Cannot find module in schema context for input QName %s.", + qualifiedName)); + } } /** - * Consists of three types {@link Operation#CREATED}, - * {@link Operation#UPDATED} and {@link Operation#DELETED}. + * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}. */ private enum Operation { - CREATED("created"), UPDATED("updated"), DELETED("deleted"); + CREATED("created"), + UPDATED("updated"), + DELETED("deleted"); private final String value; @@ -370,4 +358,13 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster this.value = value; } } -} + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("path", path) + .add("stream-name", streamName) + .add("output-type", outputType) + .toString(); + } +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java new file mode 100644 index 0000000000..85a6569100 --- /dev/null +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java @@ -0,0 +1,363 @@ +/* + * Copyright © 2019 FRINX s.r.o. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.restconf.nb.rfc8040.streams.listeners; + +import static java.util.Objects.requireNonNull; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.StampedLock; +import java.util.function.Function; +import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants; +import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or + * {@link NotificationListenerAdapter} listeners. + */ +public final class ListenersBroker { + private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class); + private static ListenersBroker listenersBroker; + + private final StampedLock dataChangeListenersLock = new StampedLock(); + private final StampedLock notificationListenersLock = new StampedLock(); + private final BiMap dataChangeListeners = HashBiMap.create(); + private final BiMap notificationListeners = HashBiMap.create(); + + private ListenersBroker() { + } + + /** + * Creation of the singleton listeners broker. + * + * @return Reusable instance of {@link ListenersBroker}. + */ + public static synchronized ListenersBroker getInstance() { + if (listenersBroker == null) { + listenersBroker = new ListenersBroker(); + } + return listenersBroker; + } + + /** + * Returns set of all data-change-event streams. + */ + public Set getDataChangeStreams() { + final long stamp = dataChangeListenersLock.readLock(); + try { + return ImmutableSet.copyOf(dataChangeListeners.keySet()); + } finally { + dataChangeListenersLock.unlockRead(stamp); + } + } + + /** + * Returns set of all notification streams. + */ + public Set getNotificationStreams() { + final long stamp = notificationListenersLock.readLock(); + try { + return ImmutableSet.copyOf(notificationListeners.keySet()); + } finally { + notificationListenersLock.unlockRead(stamp); + } + } + + /** + * Gets {@link ListenerAdapter} specified by stream identification. + * + * @param streamName Stream name. + * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()} + * if listener with specified stream name doesn't exist. + */ + public Optional getDataChangeListenerFor(final String streamName) { + final long stamp = dataChangeListenersLock.readLock(); + try { + final ListenerAdapter listenerAdapter = dataChangeListeners.get(requireNonNull(streamName)); + return Optional.ofNullable(listenerAdapter); + } finally { + dataChangeListenersLock.unlockRead(stamp); + } + } + + /** + * Gets {@link NotificationListenerAdapter} specified by stream name. + * + * @param streamName Stream name. + * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional} + * or {@link Optional#empty()} if listener with specified stream name doesn't exist. + */ + public Optional getNotificationListenerFor(final String streamName) { + final long stamp = notificationListenersLock.readLock(); + try { + final NotificationListenerAdapter listenerAdapter = notificationListeners.get(requireNonNull(streamName)); + return Optional.ofNullable(listenerAdapter); + } finally { + notificationListenersLock.unlockRead(stamp); + } + } + + /** + * Get listener for stream-name. + * + * @param streamName Stream name. + * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional} + * or {@link Optional#empty()} if listener with specified stream name doesn't exist. + */ + public Optional getListenerFor(final String streamName) { + if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) { + return getNotificationListenerFor(streamName).map(Function.identity()); + } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) { + return getDataChangeListenerFor(streamName).map(Function.identity()); + } else { + return Optional.empty(); + } + } + + /** + * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener + * hasn't been created yet. + * + * @param path Path to data in data repository. + * @param streamName Stream name. + * @param outputType Specific type of output for notifications - XML or JSON. + * @return Created or existing data-change listener adapter. + */ + public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName, + final NotificationOutputType outputType) { + requireNonNull(path); + requireNonNull(streamName); + requireNonNull(outputType); + + final long stamp = dataChangeListenersLock.writeLock(); + try { + return dataChangeListeners.computeIfAbsent(streamName, stream -> new ListenerAdapter( + path, stream, outputType)); + } finally { + dataChangeListenersLock.unlockWrite(stamp); + } + } + + /** + * Creates new {@link NotificationDefinition} listener using input stream name and schema path + * if such listener haven't been created yet. + * + * @param schemaPath Schema path of YANG notification structure. + * @param streamName Stream name. + * @param outputType Specific type of output for notifications - XML or JSON. + * @return Created or existing notification listener adapter. + */ + public NotificationListenerAdapter registerNotificationListener(final SchemaPath schemaPath, + final String streamName, final NotificationOutputType outputType) { + requireNonNull(schemaPath); + requireNonNull(streamName); + requireNonNull(outputType); + + final long stamp = notificationListenersLock.writeLock(); + try { + return notificationListeners.computeIfAbsent(streamName, stream -> new NotificationListenerAdapter( + schemaPath, stream, outputType.getName())); + } finally { + notificationListenersLock.unlockWrite(stamp); + } + } + + /** + * Removal and closing of all data-change-event and notification listeners. + */ + public synchronized void removeAndCloseAllListeners() { + final long stampNotifications = notificationListenersLock.writeLock(); + final long stampDataChanges = dataChangeListenersLock.writeLock(); + try { + removeAndCloseAllDataChangeListenersTemplate(); + removeAndCloseAllNotificationListenersTemplate(); + } finally { + dataChangeListenersLock.unlockWrite(stampDataChanges); + notificationListenersLock.unlockWrite(stampNotifications); + } + } + + /** + * Closes and removes all data-change listeners. + */ + public void removeAndCloseAllDataChangeListeners() { + final long stamp = dataChangeListenersLock.writeLock(); + try { + removeAndCloseAllDataChangeListenersTemplate(); + } finally { + dataChangeListenersLock.unlockWrite(stamp); + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void removeAndCloseAllDataChangeListenersTemplate() { + dataChangeListeners.values() + .forEach(listenerAdapter -> { + try { + listenerAdapter.close(); + } catch (final Exception exception) { + LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception); + throw new IllegalStateException(String.format("Failed to close data-change listener %s.", + listenerAdapter), exception); + } + }); + dataChangeListeners.clear(); + } + + /** + * Closes and removes all notification listeners. + */ + public void removeAndCloseAllNotificationListeners() { + final long stamp = notificationListenersLock.writeLock(); + try { + removeAndCloseAllNotificationListenersTemplate(); + } finally { + notificationListenersLock.unlockWrite(stamp); + } + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void removeAndCloseAllNotificationListenersTemplate() { + notificationListeners.values() + .forEach(listenerAdapter -> { + try { + listenerAdapter.close(); + } catch (final Exception exception) { + LOG.error("Failed to close notification listener {}.", listenerAdapter, exception); + throw new IllegalStateException(String.format("Failed to close notification listener %s.", + listenerAdapter), exception); + } + }); + notificationListeners.clear(); + } + + /** + * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter. + * + * @param listener Listener to be closed and removed. + */ + @SuppressWarnings("checkstyle:IllegalCatch") + public void removeAndCloseDataChangeListener(final ListenerAdapter listener) { + final long stamp = dataChangeListenersLock.writeLock(); + try { + removeAndCloseDataChangeListenerTemplate(listener); + } catch (final Exception exception) { + LOG.error("Data-change listener {} cannot be closed.", listener, exception); + } finally { + dataChangeListenersLock.unlockWrite(stamp); + } + } + + /** + * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter. + * + * @param listener Listener to be closed and removed. + */ + @SuppressWarnings("checkstyle:IllegalCatch") + private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) { + final long stamp = dataChangeListenersLock.writeLock(); + try { + requireNonNull(listener).close(); + if (dataChangeListeners.inverse().remove(listener) == null) { + LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener); + } + } catch (final Exception exception) { + LOG.error("Data-change listener {} cannot be closed.", listener, exception); + throw new IllegalStateException(String.format( + "Data-change listener %s cannot be closed.", + listener), exception); + } finally { + dataChangeListenersLock.unlockWrite(stamp); + } + } + + /** + * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter. + * + * @param listener Listener to be closed and removed. + */ + @SuppressWarnings("checkstyle:IllegalCatch") + public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) { + final long stamp = notificationListenersLock.writeLock(); + try { + removeAndCloseNotificationListenerTemplate(listener); + } catch (final Exception exception) { + LOG.error("Notification listener {} cannot be closed.", listener, exception); + } finally { + notificationListenersLock.unlockWrite(stamp); + } + } + + @SuppressWarnings({"checkstyle:IllegalCatch"}) + private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) { + try { + requireNonNull(listener).close(); + if (notificationListeners.inverse().remove(listener) == null) { + LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener); + } + } catch (final Exception exception) { + LOG.error("Notification listener {} cannot be closed.", listener, exception); + throw new IllegalStateException(String.format( + "Notification listener %s cannot be closed.", listener), + exception); + } + } + + /** + * Removal and closing of general listener (data-change or notification listener). + * + * @param listener Listener to be closed and removed from cache. + */ + void removeAndCloseListener(final BaseListenerInterface listener) { + requireNonNull(listener); + if (listener instanceof ListenerAdapter) { + removeAndCloseDataChangeListener((ListenerAdapter) listener); + } else if (listener instanceof NotificationListenerAdapter) { + removeAndCloseNotificationListener((NotificationListenerAdapter) listener); + } + } + + /** + * Creates string representation of stream name from URI. Removes slash from URI in start and end positions. + * + * @param uri URI for creation of stream name. + * @return String representation of stream name. + */ + public static String createStreamNameFromUri(final String uri) { + String result = requireNonNull(uri); + if (result.startsWith("/")) { + result = result.substring(1); + } + if (result.endsWith("/")) { + result = result.substring(0, result.length() - 1); + } + return result; + } + + @VisibleForTesting + public synchronized void setDataChangeListeners(final Map listenerAdapterCollection) { + final long stamp = dataChangeListenersLock.writeLock(); + try { + dataChangeListeners.clear(); + dataChangeListeners.putAll(listenerAdapterCollection); + } finally { + dataChangeListenersLock.unlockWrite(stamp); + } + } +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java index 1814f07113..32a8c66eeb 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java @@ -8,6 +8,7 @@ package org.opendaylight.restconf.nb.rfc8040.streams.listeners; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.gson.JsonObject; import com.google.gson.JsonParser; @@ -34,9 +35,7 @@ import org.w3c.dom.Element; import org.w3c.dom.Node; /** - * {@link NotificationListenerAdapter} is responsible to track events on - * notifications. - * + * {@link NotificationListenerAdapter} is responsible to track events on notifications. */ public class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener { @@ -49,12 +48,9 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem /** * Set path of listener and stream name, register event bus. * - * @param path - * path of notification - * @param streamName - * stream name of listener - * @param outputType - * type of output on notification (JSON, XML) + * @param path Schema path of YANG notification. + * @param streamName Name of the stream. + * @param outputType Type of output on notification (JSON or XML). */ NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) { register(this); @@ -67,9 +63,9 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem } /** - * Get outputType of listener. + * Get output type of this listener. * - * @return the outputType + * @return The configured output type (JSON or XML). */ @Override public String getOutputType() { @@ -93,7 +89,7 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem /** * Get stream name of this listener. * - * @return {@link String} + * @return The configured stream name. */ @Override public String getStreamName() { @@ -103,7 +99,7 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem /** * Get schema path of notification. * - * @return {@link SchemaPath} + * @return The configured schema path that points to observing YANG notification schema node. */ public SchemaPath getSchemaPath() { return this.path; @@ -112,7 +108,7 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem /** * Prepare data of notification and data to client. * - * @param data data + * @param data JSON or XML data that holds notification data. */ private void prepareAndPostData(final String data) { final Event event = new Event(EventType.NOTIFY); @@ -121,9 +117,9 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem } /** - * Prepare json from notification data. + * Creation of JSON from notification data. * - * @return json as {@link String} + * @return Transformed notification data in JSON format. */ @VisibleForTesting String prepareJson(final SchemaContext schemaContext, final DOMNotification notification) { @@ -137,8 +133,8 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem private static String writeBodyToString(final SchemaContext schemaContext, final DOMNotification notification) { final Writer writer = new StringWriter(); final NormalizedNodeStreamWriter jsonStream = JSONNormalizedNodeStreamWriter.createExclusiveWriter( - JSONCodecFactorySupplier.DRAFT_LHOTKA_NETMOD_YANG_JSON_02.getShared(schemaContext), notification.getType(), - null, JsonWriterFactory.createJsonWriter(writer)); + JSONCodecFactorySupplier.DRAFT_LHOTKA_NETMOD_YANG_JSON_02.getShared(schemaContext), + notification.getType(), null, JsonWriterFactory.createJsonWriter(writer)); final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream); try { nodeWriter.write(notification.getBody()); @@ -149,6 +145,11 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem return writer.toString(); } + /** + * Creation of XML from notification data. + * + * @return Transformed notification data in XML format. + */ private String prepareXml(final SchemaContext schemaContext, final DOMNotification notification) { final Document doc = createDocument(); final Element notificationElement = basePartDoc(doc); @@ -164,7 +165,6 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem private void addValuesToNotificationEventElement(final Document doc, final Element element, final SchemaContext schemaContext, final DOMNotification notification) { try { - final DOMResult domResult = writeNormalizedNode(notification.getBody(), schemaContext, this.path); final Node result = doc.importNode(domResult.getNode().getFirstChild(), true); final Element dataElement = doc.createElement("notification"); @@ -176,4 +176,13 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem LOG.error("Error processing stream", e); } } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("path", path) + .add("stream-name", streamName) + .add("output-type", outputType) + .toString(); + } } diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Notificator.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Notificator.java deleted file mode 100644 index d26fca4ed2..0000000000 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Notificator.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.restconf.nb.rfc8040.streams.listeners; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; -import org.opendaylight.yangtools.yang.model.api.SchemaPath; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link Notificator} is responsible to create, remove and find - * {@link ListenerAdapter} listener. - */ -public final class Notificator { - - private static Map dataChangeListener = new ConcurrentHashMap<>(); - private static Map> notificationListenersByStreamName = - new ConcurrentHashMap<>(); - - private static final Logger LOG = LoggerFactory.getLogger(Notificator.class); - private static final Lock LOCK = new ReentrantLock(); - - private Notificator() { - } - - /** - * Returns list of all stream names. - */ - public static Set getStreamNames() { - return dataChangeListener.keySet(); - } - - /** - * Gets {@link ListenerAdapter} specified by stream name. - * - * @param streamName - * The name of the stream. - * @return {@link ListenerAdapter} specified by stream name. - */ - public static ListenerAdapter getListenerFor(final String streamName) { - return dataChangeListener.get(streamName); - } - - /** - * Checks if the listener specified by {@link YangInstanceIdentifier} path exist. - * - * @param streamName name of the stream - * @return True if the listener exist, false otherwise. - */ - public static boolean existListenerFor(final String streamName) { - return dataChangeListener.containsKey(streamName); - } - - /** - * Creates new {@link ListenerAdapter} listener from - * {@link YangInstanceIdentifier} path and stream name. - * - * @param path - * Path to data in data repository. - * @param streamName - * The name of the stream. - * @param outputType - * Spcific type of output for notifications - XML or JSON - * @return New {@link ListenerAdapter} listener from - * {@link YangInstanceIdentifier} path and stream name. - */ - public static ListenerAdapter createListener(final YangInstanceIdentifier path, final String streamName, - final NotificationOutputType outputType) { - final ListenerAdapter listener = new ListenerAdapter(path, streamName, outputType); - try { - LOCK.lock(); - dataChangeListener.put(streamName, listener); - } finally { - LOCK.unlock(); - } - return listener; - } - - /** - * Looks for listener determined by {@link YangInstanceIdentifier} path and removes it. - * Creates String representation of stream name from URI. Removes slash from URI in start and end position. - * - * @param uri - * URI for creation stream name. - * @return String representation of stream name. - */ - public static String createStreamNameFromUri(final String uri) { - if (uri == null) { - return null; - } - String result = uri; - if (result.startsWith("/")) { - result = result.substring(1); - } - if (result.endsWith("/")) { - result = result.substring(0, result.length() - 1); - } - return result; - } - - /** - * Removes all listeners. - */ - @SuppressWarnings("checkstyle:IllegalCatch") - public static void removeAllListeners() { - for (final ListenerAdapter listener : dataChangeListener.values()) { - try { - listener.close(); - } catch (final Exception e) { - LOG.error("Failed to close listener", e); - } - } - try { - LOCK.lock(); - dataChangeListener = new ConcurrentHashMap<>(); - } finally { - LOCK.unlock(); - } - } - - /** - * Delete {@link ListenerAdapter} listener specified in parameter. - * - * @param - * - * @param listener - * ListenerAdapter - */ - @SuppressWarnings("checkstyle:IllegalCatch") - private static void deleteListener(final T listener) { - if (listener != null) { - try { - listener.close(); - } catch (final Exception e) { - LOG.error("Failed to close listener", e); - } - try { - LOCK.lock(); - dataChangeListener.remove(listener.getStreamName()); - } finally { - LOCK.unlock(); - } - } - } - - /** - * Check if the listener specified by qnames of request exist. - * - * @param streamName - * name of stream - * @return True if the listener exist, false otherwise. - */ - public static boolean existNotificationListenerFor(final String streamName) { - return notificationListenersByStreamName.containsKey(streamName); - } - - /** - * Prepare listener for notification ({@link NotificationDefinition}). - * - * @param paths - * paths of notifications - * @param streamName - * name of stream (generated by paths) - * @param outputType - * type of output for onNotification - XML or JSON - * @return List of {@link NotificationListenerAdapter} by paths - */ - public static List createNotificationListener(final List paths, - final String streamName, final String outputType) { - final List listListeners = new ArrayList<>(); - for (final SchemaPath path : paths) { - final NotificationListenerAdapter listener = new NotificationListenerAdapter(path, streamName, outputType); - listListeners.add(listener); - } - try { - LOCK.lock(); - notificationListenersByStreamName.put(streamName, listListeners); - } finally { - LOCK.unlock(); - } - return listListeners; - } - - public static void removeListenerIfNoSubscriberExists(final T listener) { - if (!listener.hasSubscribers()) { - if (listener instanceof NotificationListenerAdapter) { - deleteNotificationListener(listener); - } else { - deleteListener(listener); - } - } - } - - @SuppressWarnings("checkstyle:IllegalCatch") - private static void deleteNotificationListener(final T listener) { - if (listener != null) { - try { - listener.close(); - } catch (final Exception e) { - LOG.error("Failed to close listener", e); - } - try { - LOCK.lock(); - notificationListenersByStreamName.remove(listener.getStreamName()); - } finally { - LOCK.unlock(); - } - } - } - - public static List getNotificationListenerFor(final String streamName) { - return notificationListenersByStreamName.get(streamName); - } -} diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServer.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServer.java index 54b11e040e..c13d2f591c 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServer.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServer.java @@ -14,18 +14,17 @@ import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; -import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@link WebSocketServer} is the singleton responsible for starting and stopping the - * web socket server. + * {@link WebSocketServer} is the class that is responsible for starting and stopping of web-socket server with + * specified listening TCP port. */ public final class WebSocketServer implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class); - private static WebSocketServer instance = null; private final int port; @@ -41,8 +40,7 @@ public final class WebSocketServer implements Runnable { /** * Create singleton instance of {@link WebSocketServer}. * - * @param port TCP port used for this server - * @return instance of {@link WebSocketServer} + * @param port TCP port used for this server. */ public static WebSocketServer createInstance(final int port) { Preconditions.checkState(instance == null, "createInstance() has already been called"); @@ -53,9 +51,9 @@ public final class WebSocketServer implements Runnable { } /** - * Get the websocket of TCP port. + * Get the TCP port of websocket server. * - * @return websocket TCP port + * @return TCP port number. */ public int getPort() { return port; @@ -64,7 +62,7 @@ public final class WebSocketServer implements Runnable { /** * Get instance of {@link WebSocketServer} created by {@link #createInstance(int)}. * - * @return instance of {@link WebSocketServer} + * @return Instance of {@link WebSocketServer}. */ public static WebSocketServer getInstance() { Preconditions.checkNotNull(instance, "createInstance() must be called prior to getInstance()"); @@ -96,10 +94,10 @@ public final class WebSocketServer implements Runnable { channel.closeFuture().sync(); } catch (final InterruptedException e) { - LOG.error("Web socket server encountered an error during startup attempt on port {}", port, e); + LOG.error("Web socket server encountered an error during startup attempt on port {}.", port, e); } catch (Throwable throwable) { // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them - LOG.error("Error while binding to port {}", port, throwable); + LOG.error("Error while binding to port {}.", port, throwable); throw throwable; } finally { stop(); @@ -110,8 +108,8 @@ public final class WebSocketServer implements Runnable { * Stops the web socket server and removes all listeners. */ private void stop() { - LOG.debug("Stopping the web socket server instance on port {}", port); - Notificator.removeAllListeners(); + LOG.debug("Stopping the web socket server instance on port {}.", port); + ListenersBroker.getInstance().removeAndCloseAllListeners(); if (bossGroup != null) { bossGroup.shutdownGracefully(); bossGroup = null; diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerHandler.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerHandler.java index 903b0dbf56..880c59a526 100755 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerHandler.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerHandler.java @@ -8,16 +8,6 @@ package org.opendaylight.restconf.nb.rfc8040.streams.websockets; -import static io.netty.handler.codec.http.HttpHeaderNames.HOST; -import static io.netty.handler.codec.http.HttpMethod.GET; -import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; -import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; -import static io.netty.handler.codec.http.HttpUtil.setContentLength; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; - import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -25,7 +15,12 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; @@ -33,10 +28,10 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; -import java.util.List; +import java.util.Optional; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter; -import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator; import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,43 +63,47 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) { // Handle a bad request. if (!req.decoderResult().isSuccess()) { - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } // Allow only GET methods. - if (req.method() != GET) { - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); + if (req.method() != HttpMethod.GET) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN)); return; } - final String streamName = Notificator.createStreamNameFromUri(req.uri()); + final String streamName = ListenersBroker.createStreamNameFromUri(req.uri()); if (streamName.contains(RestconfConstants.DATA_SUBSCR)) { - final ListenerAdapter listener = Notificator.getListenerFor(streamName); - if (listener != null) { - listener.addSubscriber(ctx.channel()); + final Optional listener = + ListenersBroker.getInstance().getDataChangeListenerFor(streamName); + if (listener.isPresent()) { + listener.get().addSubscriber(ctx.channel()); LOG.debug("Subscriber successfully registered."); } else { LOG.error("Listener for stream with name '{}' was not found.", streamName); - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)); } } else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) { - final List listeners = Notificator.getNotificationListenerFor(streamName); - if (listeners != null && !listeners.isEmpty()) { - for (final NotificationListenerAdapter listener : listeners) { - listener.addSubscriber(ctx.channel()); - LOG.debug("Subscriber successfully registered."); - } + final Optional listener = + ListenersBroker.getInstance().getNotificationListenerFor(streamName); + if (listener.isPresent()) { + listener.get().addSubscriber(ctx.channel()); + LOG.debug("Subscriber successfully registered."); } else { LOG.error("Listener for stream with name '{}' was not found.", streamName); - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)); } } // Handshake final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), - null, false); + null, false); this.handshaker = wsFactory.newHandshaker(req); if (this.handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); @@ -122,17 +121,17 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler * @param res FullHttpResponse */ private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req, - final FullHttpResponse res) { + final FullHttpResponse res) { // Generate an error page if response getStatus code is not OK (200). - final boolean notOkay = !OK.equals(res.status()); + final boolean notOkay = !HttpResponseStatus.OK.equals(res.status()); if (notOkay) { res.content().writeCharSequence(res.status().toString(), CharsetUtil.UTF_8); - setContentLength(res, res.content().readableBytes()); + HttpUtil.setContentLength(res, res.content().readableBytes()); } // Send the response and close the connection if necessary. final ChannelFuture f = ctx.channel().writeAndFlush(res); - if (notOkay || !isKeepAlive(req)) { + if (notOkay || !HttpUtil.isKeepAlive(req)) { f.addListener(ChannelFutureListener.CLOSE); } } @@ -146,26 +145,31 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) { if (frame instanceof CloseWebSocketFrame) { this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); - final String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText()); + final String streamName = ListenersBroker.createStreamNameFromUri( + ((CloseWebSocketFrame) frame).reasonText()); if (streamName.contains(RestconfConstants.DATA_SUBSCR)) { - final ListenerAdapter listener = Notificator.getListenerFor(streamName); - if (listener != null) { - listener.removeSubscriber(ctx.channel()); - LOG.debug("Subscriber successfully registered."); - Notificator.removeListenerIfNoSubscriberExists(listener); + final Optional listener = ListenersBroker.getInstance() + .getDataChangeListenerFor(streamName); + if (listener.isPresent()) { + listener.get().removeSubscriber(ctx.channel()); + LOG.debug("Subscriber successfully removed."); + if (!listener.get().hasSubscribers()) { + ListenersBroker.getInstance().removeAndCloseDataChangeListener(listener.get()); + } } } else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) { - final List listeners = Notificator.getNotificationListenerFor(streamName); - if (listeners != null && !listeners.isEmpty()) { - for (final NotificationListenerAdapter listener : listeners) { - listener.removeSubscriber(ctx.channel()); + final Optional listener + = ListenersBroker.getInstance().getNotificationListenerFor(streamName); + if (listener.isPresent()) { + listener.get().removeSubscriber(ctx.channel()); + LOG.debug("Subscriber successfully removed."); + if (!listener.get().hasSubscribers()) { + ListenersBroker.getInstance().removeAndCloseNotificationListener(listener.get()); } } } - return; } else if (frame instanceof PingWebSocketFrame) { ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain())); - return; } } @@ -177,10 +181,10 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler /** * Get web socket location from HTTP request. * - * @param req HTTP request from which the location will be returned + * @param req HTTP request from which the location will be returned. * @return String representation of web socket location. */ private static String getWebSocketLocation(final HttpRequest req) { - return "ws://" + req.headers().get(HOST) + req.uri(); + return "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri(); } -} +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerInitializer.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerInitializer.java index 5ab8639b21..de8aaf6521 100644 --- a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerInitializer.java +++ b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerInitializer.java @@ -15,17 +15,16 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; /** - * {@link WebSocketServerInitializer} is used to setup the {@link ChannelPipeline} of a {@link io.netty.channel.Channel} - * . + * {@link WebSocketServerInitializer} is used to setup the {@link ChannelPipeline} of + * a {@link io.netty.channel.Channel}. */ public class WebSocketServerInitializer extends ChannelInitializer { @Override - protected void initChannel(final SocketChannel ch) { - ChannelPipeline pipeline = ch.pipeline(); + protected void initChannel(final SocketChannel channel) { + ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("codec-http", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("handler", new WebSocketServerHandler()); } - -} +} \ No newline at end of file diff --git a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java index f28981dde1..64751f82d5 100644 --- a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java +++ b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java @@ -15,15 +15,16 @@ import static org.mockito.Mockito.when; import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateTrueFluentFuture; import com.google.common.collect.ImmutableClassToInstanceMap; -import java.lang.reflect.Field; +import java.io.FileNotFoundException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; @@ -32,6 +33,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.dom.api.DOMDataBroker; @@ -49,17 +51,18 @@ import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler; import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler; import org.opendaylight.restconf.nb.rfc8040.handlers.TransactionChainHandler; import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter; -import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator; +import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker; import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec; import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils; public class RestconfStreamsSubscriptionServiceImplTest { private static final String URI = "/restconf/18/data/ietf-restconf-monitoring:restconf-state/streams/stream/" + "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE"; - private static Field listenersByStreamName; @Mock private DOMDataBrokerHandler dataBrokerHandler; @@ -73,7 +76,7 @@ public class RestconfStreamsSubscriptionServiceImplTest { @SuppressWarnings("unchecked") @Before - public void setUp() throws Exception { + public void setUp() throws FileNotFoundException, URISyntaxException { MockitoAnnotations.initMocks(this); final DOMTransactionChain domTx = mock(DOMTransactionChain.class); @@ -107,6 +110,8 @@ public class RestconfStreamsSubscriptionServiceImplTest { final UriBuilder baseUriBuilder = new LocalUriInfo().getBaseUriBuilder(); when(uriInfo.getBaseUri()).thenReturn(baseUriBuilder.build()); when(uriInfo.getBaseUriBuilder()).thenReturn(baseUriBuilder); + final URI uri = new URI("http://127.0.0.1/" + URI); + when(uriInfo.getAbsolutePath()).thenReturn(uri); this.schemaHandler.onGlobalContextUpdated( YangParserTestUtils.parseYangFiles(TestRestconfUtils.loadFiles("/notifications"))); } @@ -124,29 +129,31 @@ public class RestconfStreamsSubscriptionServiceImplTest { } @BeforeClass - public static void setUpBeforeTest() throws Exception { + public static void setUpBeforeTest() { final Map listenersByStreamNameSetter = new HashMap<>(); final ListenerAdapter adapter = mock(ListenerAdapter.class); + final YangInstanceIdentifier yiid = mock(YangInstanceIdentifier.class); + final YangInstanceIdentifier.PathArgument lastPathArgument = mock(YangInstanceIdentifier.PathArgument.class); + final QName qname = QName.create("toaster", "2009-11-20", "toasterStatus"); + Mockito.when(adapter.getPath()).thenReturn(yiid); + Mockito.when(adapter.getOutputType()).thenReturn("JSON"); + Mockito.when(yiid.getLastPathArgument()).thenReturn(lastPathArgument); + Mockito.when(lastPathArgument.getNodeType()).thenReturn(qname); listenersByStreamNameSetter.put( "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", adapter); - listenersByStreamName = Notificator.class.getDeclaredField("dataChangeListener"); - - listenersByStreamName.setAccessible(true); - listenersByStreamName.set(Notificator.class, listenersByStreamNameSetter); + ListenersBroker.getInstance().setDataChangeListeners(listenersByStreamNameSetter); } @AfterClass - public static void setUpAfterTest() throws Exception { - listenersByStreamName.set(Notificator.class, null); - listenersByStreamName.set(Notificator.class, new ConcurrentHashMap<>()); - listenersByStreamName.setAccessible(false); + public static void setUpAfterTest() { + ListenersBroker.getInstance().setDataChangeListeners(Collections.emptyMap()); } @Test - public void testSubscribeToStream() throws Exception { + public void testSubscribeToStream() { final UriBuilder uriBuilder = UriBuilder.fromUri(URI); - Notificator.createListener( + ListenersBroker.getInstance().registerDataChangeListener( IdentifierCodec.deserialize("toaster:toaster/toasterStatus", this.schemaHandler.get()), "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE", NotificationOutputType.XML); diff --git a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java index 4021be2c8d..473d6af1ce 100644 --- a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java +++ b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java @@ -17,7 +17,6 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.json.JSONObject; @@ -87,7 +86,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest { final NotificationOutputTypeGrouping.NotificationOutputType outputType, final boolean leafNodesOnly) { super(path, streamName, outputType); - setQueryParams(EPOCH, Optional.empty(), Optional.empty(), leafNodesOnly); + setQueryParams(EPOCH, null, null, leafNodesOnly); } @Override diff --git a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerTest.java b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerTest.java index 33328bc2d1..8b1ede4116 100644 --- a/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerTest.java +++ b/restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerTest.java @@ -18,7 +18,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import org.junit.Before; @@ -215,14 +214,10 @@ public class NotificationListenerTest { return child; } - private String prepareJson(final DOMNotification notificationData, final SchemaPath schemaPathNotifi) - throws Exception { - final List paths = new ArrayList<>(); - paths.add(schemaPathNotifi); - final List listNotifi = - Notificator.createNotificationListener(paths, "stream-name", NotificationOutputType.JSON.toString()); - final NotificationListenerAdapter notifi = listNotifi.get(0); - final String result = notifi.prepareJson(schmeaCtx, notificationData); + private String prepareJson(final DOMNotification notificationData, final SchemaPath schemaPathNotifi) { + final NotificationListenerAdapter notifiAdapter = ListenersBroker.getInstance().registerNotificationListener( + schemaPathNotifi, "stream-name", NotificationOutputType.JSON); + final String result = notifiAdapter.prepareJson(schmeaCtx, notificationData); return Preconditions.checkNotNull(result); } }