<version>1.2</version>
</dependency>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ </dependency>
+
<!-- Testing Dependencies -->
<dependency>
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
private volatile DOMRpcService rpcService;
private volatile ConsumerSession context;
private DOMDataBroker domDataBroker;
+ private DOMNotificationService domNotification;
private BrokerFacade() {
}
public void setRpcService(final DOMRpcService router) {
- rpcService = router;
+ this.rpcService = router;
+ }
+
+ public void setDomNotificationService(final DOMNotificationService domNotification) {
+ this.domNotification = domNotification;
}
public void setContext(final ConsumerSession context) {
}
private void checkPreconditions() {
- if (context == null || domDataBroker == null) {
+ if ((this.context == null) || (this.domDataBroker == null)) {
throw new RestconfDocumentedException(Status.SERVICE_UNAVAILABLE);
}
}
// READ configuration
public NormalizedNode<?, ?> readConfigurationData(final YangInstanceIdentifier path) {
checkPreconditions();
- return readDataViaTransaction(domDataBroker.newReadOnlyTransaction(), CONFIGURATION, path);
+ return readDataViaTransaction(this.domDataBroker.newReadOnlyTransaction(), CONFIGURATION, path);
}
public NormalizedNode<?, ?> readConfigurationData(final DOMMountPoint mountPoint, final YangInstanceIdentifier path) {
// READ operational
public NormalizedNode<?, ?> readOperationalData(final YangInstanceIdentifier path) {
checkPreconditions();
- return readDataViaTransaction(domDataBroker.newReadOnlyTransaction(), OPERATIONAL, path);
+ return readDataViaTransaction(this.domDataBroker.newReadOnlyTransaction(), OPERATIONAL, path);
}
public NormalizedNode<?, ?> readOperationalData(final DOMMountPoint mountPoint, final YangInstanceIdentifier path) {
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPut(
final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
checkPreconditions();
- return putDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
+ return putDataViaTransaction(this.domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
}
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPut(
public PATCHStatusContext patchConfigurationDataWithinTransaction(final PATCHContext context,
final SchemaContext globalSchema) {
- final DOMDataReadWriteTransaction patchTransaction = domDataBroker.newReadWriteTransaction();
- List<PATCHStatusEntity> editCollection = new ArrayList<>();
+ final DOMDataReadWriteTransaction patchTransaction = this.domDataBroker.newReadWriteTransaction();
+ final List<PATCHStatusEntity> editCollection = new ArrayList<>();
List<RestconfError> editErrors;
- List<RestconfError> globalErrors = null;
+ final List<RestconfError> globalErrors = null;
int errorCounter = 0;
- for (PATCHEntity patchEntity : context.getData()) {
+ for (final PATCHEntity patchEntity : context.getData()) {
final PATCHEditOperation operation = PATCHEditOperation.valueOf(patchEntity.getOperation().toUpperCase());
switch (operation) {
postDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity.getTargetNode(),
patchEntity.getNode(), globalSchema);
editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null));
- } catch (RestconfDocumentedException e) {
+ } catch (final RestconfDocumentedException e) {
editErrors = new ArrayList<>();
editErrors.addAll(e.getErrors());
editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors));
putDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity
.getTargetNode(), patchEntity.getNode(), globalSchema);
editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null));
- } catch (RestconfDocumentedException e) {
+ } catch (final RestconfDocumentedException e) {
editErrors = new ArrayList<>();
editErrors.addAll(e.getErrors());
editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors));
deleteDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity
.getTargetNode());
editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null));
- } catch (RestconfDocumentedException e) {
+ } catch (final RestconfDocumentedException e) {
editErrors = new ArrayList<>();
editErrors.addAll(e.getErrors());
editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), false, editErrors));
deleteDataWithinTransaction(patchTransaction, CONFIGURATION, patchEntity
.getTargetNode());
editCollection.add(new PATCHStatusEntity(patchEntity.getEditId(), true, null));
- } catch (RestconfDocumentedException e) {
+ } catch (final RestconfDocumentedException e) {
LOG.error("Error removing {} by {} operation", patchEntity.getTargetNode().toString(),
patchEntity.getEditId(), e);
}
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPost(
final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
checkPreconditions();
- return postDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
+ return postDataViaTransaction(this.domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
}
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPost(
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataDelete(
final YangInstanceIdentifier path) {
checkPreconditions();
- return deleteDataViaTransaction(domDataBroker.newWriteOnlyTransaction(), CONFIGURATION, path);
+ return deleteDataViaTransaction(this.domDataBroker.newWriteOnlyTransaction(), CONFIGURATION, path);
}
public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataDelete(
// RPC
public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
checkPreconditions();
- if (rpcService == null) {
+ if (this.rpcService == null) {
throw new RestconfDocumentedException(Status.SERVICE_UNAVAILABLE);
}
LOG.trace("Invoke RPC {} with input: {}", type, input);
- return rpcService.invokeRpc(type, input);
+ return this.rpcService.invokeRpc(type, input);
}
public void registerToListenDataChanges(final LogicalDatastoreType datastore, final DataChangeScope scope,
}
final YangInstanceIdentifier path = listener.getPath();
- final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(
+ final ListenerRegistration<DOMDataChangeListener> registration = this.domDataBroker.registerDataChangeListener(
datastore, path, listener, scope);
listener.setRegistration(registration);
ImmutableNodes.fromInstanceId(schemaContext, YangInstanceIdentifier.create(normalizedPathWithoutChildArgs));
rwTx.merge(store, rootNormalizedPath, parentStructure);
}
+
+ public void registerToListenNotification(final NotificationListenerAdapter listener) {
+ checkPreconditions();
+
+ if (listener.isListening()) {
+ return;
+ }
+
+ final SchemaPath path = listener.getSchemaPath();
+ final ListenerRegistration<DOMNotificationListener> registration = this.domNotification
+ .registerNotificationListener(listener, path);
+
+ listener.setRegistration(registration);
+ }
}
import java.net.URISyntaxException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaNode;
private static final YangInstanceIdentifier.AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER;
+ public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
+ private static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
+
+ public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
+ private static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+
static {
try {
final Date eventSubscriptionAugRevision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08");
response = mountRpcServices.get().invokeRpc(type, payload.getData());
} else {
if (namespace.toString().equals(SAL_REMOTE_NAMESPACE)) {
- response = invokeSalRemoteRpcSubscribeRPC(payload);
+ if (identifier.contains(CREATE_DATA_SUBSCR)) {
+ response = invokeSalRemoteRpcSubscribeRPC(payload);
+ } else if (identifier.contains(CREATE_NOTIFICATION_STREAM)) {
+ response = invokeSalRemoteRpcNotifiStrRPC(payload);
+ } else {
+ final String msg = "Not supported operation";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg, ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED);
+ }
} else {
response = this.broker.invokeRpc(type, payload.getData());
}
}
final YangInstanceIdentifier pathIdentifier = ((YangInstanceIdentifier) pathValue);
- String streamName = null;
+ String streamName = (String) CREATE_DATA_SUBSCR;
if (!pathIdentifier.isEmpty()) {
- final String fullRestconfIdentifier = this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
+ final String fullRestconfIdentifier = DATA_SUBSCR
+ + this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
LogicalDatastoreType datastore = parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME);
datastore = datastore == null ? DEFAULT_DATASTORE : datastore;
*/
@Override
public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
+ if (identifier.contains(DATA_SUBSCR)) {
+ return dataSubs(identifier, uriInfo);
+ } else if (identifier.contains(NOTIFICATION_STREAM)) {
+ return notifStream(identifier, uriInfo);
+ }
+ final String msg = "Bad type of notification of sal-remote";
+ LOG.warn(msg);
+ throw new RestconfDocumentedException(msg);
+ }
+
+ /**
+ * Register notification listener by stream name
+ *
+ * @param identifier
+ * - stream name
+ * @param uriInfo
+ * - uriInfo
+ * @return {@link Response}
+ */
+ private Response notifStream(final String identifier, final UriInfo uriInfo) {
+ final String streamName = Notificator.createStreamNameFromUri(identifier);
+ if (Strings.isNullOrEmpty(streamName)) {
+ throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+ }
+ final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if ((listeners == null) || listeners.isEmpty()) {
+ throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+ ErrorTag.UNKNOWN_ELEMENT);
+ }
+
+ for (final NotificationListenerAdapter listener : listeners) {
+ this.broker.registerToListenNotification(listener);
+ }
+
+ final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+ int notificationPort = NOTIFICATION_PORT;
+ try {
+ final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+ notificationPort = webSocketServerInstance.getPort();
+ } catch (final NullPointerException e) {
+ WebSocketServer.createInstance(NOTIFICATION_PORT);
+ }
+ final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+ final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+ return Response.status(Status.OK).location(uriToWebsocketServer).build();
+ }
+
+ /**
+ * Register data change listener by stream name
+ *
+ * @param identifier
+ * - stream name
+ * @param uriInfo
+ * - uri info
+ * @return {@link Response}
+ */
+ private Response dataSubs(final String identifier, final UriInfo uriInfo) {
final String streamName = Notificator.createStreamNameFromUri(identifier);
if (Strings.isNullOrEmpty(streamName)) {
throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
/**
* Load parameter for subscribing to stream from input composite node
*
- * @param compNode
+ * @param value
* contains value
* @return enum object if its string value is equal to {@code paramName}. In other cases null.
*/
return streamNodeValues.build();
}
+
+ /**
+ * Prepare stream for notification
+ *
+ * @param payload
+ * - contains list of qnames of notifications
+ * @return - checked future object
+ */
+ private CheckedFuture<DOMRpcResult, DOMRpcException> invokeSalRemoteRpcNotifiStrRPC(
+ final NormalizedNodeContext payload) {
+ final ContainerNode data = (ContainerNode) payload.getData();
+ LeafSetNode leafSet = null;
+ String outputType = "XML";
+ for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+ if (dataChild instanceof LeafSetNode) {
+ leafSet = (LeafSetNode) dataChild;
+ } else if (dataChild instanceof AugmentationNode) {
+ outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+ }
+ }
+
+ final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+ final List<SchemaPath> paths = new ArrayList<>();
+ String streamName = CREATE_NOTIFICATION_STREAM + "/";
+
+ final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+ while (iterator.hasNext()) {
+ final QName valueQName = QName.create((String) iterator.next().getValue());
+ final Module module = ControllerContext.getInstance()
+ .findModuleByNamespace(valueQName.getModule().getNamespace());
+ Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace()
+ + " does not exist");
+ NotificationDefinition notifiDef = null;
+ for (final NotificationDefinition notification : module.getNotifications()) {
+ if (notification.getQName().equals(valueQName)) {
+ notifiDef = notification;
+ break;
+ }
+ }
+ final String moduleName = module.getName();
+ Preconditions.checkNotNull(notifiDef,
+ "Notification " + valueQName + "doesn't exist in module " + moduleName);
+ paths.add(notifiDef.getPath());
+ streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+ if (iterator.hasNext()) {
+ streamName = streamName + ",";
+ }
+ }
+
+ final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+ final QName outputQname = QName.create(rpcQName, "output");
+ final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+ final ContainerNode output = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new NodeIdentifier(outputQname))
+ .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+ if (!Notificator.existNotificationListenerFor(streamName)) {
+ Notificator.createNotificationListener(paths, streamName, outputType);
+ }
+
+ final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output);
+
+ return Futures.immediateCheckedFuture(defaultDOMRpcResult);
+ }
}
import org.opendaylight.controller.config.yang.md.sal.rest.connector.Rpcs;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
BrokerFacade.getInstance().setContext(session);
BrokerFacade.getInstance().setDomDataBroker( domDataBroker);
final SchemaService schemaService = session.getService(SchemaService.class);
- listenerRegistration = schemaService.registerSchemaContextListener(ControllerContext.getInstance());
+ this.listenerRegistration = schemaService.registerSchemaContextListener(ControllerContext.getInstance());
BrokerFacade.getInstance().setRpcService(session.getService(DOMRpcService.class));
-
+ BrokerFacade.getInstance().setDomNotificationService(session.getService(DOMNotificationService.class));
ControllerContext.getInstance().setSchemas(schemaService.getGlobalContext());
ControllerContext.getInstance().setMountService(session.getService(DOMMountPointService.class));
- webSocketServerThread = new Thread(WebSocketServer.createInstance(port.getValue().intValue()));
- webSocketServerThread.setName("Web socket server on port " + port);
- webSocketServerThread.start();
+ this.webSocketServerThread = new Thread(WebSocketServer.createInstance(this.port.getValue().intValue()));
+ this.webSocketServerThread.setName("Web socket server on port " + this.port);
+ this.webSocketServerThread.start();
}
@Override
@Override
public void close() {
- if (listenerRegistration != null) {
- listenerRegistration.close();
+ if (this.listenerRegistration != null) {
+ this.listenerRegistration.close();
}
WebSocketServer.destroyInstance();
- webSocketServerThread.interrupt();
+ this.webSocketServerThread.interrupt();
}
@Override
final Config config = new Config();
final Get get = new Get();
- get.setReceivedRequests(stats.getConfigGet());
- get.setSuccessfulResponses(stats.getSuccessGetConfig());
- get.setFailedResponses(stats.getFailureGetConfig());
+ get.setReceivedRequests(this.stats.getConfigGet());
+ get.setSuccessfulResponses(this.stats.getSuccessGetConfig());
+ get.setFailedResponses(this.stats.getFailureGetConfig());
config.setGet(get);
final Post post = new Post();
- post.setReceivedRequests(stats.getConfigPost());
- post.setSuccessfulResponses(stats.getSuccessPost());
- post.setFailedResponses(stats.getFailurePost());
+ post.setReceivedRequests(this.stats.getConfigPost());
+ post.setSuccessfulResponses(this.stats.getSuccessPost());
+ post.setFailedResponses(this.stats.getFailurePost());
config.setPost(post);
final Put put = new Put();
- put.setReceivedRequests(stats.getConfigPut());
- put.setSuccessfulResponses(stats.getSuccessPut());
- put.setFailedResponses(stats.getFailurePut());
+ put.setReceivedRequests(this.stats.getConfigPut());
+ put.setSuccessfulResponses(this.stats.getSuccessPut());
+ put.setFailedResponses(this.stats.getFailurePut());
config.setPut(put);
final Delete delete = new Delete();
- delete.setReceivedRequests(stats.getConfigDelete());
- delete.setSuccessfulResponses(stats.getSuccessDelete());
- delete.setFailedResponses(stats.getFailureDelete());
+ delete.setReceivedRequests(this.stats.getConfigDelete());
+ delete.setSuccessfulResponses(this.stats.getSuccessDelete());
+ delete.setFailedResponses(this.stats.getFailureDelete());
config.setDelete(delete);
return config;
@Override
public Operational getOperational() {
- final BigInteger opGet = stats.getOperationalGet();
+ final BigInteger opGet = this.stats.getOperationalGet();
final Operational operational = new Operational();
final Get get = new Get();
get.setReceivedRequests(opGet);
- get.setSuccessfulResponses(stats.getSuccessGetOperational());
- get.setFailedResponses(stats.getFailureGetOperational());
+ get.setSuccessfulResponses(this.stats.getSuccessGetOperational());
+ get.setFailedResponses(this.stats.getFailureGetOperational());
operational.setGet(get);
return operational;
}
@Override
public Rpcs getRpcs() {
- final BigInteger rpcInvoke = stats.getRpc();
+ final BigInteger rpcInvoke = this.stats.getRpc();
final Rpcs rpcs = new Rpcs();
rpcs.setReceivedRequests(rpcInvoke);
return rpcs;
private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
private static final Pattern RFC3339_PATTERN = Pattern.compile("(\\d\\d)(\\d\\d)$");
- private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
+ private static final SimpleDateFormat RFC3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
private final YangInstanceIdentifier path;
private ListenerRegistration<DOMDataChangeListener> registration;
*/
ListenerAdapter(final YangInstanceIdentifier path, final String streamName) {
Preconditions.checkNotNull(path);
- Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
+ Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
this.path = path;
this.streamName = streamName;
- eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
- eventBusChangeRecorder = new EventBusChangeRecorder();
- eventBus.register(eventBusChangeRecorder);
+ this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+ this.eventBusChangeRecorder = new EventBusChangeRecorder();
+ this.eventBus.register(this.eventBusChangeRecorder);
}
@Override
final String xml = prepareXmlFrom(change);
final Event event = new Event(EventType.NOTIFY);
event.setData(xml);
- eventBus.post(event);
+ this.eventBus.post(event);
}
}
public void recordCustomerChange(final Event event) {
if (event.getType() == EventType.REGISTER) {
final Channel subscriber = event.getSubscriber();
- if (!subscribers.contains(subscriber)) {
- subscribers.add(subscriber);
+ if (!ListenerAdapter.this.subscribers.contains(subscriber)) {
+ ListenerAdapter.this.subscribers.add(subscriber);
}
} else if (event.getType() == EventType.DEREGISTER) {
- subscribers.remove(event.getSubscriber());
+ ListenerAdapter.this.subscribers.remove(event.getSubscriber());
Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
} else if (event.getType() == EventType.NOTIFY) {
- for (final Channel subscriber : subscribers) {
+ for (final Channel subscriber : ListenerAdapter.this.subscribers) {
if (subscriber.isActive()) {
LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
} else {
LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
- subscribers.remove(subscriber);
+ ListenerAdapter.this.subscribers.remove(subscriber);
}
}
}
* @return Channel
*/
public Channel getSubscriber() {
- return subscriber;
+ return this.subscriber;
}
/**
* @return String representation of event data.
*/
public String getData() {
- return data;
+ return this.data;
}
/**
* @return The type of the event.
*/
public EventType getType() {
- return type;
+ return this.type;
}
}
* Date
* @return Data specified by RFC3339.
*/
- private String toRFC3339(final Date d) {
- return RFC3339_PATTERN.matcher(rfc3339.format(d)).replaceAll("$1:$2");
+ public static String toRFC3339(final Date d) {
+ return RFC3339_PATTERN.matcher(RFC3339.format(d)).replaceAll("$1:$2");
}
/**
* Creates {@link Document} document.
* @return {@link Document} document.
*/
- private Document createDocument() {
+ public static Document createDocument() {
final DocumentBuilder bob;
try {
bob = DBF.newDocumentBuilder();
*/
private void addValuesFromDataToElement(final Document doc, final Set<YangInstanceIdentifier> data, final Element element,
final Operation operation) {
- if (data == null || data.isEmpty()) {
+ if ((data == null) || data.isEmpty()) {
return;
}
for (final YangInstanceIdentifier path : data) {
private void addCreatedChangedValuesFromDataToElement(final Document doc, final Set<Entry<YangInstanceIdentifier,
NormalizedNode<?,?>>> data, final Element element, final Operation operation, final SchemaContext
schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
- if (data == null || data.isEmpty()) {
+ if ((data == null) || data.isEmpty()) {
return;
}
- for (Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
+ for (final Entry<YangInstanceIdentifier, NormalizedNode<?, ?>> entry : data) {
if (!ControllerContext.getInstance().isNodeMixin(entry.getKey())) {
final Node node = createCreatedChangedDataChangeEventElement(doc, entry, operation, schemaContext,
dataSchemaContextTree);
final Element dataElement = doc.createElement("data");
dataElement.appendChild(result);
dataChangeEventElement.appendChild(dataElement);
- } catch (IOException e) {
+ } catch (final IOException e) {
LOG.error("Error in writer ", e);
- } catch (XMLStreamException e) {
+ } catch (final XMLStreamException e) {
LOG.error("Error processing stream", e);
}
return dataChangeEventElement;
}
- private static DOMResult writeNormalizedNode(final NormalizedNode<?,?> normalized, final
- YangInstanceIdentifier path, final SchemaContext context, final DataSchemaContextTree dataSchemaContextTree) throws
- IOException, XMLStreamException {
+ private static DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized,
+ final YangInstanceIdentifier path, final SchemaContext context,
+ final DataSchemaContextTree dataSchemaContextTree)
+ throws IOException, XMLStreamException {
final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
final Document doc = XmlDocumentUtils.getDocument();
final DOMResult result = new DOMResult(doc);
XMLStreamWriter writer = null;
final SchemaPath nodePath;
- if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) {
+ if ((normalized instanceof MapEntryNode) || (normalized instanceof UnkeyedListEntryNode)) {
nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath();
} else {
nodePath = dataSchemaContextTree.getChild(path).getDataSchemaNode().getPath().getParent();
* @return Path pointed to data in data store.
*/
public YangInstanceIdentifier getPath() {
- return path;
+ return this.path;
}
/**
* @return The name of the stream.
*/
public String getStreamName() {
- return streamName;
+ return this.streamName;
}
/**
* Removes all subscribers and unregisters event bus change recorder form event bus.
*/
public void close() throws Exception {
- subscribers = new ConcurrentSet<>();
- registration.close();
- registration = null;
- eventBus.unregister(eventBusChangeRecorder);
+ this.subscribers = new ConcurrentSet<>();
+ this.registration.close();
+ this.registration = null;
+ this.eventBus.unregister(this.eventBusChangeRecorder);
}
/**
* @return True if exist, false otherwise.
*/
public boolean isListening() {
- return registration == null ? false : true;
+ return this.registration == null ? false : true;
}
/**
}
final Event event = new Event(EventType.REGISTER);
event.setSubscriber(subscriber);
- eventBus.post(event);
+ this.eventBus.post(event);
}
/**
LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
final Event event = new Event(EventType.DEREGISTER);
event.setSubscriber(subscriber);
- eventBus.post(event);
+ this.eventBus.post(event);
}
/**
* @return True if exist at least one {@link Channel} subscriber, false otherwise.
*/
public boolean hasSubscribers() {
- return !subscribers.isEmpty();
+ return !this.subscribers.isEmpty();
}
/**
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.sal.streams.listeners;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.util.internal.ConcurrentSet;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMResult;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+import org.json.JSONObject;
+import org.json.XML;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.netconf.sal.restconf.impl.ControllerContext;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlDocumentUtils;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+/**
+ * {@link NotificationListenerAdapter} is responsible to track events on
+ * notifications.
+ *
+ */
+public class NotificationListenerAdapter implements DOMNotificationListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationListenerAdapter.class);
+ private static final TransformerFactory FACTORY = TransformerFactory.newInstance();
+
+ private final String streamName;
+ private ListenerRegistration<DOMNotificationListener> registration;
+ private Set<Channel> subscribers = new ConcurrentSet<>();
+ private final EventBus eventBus;
+ private final EventBusChangeRecorder eventBusChangeRecorder;
+
+ private final SchemaPath path;
+ private final String outputType;
+
+ /**
+ * Set path of listener and stream name, register event bus.
+ *
+ * @param path
+ * - path of notification
+ * @param streamName
+ * - stream name of listener
+ * @param outputType
+ * - type of output on notification (JSON, XML)
+ */
+ NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
+ this.outputType = outputType;
+ Preconditions.checkArgument((streamName != null) && !streamName.isEmpty());
+ Preconditions.checkArgument(path != null);
+ this.path = path;
+ this.streamName = streamName;
+ this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+ this.eventBusChangeRecorder = new EventBusChangeRecorder();
+ this.eventBus.register(this.eventBusChangeRecorder);
+ }
+
+ @Override
+ public void onNotification(final DOMNotification notification) {
+ final String xml = prepareXmlFrom(notification);
+ final Event event = new Event(EventType.NOTIFY);
+ if (this.outputType.equals("JSON")) {
+ final JSONObject jsonObject = XML.toJSONObject(xml);
+ event.setData(jsonObject.toString());
+ } else {
+ event.setData(xml);
+ }
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Checks if exists at least one {@link Channel} subscriber.
+ *
+ * @return True if exist at least one {@link Channel} subscriber, false
+ * otherwise.
+ */
+ public boolean hasSubscribers() {
+ return !this.subscribers.isEmpty();
+ }
+
+ /**
+ * Reset lists, close registration and unregister bus event.
+ */
+ public void close() {
+ this.subscribers = new ConcurrentSet<>();
+ this.registration.close();
+ this.registration = null;
+ this.eventBus.unregister(this.eventBusChangeRecorder);
+ }
+
+ /**
+ * Get stream name of this listener
+ *
+ * @return {@link String}
+ */
+ public String getStreamName() {
+ return this.streamName;
+ }
+
+ /**
+ * Check if is this listener registered.
+ *
+ * @return - true if is registered, otherwise null
+ */
+ public boolean isListening() {
+ return this.registration == null ? false : true;
+ }
+
+ /**
+ * Get schema path of notification
+ *
+ * @return {@link SchemaPath}
+ */
+ public SchemaPath getSchemaPath() {
+ return this.path;
+ }
+
+ /**
+ * Set registration for close after closing connection and check if this
+ * listener is registered
+ *
+ * @param registration
+ * - registered listener
+ */
+ public void setRegistration(final ListenerRegistration<DOMNotificationListener> registration) {
+ Preconditions.checkNotNull(registration);
+ this.registration = registration;
+ }
+
+ /**
+ * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+ * subscriber to the event and post event into event bus.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void addSubscriber(final Channel subscriber) {
+ if (!subscriber.isActive()) {
+ LOG.debug("Channel is not active between websocket server and subscriber {}" + subscriber.remoteAddress());
+ }
+ final Event event = new Event(EventType.REGISTER);
+ event.setSubscriber(subscriber);
+ this.eventBus.post(event);
+ }
+
+ /**
+ * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+ * subscriber to the event and posts event into event bus.
+ *
+ * @param subscriber
+ */
+ public void removeSubscriber(final Channel subscriber) {
+ LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+ final Event event = new Event(EventType.DEREGISTER);
+ event.setSubscriber(subscriber);
+ this.eventBus.post(event);
+ }
+
+ private String prepareXmlFrom(final DOMNotification notification) {
+ final SchemaContext schemaContext = ControllerContext.getInstance().getGlobalSchema();
+ final Document doc = ListenerAdapter.createDocument();
+ final Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
+ "notification");
+ doc.appendChild(notificationElement);
+
+ final Element eventTimeElement = doc.createElement("eventTime");
+ eventTimeElement.setTextContent(ListenerAdapter.toRFC3339(new Date()));
+ notificationElement.appendChild(eventTimeElement);
+
+ final Element notificationEventElement = doc.createElementNS(
+ "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "create-notification-stream");
+ addValuesToNotificationEventElement(doc, notificationEventElement, notification, schemaContext);
+ notificationElement.appendChild(notificationEventElement);
+
+ try {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final Transformer transformer = FACTORY.newTransformer();
+ transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+ transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+ transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+ transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+ transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+ transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, Charsets.UTF_8)));
+ final byte[] charData = out.toByteArray();
+ return new String(charData, "UTF-8");
+ } catch (TransformerException | UnsupportedEncodingException e) {
+ final String msg = "Error during transformation of Document into String";
+ LOG.error(msg, e);
+ return msg;
+ }
+ }
+
+ private void addValuesToNotificationEventElement(final Document doc, final Element element,
+ final DOMNotification notification, final SchemaContext schemaContext) {
+ if (notification == null) {
+ return;
+ }
+
+ final NormalizedNode<NodeIdentifier, Collection<DataContainerChild<? extends PathArgument, ?>>> body = notification
+ .getBody();
+ try {
+ final DOMResult domResult = writeNormalizedNode(body,
+ YangInstanceIdentifier.create(body.getIdentifier()), schemaContext);
+ final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
+ final Element dataElement = doc.createElement("notification");
+ dataElement.appendChild(result);
+ element.appendChild(dataElement);
+ } catch (final IOException e) {
+ LOG.error("Error in writer ", e);
+ } catch (final XMLStreamException e) {
+ LOG.error("Error processing stream", e);
+ }
+ }
+
+ private DOMResult writeNormalizedNode(final NormalizedNode<?, ?> normalized, final YangInstanceIdentifier path,
+ final SchemaContext context) throws IOException, XMLStreamException {
+ final XMLOutputFactory XML_FACTORY = XMLOutputFactory.newFactory();
+ final Document doc = XmlDocumentUtils.getDocument();
+ final DOMResult result = new DOMResult(doc);
+ NormalizedNodeWriter normalizedNodeWriter = null;
+ NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
+ XMLStreamWriter writer = null;
+
+ try {
+ writer = XML_FACTORY.createXMLStreamWriter(result);
+ normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, context,
+ this.getSchemaPath());
+ normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
+
+ normalizedNodeWriter.write(normalized);
+
+ normalizedNodeWriter.flush();
+ } finally {
+ if (normalizedNodeWriter != null) {
+ normalizedNodeWriter.close();
+ }
+ if (normalizedNodeStreamWriter != null) {
+ normalizedNodeStreamWriter.close();
+ }
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Tracks events of data change by customer.
+ */
+ private final class EventBusChangeRecorder {
+ @Subscribe
+ public void recordCustomerChange(final Event event) {
+ if (event.getType() == EventType.REGISTER) {
+ final Channel subscriber = event.getSubscriber();
+ if (!NotificationListenerAdapter.this.subscribers.contains(subscriber)) {
+ NotificationListenerAdapter.this.subscribers.add(subscriber);
+ }
+ } else if (event.getType() == EventType.DEREGISTER) {
+ NotificationListenerAdapter.this.subscribers.remove(event.getSubscriber());
+ Notificator.removeNotificationListenerIfNoSubscriberExists(NotificationListenerAdapter.this);
+ } else if (event.getType() == EventType.NOTIFY) {
+ for (final Channel subscriber : NotificationListenerAdapter.this.subscribers) {
+ if (subscriber.isActive()) {
+ LOG.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+ subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
+ } else {
+ LOG.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
+ NotificationListenerAdapter.this.subscribers.remove(subscriber);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Represents event of specific {@link EventType} type, holds data and
+ * {@link Channel} subscriber.
+ */
+ private final class Event {
+ private final EventType type;
+ private Channel subscriber;
+ private String data;
+
+ /**
+ * Creates new event specified by {@link EventType} type.
+ *
+ * @param type
+ * EventType
+ */
+ public Event(final EventType type) {
+ this.type = type;
+ }
+
+ /**
+ * Gets the {@link Channel} subscriber.
+ *
+ * @return Channel
+ */
+ public Channel getSubscriber() {
+ return this.subscriber;
+ }
+
+ /**
+ * Sets subscriber for event.
+ *
+ * @param subscriber
+ * Channel
+ */
+ public void setSubscriber(final Channel subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ /**
+ * Gets event String.
+ *
+ * @return String representation of event data.
+ */
+ public String getData() {
+ return this.data;
+ }
+
+ /**
+ * Sets event data.
+ *
+ * @param data
+ * String.
+ */
+ public void setData(final String data) {
+ this.data = data;
+ }
+
+ /**
+ * Gets event type.
+ *
+ * @return The type of the event.
+ */
+ public EventType getType() {
+ return this.type;
+ }
+ }
+
+ /**
+ * Type of the event.
+ */
+ private enum EventType {
+ REGISTER, DEREGISTER, NOTIFY;
+ }
+}
*/
package org.opendaylight.netconf.sal.streams.listeners;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@link Notificator} is responsible to create, remove and find
public class Notificator {
private static Map<String, ListenerAdapter> listenersByStreamName = new ConcurrentHashMap<>();
+ private static Map<String, List<NotificationListenerAdapter>> notificationListenersByStreamName = new ConcurrentHashMap<>();
+
+ private static final Logger LOG = LoggerFactory.getLogger(Notificator.class);
private static final Lock lock = new ReentrantLock();
private Notificator() {
* The name of the stream.
* @return {@link ListenerAdapter} specified by stream name.
*/
- public static ListenerAdapter getListenerFor(String streamName) {
+ public static ListenerAdapter getListenerFor(final String streamName) {
return listenersByStreamName.get(streamName);
}
* @param streamName
* @return True if the listener exist, false otherwise.
*/
- public static boolean existListenerFor(String streamName) {
+ public static boolean existListenerFor(final String streamName) {
return listenersByStreamName.containsKey(streamName);
}
* The name of the stream.
* @return New {@link ListenerAdapter} listener from {@link YangInstanceIdentifier} path and stream name.
*/
- public static ListenerAdapter createListener(YangInstanceIdentifier path, String streamName) {
- ListenerAdapter listener = new ListenerAdapter(path, streamName);
+ public static ListenerAdapter createListener(final YangInstanceIdentifier path, final String streamName) {
+ final ListenerAdapter listener = new ListenerAdapter(path, streamName);
try {
lock.lock();
listenersByStreamName.put(streamName, listener);
* URI for creation stream name.
* @return String representation of stream name.
*/
- public static String createStreamNameFromUri(String uri) {
+ public static String createStreamNameFromUri(final String uri) {
if (uri == null) {
return null;
}
* Removes all listeners.
*/
public static void removeAllListeners() {
- for (ListenerAdapter listener : listenersByStreamName.values()) {
+ for (final ListenerAdapter listener : listenersByStreamName.values()) {
try {
listener.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
+ LOG.error("Failed to close listener", e);
}
}
try {
* @param listener
* ListenerAdapter
*/
- public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) {
+ public static void removeListenerIfNoSubscriberExists(final ListenerAdapter listener) {
if (!listener.hasSubscribers()) {
deleteListener(listener);
}
* @param listener
* ListenerAdapter
*/
- private static void deleteListener(ListenerAdapter listener) {
+ private static void deleteListener(final ListenerAdapter listener) {
if (listener != null) {
try {
listener.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
+ LOG.error("Failed to close listener", e);
}
try {
lock.lock();
}
}
+ /**
+ * Check if the listener specified by qnames of request exist.
+ *
+ * @param streamName
+ * - name of stream
+ * @return True if the listener exist, false otherwise.
+ */
+ public static boolean existNotificationListenerFor(final String streamName) {
+ return notificationListenersByStreamName.containsKey(streamName);
+
+ }
+
+ public static List<NotificationListenerAdapter> createNotificationListener(final List<SchemaPath> paths,
+ final String streamName, final String outputType) {
+ final List<NotificationListenerAdapter> listListeners = new ArrayList<>();
+ for (final SchemaPath path : paths) {
+ final NotificationListenerAdapter listener = new NotificationListenerAdapter(path, streamName, outputType);
+ listListeners.add(listener);
+ }
+ try {
+ lock.lock();
+ notificationListenersByStreamName.put(streamName, listListeners);
+ } finally {
+ lock.unlock();
+ }
+ return listListeners;
+ }
+
+ public static void removeNotificationListenerIfNoSubscriberExists(final NotificationListenerAdapter listener) {
+ if (!listener.hasSubscribers()) {
+ deleteNotificationListener(listener);
+ }
+ }
+
+ private static void deleteNotificationListener(final NotificationListenerAdapter listener) {
+ if (listener != null) {
+ try {
+ listener.close();
+ } catch (final Exception e) {
+ LOG.error("Failed to close listener", e);
+ }
+ try {
+ lock.lock();
+ notificationListenersByStreamName.remove(listener.getStreamName());
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ public static List<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
+ return notificationListenersByStreamName.get(streamName);
+ }
}
package org.opendaylight.netconf.sal.streams.websockets;
+import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
import java.io.IOException;
+import java.util.List;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
return;
}
- String streamName = Notificator.createStreamNameFromUri(req.getUri());
- ListenerAdapter listener = Notificator.getListenerFor(streamName);
- if (listener != null) {
- listener.addSubscriber(ctx.channel());
- logger.debug("Subscriber successfully registered.");
- } else {
- logger.error("Listener for stream with name '{}' was not found.", streamName);
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+ final String streamName = Notificator.createStreamNameFromUri(req.getUri());
+ if (streamName.contains(RestconfImpl.DATA_SUBSCR)) {
+ final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+ if (listener != null) {
+ listener.addSubscriber(ctx.channel());
+ logger.debug("Subscriber successfully registered.");
+ } else {
+ logger.error("Listener for stream with name '{}' was not found.", streamName);
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+ }
+ } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) {
+ final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if (!listeners.isEmpty() && (listeners != null)) {
+ for (final NotificationListenerAdapter listener : listeners) {
+ listener.addSubscriber(ctx.channel());
+ logger.debug("Subscriber successfully registered.");
+ }
+ } else {
+ logger.error("Listener for stream with name '{}' was not found.", streamName);
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+ }
}
// Handshake
- WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
+ final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
null, false);
- handshaker = wsFactory.newHandshaker(req);
- if (handshaker == null) {
+ this.handshaker = wsFactory.newHandshaker(req);
+ if (this.handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
- handshaker.handshake(ctx.channel(), req);
+ this.handshaker.handshake(ctx.channel(), req);
}
}
final FullHttpResponse res) {
// Generate an error page if response getStatus code is not OK (200).
if (res.getStatus().code() != 200) {
- ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
+ final ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
setContentLength(res, res.content().readableBytes());
}
// Send the response and close the connection if necessary.
- ChannelFuture f = ctx.channel().writeAndFlush(res);
- if (!isKeepAlive(req) || res.getStatus().code() != 200) {
+ final ChannelFuture f = ctx.channel().writeAndFlush(res);
+ if (!isKeepAlive(req) || (res.getStatus().code() != 200)) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
*/
private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) throws IOException {
if (frame instanceof CloseWebSocketFrame) {
- handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
- String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
- ListenerAdapter listener = Notificator.getListenerFor(streamName);
- if (listener != null) {
- listener.removeSubscriber(ctx.channel());
- logger.debug("Subscriber successfully registered.");
+ this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
+ final String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
+ if (streamName.contains(RestconfImpl.DATA_SUBSCR)) {
+ final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+ if (listener != null) {
+ listener.removeSubscriber(ctx.channel());
+ logger.debug("Subscriber successfully registered.");
+ }
+ Notificator.removeListenerIfNoSubscriberExists(listener);
+ } else if (streamName.contains(RestconfImpl.NOTIFICATION_STREAM)) {
+ final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+ if (!listeners.isEmpty() && (listeners != null)) {
+ for (final NotificationListenerAdapter listener : listeners) {
+ listener.removeSubscriber(ctx.channel());
+ }
+ }
}
- Notificator.removeListenerIfNoSubscriberExists(listener);
return;
} else if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception {
- if (cause instanceof java.nio.channels.ClosedChannelException == false) {
+ if ((cause instanceof java.nio.channels.ClosedChannelException) == false) {
// cause.printStackTrace();
}
ctx.close();
import sal-remote {prefix salrmt; revision-date "2014-01-14";}
description
- "Added input parameters to rpc create-data-change-event-subscription";
+ "Added input parameters to rpc create-data-change-event-subscription and to create-notification-stream";
revision "2014-07-08" {
}
}
}
+ augment "/salrmt:create-notification-stream/salrmt:input" {
+ leaf notification-output-type {
+ type enumeration {
+ enum JSON;
+ enum XML;
+ }
+ default "XML";
+ description "Input parameter which type of output will be parsed on notification";
+ }
+ }
+
}
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import java.util.concurrent.Future;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
import org.opendaylight.netconf.sal.restconf.impl.RestconfError;
import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
@Mock
DOMDataBroker domDataBroker;
+ @Mock
+ DOMNotificationService domNotification;
+
@Mock
ConsumerSession context;
MockitoAnnotations.initMocks(this);
// TODO it is started before every test method
brokerFacade.setDomDataBroker(domDataBroker);
+ brokerFacade.setDomNotificationService(domNotification);
brokerFacade.setRpcService(mockRpcService);
brokerFacade.setContext(context);
when(domDataBroker.newReadOnlyTransaction()).thenReturn(rTransaction);
when(domDataBroker.newReadWriteTransaction()).thenReturn(rwTransaction);
ControllerContext.getInstance().setSchemas(TestUtils.loadSchemaContext("/full-versions/test-module"));
-
}
private CheckedFuture<Optional<NormalizedNode<?, ?>>,ReadFailedException> wrapDummyNode(final NormalizedNode<?, ?> dummyNode) {
brokerFacade.registerToListenDataChanges(LogicalDatastoreType.CONFIGURATION, DataChangeScope.BASE, listener);
verifyNoMoreInteractions(domDataBroker);
+ }
+ /**
+ * Create, register, close and remove notification listener.
+ */
+ @Test
+ public void testRegisterToListenNotificationChanges() {
+ // create test notification listener
+ final String identifier = "create-notification-stream/toaster:toastDone";
+ final SchemaPath path = SchemaPath.create(true,
+ QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toastDone"));
+ Notificator.createNotificationListener(Lists.newArrayList(path), identifier, "XML");
+ final NotificationListenerAdapter listener = Notificator.getNotificationListenerFor(identifier).get(0);
+
+ // mock registration
+ final ListenerRegistration<NotificationListenerAdapter> registration = mock(ListenerRegistration.class);
+ when(domNotification.registerNotificationListener(listener, listener.getSchemaPath()))
+ .thenReturn(registration);
+
+ // test to register listener for the first time
+ brokerFacade.registerToListenNotification(listener);
+ assertEquals("Registration was not successful", true, listener.isListening());
+
+ // try to register for the second time
+ brokerFacade.registerToListenNotification(listener);
+ assertEquals("Registration was not successful", true, listener.isListening());
+
+ // registrations should be invoked only once
+ verify(domNotification, times(1)).registerNotificationListener(listener, listener.getSchemaPath());
+
+ // close and remove test notification listener
+ listener.close();
+ Notificator.removeNotificationListenerIfNoSubscriberExists(listener);
}
}
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import java.io.FileNotFoundException;
+import java.net.URI;
import java.text.ParseException;
import java.util.Set;
import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.restconf.impl.InstanceIdentifierContext;
import org.opendaylight.netconf.sal.restconf.impl.NormalizedNodeContext;
import org.opendaylight.netconf.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaNode;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
final Set<Module> allModules = schemaContext.getModules();
assertNotNull(allModules);
- controllerContext = spy(ControllerContext.getInstance());
+ controllerContext = ControllerContext.getInstance();
controllerContext.setSchemas(schemaContext);
-
}
@Before
this.restconfImpl.invokeRpc("ietf-netconf", ctx, uriInfo);
verify(rpcService, times(2)).invokeRpc(any(SchemaPath.class), any(NormalizedNode.class));
}
+
+ /**
+ * Create notification stream for toaster module
+ */
+ @Test
+ public void createNotificationStreamTest() {
+ final NormalizedNodeContext payload = mock(NormalizedNodeContext.class);
+ final InstanceIdentifierContext iiCtx = mock(InstanceIdentifierContext.class);
+ doReturn(iiCtx).when(payload).getInstanceIdentifierContext();
+
+ final SchemaNode schemaNode = mock(SchemaNode.class,
+ Mockito.withSettings().extraInterfaces(RpcDefinition.class));
+ doReturn(schemaNode).when(iiCtx).getSchemaNode();
+ doReturn(mock(SchemaPath.class)).when(schemaNode).getPath();
+
+ doReturn(QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
+ "2014-01-14", "create-notification-stream")).when(schemaNode).getQName();
+ doReturn(null).when(iiCtx).getMountPoint();
+
+ final Set<DataContainerChild<?, ?>> children = Sets.newHashSet();
+ final DataContainerChild<?, ?> child = mock(DataContainerChild.class,
+ Mockito.withSettings().extraInterfaces(LeafSetNode.class));
+
+ final LeafSetEntryNode entryNode = mock(LeafSetEntryNode.class);
+ when(entryNode.getValue()).thenReturn("(http://netconfcentral.org/ns/toaster?revision=2009-11-20)toastDone");
+ when(((LeafSetNode) child).getValue()).thenReturn(Sets.newHashSet(entryNode));
+ children.add(child);
+
+ final NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class,
+ Mockito.withSettings().extraInterfaces(ContainerNode.class));
+ doReturn(normalizedNode).when(payload).getData();
+ doReturn(children).when(normalizedNode).getValue();
+
+ // register notification
+ final NormalizedNodeContext context = this.restconfImpl
+ .invokeRpc("sal-remote:create-notification-stream", payload, null);
+ assertNotNull(context);
+ }
+
+ /**
+ * Subscribe for notification stream of toaster module
+ */
+ @Test
+ public void subscribeToNotificationStreamTest() throws Exception {
+ final String identifier = "create-notification-stream/toaster:toastDone";
+
+ // register test notification stream
+ final SchemaPath path = SchemaPath.create(
+ true, QName.create("http://netconfcentral.org/ns/toaster", "2009-11-20", "toastDone"));
+ Notificator.createNotificationListener(Lists.newArrayList(path), identifier, "XML");
+
+ final UriInfo uriInfo = mock(UriInfo.class);
+ final UriBuilder uriBuilder = mock(UriBuilder.class);
+ when(uriBuilder.port(8181)).thenReturn(uriBuilder);
+ when(uriBuilder.replacePath(identifier)).thenReturn(uriBuilder);
+ when(uriBuilder.build()).thenReturn(new URI(""));
+ when(uriBuilder.scheme("ws")).thenReturn(uriBuilder);
+ when(uriInfo.getAbsolutePathBuilder()).thenReturn(uriBuilder);
+
+ final BrokerFacade brokerFacade = mock(BrokerFacade.class);
+ this.restconfImpl.setBroker(brokerFacade);
+
+ // subscribe to stream and verify response
+ final Response response = this.restconfImpl.subscribeToStream(identifier, uriInfo);
+ assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
+
+ // remove test notification stream
+ Notificator.removeAllListeners();
+ }
}