import org.w3c.dom.Document;
import org.w3c.dom.Element;
+/**
+ * Topic registration on event-source-status-notification.
+ */
public class ConnectionNotificationTopicRegistration extends NotificationTopicRegistration {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionNotificationTopicRegistration.class);
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+/**
+ * NetconfEventSource serves as proxy between nodes and messagebus. Subscribers can join topic stream from this source.
+ * Then they will receive notifications that matches pattern specified by topic.
+ */
public class NetconfEventSource implements EventSource, DOMNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
private final List<NotificationTopicRegistration> notificationTopicRegistrationList = new ArrayList<>();
+ /**
+ * Creates new NetconfEventSource for node. Topic notifications will be published via provided {@link DOMNotificationPublishService}
+ * @param node node
+ * @param streamMap netconf streams from device
+ * @param netconfMount
+ * @param mountPoint
+ * @param publishService publish service
+ */
public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount,
final MountPoint mountPoint, final DOMNotificationPublishService publishService) {
this.netconfMount = Preconditions.checkNotNull(netconfMount);
}
}
+ /**
+ * Returns all available notification paths that matches given pattern.
+ * @param notificationPattern pattern
+ * @return notification paths
+ */
private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern) {
final String regex = notificationPattern.getValue();
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * NetconfEventSourceManager implements DataChangeListener. On topology changes, it manages creation,
+ * updating and removing registrations of event sources.
+ */
public final class NetconfEventSourceManager implements DataChangeListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSourceManager.class);
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Notification topic registration.
+ */
public abstract class NotificationTopicRegistration implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class);
return notificationUrnPrefix;
}
+ /**
+ * Checks, if notification is from namespace belonging to this registration.
+ * @param notificationPath path
+ * @return true, if notification belongs to registration namespace
+ */
public boolean checkNotificationPath(SchemaPath notificationPath) {
if (notificationPath == null) {
return false;
abstract void reActivateNotificationSource();
+ /**
+ * Registers associated event source notification to topic.
+ * @param notificationPath notification path
+ * @param topicId topic id
+ * @return true, if successful
+ */
abstract boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId);
+ /**
+ * Registers associated event source notification to topic.
+ * @param topicId topic id
+ * @return true, if successful
+ */
abstract void unRegisterNotificationTopic(TopicId topicId);
+ /**
+ * Returns registered topics for given path.
+ * @param notificationPath path
+ * @return topicIds
+ */
abstract ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath);
public boolean isReplaySupported() {
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Topic registration for notification stream.
+ */
public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+ /**
+ * Creates registration to notification stream.
+ * @param stream stream
+ * @param notificationPrefix notifications namespace
+ * @param netconfEventSource event source
+ */
public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
NetconfEventSource netconfEventSource) {
super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
}
+ /**
+ * Subscribes to notification stream associated with this registration.
+ */
void activateNotificationSource() {
if (isActive() == false) {
LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
}
}
+ /**
+ * Subscribes to notification stream associated with this registration. If replay is supported, notifications from last
+ * received event time will be requested.
+ */
void reActivateNotificationSource() {
if (isActive()) {
LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);