import java.net.URISyntaxException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaNode;
private static final YangInstanceIdentifier.AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER;
+ public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
+ private static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
+
+ public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
+ private static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+
static {
try {
final Date eventSubscriptionAugRevision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08");
response = mountRpcServices.get().invokeRpc(type, payload.getData());
} else {
if (namespace.toString().equals(SAL_REMOTE_NAMESPACE)) {
- response = invokeSalRemoteRpcSubscribeRPC(payload);
+ if (identifier.contains(CREATE_DATA_SUBSCR)) {
+ response = invokeSalRemoteRpcSubscribeRPC(payload);
+ } else if (identifier.contains(CREATE_NOTIFICATION_STREAM)) {
+ response = invokeSalRemoteRpcNotifiStrRPC(payload);
+ } else {
+ final String msg = "Not supported operation";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED);
+ }
} else {
response = this.broker.invokeRpc(type, payload.getData());
}
}
final YangInstanceIdentifier pathIdentifier = ((YangInstanceIdentifier) pathValue);
- String streamName = null;
+ String streamName = (String) CREATE_DATA_SUBSCR;
if (!pathIdentifier.isEmpty()) {
- final String fullRestconfIdentifier = this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
+ final String fullRestconfIdentifier = DATA_SUBSCR
+ + this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
LogicalDatastoreType datastore = parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME);
datastore = datastore == null ? DEFAULT_DATASTORE : datastore;
*/
@Override
public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
+ if (identifier.contains(DATA_SUBSCR)) {
+ return dataSubs(identifier, uriInfo);
+ } else if (identifier.contains(NOTIFICATION_STREAM)) {
+ return notifStream(identifier, uriInfo);
+ }
+ final String msg = "Bad type of notification of sal-remote";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg);
+ }
+
+ /**
+ * Register notification listener by stream name
+ *
+ * @param identifier
+ * - stream name
+ * @param uriInfo
+ * - uriInfo
+ * @return {@link Response}
+ */
+ private Response notifStream(final String identifier, final UriInfo uriInfo) {
+ final String streamName = Notificator.createStreamNameFromUri(identifier);
+ if (Strings.isNullOrEmpty(streamName)) {
+ throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+ }
+ final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if ((listeners == null) || listeners.isEmpty()) {
+ throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+ ErrorTag.UNKNOWN_ELEMENT);
+ }
+
+ for (final NotificationListenerAdapter listener : listeners) {
+ this.broker.registerToListenNotification(listener);
+ }
+
+ final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+ int notificationPort = NOTIFICATION_PORT;
+ try {
+ final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+ notificationPort = webSocketServerInstance.getPort();
+ } catch (final NullPointerException e) {
+ WebSocketServer.createInstance(NOTIFICATION_PORT);
+ }
+ final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+ final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+ return Response.status(Status.OK).location(uriToWebsocketServer).build();
+ }
+
+ /**
+ * Register data change listener by stream name
+ *
+ * @param identifier
+ * - stream name
+ * @param uriInfo
+ * - uri info
+ * @return {@link Response}
+ */
+ private Response dataSubs(final String identifier, final UriInfo uriInfo) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
/**
* Load parameter for subscribing to stream from input composite node
*
- * @param compNode
+ * @param value
* contains value
* @return enum object if its string value is equal to {@code paramName}. In other cases null.
*/
return streamNodeValues.build();
}
+
+ /**
+ * Prepare stream for notification
+ *
+ * @param payload
+ * - contains list of qnames of notifications
+ * @return - checked future object
+ */
+ private CheckedFuture<DOMRpcResult, DOMRpcException> invokeSalRemoteRpcNotifiStrRPC(
+ final NormalizedNodeContext payload) {
+ final ContainerNode data = (ContainerNode) payload.getData();
+ LeafSetNode leafSet = null;
+ String outputType = "XML";
+ for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+ if (dataChild instanceof LeafSetNode) {
+ leafSet = (LeafSetNode) dataChild;
+ } else if (dataChild instanceof AugmentationNode) {
+ outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+ }
+ }
+
+ final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+ final List<SchemaPath> paths = new ArrayList<>();
+ String streamName = CREATE_NOTIFICATION_STREAM + "/";
+
+ final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+ while (iterator.hasNext()) {
+ final QName valueQName = QName.create((String) iterator.next().getValue());
+ final Module module = ControllerContext.getInstance()
+ .findModuleByNamespace(valueQName.getModule().getNamespace());
+ Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace()
+ + " does not exist");
+ NotificationDefinition notifiDef = null;
+ for (final NotificationDefinition notification : module.getNotifications()) {
+ if (notification.getQName().equals(valueQName)) {
+ notifiDef = notification;
+ break;
+ }
+ }
+ final String moduleName = module.getName();
+ Preconditions.checkNotNull(notifiDef,
+ "Notification " + valueQName + "doesn't exist in module " + moduleName);
+ paths.add(notifiDef.getPath());
+ streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+ if (iterator.hasNext()) {
+ streamName = streamName + ",";
+ }
+ }
+
+ final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+ final QName outputQname = QName.create(rpcQName, "output");
+ final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+ final ContainerNode output = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(outputQname))
+ .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+ if (!Notificator.existNotificationListenerFor(streamName)) {
+ Notificator.createNotificationListener(paths, streamName, outputType);
+ }
+
+ final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output);
+
+ return Futures.immediateCheckedFuture(defaultDOMRpcResult);
+ }
}