+ private class StreamNotificationTopicRegistration{
+
+ final private String streamName;
+ final private DOMMountPoint netconfMount;
+ final private String nodeId;
+ final private NetconfEventSource notificationListener;
+ private boolean active;
+
+ private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+ public StreamNotificationTopicRegistration(final String streamName, final String nodeId, final DOMMountPoint netconfMount, NetconfEventSource notificationListener) {
+ this.streamName = streamName;
+ this.netconfMount = netconfMount;
+ this.nodeId = nodeId;
+ this.notificationListener = notificationListener;
+ this.active = false;
+ }
+
+ public boolean isActive() {
+ return active;
+ }
+
+ public void reActivateStream(){
+ if(this.isActive()){
+ LOG.info("Stream {} is reactivated active on node {}.", this.streamName, this.nodeId);
+ final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+ .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName))
+ .build();
+ netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+ }
+ }
+
+ public void activateStream() {
+ if(this.isActive() == false){
+ LOG.info("Stream {} is not active on node {}. Will subscribe.", this.streamName, this.nodeId);
+ final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+ .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.streamName))
+ .build();
+ netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+ this.active = true;
+ } else {
+ LOG.info("Stream {} is now active on node {}", this.streamName, this.nodeId);
+ }
+ }
+
+ public void deactivateStream() {
+ for(ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()){
+ reg.close();
+ }
+ this.active = false;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath){
+ return notificationTopicMap.get(notificationPath);
+ }
+
+ public void registerNotificationListenerTopic(SchemaPath notificationPath, TopicId topicId){
+ final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
+ if(notificationPath != null && notifyService.isPresent()){
+ ListenerRegistration<NetconfEventSource> registration = notifyService.get().registerNotificationListener(this.notificationListener,notificationPath);
+ notificationRegistrationMap.put(notificationPath, registration);
+ ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
+ if(topicIds == null){
+ topicIds = new ArrayList<>();
+ topicIds.add(topicId);
+ } else {
+ if(topicIds.contains(topicId) == false){
+ topicIds.add(topicId);
+ }
+ }
+ notificationTopicMap.put(notificationPath, topicIds);
+ }
+ }
+
+ }