import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
Optional<Map<String, Stream>> streamMap = getAvailableStreams();
if(streamMap.isPresent()){
+ LOG.debug("Stream configuration compare...");
for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+ LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
if(streamMap.get().containsKey(streamName)){
+ LOG.debug("Stream containig on device");
notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
}
}
Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
if(dataBroker.isPresent()){
-
+ LOG.debug("GET Available streams ...");
ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
if(streams.isPresent()){
streamMap = new HashMap<>();
for(Stream stream : streams.get().getStream()){
+ LOG.debug("*** find stream {}", stream.getName().getValue());
streamMap.put(stream.getName().getValue(), stream);
}
}
LOG.warn("Can not read streams for node {}",this.nodeId);
}
+ } else {
+ LOG.warn("No databroker on node {}", this.nodeId);
}
return Optional.fromNullable(streamMap);
@Override
public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
-
+ LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId);
final NotificationPattern notificationPattern = input.getNotificationPattern();
final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
return registerTopic(input.getTopicId(),matchingNotifications);
}
- private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+ @Override
+ public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
+ for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
+ reg.unRegisterNotificationTopic(input.getTopicId());
+ }
+ return Util.resultRpcSuccessFor((Void) null) ;
+ }
+ private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+ LOG.debug("Join topic {} - register");
JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){
+ LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() );
final Optional<DOMNotificationService> notifyService = getDOMMountPoint().getService(DOMNotificationService.class);
if(notifyService.isPresent()){
- int subscribedStreams = 0;
+ int registeredNotificationCount = 0;
for(SchemaPath schemaNotification : notificationsToSubscribe){
for(NotificationTopicRegistration reg : notificationTopicRegistrationList){
- LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
- reg.activateNotificationSource();
- boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
- if(regSuccess){
- subscribedStreams = subscribedStreams +1;
- }
+ LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName());
+ if(reg.checkNotificationPath(schemaNotification)){
+ LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() );
+ boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId);
+ if(regSuccess){
+ registeredNotificationCount = registeredNotificationCount +1;
+ }
+ }
}
}
- if(subscribedStreams > 0){
+ if(registeredNotificationCount > 0){
joinTopicStatus = JoinTopicStatus.Up;
}
+ } else {
+ LOG.warn("NO DOMNotification service on node {}", this.nodeId);
}
+ } else {
+ LOG.debug("Notifications to subscribe has NOT found");
}
final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
public void deActivateStreams(){
for (NotificationTopicRegistration reg : notificationTopicRegistrationList) {
LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId);
- reg.deActivateNotificationSource();
+ reg.deActivateNotificationSource();
}
}
@Override
public void onNotification(final DOMNotification notification) {
- LOG.info("Notification {} has been arrived...",notification.getType());
SchemaPath notificationPath = notification.getType();
Date notificationEventTime = null;
if(notification instanceof DOMEvent){
for(TopicId topicId : topicIdsForNotification){
publishNotification(notification, topicId);
- LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
+ LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue());
}
}