import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadOperations;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
import org.opendaylight.restconf.common.context.InstanceIdentifierContext;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
-import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketServer;
import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
.appendOffset("+HH:MM", "Z").toFormatter();
private SubscribeToStreamUtil() {
- throw new UnsupportedOperationException("Util class");
+ throw new UnsupportedOperationException("Utility class");
}
/**
- * Register listeners by streamName in identifier to listen to yang
- * notifications, put or delete info about listener to DS according to
- * ietf-restconf-monitoring.
+ * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
+ * about listener to DS according to ietf-restconf-monitoring.
*
- * @param identifier
- * identifier as stream name
- * @param uriInfo
- * for getting base URI information
- * @param notificationQueryParams
- * query parameters of notification
- * @param handlersHolder
- * holder of handlers for notifications
- * @return location for listening
+ * @param identifier Name of the stream.
+ * @param uriInfo URI information.
+ * @param notificationQueryParams Query parameters of notification.
+ * @param handlersHolder Holder of handlers for notifications.
+ * @return Stream location for listening.
*/
@SuppressWarnings("rawtypes")
- public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
+ public static URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
- final String streamName = Notificator.createStreamNameFromUri(identifier);
+ final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
}
- List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
- if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) {
- listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName());
- } else {
- listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName());
- }
- if (listeners.isEmpty()) {
- throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+ final Optional<NotificationListenerAdapter> notificationListenerAdapter =
+ ListenersBroker.getInstance().getNotificationListenerFor(streamName);
+
+ if (!notificationListenerAdapter.isPresent()) {
+ throw new RestconfDocumentedException(String.format(
+ "Stream with name %s was not found.", streamName),
+ ErrorType.PROTOCOL,
ErrorTag.UNKNOWN_ELEMENT);
}
- final DOMDataTreeReadWriteTransaction wTx =
- handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+ final DOMDataTreeReadWriteTransaction writeTransaction = handlersHolder
+ .getTransactionChainHandler()
+ .get()
+ .newReadWriteTransaction();
final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
- final boolean exist = checkExist(schemaContext, wTx);
-
+ final boolean exist = checkExist(schemaContext, writeTransaction);
final URI uri = prepareUriByStreamName(uriInfo, streamName);
- for (final NotificationListenerAdapter listener : listeners) {
- registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
- listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
- notificationQueryParams.getFilter(), false);
- listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
- final NormalizedNode mapToStreams = RestconfMappingNodeUtil
- .mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
- schemaContext.getNotifications(), notificationQueryParams.getStart(),
- listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
- writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
- mapToStreams);
- }
- submitData(wTx);
+ registerToListenNotification(
+ notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
+ notificationListenerAdapter.get().setQueryParams(
+ notificationQueryParams.getStart(),
+ notificationQueryParams.getStop().orElse(null),
+ notificationQueryParams.getFilter().orElse(null),
+ false);
+ notificationListenerAdapter.get().setCloseVars(
+ handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+ final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+ notificationListenerAdapter.get().getSchemaPath().getLastComponent(),
+ schemaContext.getNotifications(), notificationQueryParams.getStart(),
+ notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext),
+ exist);
+ writeDataToDS(schemaContext,
+ notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction,
+ exist, mapToStreams);
+ submitData(writeTransaction);
return uri;
}
- static List<NotificationListenerAdapter>
- pickSpecificListenerByOutput(final List<NotificationListenerAdapter> listeners, final String outputType) {
- for (final NotificationListenerAdapter notificationListenerAdapter : listeners) {
- if (notificationListenerAdapter.getOutputType().equals(outputType)) {
- final List<NotificationListenerAdapter> list = new ArrayList<>();
- list.add(notificationListenerAdapter);
- return list;
- }
- }
- return listeners;
- }
-
/**
* Prepare InstanceIdentifierContext for Location leaf.
*
- * @param schemaHandler
- * schemaContext handler
- * @return InstanceIdentifier of Location leaf
+ * @param schemaHandler Schema context handler.
+ * @return InstanceIdentifier of Location leaf.
*/
public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
- final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
- final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
- .findModule(qnameBase.getModule()).get()
- .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
- final List<PathArgument> path = new ArrayList<>();
- path.add(NodeIdentifier.create(qnameBase));
- path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
+ final Optional<Module> module = schemaHandler.get()
+ .findModule(RestconfStreamsConstants.NOTIFI_QNAME.getModule());
+ Preconditions.checkState(module.isPresent());
+ final Optional<DataSchemaNode> notify = module.get()
+ .findDataChildByName(RestconfStreamsConstants.NOTIFI_QNAME);
+ Preconditions.checkState(notify.isPresent());
+ final Optional<DataSchemaNode> location = ((ContainerSchemaNode) notify.get())
+ .findDataChildByName(RestconfStreamsConstants.LOCATION_QNAME);
+ Preconditions.checkState(location.isPresent());
- return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
- schemaHandler.get());
+ final List<PathArgument> path = new ArrayList<>();
+ path.add(NodeIdentifier.create(RestconfStreamsConstants.NOTIFI_QNAME));
+ path.add(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME));
+ return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location.get(),
+ null, schemaHandler.get());
}
/**
- * Register listener by streamName in identifier to listen to data change
- * notifications, put or delete info about listener to DS according to
- * ietf-restconf-monitoring.
+ * Register listener by streamName in identifier to listen to data change notifications, and put or delete
+ * information about listener to DS according to ietf-restconf-monitoring.
*
- * @param identifier
- * identifier as stream name
- * @param uriInfo
- * for getting base URI information
- * @param notificationQueryParams
- * query parameters of notification
- * @param handlersHolder
- * holder of handlers for notifications
- * @return location for listening
+ * @param identifier Identifier as stream name.
+ * @param uriInfo Base URI information.
+ * @param notificationQueryParams Query parameters of notification.
+ * @param handlersHolder Holder of handlers for notifications.
+ * @return Location for listening.
*/
@SuppressWarnings("rawtypes")
- public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
+ public static URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
- final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
-
- final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+ final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
+ final LogicalDatastoreType datastoreType = parseURIEnum(
+ LogicalDatastoreType.class,
mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
- if (ds == null) {
- final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
- LOG.debug(msg);
- throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ if (datastoreType == null) {
+ final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
+ LOG.debug(message);
+ throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
}
- final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+ final DataChangeScope scope = parseURIEnum(
+ DataChangeScope.class,
mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
if (scope == null) {
- final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
- LOG.warn(msg);
- throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
+ LOG.warn(message);
+ throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
}
- final String streamName = Notificator.createStreamNameFromUri(identifier);
-
- final ListenerAdapter listener = Notificator.getListenerFor(streamName);
- Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
-
- listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
- notificationQueryParams.getFilter(), false);
- listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+ final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
+ final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
+ Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
- registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
+ listener.get().setQueryParams(
+ notificationQueryParams.getStart(),
+ notificationQueryParams.getStop().orElse(null),
+ notificationQueryParams.getFilter().orElse(null),
+ false);
+ listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+ registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
final URI uri = prepareUriByStreamName(uriInfo, streamName);
-
- final DOMDataTreeReadWriteTransaction wTx =
- handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+ final DOMDataTreeReadWriteTransaction writeTransaction
+ = handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
- final boolean exist = checkExist(schemaContext, wTx);
+ final boolean exist = checkExist(schemaContext, writeTransaction);
final NormalizedNode mapToStreams = RestconfMappingNodeUtil
- .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
- notificationQueryParams.getStart(), listener.getOutputType(), uri,
+ .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
+ notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
getMonitoringModule(schemaContext), exist, schemaContext);
- writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
- mapToStreams);
- submitData(wTx);
+ writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
+ writeTransaction, exist, mapToStreams);
+ submitData(writeTransaction);
return uri;
}
- public static Module getMonitoringModule(final SchemaContext schemaContext) {
+ static Module getMonitoringModule(final SchemaContext schemaContext) {
return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null);
}
/**
- * Parse input of query parameters - start-time or stop-time - from
- * {@link DateAndTime} format to {@link Instant} format.
+ * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
+ * to {@link Instant} format.
*
- * @param entry
- * start-time or stop-time as string in {@link DateAndTime}
- * format
- * @return parsed {@link Instant} by entry
+ * @param entry Start-time or stop-time as string in {@link DateAndTime} format.
+ * @return Parsed {@link Instant} by entry.
*/
public static Instant parseDateFromQueryParam(final Entry<String, List<String>> entry) {
final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
final String value = event.getValue();
- final TemporalAccessor p;
+ final TemporalAccessor accessor;
try {
- p = FORMATTER.parse(value);
+ accessor = FORMATTER.parse(value);
} catch (final DateTimeParseException e) {
throw new RestconfDocumentedException("Cannot parse of value in date: " + value, e);
}
- return Instant.from(p);
-
+ return Instant.from(accessor);
}
@SuppressWarnings("rawtypes")
- static void writeDataToDS(final SchemaContext schemaContext,
- final String name, final DOMDataTreeReadWriteTransaction readWriteTransaction,
- final boolean exist, final NormalizedNode mapToStreams) {
- String pathId = "";
+ static void writeDataToDS(final SchemaContext schemaContext, final String name,
+ final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
+ final NormalizedNode mapToStreams) {
+ String pathId;
if (exist) {
pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
} else {
pathId = MonitoringModule.PATH_TO_STREAMS;
}
- readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
- mapToStreams);
+ readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
+ IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams);
}
static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
}
/**
- * Prepare map of values from URI.
+ * Prepare map of URI parameter-values.
*
- * @param identifier
- * URI
- * @return {@link Map}
+ * @param identifier String identification of URI.
+ * @return Map od URI parameters and values.
*/
- public static Map<String, String> mapValuesFromUri(final String identifier) {
+ private static Map<String, String> mapValuesFromUri(final String identifier) {
final HashMap<String, String> result = new HashMap<>();
for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
}
static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+ final String scheme = uriInfo.getAbsolutePath().getScheme();
final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
-
- prepareNotificationPort(uriInfo.getBaseUri().getPort());
- uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+ switch (scheme) {
+ case RestconfStreamsConstants.SCHEMA_UPGRADE_SECURED_URI:
+ uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_SECURED_URI);
+ break;
+ case RestconfStreamsConstants.SCHEMA_UPGRADE_URI:
+ default:
+ uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_URI);
+ }
return uriBuilder.replacePath(streamName).build();
}
/**
- * Register data change listener in dom data broker and set it to listener
- * on stream.
+ * Register data change listener in DOM data broker and set it to listener on stream.
*
- * @param ds
- * {@link LogicalDatastoreType}
- * @param scope
- * {@link DataChangeScope}
- * @param listener
- * listener on specific stream
- * @param domDataBroker
- * data broker for register data change listener
+ * @param datastore {@link LogicalDatastoreType}
+ * @param listener listener on specific stream
+ * @param domDataBroker data broker for register data change listener
*/
- private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
- final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+ private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
+ final DOMDataBroker domDataBroker) {
if (listener.isListening()) {
return;
}
throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
}
- final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(ds, listener.getPath());
+ final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
final ListenerRegistration<ListenerAdapter> registration =
- changeService.registerDataTreeChangeListener(root, listener);
-
+ changeService.registerDataTreeChangeListener(root, listener);
listener.setRegistration(registration);
}
- /**
- * Get port from web socket server. If doesn't exit, create it.
- *
- * @param port
- * - port
- */
- private static void prepareNotificationPort(final int port) {
- try {
- WebSocketServer.getInstance();
- } catch (final NullPointerException e) {
- WebSocketServer.createInstance(port);
- }
- }
-
static boolean checkExist(final SchemaContext schemaContext,
- final DOMDataTreeReadTransaction readWriteTransaction) {
+ final DOMDataTreeReadOperations readWriteTransaction) {
boolean exist;
try {
- exist = readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
+ return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get();
- } catch (final InterruptedException | ExecutionException e) {
- throw new RestconfDocumentedException("Problem while checking data if exists", e);
+ } catch (final InterruptedException | ExecutionException exception) {
+ throw new RestconfDocumentedException("Problem while checking data if exists", exception);
}
- return exist;
}
private static void registerToListenNotification(final NotificationListenerAdapter listener,
final SchemaPath path = listener.getSchemaPath();
final ListenerRegistration<DOMNotificationListener> registration =
notificationServiceHandler.get().registerNotificationListener(listener, path);
-
listener.setRegistration(registration);
}
/**
- * Parse enum from URI.
+ * Parse out enumeration from URI.
*
- * @param clazz
- * enum type
- * @param value
- * string of enum value
- * @return enum
+ * @param clazz Target enumeration type.
+ * @param value String representation of enumeration value.
+ * @return Parsed enumeration type.
*/
private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
if (value == null || value.equals("")) {