&& identifier.contains(STREAM_LOCATION_PATH_PART)) {
final String value = (String) node.getValue();
final String streamName = value.substring(
- value.indexOf(CREATE_NOTIFICATION_STREAM.toString() + RestconfConstants.SLASH),
- value.length());
+ value.indexOf(CREATE_NOTIFICATION_STREAM + RestconfConstants.SLASH));
this.delegRestconfSubscrService.subscribeToStream(streamName, uriInfo);
}
if (node == null) {
@Override
public NormalizedNodeContext invokeRpc(final String identifier, final NormalizedNodeContext payload,
- final UriInfo uriInfo) {
+ final UriInfo uriInfo) {
final SchemaContextRef refSchemaCtx = new SchemaContextRef(this.schemaContextHandler.get());
final SchemaPath schemaPath = payload.getInstanceIdentifierContext().getSchemaNode().getPath();
final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint();
SchemaContextRef schemaContextRef;
if (mountPoint == null) {
- if (namespace.toString().equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE)) {
- if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCR)) {
+ if (namespace.equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE.getNamespace())) {
+ if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCRIPTION)) {
response = CreateStreamUtil.createDataChangeNotifiStream(payload, refSchemaCtx);
} else {
throw new RestconfDocumentedException("Not supported operation", ErrorType.RPC,
import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
import org.opendaylight.restconf.nb.rfc8040.rests.utils.SubscribeToStreamUtil;
-import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeBuilder;
/**
* Implementation of {@link RestconfStreamsSubscriptionService}.
- *
*/
@Path("/")
public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSubscriptionService {
final NotificationQueryParams notificationQueryParams = NotificationQueryParams.fromUriInfo(uriInfo);
URI response = null;
- if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) {
- response = SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, notificationQueryParams,
+ if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
+ response = SubscribeToStreamUtil.subscribeToDataStream(identifier, uriInfo, notificationQueryParams,
this.handlersHolder);
} else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
- response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, notificationQueryParams,
+ response = SubscribeToStreamUtil.subscribeToYangStream(identifier, uriInfo, notificationQueryParams,
this.handlersHolder);
}
SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.handlersHolder.getSchemaHandler());
final NormalizedNodeBuilder<NodeIdentifier, Object, LeafNode<Object>> builder =
ImmutableLeafNodeBuilder.create().withValue(response.toString());
- builder.withNodeIdentifier(
- NodeIdentifier.create(QName.create("subscribe:to:notification", "2016-10-28", "location")));
+ builder.withNodeIdentifier(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME));
// prepare new header with location
final Map<String, Object> headers = new HashMap<>();
/**
* Parser and holder of query paramteres from uriInfo for notifications.
- *
*/
public static final class NotificationQueryParams {
return Optional.ofNullable(filter);
}
}
-
-}
+}
\ No newline at end of file
*/
package org.opendaylight.restconf.nb.rfc8040.rests.utils;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.List;
+import static java.util.Objects.requireNonNull;
+
import java.util.Optional;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
import org.opendaylight.restconf.common.util.DataChangeScope;
import org.opendaylight.restconf.nb.rfc8040.references.SchemaContextRef;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Util class for streams.
- *
- * <ul>
- * <li>create stream
- * <li>subscribe
- * </ul>
- *
+ * Utility class for creation of data-change-event or YANG notification streams.
*/
public final class CreateStreamUtil {
private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
- private static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type";
private CreateStreamUtil() {
- throw new UnsupportedOperationException("Util class");
+ throw new UnsupportedOperationException("Utility class");
}
/**
- * Create stream with POST operation via RPC.
- *
- * @param payload
- * input of rpc - example in JSON:
- *
- * <pre>
- * {@code
- * {
- * "input": {
- * "path": "/toaster:toaster/toaster:toasterStatus",
- * "sal-remote-augment:datastore": "OPERATIONAL",
- * "sal-remote-augment:scope": "ONE"
- * }
- * }
- * }
- * </pre>
- *
- * @param refSchemaCtx
- * reference to {@link SchemaContext} -
- * {@link SchemaContextRef}
- * @return {@link DOMRpcResult} - This means output of RPC - example in JSON:
+ * Create data-change-event or notification stream with POST operation via RPC.
*
- * <pre>
- * {@code
+ * @param payload Input of RPC - example in JSON (data-change-event stream):
+ * <pre>
+ * {@code
+ * {
+ * "input": {
+ * "path": "/toaster:toaster/toaster:toasterStatus",
+ * "sal-remote-augment:datastore": "OPERATIONAL",
+ * "sal-remote-augment:scope": "ONE"
+ * }
+ * }
+ * }
+ * </pre>
+ * @param refSchemaCtx Reference to {@link SchemaContext} - {@link SchemaContextRef}.
+ * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
+ * <pre>
+ * {@code
* {
* "output": {
* "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
* }
* }
- * }
- * </pre>
- *
+ * }
+ * </pre>
*/
public static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
final SchemaContextRef refSchemaCtx) {
- final ContainerNode data = (ContainerNode) payload.getData();
+ // parsing out of container with settings and path
+ final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
final YangInstanceIdentifier path = preparePath(data, qname);
- String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
-
- final QName outputQname = QName.create(qname, "output");
- final QName streamNameQname = QName.create(qname, "stream-name");
+ // building of stream name
+ final StringBuilder streamNameBuilder = new StringBuilder(
+ prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx).get(), data));
final NotificationOutputType outputType = prepareOutputType(data);
if (outputType.equals(NotificationOutputType.JSON)) {
- streamName = streamName + "/JSON";
+ streamNameBuilder.append('/').append(outputType.getName());
}
+ final String streamName = streamNameBuilder.toString();
- if (!Notificator.existListenerFor(streamName)) {
- Notificator.createListener(path, streamName, outputType);
- }
+ // registration of the listener
+ ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
+
+ // building of output
+ final QName outputQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_CONTAINER_NAME);
+ final QName streamNameQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_STREAM_NAME);
- final ContainerNode output =
- ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
- .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+ final ContainerNode output = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(outputQname))
+ .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
return new DefaultDOMRpcResult(output);
}
/**
- * Prepare {@code NotificationOutputType}.
+ * Prepare {@link NotificationOutputType}.
*
- * @param data
- * data of notification
- * @return output type fo notification
+ * @param data Container with stream settings (RPC create-stream).
+ * @return Parsed {@link NotificationOutputType}.
*/
private static NotificationOutputType prepareOutputType(final ContainerNode data) {
- NotificationOutputType outputType = parseEnum(data, NotificationOutputType.class, OUTPUT_TYPE_PARAM_NAME);
+ NotificationOutputType outputType = parseEnum(
+ data, NotificationOutputType.class, RestconfStreamsConstants.OUTPUT_TYPE_PARAM_NAME);
return outputType == null ? NotificationOutputType.XML : outputType;
}
+ /**
+ * Prepare stream name.
+ *
+ * @param path Path of element from which data-change-event notifications are going to be generated.
+ * @param schemaContext Schema context.
+ * @param data Container with stream settings (RPC create-stream).
+ * @return Parsed stream name.
+ */
private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
- final SchemaContext schemaContext,
- final ContainerNode data) {
- LogicalDatastoreType ds = parseEnum(data, LogicalDatastoreType.class,
- RestconfStreamsConstants.DATASTORE_PARAM_NAME);
- ds = ds == null ? RestconfStreamsConstants.DEFAULT_DS : ds;
+ final SchemaContext schemaContext, final ContainerNode data) {
+ LogicalDatastoreType datastoreType = parseEnum(
+ data, LogicalDatastoreType.class, RestconfStreamsConstants.DATASTORE_PARAM_NAME);
+ datastoreType = datastoreType == null ? RestconfStreamsConstants.DEFAULT_DS : datastoreType;
DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
scope = scope == null ? RestconfStreamsConstants.DEFAULT_SCOPE : scope;
- final String streamName = RestconfStreamsConstants.DATA_SUBSCR + "/"
- + Notificator
- .createStreamNameFromUri(ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
- + RestconfStreamsConstants.DS_URI + ds + RestconfStreamsConstants.SCOPE_URI + scope);
- return streamName;
+ return RestconfStreamsConstants.DATA_SUBSCRIPTION
+ + "/"
+ + ListenersBroker.createStreamNameFromUri(
+ ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
+ + RestconfStreamsConstants.DS_URI
+ + datastoreType
+ + RestconfStreamsConstants.SCOPE_URI
+ + scope);
+ }
+
+ /**
+ * Prepare {@link YangInstanceIdentifier} of stream source.
+ *
+ * @param data Container with stream settings (RPC create-stream).
+ * @param qualifiedName QName of the input RPC context (used only in debugging).
+ * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
+ * are going to be generated.
+ */
+ private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
+ final Optional<DataContainerChild<? extends PathArgument, ?>> path = data.getChild(
+ new YangInstanceIdentifier.NodeIdentifier(QName.create(
+ qualifiedName,
+ RestconfStreamsConstants.STREAM_PATH_PARAM_NAME)));
+ Object pathValue = null;
+ if (path.isPresent()) {
+ pathValue = path.get().getValue();
+ }
+ if (!(pathValue instanceof YangInstanceIdentifier)) {
+ LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
+ throw new RestconfDocumentedException(
+ "Instance identifier was not normalized correctly",
+ ErrorType.APPLICATION,
+ ErrorTag.OPERATION_FAILED);
+ }
+ return (YangInstanceIdentifier) pathValue;
}
+ /**
+ * Parsing out of enumeration from RPC create-stream body.
+ *
+ * @param data Container with stream settings (RPC create-stream).
+ * @param clazz Enum type to be parsed out from input container.
+ * @param paramName Local name of the enum element.
+ * @return Parsed enumeration.
+ */
private static <T> T parseEnum(final ContainerNode data, final Class<T> clazz, final String paramName) {
final Optional<DataContainerChild<? extends PathArgument, ?>> optAugNode = data.getChild(
- RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
+ RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
if (!optAugNode.isPresent()) {
return null;
}
return null;
}
final Optional<DataContainerChild<? extends PathArgument, ?>> enumNode = ((AugmentationNode) augNode).getChild(
- new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
+ new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
if (!enumNode.isPresent()) {
return null;
}
return ResolveEnumUtil.resolveEnum(clazz, (String) value);
}
- private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
- final Optional<DataContainerChild<? extends PathArgument, ?>> path = data
- .getChild(new YangInstanceIdentifier.NodeIdentifier(QName.create(qualifiedName, "path")));
- Object pathValue = null;
- if (path.isPresent()) {
- pathValue = path.get().getValue();
- }
- if (!(pathValue instanceof YangInstanceIdentifier)) {
- LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
- throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
- ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
- }
- return (YangInstanceIdentifier) pathValue;
- }
-
/**
- * Create stream with POST operation via RPC.
+ * Create YANG notification stream using notification definition in YANG schema.
*
- * @param notificatinoDefinition
- * input of RPC
- * @param refSchemaCtx
- * schemaContext
- * @param outputType
- * output type
- * @return {@link DOMRpcResult}
+ * @param notificationDefinition YANG notification definition.
+ * @param refSchemaCtx Reference to {@link SchemaContext} - {@link SchemaContextRef}.
+ * @param outputType Output type (XML or JSON).
+ * @return {@link NotificationListenerAdapter}
*/
- public static List<NotificationListenerAdapter> createYangNotifiStream(
- final NotificationDefinition notificatinoDefinition, final SchemaContextRef refSchemaCtx,
- final String outputType) {
- final List<SchemaPath> paths = new ArrayList<>();
- final QName notificatinoDefinitionQName = notificatinoDefinition.getQName();
- final Module module =
- refSchemaCtx.findModuleByNamespaceAndRevision(notificatinoDefinitionQName.getModule().getNamespace(),
- notificatinoDefinitionQName.getModule().getRevision());
- Preconditions.checkNotNull(module,
- "Module for namespace " + notificatinoDefinitionQName.getModule().getNamespace() + " does not exist");
- NotificationDefinition notifiDef = null;
- for (final NotificationDefinition notification : module.getNotifications()) {
- if (notification.getQName().equals(notificatinoDefinitionQName)) {
- notifiDef = notification;
- break;
- }
- }
- final String moduleName = module.getName();
- Preconditions.checkNotNull(notifiDef,
- "Notification " + notificatinoDefinitionQName + "doesn't exist in module " + moduleName);
- paths.add(notifiDef.getPath());
- String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/";
- streamName = streamName + moduleName + ":" + notificatinoDefinitionQName.getLocalName();
- if (outputType.equals("JSON")) {
- streamName = streamName + "/JSON";
- }
+ public static NotificationListenerAdapter createYangNotifiStream(
+ final NotificationDefinition notificationDefinition, final SchemaContextRef refSchemaCtx,
+ final NotificationOutputType outputType) {
+ final String streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
+ requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
+ final Optional<NotificationListenerAdapter> listenerForStreamName = ListenersBroker.getInstance()
+ .getNotificationListenerFor(streamName);
+ return listenerForStreamName.orElseGet(() -> ListenersBroker.getInstance().registerNotificationListener(
+ notificationDefinition.getPath(), streamName, outputType));
+ }
- if (Notificator.existNotificationListenerFor(streamName)) {
- final List<NotificationListenerAdapter> notificationListenerFor =
- Notificator.getNotificationListenerFor(streamName);
- return SubscribeToStreamUtil.pickSpecificListenerByOutput(notificationListenerFor, outputType);
+ private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
+ final SchemaContextRef refSchemaCtx, final String outputType) {
+ final QName notificationDefinitionQName = notificationDefinition.getQName();
+ final Module module = refSchemaCtx.findModuleByNamespaceAndRevision(
+ notificationDefinitionQName.getModule().getNamespace(),
+ notificationDefinitionQName.getModule().getRevision());
+ requireNonNull(module, String.format("Module for namespace %s does not exist.",
+ notificationDefinitionQName.getModule().getNamespace()));
+
+ final StringBuilder streamNameBuilder = new StringBuilder();
+ streamNameBuilder.append(RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM)
+ .append('/')
+ .append(module.getName())
+ .append(':')
+ .append(notificationDefinitionQName.getLocalName());
+ if (outputType.equals(NotificationOutputType.JSON.getName())) {
+ streamNameBuilder.append(NotificationOutputType.JSON.getName());
}
-
- return Notificator.createNotificationListener(paths, streamName, outputType);
+ return streamNameBuilder.toString();
}
-}
+}
\ No newline at end of file
import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAMS_PATH;
import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAM_PATH_PART;
-import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.FluentFuture;
import java.net.URI;
final SchemaContextRef schemaContextRef, final UriInfo uriInfo) {
final SchemaContext schemaContext = schemaContextRef.get();
if (identifier.contains(STREAMS_PATH) && !identifier.contains(STREAM_PATH_PART)) {
- final DOMDataTreeReadWriteTransaction wTx = transactionNode.getTransactionChain().newReadWriteTransaction();
- final boolean exist = SubscribeToStreamUtil.checkExist(schemaContext, wTx);
-
- for (final NotificationDefinition notificationDefinition : schemaContextRef.get().getNotifications()) {
- final List<NotificationListenerAdapter> notifiStreamXML =
- CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
- NotificationOutputType.XML.getName());
- final List<NotificationListenerAdapter> notifiStreamJSON =
- CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
- NotificationOutputType.JSON.getName());
- for (final NotificationListenerAdapter listener : Iterables.concat(notifiStreamXML, notifiStreamJSON)) {
- final URI uri = SubscribeToStreamUtil.prepareUriByStreamName(uriInfo, listener.getStreamName());
- final NormalizedNode mapToStreams =
- RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
- listener.getSchemaPath().getLastComponent(), schemaContext.getNotifications(),
- null, listener.getOutputType(), uri,
- SubscribeToStreamUtil.getMonitoringModule(schemaContext), exist);
- SubscribeToStreamUtil.writeDataToDS(schemaContext,
- listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
- mapToStreams);
- }
- }
- SubscribeToStreamUtil.submitData(wTx);
+ createAllYangNotificationStreams(transactionNode, schemaContextRef, uriInfo);
}
return readData(content, transactionNode, withDefa, schemaContext);
}
+ private static void createAllYangNotificationStreams(final TransactionVarsWrapper transactionNode,
+ final SchemaContextRef schemaContextRef, final UriInfo uriInfo) {
+ final DOMDataTreeReadWriteTransaction wTx = transactionNode.getTransactionChain().newReadWriteTransaction();
+ final boolean exist = SubscribeToStreamUtil.checkExist(schemaContextRef.get(), wTx);
+
+ for (final NotificationDefinition notificationDefinition : schemaContextRef.get().getNotifications()) {
+ final NotificationListenerAdapter notifiStreamXML =
+ CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
+ NotificationOutputType.XML);
+ final NotificationListenerAdapter notifiStreamJSON =
+ CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
+ NotificationOutputType.JSON);
+ writeNotificationStreamToDatastore(schemaContextRef, uriInfo, wTx, exist, notifiStreamXML);
+ writeNotificationStreamToDatastore(schemaContextRef, uriInfo, wTx, exist, notifiStreamJSON);
+ }
+ SubscribeToStreamUtil.submitData(wTx);
+ }
+
+ private static void writeNotificationStreamToDatastore(final SchemaContextRef schemaContextRef,
+ final UriInfo uriInfo, final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
+ final NotificationListenerAdapter listener) {
+ final URI uri = SubscribeToStreamUtil.prepareUriByStreamName(uriInfo, listener.getStreamName());
+ final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+ listener.getSchemaPath().getLastComponent(), schemaContextRef.get().getNotifications(), null,
+ listener.getOutputType(), uri,
+ SubscribeToStreamUtil.getMonitoringModule(schemaContextRef.get()), exist);
+ SubscribeToStreamUtil.writeDataToDS(schemaContextRef.get(),
+ listener.getSchemaPath().getLastComponent().getLocalName(), readWriteTransaction, exist, mapToStreams);
+ }
+
private static NormalizedNode<?, ?> prepareDataByParamWithDef(final NormalizedNode<?, ?> result,
final YangInstanceIdentifier path, final String withDefa, final SchemaContext ctx) {
boolean trim;
/**
* Constants for streams.
- *
*/
public final class RestconfStreamsConstants {
- public static final String SAL_REMOTE_NAMESPACE = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote";
+ public static final QNameModule SAL_REMOTE_AUGMENT = QNameModule.create(
+ URI.create("urn:sal:restconf:event:subscription"),
+ Revision.of("2014-07-08"));
+ public static final QNameModule SUBSCRIBE_TO_NOTIFICATION = QNameModule.create(
+ URI.create("subscribe:to:notification"),
+ Revision.of("2016-10-28"));
+
+ public static final QName SAL_REMOTE_NAMESPACE = QName.create(
+ "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
+ "2014-01-14",
+ "sal-remote");
+
+ public static final String STREAM_PATH_PARAM_NAME = "path";
public static final String DATASTORE_PARAM_NAME = "datastore";
+ public static final String SCOPE_PARAM_NAME = "scope";
+ public static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type";
+ public static final String OUTPUT_CONTAINER_NAME = "output";
+ public static final String OUTPUT_STREAM_NAME = "stream-name";
- private static final URI NAMESPACE_EVENT_SUBSCRIPTION_AUGMENT = URI.create("urn:sal:restconf:event:subscription");
- public static final QNameModule SAL_REMOTE_AUGMENT = QNameModule.create(NAMESPACE_EVENT_SUBSCRIPTION_AUGMENT,
- Revision.of("2014-07-08")).intern();
+ public static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(ImmutableSet.of(
+ QName.create(SAL_REMOTE_AUGMENT, SCOPE_PARAM_NAME),
+ QName.create(SAL_REMOTE_AUGMENT, DATASTORE_PARAM_NAME),
+ QName.create(SAL_REMOTE_AUGMENT, OUTPUT_TYPE_PARAM_NAME)));
- public static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
- ImmutableSet.of(QName.create(SAL_REMOTE_AUGMENT, "scope"), QName.create(SAL_REMOTE_AUGMENT, "datastore"),
- QName.create(SAL_REMOTE_AUGMENT, "notification-output-type")));
+ public static final QName LOCATION_QNAME = QName.create(SUBSCRIBE_TO_NOTIFICATION, "location");
+ public static final QName NOTIFI_QNAME = QName.create(SUBSCRIBE_TO_NOTIFICATION, "notifi");
public static final DataChangeScope DEFAULT_SCOPE = DataChangeScope.BASE;
-
public static final LogicalDatastoreType DEFAULT_DS = LogicalDatastoreType.CONFIGURATION;
- public static final String SCOPE_PARAM_NAME = "scope";
-
public static final char EQUAL = ParserBuilderConstants.Deserializer.EQUAL;
-
public static final String DS_URI = RestconfConstants.SLASH + DATASTORE_PARAM_NAME + EQUAL;
-
public static final String SCOPE_URI = RestconfConstants.SLASH + SCOPE_PARAM_NAME + EQUAL;
+ public static final String SCHEMA_SUBSCRIBE_URI = "ws";
+ public static final String SCHEMA_SUBSCRIBE_SECURED_URI = "wss";
+ public static final String SCHEMA_UPGRADE_URI = "http";
+ public static final String SCHEMA_UPGRADE_SECURED_URI = "https";
- public static final int NOTIFICATION_PORT = 8181;
-
- public static final String SCHEMA_SUBSCIBRE_URI = "ws";
-
- public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
- public static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
-
- public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
- public static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+ public static final String DATA_SUBSCRIPTION = "data-change-event-subscription";
+ public static final String CREATE_DATA_SUBSCRIPTION = "create-" + DATA_SUBSCRIPTION;
+ public static final String NOTIFICATION_STREAM = "notification-stream";
+ public static final String CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
public static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
public static final String STREAM_PATH_PART = "/stream=";
private RestconfStreamsConstants() {
throw new UnsupportedOperationException("Util class.");
}
-
-}
+}
\ No newline at end of file
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
-import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketServer;
import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
.appendOffset("+HH:MM", "Z").toFormatter();
private SubscribeToStreamUtil() {
- throw new UnsupportedOperationException("Util class");
+ throw new UnsupportedOperationException("Utility class");
}
/**
- * Register listeners by streamName in identifier to listen to yang
- * notifications, put or delete info about listener to DS according to
- * ietf-restconf-monitoring.
+ * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
+ * about listener to DS according to ietf-restconf-monitoring.
*
- * @param identifier
- * identifier as stream name
- * @param uriInfo
- * for getting base URI information
- * @param notificationQueryParams
- * query parameters of notification
- * @param handlersHolder
- * holder of handlers for notifications
- * @return location for listening
+ * @param identifier Name of the stream.
+ * @param uriInfo URI information.
+ * @param notificationQueryParams Query parameters of notification.
+ * @param handlersHolder Holder of handlers for notifications.
+ * @return Stream location for listening.
*/
@SuppressWarnings("rawtypes")
- public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
+ public static URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
- final String streamName = Notificator.createStreamNameFromUri(identifier);
+ final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
}
- List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
- if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) {
- listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName());
- } else {
- listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName());
- }
- if (listeners.isEmpty()) {
- throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+ final Optional<NotificationListenerAdapter> notificationListenerAdapter =
+ ListenersBroker.getInstance().getNotificationListenerFor(streamName);
+
+ if (!notificationListenerAdapter.isPresent()) {
+ throw new RestconfDocumentedException(String.format(
+ "Stream with name %s was not found.", streamName),
+ ErrorType.PROTOCOL,
ErrorTag.UNKNOWN_ELEMENT);
}
- final DOMDataTreeReadWriteTransaction wTx =
- handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+ final DOMDataTreeReadWriteTransaction writeTransaction = handlersHolder
+ .getTransactionChainHandler()
+ .get()
+ .newReadWriteTransaction();
final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
- final boolean exist = checkExist(schemaContext, wTx);
-
+ final boolean exist = checkExist(schemaContext, writeTransaction);
final URI uri = prepareUriByStreamName(uriInfo, streamName);
- for (final NotificationListenerAdapter listener : listeners) {
- registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
- listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
- notificationQueryParams.getFilter(), false);
- listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
- final NormalizedNode mapToStreams = RestconfMappingNodeUtil
- .mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
- schemaContext.getNotifications(), notificationQueryParams.getStart(),
- listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
- writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
- mapToStreams);
- }
- submitData(wTx);
+ registerToListenNotification(
+ notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
+ notificationListenerAdapter.get().setQueryParams(
+ notificationQueryParams.getStart(),
+ notificationQueryParams.getStop().orElse(null),
+ notificationQueryParams.getFilter().orElse(null),
+ false);
+ notificationListenerAdapter.get().setCloseVars(
+ handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+ final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+ notificationListenerAdapter.get().getSchemaPath().getLastComponent(),
+ schemaContext.getNotifications(), notificationQueryParams.getStart(),
+ notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext),
+ exist);
+ writeDataToDS(schemaContext,
+ notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction,
+ exist, mapToStreams);
+ submitData(writeTransaction);
return uri;
}
- static List<NotificationListenerAdapter>
- pickSpecificListenerByOutput(final List<NotificationListenerAdapter> listeners, final String outputType) {
- for (final NotificationListenerAdapter notificationListenerAdapter : listeners) {
- if (notificationListenerAdapter.getOutputType().equals(outputType)) {
- final List<NotificationListenerAdapter> list = new ArrayList<>();
- list.add(notificationListenerAdapter);
- return list;
- }
- }
- return listeners;
- }
-
/**
* Prepare InstanceIdentifierContext for Location leaf.
*
- * @param schemaHandler
- * schemaContext handler
- * @return InstanceIdentifier of Location leaf
+ * @param schemaHandler Schema context handler.
+ * @return InstanceIdentifier of Location leaf.
*/
public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
- final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
- final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
- .findModule(qnameBase.getModule()).get()
- .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
- final List<PathArgument> path = new ArrayList<>();
- path.add(NodeIdentifier.create(qnameBase));
- path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
+ final Optional<Module> module = schemaHandler.get()
+ .findModule(RestconfStreamsConstants.NOTIFI_QNAME.getModule());
+ Preconditions.checkState(module.isPresent());
+ final Optional<DataSchemaNode> notify = module.get()
+ .findDataChildByName(RestconfStreamsConstants.NOTIFI_QNAME);
+ Preconditions.checkState(notify.isPresent());
+ final Optional<DataSchemaNode> location = ((ContainerSchemaNode) notify.get())
+ .findDataChildByName(RestconfStreamsConstants.LOCATION_QNAME);
+ Preconditions.checkState(location.isPresent());
- return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
- schemaHandler.get());
+ final List<PathArgument> path = new ArrayList<>();
+ path.add(NodeIdentifier.create(RestconfStreamsConstants.NOTIFI_QNAME));
+ path.add(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME));
+ return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location.get(),
+ null, schemaHandler.get());
}
/**
- * Register listener by streamName in identifier to listen to data change
- * notifications, put or delete info about listener to DS according to
- * ietf-restconf-monitoring.
+ * Register listener by streamName in identifier to listen to data change notifications, and put or delete
+ * information about listener to DS according to ietf-restconf-monitoring.
*
- * @param identifier
- * identifier as stream name
- * @param uriInfo
- * for getting base URI information
- * @param notificationQueryParams
- * query parameters of notification
- * @param handlersHolder
- * holder of handlers for notifications
- * @return location for listening
+ * @param identifier Identifier as stream name.
+ * @param uriInfo Base URI information.
+ * @param notificationQueryParams Query parameters of notification.
+ * @param handlersHolder Holder of handlers for notifications.
+ * @return Location for listening.
*/
@SuppressWarnings("rawtypes")
- public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
+ public static URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
- final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
-
- final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+ final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
+ final LogicalDatastoreType datastoreType = parseURIEnum(
+ LogicalDatastoreType.class,
mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
- if (ds == null) {
- final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
- LOG.debug(msg);
- throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ if (datastoreType == null) {
+ final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
+ LOG.debug(message);
+ throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
}
- final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+ final DataChangeScope scope = parseURIEnum(
+ DataChangeScope.class,
mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
if (scope == null) {
- final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
- LOG.warn(msg);
- throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
+ LOG.warn(message);
+ throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
}
- final String streamName = Notificator.createStreamNameFromUri(identifier);
-
- final ListenerAdapter listener = Notificator.getListenerFor(streamName);
- Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
-
- listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
- notificationQueryParams.getFilter(), false);
- listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+ final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
+ final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
+ Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
- registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
+ listener.get().setQueryParams(
+ notificationQueryParams.getStart(),
+ notificationQueryParams.getStop().orElse(null),
+ notificationQueryParams.getFilter().orElse(null),
+ false);
+ listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+ registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
final URI uri = prepareUriByStreamName(uriInfo, streamName);
-
- final DOMDataTreeReadWriteTransaction wTx =
- handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+ final DOMDataTreeReadWriteTransaction writeTransaction
+ = handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
- final boolean exist = checkExist(schemaContext, wTx);
+ final boolean exist = checkExist(schemaContext, writeTransaction);
final NormalizedNode mapToStreams = RestconfMappingNodeUtil
- .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
- notificationQueryParams.getStart(), listener.getOutputType(), uri,
+ .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
+ notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
getMonitoringModule(schemaContext), exist, schemaContext);
- writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
- mapToStreams);
- submitData(wTx);
+ writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
+ writeTransaction, exist, mapToStreams);
+ submitData(writeTransaction);
return uri;
}
- public static Module getMonitoringModule(final SchemaContext schemaContext) {
+ static Module getMonitoringModule(final SchemaContext schemaContext) {
return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null);
}
/**
- * Parse input of query parameters - start-time or stop-time - from
- * {@link DateAndTime} format to {@link Instant} format.
+ * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
+ * to {@link Instant} format.
*
- * @param entry
- * start-time or stop-time as string in {@link DateAndTime}
- * format
- * @return parsed {@link Instant} by entry
+ * @param entry Start-time or stop-time as string in {@link DateAndTime} format.
+ * @return Parsed {@link Instant} by entry.
*/
public static Instant parseDateFromQueryParam(final Entry<String, List<String>> entry) {
final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
final String value = event.getValue();
- final TemporalAccessor p;
+ final TemporalAccessor accessor;
try {
- p = FORMATTER.parse(value);
+ accessor = FORMATTER.parse(value);
} catch (final DateTimeParseException e) {
throw new RestconfDocumentedException("Cannot parse of value in date: " + value, e);
}
- return Instant.from(p);
-
+ return Instant.from(accessor);
}
@SuppressWarnings("rawtypes")
- static void writeDataToDS(final SchemaContext schemaContext,
- final String name, final DOMDataTreeReadWriteTransaction readWriteTransaction,
- final boolean exist, final NormalizedNode mapToStreams) {
- String pathId = "";
+ static void writeDataToDS(final SchemaContext schemaContext, final String name,
+ final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
+ final NormalizedNode mapToStreams) {
+ String pathId;
if (exist) {
pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
} else {
pathId = MonitoringModule.PATH_TO_STREAMS;
}
- readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
- mapToStreams);
+ readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
+ IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams);
}
static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
}
/**
- * Prepare map of values from URI.
+ * Prepare map of URI parameter-values.
*
- * @param identifier
- * URI
- * @return {@link Map}
+ * @param identifier String identification of URI.
+ * @return Map od URI parameters and values.
*/
- public static Map<String, String> mapValuesFromUri(final String identifier) {
+ private static Map<String, String> mapValuesFromUri(final String identifier) {
final HashMap<String, String> result = new HashMap<>();
for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
}
static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+ final String scheme = uriInfo.getAbsolutePath().getScheme();
final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
-
- prepareNotificationPort(uriInfo.getBaseUri().getPort());
- uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+ switch (scheme) {
+ case RestconfStreamsConstants.SCHEMA_UPGRADE_SECURED_URI:
+ uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_SECURED_URI);
+ break;
+ case RestconfStreamsConstants.SCHEMA_UPGRADE_URI:
+ default:
+ uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_URI);
+ }
return uriBuilder.replacePath(streamName).build();
}
/**
- * Register data change listener in dom data broker and set it to listener
- * on stream.
+ * Register data change listener in DOM data broker and set it to listener on stream.
*
- * @param ds
- * {@link LogicalDatastoreType}
- * @param scope
- * {@link DataChangeScope}
- * @param listener
- * listener on specific stream
- * @param domDataBroker
- * data broker for register data change listener
+ * @param datastore {@link LogicalDatastoreType}
+ * @param listener listener on specific stream
+ * @param domDataBroker data broker for register data change listener
*/
- private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
- final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+ private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
+ final DOMDataBroker domDataBroker) {
if (listener.isListening()) {
return;
}
throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
}
- final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(ds, listener.getPath());
+ final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
final ListenerRegistration<ListenerAdapter> registration =
- changeService.registerDataTreeChangeListener(root, listener);
-
+ changeService.registerDataTreeChangeListener(root, listener);
listener.setRegistration(registration);
}
- /**
- * Get port from web socket server. If doesn't exit, create it.
- *
- * @param port
- * - port
- */
- private static void prepareNotificationPort(final int port) {
- try {
- WebSocketServer.getInstance();
- } catch (final NullPointerException e) {
- WebSocketServer.createInstance(port);
- }
- }
-
static boolean checkExist(final SchemaContext schemaContext,
final DOMDataTreeReadOperations readWriteTransaction) {
boolean exist;
try {
- exist = readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
+ return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get();
- } catch (final InterruptedException | ExecutionException e) {
- throw new RestconfDocumentedException("Problem while checking data if exists", e);
+ } catch (final InterruptedException | ExecutionException exception) {
+ throw new RestconfDocumentedException("Problem while checking data if exists", exception);
}
- return exist;
}
private static void registerToListenNotification(final NotificationListenerAdapter listener,
final SchemaPath path = listener.getSchemaPath();
final ListenerRegistration<DOMNotificationListener> registration =
notificationServiceHandler.get().registerNotificationListener(listener, path);
-
listener.setRegistration(registration);
}
/**
- * Parse enum from URI.
+ * Parse out enumeration from URI.
*
- * @param clazz
- * enum type
- * @param value
- * string of enum value
- * @return enum
+ * @param clazz Target enumeration type.
+ * @param value String representation of enumeration value.
+ * @return Parsed enumeration type.
*/
private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
if (value == null || value.equals("")) {
private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
private final EventBus eventBus;
- @SuppressWarnings("rawtypes")
private EventBusChangeRecorder eventBusChangeRecorder;
private volatile ListenerRegistration<?> registration;
/**
* Creating {@link EventBus}.
*/
- protected AbstractCommonSubscriber() {
+ AbstractCommonSubscriber() {
this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
}
this.registration.close();
this.registration = null;
}
-
deleteDataInDS();
unregister();
}
- /**
- * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
- * subscriber to the event and post event into event bus.
- *
- * @param subscriber
- * Channel
- */
+ @Override
public void addSubscriber(final Channel subscriber) {
if (!subscriber.isActive()) {
LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
this.eventBus.post(event);
}
- /**
- * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
- * subscriber to the event and posts event into event bus.
- *
- * @param subscriber subscriber channel
- */
+ @Override
public void removeSubscriber(final Channel subscriber) {
LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
final Event event = new Event(EventType.DEREGISTER);
this.eventBus.post(event);
}
- /**
- * Sets {@link ListenerRegistration} registration.
- *
- * @param registration
- * DOMDataChangeListener registration
- */
+ @Override
public void setRegistration(final ListenerRegistration<?> registration) {
this.registration = registration;
}
- /**
- * Checks if {@link ListenerRegistration} registration exist.
- *
- * @return True if exist, false otherwise.
- */
+ @Override
public boolean isListening() {
return this.registration != null;
}
* Creating and registering {@link EventBusChangeRecorder} of specific
* listener on {@link EventBus}.
*
- * @param listener
- * specific listener of notifications
+ * @param listener Specific listener of notifications.
*/
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected <T extends BaseListenerInterface> void register(final T listener) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ <T extends BaseListenerInterface> void register(final T listener) {
this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
this.eventBus.register(this.eventBusChangeRecorder);
}
/**
* Post event to event bus.
*
- * @param event
- * data of incoming notifications
+ * @param event Data of incoming notifications.
*/
protected void post(final Event event) {
this.eventBus.post(event);
}
/**
- * Removes all subscribers and unregisters event bus change recorder form
- * event bus.
+ * Removes all subscribers and unregisters event bus change recorder form event bus.
*/
- protected void unregister() {
+ private void unregister() {
this.subscribers.clear();
this.eventBus.unregister(this.eventBusChangeRecorder);
}
import com.google.common.base.Preconditions;
import java.io.StringReader;
import java.time.Instant;
-import java.util.Optional;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
/**
* Features of query parameters part of both notifications.
- *
*/
abstract class AbstractQueryParams extends AbstractNotificationsData {
// FIXME: BUG-7956: switch to using UntrustedXML
/**
* Set query parameters for listener.
*
- * @param start
- * start-time of getting notification
- * @param stop
- * stop-time of getting notification
- * @param filter
- * indicate which subset of all possible events are of interest
- * @param leafNodesOnly
- * if true, notifications will contain changes to leaf nodes only
+ * @param start Start-time of getting notification.
+ * @param stop Stop-time of getting notification.
+ * @param filter Indicates which subset of all possible events are of interest.
+ * @param leafNodesOnly If TRUE, notifications will contain changes of leaf nodes only.
*/
@SuppressWarnings("checkstyle:hiddenField")
- public void setQueryParams(final Instant start, final Optional<Instant> stop, final Optional<String> filter,
- final boolean leafNodesOnly) {
+ public void setQueryParams(final Instant start, final Instant stop, final String filter,
+ final boolean leafNodesOnly) {
this.start = Preconditions.checkNotNull(start);
- this.stop = stop.orElse(null);
- this.filter = filter.orElse(null);
+ this.stop = stop;
+ this.filter = filter;
this.leafNodesOnly = leafNodesOnly;
}
*
* @return true if this query should only notify about leaf node changes
*/
- public boolean getLeafNodesOnly() {
+ boolean getLeafNodesOnly() {
return leafNodesOnly;
}
/**
* Check if is filter used and then prepare and post data do client.
*
- * @param xml data of notification
+ * @param xml XML data of notification.
*/
@SuppressWarnings("checkstyle:IllegalCatch")
boolean checkFilter(final String xml) {
if (this.filter == null) {
return true;
}
-
try {
return parseFilterParam(xml);
} catch (final Exception e) {
}
/**
- * Parse and evaluate filter value by xml.
+ * Parse and evaluate filter statement by XML format.
*
- * @return true or false - depends on filter expression and data of
- * notifiaction
- * @throws Exception if operation fails
+ * @return {@code true} or {@code false} depending on filter expression and data of notification.
+ * @throws Exception If operation fails.
*/
private boolean parseFilterParam(final String xml) throws Exception {
final Document docOfXml = DBF.newDocumentBuilder().parse(new InputSource(new StringReader(xml)));
// FIXME: BUG-7956: xPath.setNamespaceContext(nsContext);
return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
}
-}
+}
\ No newline at end of file
import io.netty.channel.Channel;
import java.util.Set;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
/**
- * Base interface for both listeners({@link ListenerAdapter},
- * {@link NotificationListenerAdapter}).
+ * Base interface for both listeners({@link ListenerAdapter}, {@link NotificationListenerAdapter}).
*/
-interface BaseListenerInterface extends AutoCloseable {
+public interface BaseListenerInterface extends AutoCloseable {
/**
* Return all subscribers of listener.
*
- * @return set of subscribers
+ * @return Set of all subscribers.
*/
Set<Channel> getSubscribers();
/**
* Checks if exists at least one {@link Channel} subscriber.
*
- * @return True if exist at least one {@link Channel} subscriber, false
- * otherwise.
+ * @return {@code true} if exist at least one {@link Channel} subscriber, {@code false} otherwise.
*/
boolean hasSubscribers();
/**
* Get name of stream.
*
- * @return stream name
+ * @return Stream name.
*/
String getStreamName();
/**
* Get output type.
*
- * @return outputType
+ * @return Output type (JSON or XML).
*/
String getOutputType();
+
+ /**
+ * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+ * subscriber to the event and post event into event bus.
+ *
+ * @param subscriber Web-socket channel.
+ */
+ void addSubscriber(Channel subscriber);
+
+ /**
+ * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+ * subscriber to the event and posts event into event bus.
+ *
+ * @param subscriber Subscriber channel.
+ */
+ void removeSubscriber(Channel subscriber);
+
+ /**
+ * Sets {@link ListenerRegistration} registration.
+ *
+ * @param registration DOMDataChangeListener registration.
+ */
+ void setRegistration(ListenerRegistration<?> registration);
+
+ /**
+ * Checks if {@link ListenerRegistration} registration exists.
+ *
+ * @return {@code true} if exists, {@code false} otherwise.
+ */
+ boolean isListening();
}
/**
* Event bus change recorder of specific listener of notifications.
*
- * @param listener
- * specific listener
+ * @param listener Specific listener.
*/
EventBusChangeRecorder(final T listener) {
this.listener = listener;
public void recordCustomerChange(final Event event) {
if (event.getType() == EventType.REGISTER) {
final Channel subscriber = event.getSubscriber();
- if (!this.listener.getSubscribers().contains(subscriber)) {
- this.listener.getSubscribers().add(subscriber);
- }
+ this.listener.getSubscribers().add(subscriber);
} else if (event.getType() == EventType.DEREGISTER) {
this.listener.getSubscribers().remove(event.getSubscriber());
- Notificator.removeListenerIfNoSubscriberExists(this.listener);
+ if (!this.listener.hasSubscribers()) {
+ ListenersBroker.getInstance().removeAndCloseListener(this.listener);
+ }
} else if (event.getType() == EventType.NOTIFY) {
for (final Channel subscriber : this.listener.getSubscribers()) {
if (subscriber.isActive()) {
}
}
}
-}
+}
\ No newline at end of file
*/
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Instant;
import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.util.DataSchemaContextNode;
import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.w3c.dom.Node;
/**
- * {@link ListenerAdapter} is responsible to track events, which occurred by
- * changing data in data source.
+ * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
*/
public class ListenerAdapter extends AbstractCommonSubscriber implements ClusteredDOMDataTreeChangeListener {
private final NotificationOutputType outputType;
/**
- * Creates new {@link ListenerAdapter} listener specified by path and stream
- * name and register for subscribing.
+ * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
*
- * @param path
- * Path to data in data store.
- * @param streamName
- * The name of the stream.
- * @param outputType
- * Type of output on notification (JSON, XML)
+ * @param path Path to data in data store.
+ * @param streamName The name of the stream.
+ * @param outputType Type of output on notification (JSON, XML).
*/
ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
final NotificationOutputType outputType) {
/**
* Prepare data of notification and data to client.
*
- * @param xml data
+ * @param xml XML-formatted data.
*/
private void prepareAndPostData(final String xml) {
final Event event = new Event(EventType.NOTIFY);
post(event);
}
- /**
- * Tracks events of data change by customer.
- */
-
/**
* Prepare data in printable form and transform it to String.
*
- * @param dataTreeCandidates the DataTreeCandidates to transform
- *
+ * @param dataTreeCandidates Data-tree candidates to be transformed.
* @return Data in printable form.
*/
private String prepareXml(final Collection<DataTreeCandidate> dataTreeCandidates) {
*/
@SuppressWarnings("checkstyle:hiddenField")
private void addValuesToDataChangedNotificationEventElement(final Document doc,
- final Element dataChangedNotificationEventElement,
- final Collection<DataTreeCandidate> dataTreeCandidates,
+ final Element dataChangedNotificationEventElement, final Collection<DataTreeCandidate> dataTreeCandidates,
final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
for (DataTreeCandidate dataTreeCandidate : dataTreeCandidates) {
final YangInstanceIdentifier parentYiid, final SchemaContext schemaContext,
final DataSchemaContextTree dataSchemaContextTree) {
- Optional<NormalizedNode<?,?>> optionalNormalizedNode = Optional.empty();
+ Optional<NormalizedNode<?, ?>> optionalNormalizedNode = Optional.empty();
switch (candidateNode.getModificationType()) {
case APPEARED:
case SUBTREE_MODIFIED:
return;
}
- NormalizedNode<?,?> normalizedNode = optionalNormalizedNode.get();
+ NormalizedNode<?, ?> normalizedNode = optionalNormalizedNode.get();
YangInstanceIdentifier yiid = YangInstanceIdentifier.builder(parentYiid)
- .append(normalizedNode.getIdentifier()).build();
+ .append(normalizedNode.getIdentifier()).build();
- boolean isNodeMixin = dataSchemaContextTree.getChild(yiid).isMixin();
+ final Optional<DataSchemaContextNode<?>> childrenSchemaNode = dataSchemaContextTree.findChild(yiid);
+ Preconditions.checkState(childrenSchemaNode.isPresent());
+ boolean isNodeMixin = childrenSchemaNode.get().isMixin();
boolean isSkippedNonLeaf = getLeafNodesOnly() && !(normalizedNode instanceof LeafNode);
if (!isNodeMixin && !isSkippedNonLeaf) {
Node node = null;
break;
case DELETE:
case DISAPPEARED:
- node = createDataChangeEventElement(doc, yiid, Operation.DELETED, schemaContext);
+ node = createDataChangeEventElement(doc, yiid, schemaContext);
break;
case UNMODIFIED:
default:
}
for (DataTreeCandidateNode childNode : candidateNode.getChildNodes()) {
- addNodeToDataChangeNotificationEventElement(doc, dataChangedNotificationEventElement, childNode,
- yiid, schemaContext, dataSchemaContextTree);
+ addNodeToDataChangeNotificationEventElement(
+ doc, dataChangedNotificationEventElement, childNode, yiid, schemaContext, dataSchemaContextTree);
}
}
/**
- * Creates changed event element from data.
+ * Creates data-changed event element from data.
*
- * @param doc
- * {@link Document}
- * @param path
- * Path to data in data store.
- * @param operation
- * {@link Operation}
- * @param schemaContext
- * schema context
- * @return {@link Node} node represented by changed event element.
+ * @param doc {@link Document}
+ * @param schemaContext Schema context.
+ * @return {@link Node} represented by changed event element.
*/
private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier eventPath,
- final Operation operation, final SchemaContext schemaContext) {
+ final SchemaContext schemaContext) {
final Element dataChangeEventElement = doc.createElement("data-change-event");
final Element pathElement = doc.createElement("path");
addPathAsValueToElement(eventPath, pathElement, schemaContext);
dataChangeEventElement.appendChild(pathElement);
final Element operationElement = doc.createElement("operation");
- operationElement.setTextContent(operation.value);
+ operationElement.setTextContent(Operation.DELETED.value);
dataChangeEventElement.appendChild(operationElement);
return dataChangeEventElement;
}
- private Node createCreatedChangedDataChangeEventElement(final Document doc,
- final YangInstanceIdentifier eventPath, final NormalizedNode<?, ?> normalized, final Operation operation,
- final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+ private Node createCreatedChangedDataChangeEventElement(final Document doc, final YangInstanceIdentifier eventPath,
+ final NormalizedNode<?, ?> normalized, final Operation operation, final SchemaContext schemaContext,
+ final DataSchemaContextTree dataSchemaContextTree) {
final Element dataChangeEventElement = doc.createElement("data-change-event");
final Element pathElement = doc.createElement("path");
addPathAsValueToElement(eventPath, pathElement, schemaContext);
try {
SchemaPath nodePath;
+ final Optional<DataSchemaContextNode<?>> childrenSchemaNode = dataSchemaContextTree.findChild(eventPath);
+ Preconditions.checkState(childrenSchemaNode.isPresent());
if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) {
- nodePath = dataSchemaContextTree.getChild(eventPath).getDataSchemaNode().getPath();
+ nodePath = childrenSchemaNode.get().getDataSchemaNode().getPath();
} else {
- nodePath = dataSchemaContextTree.getChild(eventPath).getDataSchemaNode().getPath().getParent();
+ nodePath = childrenSchemaNode.get().getDataSchemaNode().getPath().getParent();
}
final DOMResult domResult = writeNormalizedNode(normalized, schemaContext, nodePath);
final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
/**
* Adds path as value to element.
*
- * @param eventPath
- * Path to data in data store.
- * @param element
- * {@link Element}
- * @param schemaContext
- * schema context
+ * @param eventPath Path to data in data store.
+ * @param element {@link Element}
+ * @param schemaContext Schema context.
*/
@SuppressWarnings("rawtypes")
private void addPathAsValueToElement(final YangInstanceIdentifier eventPath, final Element element,
continue;
}
textContent.append("/");
- writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), schemaContext);
+ writeIdentifierWithNamespacePrefix(textContent, pathArgument.getNodeType(), schemaContext);
if (pathArgument instanceof NodeIdentifierWithPredicates) {
final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
for (final Entry<QName, Object> entry : predicates.entrySet()) {
final QName keyValue = entry.getKey();
final String predicateValue = String.valueOf(entry.getValue());
textContent.append("[");
- writeIdentifierWithNamespacePrefix(element, textContent, keyValue, schemaContext);
+ writeIdentifierWithNamespacePrefix(textContent, keyValue, schemaContext);
textContent.append("='");
textContent.append(predicateValue);
textContent.append("'");
/**
* Writes identifier that consists of prefix and QName.
*
- * @param element
- * {@link Element}
- * @param textContent
- * StringBuilder
- * @param qualifiedName
- * QName
- * @param schemaContext
- * schema context
+ * @param textContent Text builder that should be supplemented by QName and its modules name.
+ * @param qualifiedName QName of the element.
+ * @param schemaContext Schema context that holds modules which should contain module specified in QName.
*/
- private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
- final QName qualifiedName, final SchemaContext schemaContext) {
- final Module module = schemaContext.findModule(qualifiedName.getModule()).get();
-
- textContent.append(module.getName());
- textContent.append(":");
- textContent.append(qualifiedName.getLocalName());
+ private static void writeIdentifierWithNamespacePrefix(final StringBuilder textContent, final QName qualifiedName,
+ final SchemaContext schemaContext) {
+ final Optional<Module> module = schemaContext.findModule(qualifiedName.getModule());
+ if (module.isPresent()) {
+ textContent.append(module.get().getName());
+ textContent.append(":");
+ textContent.append(qualifiedName.getLocalName());
+ } else {
+ LOG.error("Cannot write identifier with namespace prefix in data-change listener adapter: "
+ + "Cannot find module in schema context for input QName {}.", qualifiedName);
+ throw new IllegalStateException(String.format("Cannot find module in schema context for input QName %s.",
+ qualifiedName));
+ }
}
/**
- * Consists of three types {@link Operation#CREATED},
- * {@link Operation#UPDATED} and {@link Operation#DELETED}.
+ * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
*/
private enum Operation {
- CREATED("created"), UPDATED("updated"), DELETED("deleted");
+ CREATED("created"),
+ UPDATED("updated"),
+ DELETED("deleted");
private final String value;
this.value = value;
}
}
-}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("path", path)
+ .add("stream-name", streamName)
+ .add("output-type", outputType)
+ .toString();
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright © 2019 FRINX s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.Function;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
+ * {@link NotificationListenerAdapter} listeners.
+ */
+public final class ListenersBroker {
+ private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
+ private static ListenersBroker listenersBroker;
+
+ private final StampedLock dataChangeListenersLock = new StampedLock();
+ private final StampedLock notificationListenersLock = new StampedLock();
+ private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
+ private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
+
+ private ListenersBroker() {
+ }
+
+ /**
+ * Creation of the singleton listeners broker.
+ *
+ * @return Reusable instance of {@link ListenersBroker}.
+ */
+ public static synchronized ListenersBroker getInstance() {
+ if (listenersBroker == null) {
+ listenersBroker = new ListenersBroker();
+ }
+ return listenersBroker;
+ }
+
+ /**
+ * Returns set of all data-change-event streams.
+ */
+ public Set<String> getDataChangeStreams() {
+ final long stamp = dataChangeListenersLock.readLock();
+ try {
+ return ImmutableSet.copyOf(dataChangeListeners.keySet());
+ } finally {
+ dataChangeListenersLock.unlockRead(stamp);
+ }
+ }
+
+ /**
+ * Returns set of all notification streams.
+ */
+ public Set<String> getNotificationStreams() {
+ final long stamp = notificationListenersLock.readLock();
+ try {
+ return ImmutableSet.copyOf(notificationListeners.keySet());
+ } finally {
+ notificationListenersLock.unlockRead(stamp);
+ }
+ }
+
+ /**
+ * Gets {@link ListenerAdapter} specified by stream identification.
+ *
+ * @param streamName Stream name.
+ * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
+ * if listener with specified stream name doesn't exist.
+ */
+ public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
+ final long stamp = dataChangeListenersLock.readLock();
+ try {
+ final ListenerAdapter listenerAdapter = dataChangeListeners.get(requireNonNull(streamName));
+ return Optional.ofNullable(listenerAdapter);
+ } finally {
+ dataChangeListenersLock.unlockRead(stamp);
+ }
+ }
+
+ /**
+ * Gets {@link NotificationListenerAdapter} specified by stream name.
+ *
+ * @param streamName Stream name.
+ * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
+ * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
+ */
+ public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
+ final long stamp = notificationListenersLock.readLock();
+ try {
+ final NotificationListenerAdapter listenerAdapter = notificationListeners.get(requireNonNull(streamName));
+ return Optional.ofNullable(listenerAdapter);
+ } finally {
+ notificationListenersLock.unlockRead(stamp);
+ }
+ }
+
+ /**
+ * Get listener for stream-name.
+ *
+ * @param streamName Stream name.
+ * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
+ * or {@link Optional#empty()} if listener with specified stream name doesn't exist.
+ */
+ public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
+ if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
+ return getNotificationListenerFor(streamName).map(Function.identity());
+ } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
+ return getDataChangeListenerFor(streamName).map(Function.identity());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
+ * hasn't been created yet.
+ *
+ * @param path Path to data in data repository.
+ * @param streamName Stream name.
+ * @param outputType Specific type of output for notifications - XML or JSON.
+ * @return Created or existing data-change listener adapter.
+ */
+ public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
+ final NotificationOutputType outputType) {
+ requireNonNull(path);
+ requireNonNull(streamName);
+ requireNonNull(outputType);
+
+ final long stamp = dataChangeListenersLock.writeLock();
+ try {
+ return dataChangeListeners.computeIfAbsent(streamName, stream -> new ListenerAdapter(
+ path, stream, outputType));
+ } finally {
+ dataChangeListenersLock.unlockWrite(stamp);
+ }
+ }
+
+ /**
+ * Creates new {@link NotificationDefinition} listener using input stream name and schema path
+ * if such listener haven't been created yet.
+ *
+ * @param schemaPath Schema path of YANG notification structure.
+ * @param streamName Stream name.
+ * @param outputType Specific type of output for notifications - XML or JSON.
+ * @return Created or existing notification listener adapter.
+ */
+ public NotificationListenerAdapter registerNotificationListener(final SchemaPath schemaPath,
+ final String streamName, final NotificationOutputType outputType) {
+ requireNonNull(schemaPath);
+ requireNonNull(streamName);
+ requireNonNull(outputType);
+
+ final long stamp = notificationListenersLock.writeLock();
+ try {
+ return notificationListeners.computeIfAbsent(streamName, stream -> new NotificationListenerAdapter(
+ schemaPath, stream, outputType.getName()));
+ } finally {
+ notificationListenersLock.unlockWrite(stamp);
+ }
+ }
+
+ /**
+ * Removal and closing of all data-change-event and notification listeners.
+ */
+ public synchronized void removeAndCloseAllListeners() {
+ final long stampNotifications = notificationListenersLock.writeLock();
+ final long stampDataChanges = dataChangeListenersLock.writeLock();
+ try {
+ removeAndCloseAllDataChangeListenersTemplate();
+ removeAndCloseAllNotificationListenersTemplate();
+ } finally {
+ dataChangeListenersLock.unlockWrite(stampDataChanges);
+ notificationListenersLock.unlockWrite(stampNotifications);
+ }
+ }
+
+ /**
+ * Closes and removes all data-change listeners.
+ */
+ public void removeAndCloseAllDataChangeListeners() {
+ final long stamp = dataChangeListenersLock.writeLock();
+ try {
+ removeAndCloseAllDataChangeListenersTemplate();
+ } finally {
+ dataChangeListenersLock.unlockWrite(stamp);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void removeAndCloseAllDataChangeListenersTemplate() {
+ dataChangeListeners.values()
+ .forEach(listenerAdapter -> {
+ try {
+ listenerAdapter.close();
+ } catch (final Exception exception) {
+ LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
+ throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
+ listenerAdapter), exception);
+ }
+ });
+ dataChangeListeners.clear();
+ }
+
+ /**
+ * Closes and removes all notification listeners.
+ */
+ public void removeAndCloseAllNotificationListeners() {
+ final long stamp = notificationListenersLock.writeLock();
+ try {
+ removeAndCloseAllNotificationListenersTemplate();
+ } finally {
+ notificationListenersLock.unlockWrite(stamp);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void removeAndCloseAllNotificationListenersTemplate() {
+ notificationListeners.values()
+ .forEach(listenerAdapter -> {
+ try {
+ listenerAdapter.close();
+ } catch (final Exception exception) {
+ LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
+ throw new IllegalStateException(String.format("Failed to close notification listener %s.",
+ listenerAdapter), exception);
+ }
+ });
+ notificationListeners.clear();
+ }
+
+ /**
+ * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
+ *
+ * @param listener Listener to be closed and removed.
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
+ final long stamp = dataChangeListenersLock.writeLock();
+ try {
+ removeAndCloseDataChangeListenerTemplate(listener);
+ } catch (final Exception exception) {
+ LOG.error("Data-change listener {} cannot be closed.", listener, exception);
+ } finally {
+ dataChangeListenersLock.unlockWrite(stamp);
+ }
+ }
+
+ /**
+ * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
+ *
+ * @param listener Listener to be closed and removed.
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
+ final long stamp = dataChangeListenersLock.writeLock();
+ try {
+ requireNonNull(listener).close();
+ if (dataChangeListeners.inverse().remove(listener) == null) {
+ LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
+ }
+ } catch (final Exception exception) {
+ LOG.error("Data-change listener {} cannot be closed.", listener, exception);
+ throw new IllegalStateException(String.format(
+ "Data-change listener %s cannot be closed.",
+ listener), exception);
+ } finally {
+ dataChangeListenersLock.unlockWrite(stamp);
+ }
+ }
+
+ /**
+ * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
+ *
+ * @param listener Listener to be closed and removed.
+ */
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
+ final long stamp = notificationListenersLock.writeLock();
+ try {
+ removeAndCloseNotificationListenerTemplate(listener);
+ } catch (final Exception exception) {
+ LOG.error("Notification listener {} cannot be closed.", listener, exception);
+ } finally {
+ notificationListenersLock.unlockWrite(stamp);
+ }
+ }
+
+ @SuppressWarnings({"checkstyle:IllegalCatch"})
+ private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) {
+ try {
+ requireNonNull(listener).close();
+ if (notificationListeners.inverse().remove(listener) == null) {
+ LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
+ }
+ } catch (final Exception exception) {
+ LOG.error("Notification listener {} cannot be closed.", listener, exception);
+ throw new IllegalStateException(String.format(
+ "Notification listener %s cannot be closed.", listener),
+ exception);
+ }
+ }
+
+ /**
+ * Removal and closing of general listener (data-change or notification listener).
+ *
+ * @param listener Listener to be closed and removed from cache.
+ */
+ void removeAndCloseListener(final BaseListenerInterface listener) {
+ requireNonNull(listener);
+ if (listener instanceof ListenerAdapter) {
+ removeAndCloseDataChangeListener((ListenerAdapter) listener);
+ } else if (listener instanceof NotificationListenerAdapter) {
+ removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
+ }
+ }
+
+ /**
+ * Creates string representation of stream name from URI. Removes slash from URI in start and end positions.
+ *
+ * @param uri URI for creation of stream name.
+ * @return String representation of stream name.
+ */
+ public static String createStreamNameFromUri(final String uri) {
+ String result = requireNonNull(uri);
+ if (result.startsWith("/")) {
+ result = result.substring(1);
+ }
+ if (result.endsWith("/")) {
+ result = result.substring(0, result.length() - 1);
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
+ final long stamp = dataChangeListenersLock.writeLock();
+ try {
+ dataChangeListeners.clear();
+ dataChangeListeners.putAll(listenerAdapterCollection);
+ } finally {
+ dataChangeListenersLock.unlockWrite(stamp);
+ }
+ }
+}
\ No newline at end of file
package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.w3c.dom.Node;
/**
- * {@link NotificationListenerAdapter} is responsible to track events on
- * notifications.
- *
+ * {@link NotificationListenerAdapter} is responsible to track events on notifications.
*/
public class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener {
/**
* Set path of listener and stream name, register event bus.
*
- * @param path
- * path of notification
- * @param streamName
- * stream name of listener
- * @param outputType
- * type of output on notification (JSON, XML)
+ * @param path Schema path of YANG notification.
+ * @param streamName Name of the stream.
+ * @param outputType Type of output on notification (JSON or XML).
*/
NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
register(this);
}
/**
- * Get outputType of listener.
+ * Get output type of this listener.
*
- * @return the outputType
+ * @return The configured output type (JSON or XML).
*/
@Override
public String getOutputType() {
/**
* Get stream name of this listener.
*
- * @return {@link String}
+ * @return The configured stream name.
*/
@Override
public String getStreamName() {
/**
* Get schema path of notification.
*
- * @return {@link SchemaPath}
+ * @return The configured schema path that points to observing YANG notification schema node.
*/
public SchemaPath getSchemaPath() {
return this.path;
/**
* Prepare data of notification and data to client.
*
- * @param data data
+ * @param data JSON or XML data that holds notification data.
*/
private void prepareAndPostData(final String data) {
final Event event = new Event(EventType.NOTIFY);
}
/**
- * Prepare json from notification data.
+ * Creation of JSON from notification data.
*
- * @return json as {@link String}
+ * @return Transformed notification data in JSON format.
*/
@VisibleForTesting
String prepareJson(final SchemaContext schemaContext, final DOMNotification notification) {
private static String writeBodyToString(final SchemaContext schemaContext, final DOMNotification notification) {
final Writer writer = new StringWriter();
final NormalizedNodeStreamWriter jsonStream = JSONNormalizedNodeStreamWriter.createExclusiveWriter(
- JSONCodecFactorySupplier.DRAFT_LHOTKA_NETMOD_YANG_JSON_02.getShared(schemaContext), notification.getType(),
- null, JsonWriterFactory.createJsonWriter(writer));
+ JSONCodecFactorySupplier.DRAFT_LHOTKA_NETMOD_YANG_JSON_02.getShared(schemaContext),
+ notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
try {
nodeWriter.write(notification.getBody());
return writer.toString();
}
+ /**
+ * Creation of XML from notification data.
+ *
+ * @return Transformed notification data in XML format.
+ */
private String prepareXml(final SchemaContext schemaContext, final DOMNotification notification) {
final Document doc = createDocument();
final Element notificationElement = basePartDoc(doc);
private void addValuesToNotificationEventElement(final Document doc, final Element element,
final SchemaContext schemaContext, final DOMNotification notification) {
try {
-
final DOMResult domResult = writeNormalizedNode(notification.getBody(), schemaContext, this.path);
final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
final Element dataElement = doc.createElement("notification");
LOG.error("Error processing stream", e);
}
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("path", path)
+ .add("stream-name", streamName)
+ .add("output-type", outputType)
+ .toString();
+ }
}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link Notificator} is responsible to create, remove and find
- * {@link ListenerAdapter} listener.
- */
-public final class Notificator {
-
- private static Map<String, ListenerAdapter> dataChangeListener = new ConcurrentHashMap<>();
- private static Map<String, List<NotificationListenerAdapter>> notificationListenersByStreamName =
- new ConcurrentHashMap<>();
-
- private static final Logger LOG = LoggerFactory.getLogger(Notificator.class);
- private static final Lock LOCK = new ReentrantLock();
-
- private Notificator() {
- }
-
- /**
- * Returns list of all stream names.
- */
- public static Set<String> getStreamNames() {
- return dataChangeListener.keySet();
- }
-
- /**
- * Gets {@link ListenerAdapter} specified by stream name.
- *
- * @param streamName
- * The name of the stream.
- * @return {@link ListenerAdapter} specified by stream name.
- */
- public static ListenerAdapter getListenerFor(final String streamName) {
- return dataChangeListener.get(streamName);
- }
-
- /**
- * Checks if the listener specified by {@link YangInstanceIdentifier} path exist.
- *
- * @param streamName name of the stream
- * @return True if the listener exist, false otherwise.
- */
- public static boolean existListenerFor(final String streamName) {
- return dataChangeListener.containsKey(streamName);
- }
-
- /**
- * Creates new {@link ListenerAdapter} listener from
- * {@link YangInstanceIdentifier} path and stream name.
- *
- * @param path
- * Path to data in data repository.
- * @param streamName
- * The name of the stream.
- * @param outputType
- * Spcific type of output for notifications - XML or JSON
- * @return New {@link ListenerAdapter} listener from
- * {@link YangInstanceIdentifier} path and stream name.
- */
- public static ListenerAdapter createListener(final YangInstanceIdentifier path, final String streamName,
- final NotificationOutputType outputType) {
- final ListenerAdapter listener = new ListenerAdapter(path, streamName, outputType);
- try {
- LOCK.lock();
- dataChangeListener.put(streamName, listener);
- } finally {
- LOCK.unlock();
- }
- return listener;
- }
-
- /**
- * Looks for listener determined by {@link YangInstanceIdentifier} path and removes it.
- * Creates String representation of stream name from URI. Removes slash from URI in start and end position.
- *
- * @param uri
- * URI for creation stream name.
- * @return String representation of stream name.
- */
- public static String createStreamNameFromUri(final String uri) {
- if (uri == null) {
- return null;
- }
- String result = uri;
- if (result.startsWith("/")) {
- result = result.substring(1);
- }
- if (result.endsWith("/")) {
- result = result.substring(0, result.length() - 1);
- }
- return result;
- }
-
- /**
- * Removes all listeners.
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- public static void removeAllListeners() {
- for (final ListenerAdapter listener : dataChangeListener.values()) {
- try {
- listener.close();
- } catch (final Exception e) {
- LOG.error("Failed to close listener", e);
- }
- }
- try {
- LOCK.lock();
- dataChangeListener = new ConcurrentHashMap<>();
- } finally {
- LOCK.unlock();
- }
- }
-
- /**
- * Delete {@link ListenerAdapter} listener specified in parameter.
- *
- * @param <T>
- *
- * @param listener
- * ListenerAdapter
- */
- @SuppressWarnings("checkstyle:IllegalCatch")
- private static <T extends BaseListenerInterface> void deleteListener(final T listener) {
- if (listener != null) {
- try {
- listener.close();
- } catch (final Exception e) {
- LOG.error("Failed to close listener", e);
- }
- try {
- LOCK.lock();
- dataChangeListener.remove(listener.getStreamName());
- } finally {
- LOCK.unlock();
- }
- }
- }
-
- /**
- * Check if the listener specified by qnames of request exist.
- *
- * @param streamName
- * name of stream
- * @return True if the listener exist, false otherwise.
- */
- public static boolean existNotificationListenerFor(final String streamName) {
- return notificationListenersByStreamName.containsKey(streamName);
- }
-
- /**
- * Prepare listener for notification ({@link NotificationDefinition}).
- *
- * @param paths
- * paths of notifications
- * @param streamName
- * name of stream (generated by paths)
- * @param outputType
- * type of output for onNotification - XML or JSON
- * @return List of {@link NotificationListenerAdapter} by paths
- */
- public static List<NotificationListenerAdapter> createNotificationListener(final List<SchemaPath> paths,
- final String streamName, final String outputType) {
- final List<NotificationListenerAdapter> listListeners = new ArrayList<>();
- for (final SchemaPath path : paths) {
- final NotificationListenerAdapter listener = new NotificationListenerAdapter(path, streamName, outputType);
- listListeners.add(listener);
- }
- try {
- LOCK.lock();
- notificationListenersByStreamName.put(streamName, listListeners);
- } finally {
- LOCK.unlock();
- }
- return listListeners;
- }
-
- public static <T extends BaseListenerInterface> void removeListenerIfNoSubscriberExists(final T listener) {
- if (!listener.hasSubscribers()) {
- if (listener instanceof NotificationListenerAdapter) {
- deleteNotificationListener(listener);
- } else {
- deleteListener(listener);
- }
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private static <T extends BaseListenerInterface> void deleteNotificationListener(final T listener) {
- if (listener != null) {
- try {
- listener.close();
- } catch (final Exception e) {
- LOG.error("Failed to close listener", e);
- }
- try {
- LOCK.lock();
- notificationListenersByStreamName.remove(listener.getStreamName());
- } finally {
- LOCK.unlock();
- }
- }
- }
-
- public static List<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
- return notificationListenersByStreamName.get(streamName);
- }
-}
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * {@link WebSocketServer} is the singleton responsible for starting and stopping the
- * web socket server.
+ * {@link WebSocketServer} is the class that is responsible for starting and stopping of web-socket server with
+ * specified listening TCP port.
*/
public final class WebSocketServer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
-
private static WebSocketServer instance = null;
private final int port;
/**
* Create singleton instance of {@link WebSocketServer}.
*
- * @param port TCP port used for this server
- * @return instance of {@link WebSocketServer}
+ * @param port TCP port used for this server.
*/
public static WebSocketServer createInstance(final int port) {
Preconditions.checkState(instance == null, "createInstance() has already been called");
}
/**
- * Get the websocket of TCP port.
+ * Get the TCP port of websocket server.
*
- * @return websocket TCP port
+ * @return TCP port number.
*/
public int getPort() {
return port;
/**
* Get instance of {@link WebSocketServer} created by {@link #createInstance(int)}.
*
- * @return instance of {@link WebSocketServer}
+ * @return Instance of {@link WebSocketServer}.
*/
public static WebSocketServer getInstance() {
Preconditions.checkNotNull(instance, "createInstance() must be called prior to getInstance()");
channel.closeFuture().sync();
} catch (final InterruptedException e) {
- LOG.error("Web socket server encountered an error during startup attempt on port {}", port, e);
+ LOG.error("Web socket server encountered an error during startup attempt on port {}.", port, e);
} catch (Throwable throwable) {
// sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
- LOG.error("Error while binding to port {}", port, throwable);
+ LOG.error("Error while binding to port {}.", port, throwable);
throw throwable;
} finally {
stop();
* Stops the web socket server and removes all listeners.
*/
private void stop() {
- LOG.debug("Stopping the web socket server instance on port {}", port);
- Notificator.removeAllListeners();
+ LOG.debug("Stopping the web socket server instance on port {}.", port);
+ ListenersBroker.getInstance().removeAndCloseAllListeners();
if (bossGroup != null) {
bossGroup.shutdownGracefully();
bossGroup = null;
package org.opendaylight.restconf.nb.rfc8040.streams.websockets;
-import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
-import static io.netty.handler.codec.http.HttpMethod.GET;
-import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
-import static io.netty.handler.codec.http.HttpUtil.setContentLength;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
-import java.util.List;
+import java.util.Optional;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) {
// Handle a bad request.
if (!req.decoderResult().isSuccess()) {
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
// Allow only GET methods.
- if (req.method() != GET) {
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
+ if (req.method() != HttpMethod.GET) {
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
return;
}
- final String streamName = Notificator.createStreamNameFromUri(req.uri());
+ final String streamName = ListenersBroker.createStreamNameFromUri(req.uri());
if (streamName.contains(RestconfConstants.DATA_SUBSCR)) {
- final ListenerAdapter listener = Notificator.getListenerFor(streamName);
- if (listener != null) {
- listener.addSubscriber(ctx.channel());
+ final Optional<ListenerAdapter> listener =
+ ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
+ if (listener.isPresent()) {
+ listener.get().addSubscriber(ctx.channel());
LOG.debug("Subscriber successfully registered.");
} else {
LOG.error("Listener for stream with name '{}' was not found.", streamName);
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
}
} else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) {
- final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
- if (listeners != null && !listeners.isEmpty()) {
- for (final NotificationListenerAdapter listener : listeners) {
- listener.addSubscriber(ctx.channel());
- LOG.debug("Subscriber successfully registered.");
- }
+ final Optional<NotificationListenerAdapter> listener =
+ ListenersBroker.getInstance().getNotificationListenerFor(streamName);
+ if (listener.isPresent()) {
+ listener.get().addSubscriber(ctx.channel());
+ LOG.debug("Subscriber successfully registered.");
} else {
LOG.error("Listener for stream with name '{}' was not found.", streamName);
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
+ HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
}
}
// Handshake
final WebSocketServerHandshakerFactory wsFactory =
new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
- null, false);
+ null, false);
this.handshaker = wsFactory.newHandshaker(req);
if (this.handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
* @param res FullHttpResponse
*/
private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req,
- final FullHttpResponse res) {
+ final FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
- final boolean notOkay = !OK.equals(res.status());
+ final boolean notOkay = !HttpResponseStatus.OK.equals(res.status());
if (notOkay) {
res.content().writeCharSequence(res.status().toString(), CharsetUtil.UTF_8);
- setContentLength(res, res.content().readableBytes());
+ HttpUtil.setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
final ChannelFuture f = ctx.channel().writeAndFlush(res);
- if (notOkay || !isKeepAlive(req)) {
+ if (notOkay || !HttpUtil.isKeepAlive(req)) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
- final String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
+ final String streamName = ListenersBroker.createStreamNameFromUri(
+ ((CloseWebSocketFrame) frame).reasonText());
if (streamName.contains(RestconfConstants.DATA_SUBSCR)) {
- final ListenerAdapter listener = Notificator.getListenerFor(streamName);
- if (listener != null) {
- listener.removeSubscriber(ctx.channel());
- LOG.debug("Subscriber successfully registered.");
- Notificator.removeListenerIfNoSubscriberExists(listener);
+ final Optional<ListenerAdapter> listener = ListenersBroker.getInstance()
+ .getDataChangeListenerFor(streamName);
+ if (listener.isPresent()) {
+ listener.get().removeSubscriber(ctx.channel());
+ LOG.debug("Subscriber successfully removed.");
+ if (!listener.get().hasSubscribers()) {
+ ListenersBroker.getInstance().removeAndCloseDataChangeListener(listener.get());
+ }
}
} else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) {
- final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
- if (listeners != null && !listeners.isEmpty()) {
- for (final NotificationListenerAdapter listener : listeners) {
- listener.removeSubscriber(ctx.channel());
+ final Optional<NotificationListenerAdapter> listener
+ = ListenersBroker.getInstance().getNotificationListenerFor(streamName);
+ if (listener.isPresent()) {
+ listener.get().removeSubscriber(ctx.channel());
+ LOG.debug("Subscriber successfully removed.");
+ if (!listener.get().hasSubscribers()) {
+ ListenersBroker.getInstance().removeAndCloseNotificationListener(listener.get());
}
}
}
- return;
} else if (frame instanceof PingWebSocketFrame) {
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
- return;
}
}
/**
* Get web socket location from HTTP request.
*
- * @param req HTTP request from which the location will be returned
+ * @param req HTTP request from which the location will be returned.
* @return String representation of web socket location.
*/
private static String getWebSocketLocation(final HttpRequest req) {
- return "ws://" + req.headers().get(HOST) + req.uri();
+ return "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri();
}
-}
+}
\ No newline at end of file
import io.netty.handler.codec.http.HttpServerCodec;
/**
- * {@link WebSocketServerInitializer} is used to setup the {@link ChannelPipeline} of a {@link io.netty.channel.Channel}
- * .
+ * {@link WebSocketServerInitializer} is used to setup the {@link ChannelPipeline} of
+ * a {@link io.netty.channel.Channel}.
*/
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
- protected void initChannel(final SocketChannel ch) {
- ChannelPipeline pipeline = ch.pipeline();
+ protected void initChannel(final SocketChannel channel) {
+ ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("handler", new WebSocketServerHandler());
}
-
-}
+}
\ No newline at end of file
import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateTrueFluentFuture;
import com.google.common.collect.ImmutableClassToInstanceMap;
-import java.lang.reflect.Field;
+import java.io.FileNotFoundException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.mdsal.common.api.CommitInfo;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
import org.opendaylight.restconf.nb.rfc8040.handlers.TransactionChainHandler;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
public class RestconfStreamsSubscriptionServiceImplTest {
private static final String URI = "/restconf/18/data/ietf-restconf-monitoring:restconf-state/streams/stream/"
+ "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
- private static Field listenersByStreamName;
@Mock
private DOMDataBrokerHandler dataBrokerHandler;
@SuppressWarnings("unchecked")
@Before
- public void setUp() throws Exception {
+ public void setUp() throws FileNotFoundException, URISyntaxException {
MockitoAnnotations.initMocks(this);
final DOMTransactionChain domTx = mock(DOMTransactionChain.class);
final UriBuilder baseUriBuilder = new LocalUriInfo().getBaseUriBuilder();
when(uriInfo.getBaseUri()).thenReturn(baseUriBuilder.build());
when(uriInfo.getBaseUriBuilder()).thenReturn(baseUriBuilder);
+ final URI uri = new URI("http://127.0.0.1/" + URI);
+ when(uriInfo.getAbsolutePath()).thenReturn(uri);
this.schemaHandler.onGlobalContextUpdated(
YangParserTestUtils.parseYangFiles(TestRestconfUtils.loadFiles("/notifications")));
}
}
@BeforeClass
- public static void setUpBeforeTest() throws Exception {
+ public static void setUpBeforeTest() {
final Map<String, ListenerAdapter> listenersByStreamNameSetter = new HashMap<>();
final ListenerAdapter adapter = mock(ListenerAdapter.class);
+ final YangInstanceIdentifier yiid = mock(YangInstanceIdentifier.class);
+ final YangInstanceIdentifier.PathArgument lastPathArgument = mock(YangInstanceIdentifier.PathArgument.class);
+ final QName qname = QName.create("toaster", "2009-11-20", "toasterStatus");
+ Mockito.when(adapter.getPath()).thenReturn(yiid);
+ Mockito.when(adapter.getOutputType()).thenReturn("JSON");
+ Mockito.when(yiid.getLastPathArgument()).thenReturn(lastPathArgument);
+ Mockito.when(lastPathArgument.getNodeType()).thenReturn(qname);
listenersByStreamNameSetter.put(
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
adapter);
- listenersByStreamName = Notificator.class.getDeclaredField("dataChangeListener");
-
- listenersByStreamName.setAccessible(true);
- listenersByStreamName.set(Notificator.class, listenersByStreamNameSetter);
+ ListenersBroker.getInstance().setDataChangeListeners(listenersByStreamNameSetter);
}
@AfterClass
- public static void setUpAfterTest() throws Exception {
- listenersByStreamName.set(Notificator.class, null);
- listenersByStreamName.set(Notificator.class, new ConcurrentHashMap<>());
- listenersByStreamName.setAccessible(false);
+ public static void setUpAfterTest() {
+ ListenersBroker.getInstance().setDataChangeListeners(Collections.emptyMap());
}
@Test
- public void testSubscribeToStream() throws Exception {
+ public void testSubscribeToStream() {
final UriBuilder uriBuilder = UriBuilder.fromUri(URI);
- Notificator.createListener(
+ ListenersBroker.getInstance().registerDataChangeListener(
IdentifierCodec.deserialize("toaster:toaster/toasterStatus", this.schemaHandler.get()),
"data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
NotificationOutputType.XML);
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;
final NotificationOutputTypeGrouping.NotificationOutputType outputType,
final boolean leafNodesOnly) {
super(path, streamName, outputType);
- setQueryParams(EPOCH, Optional.empty(), Optional.empty(), leafNodesOnly);
+ setQueryParams(EPOCH, null, null, leafNodesOnly);
}
@Override
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.junit.Before;
return child;
}
- private String prepareJson(final DOMNotification notificationData, final SchemaPath schemaPathNotifi)
- throws Exception {
- final List<SchemaPath> paths = new ArrayList<>();
- paths.add(schemaPathNotifi);
- final List<NotificationListenerAdapter> listNotifi =
- Notificator.createNotificationListener(paths, "stream-name", NotificationOutputType.JSON.toString());
- final NotificationListenerAdapter notifi = listNotifi.get(0);
- final String result = notifi.prepareJson(schmeaCtx, notificationData);
+ private String prepareJson(final DOMNotification notificationData, final SchemaPath schemaPathNotifi) {
+ final NotificationListenerAdapter notifiAdapter = ListenersBroker.getInstance().registerNotificationListener(
+ schemaPathNotifi, "stream-name", NotificationOutputType.JSON);
+ final String result = notifiAdapter.prepareJson(schmeaCtx, notificationData);
return Preconditions.checkNotNull(result);
}
}