From: Jakub Toth Date: Fri, 8 Jul 2016 13:44:52 +0000 (+0200) Subject: Bug 3959 - support netconf notification X-Git-Tag: release/boron~31 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F39%2F41639%2F18;p=netconf.git Bug 3959 - support netconf notification *added dependency for converting JSON to XML *created new notification listener adapter extended by DOMNotificationListener *added augment for sal-remote to add new param for input of create-notification-stream rpc to make a choice between JSON or XML output of notification (not mandatory - default is XML) *added unit tests Change-Id: I7f5909208dce71e2280fd1bc6dbe49cb7533523a Signed-off-by: Jakub Toth --- diff --git a/restconf/sal-rest-connector/pom.xml b/restconf/sal-rest-connector/pom.xml index 84e5e8e8c4..806ee55e93 100644 --- a/restconf/sal-rest-connector/pom.xml +++ b/restconf/sal-rest-connector/pom.xml @@ -145,6 +145,11 @@ 1.2 + + org.json + json + + org.glassfish.jersey.test-framework.providers diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java index e2097310f2..37c90515ab 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/BrokerFacade.java @@ -9,6 +9,7 @@ package org.opendaylight.netconf.sal.restconf.impl; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION; import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL; + import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -28,6 +29,8 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; @@ -35,6 +38,7 @@ import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession; import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag; import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType; import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; +import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; @@ -54,12 +58,17 @@ public class BrokerFacade { private volatile DOMRpcService rpcService; private volatile ConsumerSession context; private DOMDataBroker domDataBroker; + private DOMNotificationService domNotification; private BrokerFacade() { } public void setRpcService(final DOMRpcService router) { - rpcService = router; + this.rpcService = router; + } + + public void setDomNotificationService(final DOMNotificationService domNotification) { + this.domNotification = domNotification; } public void setContext(final ConsumerSession context) { @@ -71,7 +80,7 @@ public class BrokerFacade { } private void checkPreconditions() { - if (context == null || domDataBroker == null) { + if ((this.context == null) || (this.domDataBroker == null)) { throw new RestconfDocumentedException(Status.SERVICE_UNAVAILABLE); } } @@ -79,7 +88,7 @@ public class BrokerFacade { // READ configuration public NormalizedNode readConfigurationData(final YangInstanceIdentifier path) { checkPreconditions(); - return readDataViaTransaction(domDataBroker.newReadOnlyTransaction(), CONFIGURATION, path); + return readDataViaTransaction(this.domDataBroker.newReadOnlyTransaction(), CONFIGURATION, path); } public NormalizedNode readConfigurationData(final DOMMountPoint mountPoint, final YangInstanceIdentifier path) { @@ -95,7 +104,7 @@ public class BrokerFacade { // READ operational public NormalizedNode readOperationalData(final YangInstanceIdentifier path) { checkPreconditions(); - return readDataViaTransaction(domDataBroker.newReadOnlyTransaction(), OPERATIONAL, path); + return readDataViaTransaction(this.domDataBroker.newReadOnlyTransaction(), OPERATIONAL, path); } public NormalizedNode readOperationalData(final DOMMountPoint mountPoint, final YangInstanceIdentifier path) { @@ -112,7 +121,7 @@ public class BrokerFacade { public CheckedFuture commitConfigurationDataPut( final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode payload) { checkPreconditions(); - return putDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema); + return putDataViaTransaction(this.domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema); } public CheckedFuture commitConfigurationDataPut( @@ -129,13 +138,13 @@ public class BrokerFacade { public PATCHStatusContext patchConfigurationDataWithinTransaction(final PATCHContext context, final SchemaContext globalSchema) { - final DOMDataReadWriteTransaction patchTransaction = domDataBroker.newReadWriteTransaction(); - List editCollection = new ArrayList<>(); + final DOMDataReadWriteTransaction patchTransaction = this.domDataBroker.newReadWriteTransaction(); + final List editCollection = new ArrayList<>(); List editErrors; - List globalErrors = null; + final List globalErrors = null; int errorCounter = 0; - for (PATCHEntity patchEntity : context.getData()) { + for (final PATCHEntity patchEntity : context.getData()) { final PATCHEditOperation operation = PATCHEditOperation.valueOf(patchEntity.getOperation().toUpperCase()); switch (operation) { @@ -145,7 +154,7 @@ public class BrokerFacade { postDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity.getTargetNode(), patchEntity.getNode(), globalSchema); editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null)); - } catch (RestconfDocumentedException e) { + } catch (final RestconfDocumentedException e) { editErrors = new ArrayList<>(); editErrors.addAll(e.getErrors()); editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors)); @@ -159,7 +168,7 @@ public class BrokerFacade { putDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity .getTargetNode(), patchEntity.getNode(), globalSchema); editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null)); - } catch (RestconfDocumentedException e) { + } catch (final RestconfDocumentedException e) { editErrors = new ArrayList<>(); editErrors.addAll(e.getErrors()); editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors)); @@ -173,7 +182,7 @@ public class BrokerFacade { deleteDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity .getTargetNode()); editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null)); - } catch (RestconfDocumentedException e) { + } catch (final RestconfDocumentedException e) { editErrors = new ArrayList<>(); editErrors.addAll(e.getErrors()); editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors)); @@ -187,7 +196,7 @@ public class BrokerFacade { deleteDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity .getTargetNode()); editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null)); - } catch (RestconfDocumentedException e) { + } catch (final RestconfDocumentedException e) { LOG.error("Error removing {} by {} operation", patchEntity.getTargetNode().toString(), patchEntity.getEditId(), e); } @@ -213,7 +222,7 @@ public class BrokerFacade { public CheckedFuture commitConfigurationDataPost( final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode payload) { checkPreconditions(); - return postDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema); + return postDataViaTransaction(this.domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema); } public CheckedFuture commitConfigurationDataPost( @@ -232,7 +241,7 @@ public class BrokerFacade { public CheckedFuture commitConfigurationDataDelete( final YangInstanceIdentifier path) { checkPreconditions(); - return deleteDataViaTransaction(domDataBroker.newWriteOnlyTransaction(), CONFIGURATION, path); + return deleteDataViaTransaction(this.domDataBroker.newWriteOnlyTransaction(), CONFIGURATION, path); } public CheckedFuture commitConfigurationDataDelete( @@ -249,11 +258,11 @@ public class BrokerFacade { // RPC public CheckedFuture invokeRpc(final SchemaPath type, final NormalizedNode input) { checkPreconditions(); - if (rpcService == null) { + if (this.rpcService == null) { throw new RestconfDocumentedException(Status.SERVICE_UNAVAILABLE); } LOG.trace("Invoke RPC {} with input: {}", type, input); - return rpcService.invokeRpc(type, input); + return this.rpcService.invokeRpc(type, input); } public void registerToListenDataChanges(final LogicalDatastoreType datastore, final DataChangeScope scope, @@ -265,7 +274,7 @@ public class BrokerFacade { } final YangInstanceIdentifier path = listener.getPath(); - final ListenerRegistration registration = domDataBroker.registerDataChangeListener( + final ListenerRegistration registration = this.domDataBroker.registerDataChangeListener( datastore, path, listener, scope); listener.setRegistration(registration); @@ -421,4 +430,18 @@ public class BrokerFacade { ImmutableNodes.fromInstanceId(schemaContext, YangInstanceIdentifier.create(normalizedPathWithoutChildArgs)); rwTx.merge(store, rootNormalizedPath, parentStructure); } + + public void registerToListenNotification(final NotificationListenerAdapter listener) { + checkPreconditions(); + + if (listener.isListening()) { + return; + } + + final SchemaPath path = listener.getSchemaPath(); + final ListenerRegistration registration = this.domNotification + .registerNotificationListener(listener, path); + + listener.setRegistration(registration); + } } diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java index 493dbb99c4..7226a0abb2 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfImpl.java @@ -26,9 +26,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -56,6 +59,7 @@ import org.opendaylight.netconf.sal.rest.api.RestconfService; import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag; import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType; import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; +import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter; import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer; import org.opendaylight.yangtools.yang.common.QName; @@ -68,6 +72,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -86,6 +91,7 @@ import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode; import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode; import org.opendaylight.yangtools.yang.model.api.ListSchemaNode; import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.api.NotificationDefinition; import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaNode; @@ -134,6 +140,12 @@ public class RestconfImpl implements RestconfService { private static final YangInstanceIdentifier.AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER; + public static final CharSequence DATA_SUBSCR = "data-change-event-subscription"; + private static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR; + + public static final CharSequence NOTIFICATION_STREAM = "notification-stream"; + private static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM; + static { try { final Date eventSubscriptionAugRevision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08"); @@ -378,7 +390,15 @@ public class RestconfImpl implements RestconfService { response = mountRpcServices.get().invokeRpc(type, payload.getData()); } else { if (namespace.toString().equals(SAL_REMOTE_NAMESPACE)) { - response = invokeSalRemoteRpcSubscribeRPC(payload); + if (identifier.contains(CREATE_DATA_SUBSCR)) { + response = invokeSalRemoteRpcSubscribeRPC(payload); + } else if (identifier.contains(CREATE_NOTIFICATION_STREAM)) { + response = invokeSalRemoteRpcNotifiStrRPC(payload); + } else { + final String msg = "Not supported operation"; + LOG.warn(msg); + throw new RestconfDocumentedException(msg, ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED); + } } else { response = this.broker.invokeRpc(type, payload.getData()); } @@ -468,9 +488,10 @@ public class RestconfImpl implements RestconfService { } final YangInstanceIdentifier pathIdentifier = ((YangInstanceIdentifier) pathValue); - String streamName = null; + String streamName = (String) CREATE_DATA_SUBSCR; if (!pathIdentifier.isEmpty()) { - final String fullRestconfIdentifier = this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null); + final String fullRestconfIdentifier = DATA_SUBSCR + + this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null); LogicalDatastoreType datastore = parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME); datastore = datastore == null ? DEFAULT_DATASTORE : datastore; @@ -877,6 +898,64 @@ public class RestconfImpl implements RestconfService { */ @Override public Response subscribeToStream(final String identifier, final UriInfo uriInfo) { + if (identifier.contains(DATA_SUBSCR)) { + return dataSubs(identifier, uriInfo); + } else if (identifier.contains(NOTIFICATION_STREAM)) { + return notifStream(identifier, uriInfo); + } + final String msg = "Bad type of notification of sal-remote"; + LOG.warn(msg); + throw new RestconfDocumentedException(msg); + } + + /** + * Register notification listener by stream name + * + * @param identifier + * - stream name + * @param uriInfo + * - uriInfo + * @return {@link Response} + */ + private Response notifStream(final String identifier, final UriInfo uriInfo) { + final String streamName = Notificator.createStreamNameFromUri(identifier); + if (Strings.isNullOrEmpty(streamName)) { + throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); + } + final List listeners = Notificator.getNotificationListenerFor(streamName); + if ((listeners == null) || listeners.isEmpty()) { + throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL, + ErrorTag.UNKNOWN_ELEMENT); + } + + for (final NotificationListenerAdapter listener : listeners) { + this.broker.registerToListenNotification(listener); + } + + final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder(); + int notificationPort = NOTIFICATION_PORT; + try { + final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance(); + notificationPort = webSocketServerInstance.getPort(); + } catch (final NullPointerException e) { + WebSocketServer.createInstance(NOTIFICATION_PORT); + } + final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws"); + final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build(); + + return Response.status(Status.OK).location(uriToWebsocketServer).build(); + } + + /** + * Register data change listener by stream name + * + * @param identifier + * - stream name + * @param uriInfo + * - uri info + * @return {@link Response} + */ + private Response dataSubs(final String identifier, final UriInfo uriInfo) { final String streamName = Notificator.createStreamNameFromUri(identifier); if (Strings.isNullOrEmpty(streamName)) { throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE); @@ -935,7 +1014,7 @@ public class RestconfImpl implements RestconfService { /** * Load parameter for subscribing to stream from input composite node * - * @param compNode + * @param value * contains value * @return enum object if its string value is equal to {@code paramName}. In other cases null. */ @@ -1104,4 +1183,69 @@ public class RestconfImpl implements RestconfService { return streamNodeValues.build(); } + + /** + * Prepare stream for notification + * + * @param payload + * - contains list of qnames of notifications + * @return - checked future object + */ + private CheckedFuture invokeSalRemoteRpcNotifiStrRPC( + final NormalizedNodeContext payload) { + final ContainerNode data = (ContainerNode) payload.getData(); + LeafSetNode leafSet = null; + String outputType = "XML"; + for (final DataContainerChild dataChild : data.getValue()) { + if (dataChild instanceof LeafSetNode) { + leafSet = (LeafSetNode) dataChild; + } else if (dataChild instanceof AugmentationNode) { + outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue(); + } + } + + final Collection entryNodes = leafSet.getValue(); + final List paths = new ArrayList<>(); + String streamName = CREATE_NOTIFICATION_STREAM + "/"; + + final Iterator iterator = entryNodes.iterator(); + while (iterator.hasNext()) { + final QName valueQName = QName.create((String) iterator.next().getValue()); + final Module module = ControllerContext.getInstance() + .findModuleByNamespace(valueQName.getModule().getNamespace()); + Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace() + + " does not exist"); + NotificationDefinition notifiDef = null; + for (final NotificationDefinition notification : module.getNotifications()) { + if (notification.getQName().equals(valueQName)) { + notifiDef = notification; + break; + } + } + final String moduleName = module.getName(); + Preconditions.checkNotNull(notifiDef, + "Notification " + valueQName + "doesn't exist in module " + moduleName); + paths.add(notifiDef.getPath()); + streamName = streamName + moduleName + ":" + valueQName.getLocalName(); + if (iterator.hasNext()) { + streamName = streamName + ","; + } + } + + final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName(); + final QName outputQname = QName.create(rpcQName, "output"); + final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier"); + + final ContainerNode output = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(new NodeIdentifier(outputQname)) + .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build(); + + if (!Notificator.existNotificationListenerFor(streamName)) { + Notificator.createNotificationListener(paths, streamName, outputType); + } + + final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output); + + return Futures.immediateCheckedFuture(defaultDOMRpcResult); + } } diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfProviderImpl.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfProviderImpl.java index ce188748fb..71006b8292 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfProviderImpl.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/restconf/impl/RestconfProviderImpl.java @@ -20,6 +20,7 @@ import org.opendaylight.controller.config.yang.md.sal.rest.connector.RestConnect import org.opendaylight.controller.config.yang.md.sal.rest.connector.Rpcs; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Provider; @@ -48,16 +49,16 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec BrokerFacade.getInstance().setContext(session); BrokerFacade.getInstance().setDomDataBroker( domDataBroker); final SchemaService schemaService = session.getService(SchemaService.class); - listenerRegistration = schemaService.registerSchemaContextListener(ControllerContext.getInstance()); + this.listenerRegistration = schemaService.registerSchemaContextListener(ControllerContext.getInstance()); BrokerFacade.getInstance().setRpcService(session.getService(DOMRpcService.class)); - + BrokerFacade.getInstance().setDomNotificationService(session.getService(DOMNotificationService.class)); ControllerContext.getInstance().setSchemas(schemaService.getGlobalContext()); ControllerContext.getInstance().setMountService(session.getService(DOMMountPointService.class)); - webSocketServerThread = new Thread(WebSocketServer.createInstance(port.getValue().intValue())); - webSocketServerThread.setName("Web socket server on port " + port); - webSocketServerThread.start(); + this.webSocketServerThread = new Thread(WebSocketServer.createInstance(this.port.getValue().intValue())); + this.webSocketServerThread.setName("Web socket server on port " + this.port); + this.webSocketServerThread.start(); } @Override @@ -68,12 +69,12 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec @Override public void close() { - if (listenerRegistration != null) { - listenerRegistration.close(); + if (this.listenerRegistration != null) { + this.listenerRegistration.close(); } WebSocketServer.destroyInstance(); - webSocketServerThread.interrupt(); + this.webSocketServerThread.interrupt(); } @Override @@ -81,27 +82,27 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec final Config config = new Config(); final Get get = new Get(); - get.setReceivedRequests(stats.getConfigGet()); - get.setSuccessfulResponses(stats.getSuccessGetConfig()); - get.setFailedResponses(stats.getFailureGetConfig()); + get.setReceivedRequests(this.stats.getConfigGet()); + get.setSuccessfulResponses(this.stats.getSuccessGetConfig()); + get.setFailedResponses(this.stats.getFailureGetConfig()); config.setGet(get); final Post post = new Post(); - post.setReceivedRequests(stats.getConfigPost()); - post.setSuccessfulResponses(stats.getSuccessPost()); - post.setFailedResponses(stats.getFailurePost()); + post.setReceivedRequests(this.stats.getConfigPost()); + post.setSuccessfulResponses(this.stats.getSuccessPost()); + post.setFailedResponses(this.stats.getFailurePost()); config.setPost(post); final Put put = new Put(); - put.setReceivedRequests(stats.getConfigPut()); - put.setSuccessfulResponses(stats.getSuccessPut()); - put.setFailedResponses(stats.getFailurePut()); + put.setReceivedRequests(this.stats.getConfigPut()); + put.setSuccessfulResponses(this.stats.getSuccessPut()); + put.setFailedResponses(this.stats.getFailurePut()); config.setPut(put); final Delete delete = new Delete(); - delete.setReceivedRequests(stats.getConfigDelete()); - delete.setSuccessfulResponses(stats.getSuccessDelete()); - delete.setFailedResponses(stats.getFailureDelete()); + delete.setReceivedRequests(this.stats.getConfigDelete()); + delete.setSuccessfulResponses(this.stats.getSuccessDelete()); + delete.setFailedResponses(this.stats.getFailureDelete()); config.setDelete(delete); return config; @@ -109,19 +110,19 @@ public class RestconfProviderImpl implements Provider, AutoCloseable, RestConnec @Override public Operational getOperational() { - final BigInteger opGet = stats.getOperationalGet(); + final BigInteger opGet = this.stats.getOperationalGet(); final Operational operational = new Operational(); final Get get = new Get(); get.setReceivedRequests(opGet); - get.setSuccessfulResponses(stats.getSuccessGetOperational()); - get.setFailedResponses(stats.getFailureGetOperational()); + get.setSuccessfulResponses(this.stats.getSuccessGetOperational()); + get.setFailedResponses(this.stats.getFailureGetOperational()); operational.setGet(get); return operational; } @Override public Rpcs getRpcs() { - final BigInteger rpcInvoke = stats.getRpc(); + final BigInteger rpcInvoke = this.stats.getRpc(); final Rpcs rpcs = new Rpcs(); rpcs.setReceivedRequests(rpcInvoke); return rpcs; diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java index 5128672577..a8651c2163 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/ListenerAdapter.java @@ -77,7 +77,7 @@ public class ListenerAdapter implements DOMDataChangeListener { private static final TransformerFactory FACTORY = TransformerFactory.newInstance(); private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$"); - private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); + private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); private final YangInstanceIdentifier path; private ListenerRegistration registration; @@ -96,12 +96,12 @@ public class ListenerAdapter implements DOMDataChangeListener { */ ListenerAdapter(final YangInstanceIdentifier path, final String streamName) { Preconditions.checkNotNull(path); - Preconditions.checkArgument(streamName != null && !streamName.isEmpty()); + Preconditions.checkArgument((streamName != null) && !streamName.isEmpty()); this.path = path; this.streamName = streamName; - eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); - eventBusChangeRecorder = new EventBusChangeRecorder(); - eventBus.register(eventBusChangeRecorder); + this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); + this.eventBusChangeRecorder = new EventBusChangeRecorder(); + this.eventBus.register(this.eventBusChangeRecorder); } @Override @@ -111,7 +111,7 @@ public class ListenerAdapter implements DOMDataChangeListener { final String xml = prepareXmlFrom(change); final Event event = new Event(EventType.NOTIFY); event.setData(xml); - eventBus.post(event); + this.eventBus.post(event); } } @@ -123,20 +123,20 @@ public class ListenerAdapter implements DOMDataChangeListener { public void recordCustomerChange(final Event event) { if (event.getType() == EventType.REGISTER) { final Channel subscriber = event.getSubscriber(); - if (!subscribers.contains(subscriber)) { - subscribers.add(subscriber); + if (!ListenerAdapter.this.subscribers.contains(subscriber)) { + ListenerAdapter.this.subscribers.add(subscriber); } } else if (event.getType() == EventType.DEREGISTER) { - subscribers.remove(event.getSubscriber()); + ListenerAdapter.this.subscribers.remove(event.getSubscriber()); Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this); } else if (event.getType() == EventType.NOTIFY) { - for (final Channel subscriber : subscribers) { + for (final Channel subscriber : ListenerAdapter.this.subscribers) { if (subscriber.isActive()) { LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); } else { LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); - subscribers.remove(subscriber); + ListenerAdapter.this.subscribers.remove(subscriber); } } } @@ -167,7 +167,7 @@ public class ListenerAdapter implements DOMDataChangeListener { * @return Channel */ public Channel getSubscriber() { - return subscriber; + return this.subscriber; } /** @@ -186,7 +186,7 @@ public class ListenerAdapter implements DOMDataChangeListener { * @return String representation of event data. */ public String getData() { - return data; + return this.data; } /** @@ -204,7 +204,7 @@ public class ListenerAdapter implements DOMDataChangeListener { * @return The type of the event. */ public EventType getType() { - return type; + return this.type; } } @@ -267,15 +267,15 @@ public class ListenerAdapter implements DOMDataChangeListener { * Date * @return Data specified by RFC3339. */ - private String toRFC3339(final Date d) { - return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2"); + public static String toRFC3339(final Date d) { + return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2"); } /** * Creates {@link Document} document. * @return {@link Document} document. */ - private Document createDocument() { + public static Document createDocument() { final DocumentBuilder bob; try { bob = DBF.newDocumentBuilder(); @@ -326,7 +326,7 @@ public class ListenerAdapter implements DOMDataChangeListener { */ private void addValuesFromDataToElement(final Document doc, final Set data, final Element element, final Operation operation) { - if (data == null || data.isEmpty()) { + if ((data == null) || data.isEmpty()) { return; } for (final YangInstanceIdentifier path : data) { @@ -340,10 +340,10 @@ public class ListenerAdapter implements DOMDataChangeListener { private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set>> data, final Element element, final Operation operation, final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) { - if (data == null || data.isEmpty()) { + if ((data == null) || data.isEmpty()) { return; } - for (Entry> entry : data) { + for (final Entry> entry : data) { if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) { final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext, dataSchemaContextTree); @@ -396,18 +396,19 @@ public class ListenerAdapter implements DOMDataChangeListener { final Element dataElement = doc.createElement("data"); dataElement.appendChild(result); dataChangeEventElement.appendChild(dataElement); - } catch (IOException e) { + } catch (final IOException e) { LOG.error("Error in writer ", e); - } catch (XMLStreamException e) { + } catch (final XMLStreamException e) { LOG.error("Error processing stream", e); } return dataChangeEventElement; } - private static DOMResult writeNormalizedNode(final NormalizedNode normalized, final - YangInstanceIdentifier path, final SchemaContext context, final DataSchemaContextTree dataSchemaContextTree) throws - IOException, XMLStreamException { + private static DOMResult writeNormalizedNode(final NormalizedNode normalized, + final YangInstanceIdentifier path, final SchemaContext context, + final DataSchemaContextTree dataSchemaContextTree) + throws IOException, XMLStreamException { final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory(); final Document doc = XmlDocumentUtils.getDocument(); final DOMResult result = new DOMResult(doc); @@ -416,7 +417,7 @@ public class ListenerAdapter implements DOMDataChangeListener { XMLStreamWriter writer = null; final SchemaPath nodePath; - if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) { + if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) { nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath(); } else { nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent(); @@ -542,7 +543,7 @@ public class ListenerAdapter implements DOMDataChangeListener { * @return Path pointed to data in data store. */ public YangInstanceIdentifier getPath() { - return path; + return this.path; } /** @@ -560,17 +561,17 @@ public class ListenerAdapter implements DOMDataChangeListener { * @return The name of the stream. */ public String getStreamName() { - return streamName; + return this.streamName; } /** * Removes all subscribers and unregisters event bus change recorder form event bus. */ public void close() throws Exception { - subscribers = new ConcurrentSet<>(); - registration.close(); - registration = null; - eventBus.unregister(eventBusChangeRecorder); + this.subscribers = new ConcurrentSet<>(); + this.registration.close(); + this.registration = null; + this.eventBus.unregister(this.eventBusChangeRecorder); } /** @@ -579,7 +580,7 @@ public class ListenerAdapter implements DOMDataChangeListener { * @return True if exist, false otherwise. */ public boolean isListening() { - return registration == null ? false : true; + return this.registration == null ? false : true; } /** @@ -595,7 +596,7 @@ public class ListenerAdapter implements DOMDataChangeListener { } final Event event = new Event(EventType.REGISTER); event.setSubscriber(subscriber); - eventBus.post(event); + this.eventBus.post(event); } /** @@ -608,7 +609,7 @@ public class ListenerAdapter implements DOMDataChangeListener { LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); final Event event = new Event(EventType.DEREGISTER); event.setSubscriber(subscriber); - eventBus.post(event); + this.eventBus.post(event); } /** @@ -617,7 +618,7 @@ public class ListenerAdapter implements DOMDataChangeListener { * @return True if exist at least one {@link Channel} subscriber, false otherwise. */ public boolean hasSubscribers() { - return !subscribers.isEmpty(); + return !this.subscribers.isEmpty(); } /** diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java new file mode 100644 index 0000000000..42f0996052 --- /dev/null +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/NotificationListenerAdapter.java @@ -0,0 +1,390 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.sal.streams.listeners; + +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.util.internal.ConcurrentSet; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; +import java.util.Collection; +import java.util.Date; +import java.util.Set; +import java.util.concurrent.Executors; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMResult; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; +import org.json.JSONObject; +import org.json.XML; +import org.opendaylight.controller.md.sal.dom.api.DOMNotification; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener; +import org.opendaylight.netconf.sal.restconf.impl.ControllerContext; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; +import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter; +import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +/** + * {@link NotificationListenerAdapter} is responsible to track events on + * notifications. + * + */ +public class NotificationListenerAdapter implements DOMNotificationListener { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class); + private static final TransformerFactory FACTORY = TransformerFactory.newInstance(); + + private final String streamName; + private ListenerRegistration registration; + private Set subscribers = new ConcurrentSet<>(); + private final EventBus eventBus; + private final EventBusChangeRecorder eventBusChangeRecorder; + + private final SchemaPath path; + private final String outputType; + + /** + * Set path of listener and stream name, register event bus. + * + * @param path + * - path of notification + * @param streamName + * - stream name of listener + * @param outputType + * - type of output on notification (JSON, XML) + */ + NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) { + this.outputType = outputType; + Preconditions.checkArgument((streamName != null) && !streamName.isEmpty()); + Preconditions.checkArgument(path != null); + this.path = path; + this.streamName = streamName; + this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); + this.eventBusChangeRecorder = new EventBusChangeRecorder(); + this.eventBus.register(this.eventBusChangeRecorder); + } + + @Override + public void onNotification(final DOMNotification notification) { + final String xml = prepareXmlFrom(notification); + final Event event = new Event(EventType.NOTIFY); + if (this.outputType.equals("JSON")) { + final JSONObject jsonObject = XML.toJSONObject(xml); + event.setData(jsonObject.toString()); + } else { + event.setData(xml); + } + this.eventBus.post(event); + } + + /** + * Checks if exists at least one {@link Channel} subscriber. + * + * @return True if exist at least one {@link Channel} subscriber, false + * otherwise. + */ + public boolean hasSubscribers() { + return !this.subscribers.isEmpty(); + } + + /** + * Reset lists, close registration and unregister bus event. + */ + public void close() { + this.subscribers = new ConcurrentSet<>(); + this.registration.close(); + this.registration = null; + this.eventBus.unregister(this.eventBusChangeRecorder); + } + + /** + * Get stream name of this listener + * + * @return {@link String} + */ + public String getStreamName() { + return this.streamName; + } + + /** + * Check if is this listener registered. + * + * @return - true if is registered, otherwise null + */ + public boolean isListening() { + return this.registration == null ? false : true; + } + + /** + * Get schema path of notification + * + * @return {@link SchemaPath} + */ + public SchemaPath getSchemaPath() { + return this.path; + } + + /** + * Set registration for close after closing connection and check if this + * listener is registered + * + * @param registration + * - registered listener + */ + public void setRegistration(final ListenerRegistration registration) { + Preconditions.checkNotNull(registration); + this.registration = registration; + } + + /** + * Creates event of type {@link EventType#REGISTER}, set {@link Channel} + * subscriber to the event and post event into event bus. + * + * @param subscriber + * Channel + */ + public void addSubscriber(final Channel subscriber) { + if (!subscriber.isActive()) { + LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress()); + } + final Event event = new Event(EventType.REGISTER); + event.setSubscriber(subscriber); + this.eventBus.post(event); + } + + /** + * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} + * subscriber to the event and posts event into event bus. + * + * @param subscriber + */ + public void removeSubscriber(final Channel subscriber) { + LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress()); + final Event event = new Event(EventType.DEREGISTER); + event.setSubscriber(subscriber); + this.eventBus.post(event); + } + + private String prepareXmlFrom(final DOMNotification notification) { + final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema(); + final Document doc = ListenerAdapter.createDocument(); + final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", + "notification"); + doc.appendChild(notificationElement); + + final Element eventTimeElement = doc.createElement("eventTime"); + eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date())); + notificationElement.appendChild(eventTimeElement); + + final Element notificationEventElement = doc.createElementNS( + "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream"); + addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext); + notificationElement.appendChild(notificationEventElement); + + try { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final Transformer transformer = FACTORY.newTransformer(); + transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); + transformer.setOutputProperty(OutputKeys.METHOD, "xml"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); + transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8))); + final byte[] charData = out.toByteArray(); + return new String(charData, "UTF-8"); + } catch (TransformerException | UnsupportedEncodingException e) { + final String msg = "Error during transformation of Document into String"; + LOG.error(msg, e); + return msg; + } + } + + private void addValuesToNotificationEventElement(final Document doc, final Element element, + final DOMNotification notification, final SchemaContext schemaContext) { + if (notification == null) { + return; + } + + final NormalizedNode>> body = notification + .getBody(); + try { + final DOMResult domResult = writeNormalizedNode(body, + YangInstanceIdentifier.create(body.getIdentifier()), schemaContext); + final Node result = doc.importNode(domResult.getNode().getFirstChild(), true); + final Element dataElement = doc.createElement("notification"); + dataElement.appendChild(result); + element.appendChild(dataElement); + } catch (final IOException e) { + LOG.error("Error in writer ", e); + } catch (final XMLStreamException e) { + LOG.error("Error processing stream", e); + } + } + + private DOMResult writeNormalizedNode(final NormalizedNode normalized, final YangInstanceIdentifier path, + final SchemaContext context) throws IOException, XMLStreamException { + final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory(); + final Document doc = XmlDocumentUtils.getDocument(); + final DOMResult result = new DOMResult(doc); + NormalizedNodeWriter normalizedNodeWriter = null; + NormalizedNodeStreamWriter normalizedNodeStreamWriter = null; + XMLStreamWriter writer = null; + + try { + writer = XML_FACTORY.createXMLStreamWriter(result); + normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context, + this.getSchemaPath()); + normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter); + + normalizedNodeWriter.write(normalized); + + normalizedNodeWriter.flush(); + } finally { + if (normalizedNodeWriter != null) { + normalizedNodeWriter.close(); + } + if (normalizedNodeStreamWriter != null) { + normalizedNodeStreamWriter.close(); + } + if (writer != null) { + writer.close(); + } + } + + return result; + } + + /** + * Tracks events of data change by customer. + */ + private final class EventBusChangeRecorder { + @Subscribe + public void recordCustomerChange(final Event event) { + if (event.getType() == EventType.REGISTER) { + final Channel subscriber = event.getSubscriber(); + if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) { + NotificationListenerAdapter.this.subscribers.add(subscriber); + } + } else if (event.getType() == EventType.DEREGISTER) { + NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber()); + Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this); + } else if (event.getType() == EventType.NOTIFY) { + for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) { + if (subscriber.isActive()) { + LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); + subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); + } else { + LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); + NotificationListenerAdapter.this.subscribers.remove(subscriber); + } + } + } + } + } + + /** + * Represents event of specific {@link EventType} type, holds data and + * {@link Channel} subscriber. + */ + private final class Event { + private final EventType type; + private Channel subscriber; + private String data; + + /** + * Creates new event specified by {@link EventType} type. + * + * @param type + * EventType + */ + public Event(final EventType type) { + this.type = type; + } + + /** + * Gets the {@link Channel} subscriber. + * + * @return Channel + */ + public Channel getSubscriber() { + return this.subscriber; + } + + /** + * Sets subscriber for event. + * + * @param subscriber + * Channel + */ + public void setSubscriber(final Channel subscriber) { + this.subscriber = subscriber; + } + + /** + * Gets event String. + * + * @return String representation of event data. + */ + public String getData() { + return this.data; + } + + /** + * Sets event data. + * + * @param data + * String. + */ + public void setData(final String data) { + this.data = data; + } + + /** + * Gets event type. + * + * @return The type of the event. + */ + public EventType getType() { + return this.type; + } + } + + /** + * Type of the event. + */ + private enum EventType { + REGISTER, DEREGISTER, NOTIFY; + } +} diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java index 9537732133..0bd38652b7 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/listeners/Notificator.java @@ -7,12 +7,17 @@ */ package org.opendaylight.netconf.sal.streams.listeners; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link Notificator} is responsible to create, remove and find @@ -21,6 +26,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; public class Notificator { private static Map listenersByStreamName = new ConcurrentHashMap<>(); + private static Map> notificationListenersByStreamName = new ConcurrentHashMap<>(); + + private static final Logger LOG = LoggerFactory.getLogger(Notificator.class); private static final Lock lock = new ReentrantLock(); private Notificator() { @@ -40,7 +48,7 @@ public class Notificator { * The name of the stream. * @return {@link ListenerAdapter} specified by stream name. */ - public static ListenerAdapter getListenerFor(String streamName) { + public static ListenerAdapter getListenerFor(final String streamName) { return listenersByStreamName.get(streamName); } @@ -50,7 +58,7 @@ public class Notificator { * @param streamName * @return True if the listener exist, false otherwise. */ - public static boolean existListenerFor(String streamName) { + public static boolean existListenerFor(final String streamName) { return listenersByStreamName.containsKey(streamName); } @@ -63,8 +71,8 @@ public class Notificator { * The name of the stream. * @return New {@link ListenerAdapter} listener from {@link YangInstanceIdentifier} path and stream name. */ - public static ListenerAdapter createListener(YangInstanceIdentifier path, String streamName) { - ListenerAdapter listener = new ListenerAdapter(path, streamName); + public static ListenerAdapter createListener(final YangInstanceIdentifier path, final String streamName) { + final ListenerAdapter listener = new ListenerAdapter(path, streamName); try { lock.lock(); listenersByStreamName.put(streamName, listener); @@ -82,7 +90,7 @@ public class Notificator { * URI for creation stream name. * @return String representation of stream name. */ - public static String createStreamNameFromUri(String uri) { + public static String createStreamNameFromUri(final String uri) { if (uri == null) { return null; } @@ -100,10 +108,11 @@ public class Notificator { * Removes all listeners. */ public static void removeAllListeners() { - for (ListenerAdapter listener : listenersByStreamName.values()) { + for (final ListenerAdapter listener : listenersByStreamName.values()) { try { listener.close(); - } catch (Exception e) { + } catch (final Exception e) { + LOG.error("Failed to close listener", e); } } try { @@ -120,7 +129,7 @@ public class Notificator { * @param listener * ListenerAdapter */ - public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) { + public static void removeListenerIfNoSubscriberExists(final ListenerAdapter listener) { if (!listener.hasSubscribers()) { deleteListener(listener); } @@ -132,11 +141,12 @@ public class Notificator { * @param listener * ListenerAdapter */ - private static void deleteListener(ListenerAdapter listener) { + private static void deleteListener(final ListenerAdapter listener) { if (listener != null) { try { listener.close(); - } catch (Exception e) { + } catch (final Exception e) { + LOG.error("Failed to close listener", e); } try { lock.lock(); @@ -147,4 +157,57 @@ public class Notificator { } } + /** + * Check if the listener specified by qnames of request exist. + * + * @param streamName + * - name of stream + * @return True if the listener exist, false otherwise. + */ + public static boolean existNotificationListenerFor(final String streamName) { + return notificationListenersByStreamName.containsKey(streamName); + + } + + public static List createNotificationListener(final List paths, + final String streamName, final String outputType) { + final List listListeners = new ArrayList<>(); + for (final SchemaPath path : paths) { + final NotificationListenerAdapter listener = new NotificationListenerAdapter(path, streamName, outputType); + listListeners.add(listener); + } + try { + lock.lock(); + notificationListenersByStreamName.put(streamName, listListeners); + } finally { + lock.unlock(); + } + return listListeners; + } + + public static void removeNotificationListenerIfNoSubscriberExists(final NotificationListenerAdapter listener) { + if (!listener.hasSubscribers()) { + deleteNotificationListener(listener); + } + } + + private static void deleteNotificationListener(final NotificationListenerAdapter listener) { + if (listener != null) { + try { + listener.close(); + } catch (final Exception e) { + LOG.error("Failed to close listener", e); + } + try { + lock.lock(); + notificationListenersByStreamName.remove(listener.getStreamName()); + } finally { + lock.unlock(); + } + } + } + + public static List getNotificationListenerFor(final String streamName) { + return notificationListenersByStreamName.get(streamName); + } } diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java index c2b4edc621..5af22bfafc 100644 --- a/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java +++ b/restconf/sal-rest-connector/src/main/java/org/opendaylight/netconf/sal/streams/websockets/WebSocketServerHandler.java @@ -8,9 +8,9 @@ package org.opendaylight.netconf.sal.streams.websockets; +import static io.netty.handler.codec.http.HttpHeaders.Names.HOST; import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static io.netty.handler.codec.http.HttpHeaders.setContentLength; -import static io.netty.handler.codec.http.HttpHeaders.Names.HOST; import static io.netty.handler.codec.http.HttpMethod.GET; import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; @@ -35,7 +35,10 @@ import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; import java.io.IOException; +import java.util.List; +import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl; import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; +import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter; import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,24 +83,37 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler return; } - String streamName = Notificator.createStreamNameFromUri(req.getUri()); - ListenerAdapter listener = Notificator.getListenerFor(streamName); - if (listener != null) { - listener.addSubscriber(ctx.channel()); - logger.debug("Subscriber successfully registered."); - } else { - logger.error("Listener for stream with name '{}' was not found.", streamName); - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + final String streamName = Notificator.createStreamNameFromUri(req.getUri()); + if (streamName.contains(RestconfImpl.DATA_SUBSCR)) { + final ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.addSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } else { + logger.error("Listener for stream with name '{}' was not found.", streamName); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + } + } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) { + final List listeners = Notificator.getNotificationListenerFor(streamName); + if (!listeners.isEmpty() && (listeners != null)) { + for (final NotificationListenerAdapter listener : listeners) { + listener.addSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } + } else { + logger.error("Listener for stream with name '{}' was not found.", streamName); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + } } // Handshake - WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), + final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, false); - handshaker = wsFactory.newHandshaker(req); - if (handshaker == null) { + this.handshaker = wsFactory.newHandshaker(req); + if (this.handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { - handshaker.handshake(ctx.channel(), req); + this.handshaker.handshake(ctx.channel(), req); } } @@ -116,15 +132,15 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler final FullHttpResponse res) { // Generate an error page if response getStatus code is not OK (200). if (res.getStatus().code() != 200) { - ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); + final ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); setContentLength(res, res.content().readableBytes()); } // Send the response and close the connection if necessary. - ChannelFuture f = ctx.channel().writeAndFlush(res); - if (!isKeepAlive(req) || res.getStatus().code() != 200) { + final ChannelFuture f = ctx.channel().writeAndFlush(res); + if (!isKeepAlive(req) || (res.getStatus().code() != 200)) { f.addListener(ChannelFutureListener.CLOSE); } } @@ -139,14 +155,23 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler */ private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) throws IOException { if (frame instanceof CloseWebSocketFrame) { - handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); - String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText()); - ListenerAdapter listener = Notificator.getListenerFor(streamName); - if (listener != null) { - listener.removeSubscriber(ctx.channel()); - logger.debug("Subscriber successfully registered."); + this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + final String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText()); + if (streamName.contains(RestconfImpl.DATA_SUBSCR)) { + final ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.removeSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } + Notificator.removeListenerIfNoSubscriberExists(listener); + } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) { + final List listeners = Notificator.getNotificationListenerFor(streamName); + if (!listeners.isEmpty() && (listeners != null)) { + for (final NotificationListenerAdapter listener : listeners) { + listener.removeSubscriber(ctx.channel()); + } + } } - Notificator.removeListenerIfNoSubscriberExists(listener); return; } else if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); @@ -156,7 +181,7 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { - if (cause instanceof java.nio.channels.ClosedChannelException == false) { + if ((cause instanceof java.nio.channels.ClosedChannelException) == false) { // cause.printStackTrace(); } ctx.close(); diff --git a/restconf/sal-rest-connector/src/main/yang/sal-remote-augment.yang b/restconf/sal-rest-connector/src/main/yang/sal-remote-augment.yang index cc1d26fb97..8f5088f8bd 100644 --- a/restconf/sal-rest-connector/src/main/yang/sal-remote-augment.yang +++ b/restconf/sal-rest-connector/src/main/yang/sal-remote-augment.yang @@ -7,7 +7,7 @@ module sal-remote-augment { import sal-remote {prefix salrmt; revision-date "2014-01-14";} description - "Added input parameters to rpc create-data-change-event-subscription"; + "Added input parameters to rpc create-data-change-event-subscription and to create-notification-stream"; revision "2014-07-08" { } @@ -28,4 +28,15 @@ module sal-remote-augment { } } + augment "/salrmt:create-notification-stream/salrmt:input" { + leaf notification-output-type { + type enumeration { + enum JSON; + enum XML; + } + default "XML"; + description "Input parameter which type of output will be parsed on notification"; + } + } + } diff --git a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java index 43b3ca7bec..9fbf9007e3 100644 --- a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java +++ b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/BrokerFacadeTest.java @@ -15,11 +15,13 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import com.google.common.base.Optional; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import java.util.concurrent.Future; @@ -39,6 +41,7 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; +import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; @@ -48,6 +51,7 @@ import org.opendaylight.netconf.sal.restconf.impl.ControllerContext; import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException; import org.opendaylight.netconf.sal.restconf.impl.RestconfError; import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter; +import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter; import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; @@ -68,6 +72,9 @@ public class BrokerFacadeTest { @Mock DOMDataBroker domDataBroker; + @Mock + DOMNotificationService domNotification; + @Mock ConsumerSession context; @@ -102,6 +109,7 @@ public class BrokerFacadeTest { MockitoAnnotations.initMocks(this); // TODO it is started before every test method brokerFacade.setDomDataBroker(domDataBroker); + brokerFacade.setDomNotificationService(domNotification); brokerFacade.setRpcService(mockRpcService); brokerFacade.setContext(context); when(domDataBroker.newReadOnlyTransaction()).thenReturn(rTransaction); @@ -109,7 +117,6 @@ public class BrokerFacadeTest { when(domDataBroker.newReadWriteTransaction()).thenReturn(rwTransaction); ControllerContext.getInstance().setSchemas(TestUtils.loadSchemaContext("/full-versions/test-module")); - } private CheckedFuture>,ReadFailedException> wrapDummyNode(final NormalizedNode dummyNode) { @@ -267,6 +274,38 @@ public class BrokerFacadeTest { brokerFacade.registerToListenDataChanges(LogicalDatastoreType.CONFIGURATION, DataChangeScope.BASE, listener); verifyNoMoreInteractions(domDataBroker); + } + /** + * Create, register, close and remove notification listener. + */ + @Test + public void testRegisterToListenNotificationChanges() { + // create test notification listener + final String identifier = "create-notification-stream/toaster:toastDone"; + final SchemaPath path = SchemaPath.create(true, + QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toastDone")); + Notificator.createNotificationListener(Lists.newArrayList(path), identifier, "XML"); + final NotificationListenerAdapter listener = Notificator.getNotificationListenerFor(identifier).get(0); + + // mock registration + final ListenerRegistration registration = mock(ListenerRegistration.class); + when(domNotification.registerNotificationListener(listener, listener.getSchemaPath())) + .thenReturn(registration); + + // test to register listener for the first time + brokerFacade.registerToListenNotification(listener); + assertEquals("Registration was not successful", true, listener.isListening()); + + // try to register for the second time + brokerFacade.registerToListenNotification(listener); + assertEquals("Registration was not successful", true, listener.isListening()); + + // registrations should be invoked only once + verify(domNotification, times(1)).registerNotificationListener(listener, listener.getSchemaPath()); + + // close and remove test notification listener + listener.close(); + Notificator.removeNotificationListenerIfNoSubscriberExists(listener); } } diff --git a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplTest.java b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplTest.java index 1c57effe45..7c4b350c6c 100644 --- a/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplTest.java +++ b/restconf/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestconfImplTest.java @@ -13,20 +13,26 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import java.io.FileNotFoundException; +import java.net.URI; import java.text.ParseException; import java.util.Set; import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriInfo; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; @@ -35,10 +41,16 @@ import org.opendaylight.netconf.sal.restconf.impl.ControllerContext; import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext; import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext; import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl; +import org.opendaylight.netconf.sal.streams.listeners.Notificator; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.Module; +import org.opendaylight.yangtools.yang.model.api.RpcDefinition; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.opendaylight.yangtools.yang.model.api.SchemaNode; import org.opendaylight.yangtools.yang.model.api.SchemaPath; @@ -60,9 +72,8 @@ public class RestconfImplTest { final Set allModules = schemaContext.getModules(); assertNotNull(allModules); - controllerContext = spy(ControllerContext.getInstance()); + controllerContext = ControllerContext.getInstance(); controllerContext.setSchemas(schemaContext); - } @Before @@ -105,4 +116,73 @@ public class RestconfImplTest { this.restconfImpl.invokeRpc("ietf-netconf", ctx, uriInfo); verify(rpcService, times(2)).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class)); } + + /** + * Create notification stream for toaster module + */ + @Test + public void createNotificationStreamTest() { + final NormalizedNodeContext payload = mock(NormalizedNodeContext.class); + final InstanceIdentifierContext iiCtx = mock(InstanceIdentifierContext.class); + doReturn(iiCtx).when(payload).getInstanceIdentifierContext(); + + final SchemaNode schemaNode = mock(SchemaNode.class, + Mockito.withSettings().extraInterfaces(RpcDefinition.class)); + doReturn(schemaNode).when(iiCtx).getSchemaNode(); + doReturn(mock(SchemaPath.class)).when(schemaNode).getPath(); + + doReturn(QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", + "2014-01-14", "create-notification-stream")).when(schemaNode).getQName(); + doReturn(null).when(iiCtx).getMountPoint(); + + final Set> children = Sets.newHashSet(); + final DataContainerChild child = mock(DataContainerChild.class, + Mockito.withSettings().extraInterfaces(LeafSetNode.class)); + + final LeafSetEntryNode entryNode = mock(LeafSetEntryNode.class); + when(entryNode.getValue()).thenReturn("(http://netconfcentral.org/ns/toaster?revision=2009-11-20)toastDone"); + when(((LeafSetNode) child).getValue()).thenReturn(Sets.newHashSet(entryNode)); + children.add(child); + + final NormalizedNode normalizedNode = mock(NormalizedNode.class, + Mockito.withSettings().extraInterfaces(ContainerNode.class)); + doReturn(normalizedNode).when(payload).getData(); + doReturn(children).when(normalizedNode).getValue(); + + // register notification + final NormalizedNodeContext context = this.restconfImpl + .invokeRpc("sal-remote:create-notification-stream", payload, null); + assertNotNull(context); + } + + /** + * Subscribe for notification stream of toaster module + */ + @Test + public void subscribeToNotificationStreamTest() throws Exception { + final String identifier = "create-notification-stream/toaster:toastDone"; + + // register test notification stream + final SchemaPath path = SchemaPath.create( + true, QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toastDone")); + Notificator.createNotificationListener(Lists.newArrayList(path), identifier, "XML"); + + final UriInfo uriInfo = mock(UriInfo.class); + final UriBuilder uriBuilder = mock(UriBuilder.class); + when(uriBuilder.port(8181)).thenReturn(uriBuilder); + when(uriBuilder.replacePath(identifier)).thenReturn(uriBuilder); + when(uriBuilder.build()).thenReturn(new URI("")); + when(uriBuilder.scheme("ws")).thenReturn(uriBuilder); + when(uriInfo.getAbsolutePathBuilder()).thenReturn(uriBuilder); + + final BrokerFacade brokerFacade = mock(BrokerFacade.class); + this.restconfImpl.setBroker(brokerFacade); + + // subscribe to stream and verify response + final Response response = this.restconfImpl.subscribeToStream(identifier, uriInfo); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + + // remove test notification stream + Notificator.removeAllListeners(); + } }