import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.netconf.sal.rest.api.RestConnector;
import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
import org.opendaylight.restconf.handlers.DOMMountPointServiceHandler;
import org.opendaylight.restconf.handlers.SchemaContextHandler;
import org.opendaylight.restconf.handlers.TransactionChainHandler;
this.transactionChain = this.dataBroker.createTransactionChain(this.transactionListener);
final TransactionChainHandler transactionChainHandler = new TransactionChainHandler(this.transactionChain);
+ final DOMDataBrokerHandler brokerHandler = new DOMDataBrokerHandler(this.dataBroker);
+
wrapperServices.setHandlers(schemaCtxHandler, domMountPointServiceHandler);
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.handlers;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+
+/**
+ * Implementation of {@link DOMDataBrokerHandler}
+ *
+ */
+public class DOMDataBrokerHandler implements Handler<DOMDataBroker> {
+
+ private final DOMDataBroker broker;
+
+ public DOMDataBrokerHandler(final DOMDataBroker broker) {
+ this.broker = broker;
+ }
+
+ @Override
+ public DOMDataBroker get() {
+ return this.broker;
+ }
+
+}
*/
package org.opendaylight.restconf.restful.services.impl;
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.Map;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
+import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
+import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
import org.opendaylight.restconf.restful.services.api.RestconfStreamsSubscriptionService;
+import org.opendaylight.restconf.restful.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.restful.utils.SubscribeToStreamUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Implementation of {@link RestconfStreamsSubscriptionService}
+ *
+ */
public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSubscriptionService {
+ private static final Logger LOG = LoggerFactory.getLogger(RestconfStreamsSubscriptionServiceImpl.class);
+
+ private DOMDataBrokerHandler domDataBrokerHandler;
+
@Override
public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
- throw new UnsupportedOperationException("Not yet implemented");
- }
+ final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
+
+ final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+ mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
+ if (ds == null) {
+ final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
+ LOG.debug(msg);
+ throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ }
+ final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+ mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
+ if (scope == null) {
+ final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+ }
+
+ final String streamName = Notificator.createStreamNameFromUri(identifier);
+ if (Strings.isNullOrEmpty(streamName)) {
+ final String msg = "Stream name is empty.";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+ }
+
+ final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+ SubscribeToStreamUtil.registration(ds, scope, listener, this.domDataBrokerHandler.get());
+
+ final int port = SubscribeToStreamUtil.prepareNotificationPort();
+
+ final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+ final UriBuilder uriToWebSocketServer = uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+ final URI uri = uriToWebSocketServer.replacePath(streamName).build();
+
+ return Response.status(Status.OK).location(uri).build();
+ }
}
return null;
}
- return resolveEnum(clazz, (String) value);
- }
-
- private static <T> T resolveEnum(final Class<T> clazz, final String value) {
- for (final T t : clazz.getEnumConstants()) {
- if (((Enum<?>) t).name().equals(value)) {
- return t;
- }
- }
- return null;
+ return StreamUtil.resolveEnum(clazz, (String) value);
}
private static YangInstanceIdentifier preparePath(final NormalizedNodeContext payload, final ContainerNode data,
public static final String SCOPE_PARAM_NAME = "scope";
- public static final String DS_URI = RestconfConstants.SLASH + DATASTORE_PARAM_NAME
- + ParserBuilderConstants.Deserializer.EQUAL;
+ public static final char EQUAL = ParserBuilderConstants.Deserializer.EQUAL;
- public static final String SCOPE_URI = RestconfConstants.SLASH + SCOPE_PARAM_NAME
- + ParserBuilderConstants.Deserializer.EQUAL;
+ public static final String DS_URI = RestconfConstants.SLASH + DATASTORE_PARAM_NAME + EQUAL;
+
+ public static final String SCOPE_URI = RestconfConstants.SLASH + SCOPE_PARAM_NAME + EQUAL;
+
+ public static final int NOTIFICATION_PORT = 8181;
+
+ public static final String SCHEMA_SUBSCIBRE_URI = "ws";
static {
Date eventSubscriptionAugRevision;
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.restful.utils;
+
+/**
+ * Common util class for stream
+ * <ul>
+ * <li>{@link SubscribeToStreamUtil}
+ * <li>{@link CreateStreamUtil}
+ * </ul>
+ *
+ */
+public final class StreamUtil {
+
+ private StreamUtil() {
+ throw new UnsupportedOperationException("Util class");
+ }
+
+ /**
+ * Resolve specific type of enum by value.
+ *
+ * @param clazz
+ * - enum
+ * @param value
+ * - string of enum
+ * @return - enum
+ */
+ public static <T> T resolveEnum(final Class<T> clazz, final String value) {
+ for (final T t : clazz.getEnumConstants()) {
+ if (((Enum<?>) t).name().equals(value)) {
+ return t;
+ }
+ }
+ return null;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.restful.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
+import org.opendaylight.restconf.utils.RestconfConstants;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Subscribe to stream util class
+ *
+ */
+public final class SubscribeToStreamUtil {
+
+ private SubscribeToStreamUtil() {
+ throw new UnsupportedOperationException("Util class");
+ }
+
+ /**
+ * Parse enum from URI
+ *
+ * @param clazz
+ * - enum type
+ * @param value
+ * - string of enum value
+ * @return enum
+ */
+ public static <T> T parseURIEnum(final Class<T> clazz, final String value) {
+ if ((value == null) || value.equals("")) {
+ return null;
+ }
+ return StreamUtil.resolveEnum(clazz, value);
+ }
+
+ /**
+ * Prepare map of values from URI
+ *
+ * @param identifier
+ * - URI
+ * @return {@link Map}
+ */
+ public static Map<String, String> mapValuesFromUri(final String identifier) {
+ final HashMap<String, String> result = new HashMap<>();
+ final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
+ for (final String token : tokens) {
+ final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
+ if (paramToken.length == 2) {
+ result.put(paramToken[0], paramToken[1]);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Register data change listener in dom data broker and set it to listener
+ * on stream
+ *
+ * @param ds
+ * - {@link LogicalDatastoreType}
+ * @param scope
+ * - {@link DataChangeScope}
+ * @param listener
+ * - listener on specific stream
+ * @param domDataBroker
+ * - data broker for register data change listener
+ */
+ public static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+ final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+ if (listener.isListening()) {
+ return;
+ }
+
+ final YangInstanceIdentifier path = listener.getPath();
+ final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(ds,
+ path, listener, scope);
+
+ listener.setRegistration(registration);
+ }
+
+ /**
+ * Get port from web socket server. If doesn't exit, create it.
+ *
+ * @return port
+ */
+ public static int prepareNotificationPort() {
+ int port = RestconfStreamsConstants.NOTIFICATION_PORT;
+ try {
+ final WebSocketServer webSocketServer = WebSocketServer.getInstance();
+ port = webSocketServer.getPort();
+ } catch (final NullPointerException e) {
+ WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+ }
+ return port;
+ }
+
+}