private static final NodeIdentifier TOPIC_NOTIFICATION_ARG = NodeIdentifier.create(TopicNotification.QNAME);
private static final NodeIdentifier EVENT_SOURCE_ARG = NodeIdentifier.create(
- QName.create(TopicNotification.QNAME, "node-id"));
+ QName.create(TopicNotification.QNAME, "node-id"));
private static final NodeIdentifier TOPIC_ID_ARG = NodeIdentifier.create(
- QName.create(TopicNotification.QNAME, "topic-id"));
+ QName.create(TopicNotification.QNAME, "topic-id"));
private static final NodeIdentifier PAYLOAD_ARG = NodeIdentifier.create(
- QName.create(TopicNotification.QNAME, "payload"));
- private static final String ConnectionNotificationSourceName = "ConnectionNotificationSource";
+ QName.create(TopicNotification.QNAME, "payload"));
+ private static final String CONNECTION_NOTIFICATION_SOURCE_NAME = "ConnectionNotificationSource";
private final DOMNotificationPublishService domPublish;
private final Map<String, String> urnPrefixToStreamMap; // key = urnPrefix, value = StreamName
/**
- * Map notification uri -> registrations
+ * Map notification uri -> registrations.
*/
private final Multimap<String, NotificationTopicRegistration>
notificationTopicRegistrations = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());
private final NetconfEventSourceMount mount;
/**
- * Creates new NetconfEventSource for node. Topic notifications will be published via provided {@link DOMNotificationPublishService}
- * @param streamMap netconf streams from device
+ * Creates new NetconfEventSource for node. Topic notifications will be published via provided
+ * {@link DOMNotificationPublishService}
+ *
+ * @param streamMap netconf streams from device
* @param publishService publish service
*/
- public NetconfEventSource(final Map<String, String> streamMap, NetconfEventSourceMount mount, final DOMNotificationPublishService publishService) {
+ public NetconfEventSource(final Map<String, String> streamMap,
+ final NetconfEventSourceMount mount,
+ final DOMNotificationPublishService publishService) {
this.mount = mount;
this.urnPrefixToStreamMap = Preconditions.checkNotNull(streamMap);
this.domPublish = Preconditions.checkNotNull(publishService);
* {@link StreamNotificationTopicRegistration} for every prefix and available stream as defined in config file.
*/
private void initializeNotificationTopicRegistrationList() {
- final ConnectionNotificationTopicRegistration cntr = new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this);
+ final ConnectionNotificationTopicRegistration cntr =
+ new ConnectionNotificationTopicRegistration(CONNECTION_NOTIFICATION_SOURCE_NAME, this);
notificationTopicRegistrations
- .put(cntr.getNotificationUrnPrefix(), cntr);
+ .put(cntr.getNotificationUrnPrefix(), cntr);
Map<String, Stream> availableStreams = getAvailableStreams();
LOG.debug("Stream configuration compare...");
for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
if (availableStreams.containsKey(streamName)) {
LOG.debug("Stream containig on device");
notificationTopicRegistrations
- .put(urnPrefix, new StreamNotificationTopicRegistration(availableStreams.get(streamName), urnPrefix, this));
+ .put(urnPrefix, new StreamNotificationTopicRegistration(availableStreams.get(streamName),
+ urnPrefix, this));
}
}
}
return streamMap;
}
- @Override public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
+ @Override
+ public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), mount.getNodeId());
final NotificationPattern notificationPattern = input.getNotificationPattern();
final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
}
- @Override public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
+ @Override
+ public Future<RpcResult<Void>> disJoinTopic(DisJoinTopicInput input) {
for (NotificationTopicRegistration reg : notificationTopicRegistrations.values()) {
reg.unRegisterNotificationTopic(input.getTopicId());
}
return Util.resultRpcSuccessFor((Void) null);
}
- private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(final TopicId topicId,
- final List<SchemaPath> notificationsToSubscribe) {
+ private synchronized Future<RpcResult<JoinTopicOutput>> registerTopic(
+ final TopicId topicId,
+ final List<SchemaPath> notificationsToSubscribe) {
Preconditions.checkNotNull(notificationsToSubscribe);
LOG.debug("Join topic {} - register", topicId);
JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
}
}
- @Override public void onNotification(final DOMNotification notification) {
+ @Override
+ public void onNotification(final DOMNotification notification) {
SchemaPath notificationPath = notification.getType();
Date notificationEventTime = null;
if (notification instanceof DOMEvent) {
for (TopicId topicId : topicIdsForNotification) {
publishNotification(notification, topicId);
LOG.debug("Notification {} has been published for TopicId {}", notification.getType(),
- topicId.getValue());
+ topicId.getValue());
}
}
}
private void publishNotification(final DOMNotification notification, TopicId topicId) {
final ContainerNode topicNotification = Builders.containerBuilder().withNodeIdentifier(TOPIC_NOTIFICATION_ARG)
- .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
- .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, mount.getNodeId())).withChild(encapsulate(notification))
- .build();
+ .withChild(ImmutableNodes.leafNode(TOPIC_ID_ARG, topicId))
+ .withChild(ImmutableNodes.leafNode(EVENT_SOURCE_ARG, mount.getNodeId()))
+ .withChild(encapsulate(notification))
+ .build();
try {
domPublish.putNotification(new TopicDOMNotification(topicNotification));
} catch (final InterruptedException e) {
/**
* Returns all available notification paths that matches given pattern.
+ *
* @param notificationPattern pattern
* @return notification paths
*/
return Util.expandQname(availableNotifications, pattern);
}
- @Override public void close() throws Exception {
+ @Override
+ public void close() throws Exception {
for (NotificationTopicRegistration streamReg : notificationTopicRegistrations.values()) {
streamReg.close();
}
}
- @Override public NodeKey getSourceNodeKey() {
+ @Override
+ public NodeKey getSourceNodeKey() {
return mount.getNode().getKey();
}
- @Override public List<SchemaPath> getAvailableNotifications() {
+ @Override
+ public List<SchemaPath> getAvailableNotifications() {
final List<SchemaPath> availNotifList = new ArrayList<>();
// add Event Source Connection status notification
availNotifList.add(ConnectionNotificationTopicRegistration.EVENT_SOURCE_STATUS_PATH);
final Set<NotificationDefinition> availableNotifications = mount.getSchemaContext()
- .getNotifications();
+ .getNotifications();
// add all known notifications from netconf device
for (final NotificationDefinition nd : availableNotifications) {
availNotifList.add(nd.getPath());