http://www.eclipse.org/legal/epl-v10.html";
revision "2014-12-02" {
- description "first revision";
+ description "first revision
+ + add rpc dis-join-topic
+ + add notification event-source-status-notification";
}
// FIXME: expand this
}
}
+ rpc dis-join-topic {
+ input {
+ leaf node {
+ ext:context-reference "inv:node-context";
+ type "instance-identifier";
+ }
+ leaf topic-id {
+ type aggr:topic-id;
+ mandatory true;
+ description "identifier of topic to be disjoin";
+ }
+ }
+
+ }
+
notification event-source-status-notification {
description
package org.opendaylight.controller.messagebus.app.impl;
+import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
-public class EventSourceTopic implements DataChangeListener {
+public class EventSourceTopic implements DataChangeListener, AutoCloseable {
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
private final NotificationPattern notificationPattern;
private final EventSourceService sourceService;
- private final Pattern nodeIdPattern;
+ private final Pattern nodeIdRegexPattern;
private final TopicId topicId;
+ private ListenerRegistration<DataChangeListener> listenerRegistration;
+ private final CopyOnWriteArraySet<InstanceIdentifier<?>> joinedEventSources = new CopyOnWriteArraySet<>();
- public EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdPattern, final EventSourceService eventSource) {
- this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
- this.sourceService = eventSource;
-
- // FIXME: regex should be the language of nodeIdPattern
- final String regex = Util.wildcardToRegex(nodeIdPattern);
- this.nodeIdPattern = Pattern.compile(regex);
+ public static EventSourceTopic create(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology){
+ EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService());
+ est.registerListner(eventSourceTopology);
+ est.notifyExistingNodes(eventSourceTopology);
+ return est;
+ }
- this.topicId = new TopicId(Util.getUUIDIdent());
+ private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceService sourceService) {
+ this.notificationPattern = Preconditions.checkNotNull(notificationPattern);
+ this.sourceService = Preconditions.checkNotNull(sourceService);
+ this.nodeIdRegexPattern = Pattern.compile(nodeIdRegexPattern);
+ this.topicId = new TopicId(getUUIDIdent());
+ this.listenerRegistration = null;
+ LOG.info("EventSourceTopic created - topicId {}", topicId.getValue());
}
public TopicId getTopicId() {
@Override
public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
- for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
+
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> createdEntry : event.getCreatedData().entrySet()) {
+ if (createdEntry.getValue() instanceof Node) {
+ final Node node = (Node) createdEntry.getValue();
+ LOG.debug("Create node...");
+ if (this.nodeIdRegexPattern.matcher(node.getNodeId().getValue()).matches()) {
+ LOG.debug("Matched...");
+ notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+ }
+ }
+ }
+
+ for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
if (changeEntry.getValue() instanceof Node) {
final Node node = (Node) changeEntry.getValue();
- if (getNodeIdRegexPattern().matcher(node.getId().getValue()).matches()) {
- notifyNode(changeEntry.getKey());
+ LOG.debug("Update node...");
+ if (this.nodeIdRegexPattern.matcher(node.getNodeId().getValue()).matches()) {
+ LOG.debug("Matched...");
+ notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
}
}
}
}
public void notifyNode(final InstanceIdentifier<?> nodeId) {
-
+ LOG.debug("Notify node: {}", nodeId);
try {
RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
if(rpcResultJoinTopic.isSuccessful() == false){
for(RpcError err : rpcResultJoinTopic.getErrors()){
LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
}
+ } else {
+ joinedEventSources.add(nodeId);
}
} catch (final Exception e) {
LOG.error("Could not invoke join topic for node {}", nodeId);
}
}
+ private void notifyExistingNodes(final EventSourceTopology eventSourceTopology){
+ LOG.debug("Notify existing nodes");
+ final Pattern nodeRegex = this.nodeIdRegexPattern;
+
+ final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction();
+ final CheckedFuture<Optional<Topology>, ReadFailedException> future =
+ tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
+
+ Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
+
+ @Override
+ public void onSuccess(Optional<Topology> data) {
+ if(data.isPresent()) {
+ final List<Node> nodes = data.get().getNode();
+ if(nodes != null){
+ for (final Node node : nodes) {
+ if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) {
+ notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
+ }
+ }
+ }
+ }
+ tx.close();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Can not notify existing nodes", t);
+ tx.close();
+ }
+
+ });
+
+ }
+
private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier<?> path) {
final NodeRef nodeRef = new NodeRef(path);
final JoinTopicInput jti =
return jti;
}
- public Pattern getNodeIdRegexPattern() {
- return nodeIdPattern;
+ private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier<?> eventSourceNodeId){
+ final NodeRef nodeRef = new NodeRef(eventSourceNodeId);
+ DisJoinTopicInput dji = new DisJoinTopicInputBuilder()
+ .setNode(nodeRef.getValue())
+ .setTopicId(topicId)
+ .build();
+ return dji;
+ }
+
+ private void registerListner(final EventSourceTopology eventSourceTopology) {
+ this.listenerRegistration =
+ eventSourceTopology.getDataBroker().registerDataChangeListener(
+ LogicalDatastoreType.OPERATIONAL,
+ EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH,
+ this,
+ DataBroker.DataChangeScope.SUBTREE);
}
+ @Override
+ public void close() {
+ if(this.listenerRegistration != null){
+ this.listenerRegistration.close();
+ }
+ for(InstanceIdentifier<?> eventSourceNodeId : joinedEventSources){
+ try {
+ RpcResult<Void> result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
+ if(result.isSuccessful() == false){
+ for(RpcError err : result.getErrors()){
+ LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString());
+ }
+ }
+ } catch (InterruptedException | ExecutionException ex) {
+ LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex);
+ }
+ }
+ joinedEventSources.clear();
+ }
+
+ private static String getUUIDIdent(){
+ UUID uuid = UUID.randomUUID();
+ return uuid.toString();
+ }
}
package org.opendaylight.controller.messagebus.app.impl;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
-import java.util.regex.Pattern;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
-import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.messagebus.spi.EventSource;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID));
private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL;
- private static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
+ static final InstanceIdentifier<Topology> EVENT_SOURCE_TOPOLOGY_PATH =
InstanceIdentifier.create(NetworkTopology.class)
.child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY);
.child(TopologyTypes.class)
.augmentation(TopologyTypes1.class);
- private final Map<EventSourceTopic, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
- new ConcurrentHashMap<>();
+ private final Map<TopicId,EventSourceTopic> eventSourceTopicMap = new ConcurrentHashMap<>();
private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
new ConcurrentHashMap<>();
final InstanceIdentifier<T> path,
final T data){
- final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+ final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
tx.put(store, path, data, true);
- tx.submit();
+ Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
+
+ @Override
+ public void onSuccess(Void result) {
+ LOG.trace("Data has put into datastore {} {}", store, path);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
+ }
+ });
}
private <T extends DataObject> void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
- final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+ final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
tx.delete(OPERATIONAL, path);
tx.submit();
+ Futures.addCallback( tx.submit(), new FutureCallback<Void>(){
+
+ @Override
+ public void onSuccess(Void result) {
+ LOG.trace("Data has deleted from datastore {} {}", store, path);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, t);
+ }
+
+ });
}
private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
deleteData(OPERATIONAL, augmentPath);
}
- private void notifyExistingNodes(final EventSourceTopic eventSourceTopic){
-
- final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
-
- final CheckedFuture<Optional<Topology>, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH);
-
- Futures.addCallback(future, new FutureCallback<Optional<Topology>>(){
-
- @Override
- public void onSuccess(Optional<Topology> data) {
- if(data.isPresent()) {
- LOG.info("Topology data are present...");
- final List<Node> nodes = data.get().getNode();
- if(nodes != null){
- LOG.info("List of nodes is not null...");
- final Pattern nodeIdPatternRegex = eventSourceTopic.getNodeIdRegexPattern();
- for (final Node node : nodes) {
- if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) {
- eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()));
- }
- }
- } else {
- LOG.info("List of nodes is NULL...");
- }
- }
- tx.close();
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Can not notify existing nodes {}", t);
- tx.close();
- }
-
- });
-
- }
@Override
public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
- LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+ LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
input.getNotificationPattern(),
input.getNodeIdPattern());
final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
- final String nodeIdPattern = input.getNodeIdPattern().getValue();
- final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
+ //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
+ final String nodeIdRegexPattern = Util.wildcardToRegex(input.getNodeIdPattern().getValue());
+ final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdRegexPattern, this);
- registerTopic(eventSourceTopic);
-
- notifyExistingNodes(eventSourceTopic);
+ eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
final CreateTopicOutput cto = new CreateTopicOutputBuilder()
.setTopicId(eventSourceTopic.getTopicId())
.build();
+ LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
+ input.getNotificationPattern(),
+ input.getNodeIdPattern());
+
return Util.resultRpcSuccessFor(cto);
}
@Override
public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
- return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
+ EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
+ if(topicToDestroy != null){
+ topicToDestroy.close();
+ }
+ return Util.resultRpcSuccessFor((Void) null);
}
@Override
public void close() {
aggregatorRpcReg.close();
- for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
- reg.close();
+ for(EventSourceTopic est : eventSourceTopicMap.values()){
+ est.close();
}
}
- private void registerTopic(final EventSourceTopic listener) {
- final ListenerRegistration<DataChangeListener> listenerRegistration =
- dataBroker.registerDataChangeListener(
- OPERATIONAL,
- EVENT_SOURCE_TOPOLOGY_PATH,
- listener,
- DataBroker.DataChangeScope.SUBTREE);
-
- topicListenerRegistrations.put(listener, listenerRegistration);
- }
-
public void register(final EventSource eventSource){
+
NodeKey nodeKey = eventSource.getSourceNodeKey();
final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
routedRpcRegistrations.put(nodeKey,reg);
insert(sourcePath);
- for(EventSourceTopic est : topicListenerRegistrations.keySet()){
- if(est.getNodeIdRegexPattern().matcher(nodeKey.getNodeId().getValue()).matches()){
- est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey));
- }
- }
}
public void unRegister(final EventSource eventSource){
register(eventSource);
return esr;
}
+
+ DataBroker getDataBroker() {
+ return dataBroker;
+ }
+
+ EventSourceService getEventSourceService() {
+ return eventSourceService;
+ }
}
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
public final class Util {
- public static String getUUIDIdent(){
- UUID uuid = UUID.randomUUID();
- return uuid.toString();
- }
-
public static <T> Future<RpcResult<T>> resultRpcSuccessFor(final T output) {
final RpcResult<T> result = RpcResultBuilder.success(output).build();
return Futures.immediateFuture(result);
*/
package org.opendaylight.controller.messagebus.eventsources.netconf;
-import java.net.URI;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.xml.parsers.DocumentBuilder;
public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) {
super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString());
this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener);
- LOG.info("Connection notification source has been initialized...");
+ LOG.info("Connection notification source has been initialized.");
setActive(true);
setReplaySupported(false);
}
@Override
public void close() throws Exception {
- LOG.info("Connection notification - publish Deactive");
- publishNotification(EventSourceStatus.Deactive);
- notificationTopicMap.clear();
- setActive(false);
+ if(isActive()){
+ LOG.debug("Connection notification - publish Deactive");
+ publishNotification(EventSourceStatus.Deactive);
+ notificationTopicMap.clear();
+ setActive(false);
+ }
}
@Override
void activateNotificationSource() {
- LOG.info("Connection notification - publish Active");
+ LOG.debug("Connection notification - publish Active");
publishNotification(EventSourceStatus.Active);
}
@Override
void deActivateNotificationSource() {
- LOG.info("Connection notification - publish Inactive");
+ LOG.debug("Connection notification - publish Inactive");
publishNotification(EventSourceStatus.Inactive);
}
@Override
void reActivateNotificationSource() {
- LOG.info("Connection notification - reactivate - publish active");
+ LOG.debug("Connection notification - reactivate - publish active");
publishNotification(EventSourceStatus.Active);
}
@Override
boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
- if(validateNotifactionSchemaPath(notificationPath) == false){
+ if(checkNotificationPath(notificationPath) == false){
LOG.debug("Bad SchemaPath for notification try to register");
return false;
}
}
@Override
- void unRegisterNotificationTopic(TopicId topicId) {
- // TODO: need code when EventAggregator.destroyTopic will be implemented
- }
-
- private boolean validateNotifactionSchemaPath(SchemaPath notificationPath){
- if(notificationPath == null){
- return false;
+ synchronized void unRegisterNotificationTopic(TopicId topicId) {
+ List<SchemaPath> notificationPathToRemove = new ArrayList<>();
+ for(SchemaPath notifKey : notificationTopicMap.keySet()){
+ ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+ if(topicList != null){
+ topicList.remove(topicId);
+ if(topicList.isEmpty()){
+ notificationPathToRemove.add(notifKey);
+ }
+ }
+ }
+ for(SchemaPath notifKey : notificationPathToRemove){
+ notificationTopicMap.remove(notifKey);
}
- URI notificationNameSpace = notificationPath.getLastComponent().getNamespace();
- return getNotificationUrnPrefix().startsWith(notificationNameSpace.toString());
}
private void publishNotification(EventSourceStatus eventSourceStatus){
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
Optional<Map<String, Stream>> streamMap = getAvailableStreams();
if(streamMap.isPresent()){
+ LOG.debug("Stream configuration compare...");
for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+ LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
if(streamMap.get().containsKey(streamName)){
+ LOG.debug("Stream containig on device");
notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
}
}
Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
if(dataBroker.isPresent()){
-
+ LOG.debug("GET Available streams ...");
ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
if(streams.isPresent()){
streamMap = new HashMap<>();
for(Stream stream : streams.get().getStream()){
+ LOG.debug("*** find stream {}", stream.getName().getValue());
streamMap.put(stream.getName().getValue(), stream);
}
}
LOG.warn("Can not read streams for node {}",this.nodeId);
}
+ } else {
+ LOG.warn("No databroker on node {}", this.nodeId);
}
return Optional.fromNullable(streamMap);
@Override
public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
-
+ LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
final NotificationPattern notificationPattern = input.getNotificationPattern();
final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
return registerTopic(input.getTopicId(),matchingNotifications);
}
- private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+ @Override
+ public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
+ for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
+ reg.unRegisterNotificationTopic(input.getTopicId());
+ }
+ return Util.resultRpcSuccessFor((Void) null) ;
+ }
+ private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+ LOG.debug("Join topic {} - register");
JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){
+ LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() );
final Optional<DOMNotificationService> notifyService = getDOMMountPoint().getService(DOMNotificationService.class);
if(notifyService.isPresent()){
- int subscribedStreams = 0;
+ int registeredNotificationCount = 0;
for(SchemaPath schemaNotification : notificationsToSubscribe){
for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
- LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
- reg.activateNotificationSource();
- boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
- if(regSuccess){
- subscribedStreams = subscribedStreams +1;
- }
+ LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName());
+ if(reg.checkNotificationPath(schemaNotification)){
+ LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
+ boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
+ if(regSuccess){
+ registeredNotificationCount = registeredNotificationCount +1;
+ }
+ }
}
}
- if(subscribedStreams > 0){
+ if(registeredNotificationCount > 0){
joinTopicStatus = JoinTopicStatus.Up;
}
+ } else {
+ LOG.warn("NO DOMNotification service on node {}", this.nodeId);
}
+ } else {
+ LOG.debug("Notifications to subscribe has NOT found");
}
final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
public void deActivateStreams(){
for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
- reg.deActivateNotificationSource();
+ reg.deActivateNotificationSource();
}
}
@Override
public void onNotification(final DOMNotification notification) {
- LOG.info("Notification {} has been arrived...",notification.getType());
SchemaPath notificationPath = notification.getType();
Date notificationEventTime = null;
if(notification instanceof DOMEvent){
for(TopicId topicId : topicIdsForNotification){
publishNotification(notification, topicId);
- LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
+ LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
}
}
@Override
public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
- LOG.info("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
+ LOG.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
if (changeEntry.getValue() instanceof Node) {
nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue());
}
NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager);
nesr.updateStatus();
- LOG.info("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue());
+ LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue());
return nesr;
}
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class NotificationTopicRegistration implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class);
+
public enum NotificationSourceType{
NetconfDeviceStream,
ConnectionStatusChange;
return notificationUrnPrefix;
}
+ public boolean checkNotificationPath(SchemaPath notificationPath){
+ if(notificationPath == null){
+ return false;
+ }
+ String nameSpace = notificationPath.getLastComponent().toString();
+ LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace, getNotificationUrnPrefix());
+ return nameSpace.startsWith(getNotificationUrnPrefix());
+ }
abstract void activateNotificationSource();
abstract void deActivateNotificationSource();
import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
this.lastEventTime= null;
setReplaySupported(this.stream.isReplaySupport());
setActive(false);
+ LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
}
void activateNotificationSource() {
@Override
boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){
- if(validateNotificationPath(notificationPath) == false){
+
+ if(checkNotificationPath(notificationPath) == false){
LOG.debug("Bad SchemaPath for notification try to register");
return false;
}
+
final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
if(notifyService.isPresent() == false){
LOG.debug("DOMNotificationService is not present");
return false;
}
- ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
+
+ activateNotificationSource();
+ if(isActive() == false){
+ LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString());
+ return false;
+ }
+
+ ListenerRegistration<NetconfEventSource> registration =
+ notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath);
notificationRegistrationMap.put(notificationPath, registration);
ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
if(topicIds == null){
topicIds.add(topicId);
}
}
+
notificationTopicMap.put(notificationPath, topicIds);
return true;
}
- private boolean validateNotificationPath(SchemaPath notificationPath){
- if(notificationPath == null){
- return false;
+ @Override
+ synchronized void unRegisterNotificationTopic(TopicId topicId) {
+ List<SchemaPath> notificationPathToRemove = new ArrayList<>();
+ for(SchemaPath notifKey : notificationTopicMap.keySet()){
+ ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+ if(topicList != null){
+ topicList.remove(topicId);
+ if(topicList.isEmpty()){
+ notificationPathToRemove.add(notifKey);
+ }
+ }
+ }
+ for(SchemaPath notifKey : notificationPathToRemove){
+ notificationTopicMap.remove(notifKey);
+ ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
+ if(reg != null){
+ reg.close();
+ }
}
- String nameSpace = notificationPath.getLastComponent().toString();
- return nameSpace.startsWith(getNotificationUrnPrefix());
}
Optional<Date> getLastEventTime() {
closeStream();
}
- @Override
- void unRegisterNotificationTopic(TopicId topicId) {
- // TODO: use it when destroy topic will be implemented
- }
-
}
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.osgi.framework.BundleContext;
+import com.google.common.util.concurrent.CheckedFuture;
+
import javax.management.ObjectName;
import static org.junit.Assert.assertEquals;
WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class), eq(true));
-
+ CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureMock).when(writeTransactionMock).submit();
assertNotNull("EventSourceRegistryWrapper has not been created correctly.", messageBusAppImplModule.createInstance());
}
import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import com.google.common.util.concurrent.CheckedFuture;
+
public class EventSourceTopicTest {
EventSourceTopic eventSourceTopic;
- org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node dataObjectMock;
+ Node dataObjectNodeMock;
NodeId nodeIdMock;
+ DataBroker dataBrokerMock;
EventSourceService eventSourceServiceMock;
+ EventSourceTopology eventSourceTopologyMock;
@BeforeClass
public static void initTestClass() throws IllegalAccessException, InstantiationException {
public void setUp() throws Exception {
NotificationPattern notificationPattern = new NotificationPattern("value1");
eventSourceServiceMock = mock(EventSourceService.class);
- eventSourceTopic = new EventSourceTopic(notificationPattern, "nodeIdPattern1", eventSourceServiceMock);
+ eventSourceTopologyMock = mock(EventSourceTopology.class);
+ dataBrokerMock = mock(DataBroker.class);
+ doReturn(eventSourceServiceMock).when(eventSourceTopologyMock).getEventSourceService();
+ doReturn(dataBrokerMock).when(eventSourceTopologyMock).getDataBroker();
+
+ WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
+ doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
+ doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class),eq(true));
+ CheckedFuture checkedFutureWriteMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureWriteMock).when(writeTransactionMock).submit();
+
+ ReadOnlyTransaction readOnlyTransactionMock = mock(ReadOnlyTransaction.class);
+ doReturn(readOnlyTransactionMock).when(dataBrokerMock).newReadOnlyTransaction();
+ CheckedFuture checkedFutureReadMock = mock(CheckedFuture.class);
+ doReturn(checkedFutureReadMock).when(readOnlyTransactionMock).read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH);
+ eventSourceTopic = EventSourceTopic.create(notificationPattern, "nodeIdPattern1", eventSourceTopologyMock);
}
@Test
AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
onDataChangedTestHelper(asyncDataChangeEventMock);
eventSourceTopic.onDataChanged(asyncDataChangeEventMock);
- verify(dataObjectMock, times(1)).getId();
- verify(nodeIdMock, times(1)).getValue();
+ verify(dataObjectNodeMock, times(2)).getNodeId();
+ verify(nodeIdMock, times(2)).getValue();
}
private void onDataChangedTestHelper(AsyncDataChangeEvent asyncDataChangeEventMock){
Map<InstanceIdentifier<?>, DataObject> map = new HashMap<>();
InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
- dataObjectMock = mock(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class);
- map.put(instanceIdentifierMock, dataObjectMock);
+ dataObjectNodeMock = mock(Node.class);
+ doReturn(getNodeKey("testNodeId01")).when(dataObjectNodeMock).getKey();
+ map.put(instanceIdentifierMock, dataObjectNodeMock);
doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
-
+ doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
nodeIdMock = mock(NodeId.class);
- doReturn(nodeIdMock).when(dataObjectMock).getId();
+ doReturn(nodeIdMock).when(dataObjectNodeMock).getNodeId();
doReturn("nodeIdPattern1").when(nodeIdMock).getValue();
}
verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class));
}
+ public NodeKey getNodeKey(String nodeId){
+ return new NodeKey(new NodeId(nodeId));
+ }
}
\ No newline at end of file
import java.util.Map;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext;
import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
NodeKey nodeKey;
RpcRegistration<EventAggregatorService> aggregatorRpcReg;
- @BeforeClass
- public static void initTestClass() throws IllegalAccessException, InstantiationException {
- }
-
@Before
public void setUp() throws Exception {
dataBrokerMock = mock(DataBroker.class);
assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
}
+ @Test
+ public void destroyTopicTest() throws Exception{
+ topicTestHelper();
+ TopicId topicId = new TopicId("topic-id-007");
+ Map<TopicId,EventSourceTopic> localMap = getEventSourceTopicMap();
+ EventSourceTopic eventSourceTopicMock = mock(EventSourceTopic.class);
+ localMap.put(topicId, eventSourceTopicMock);
+ DestroyTopicInput input = new DestroyTopicInputBuilder().setTopicId(topicId).build();
+ eventSourceTopology.destroyTopic(input);
+ verify(eventSourceTopicMock, times(1)).close();
+ }
+
private void topicTestHelper() throws Exception{
constructorTestHelper();
createTopicInputMock = mock(CreateTopicInput.class);
doReturn(nodeId).when(nodeMock).getNodeId();
}
- @Test
- public void destroyTopicTest() throws Exception{
- topicTestHelper();
- //TODO: modify test when destroyTopic will be implemented
- DestroyTopicInput destroyTopicInput = null;
- assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
- }
-
@Test
public void closeTest() throws Exception{
constructorTestHelper();
topicTestHelper();
- Map<DataChangeListener, ListenerRegistration<DataChangeListener>> localMap = getTopicListenerRegistrations();
- DataChangeListener dataChangeListenerMock = mock(DataChangeListener.class);
- ListenerRegistration<DataChangeListener> listenerListenerRegistrationMock = (ListenerRegistration<DataChangeListener>) mock(ListenerRegistration.class);
- localMap.put(dataChangeListenerMock, listenerListenerRegistrationMock);
+ Map<TopicId,EventSourceTopic> localMap = getEventSourceTopicMap();
+ TopicId topicIdMock = mock(TopicId.class);
+ EventSourceTopic eventSourceTopicMock = mock(EventSourceTopic.class);
+ localMap.put(topicIdMock, eventSourceTopicMock);
eventSourceTopology.close();
verify(aggregatorRpcReg, times(1)).close();
- verify(listenerListenerRegistrationMock, times(1)).close();
+ verify(eventSourceTopicMock, times(1)).close();
}
@Test
assertNotNull("Return value has not been created correctly.", eventSourceTopology.registerEventSource(eventSourceMock));
}
- private Map getTopicListenerRegistrations() throws Exception{
- Field nesField = EventSourceTopology.class.getDeclaredField("topicListenerRegistrations");
+ private Map getEventSourceTopicMap() throws Exception{
+ Field nesField = EventSourceTopology.class.getDeclaredField("eventSourceTopicMap");
nesField.setAccessible(true);
return (Map) nesField.get(eventSourceTopology);
}