+ this.initializeNotificationTopicRegistrationList();
+
+ LOG.info("NetconfEventSource [{}] created.", this.nodeId);
+ }
+
+ private void initializeNotificationTopicRegistrationList() {
+ notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this));
+ Optional<Map<String, Stream>> streamMap = getAvailableStreams();
+ if(streamMap.isPresent()){
+ LOG.debug("Stream configuration compare...");
+ for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) {
+ final String streamName = this.urnPrefixToStreamMap.get(urnPrefix);
+ LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName);
+ if(streamMap.get().containsKey(streamName)){
+ LOG.debug("Stream containig on device");
+ notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this));
+ }
+ }
+ }
+ }
+
+ private Optional<Map<String, Stream>> getAvailableStreams(){
+
+ Map<String,Stream> streamMap = null;
+ InstanceIdentifier<Streams> pathStream = InstanceIdentifier.builder(Netconf.class).child(Streams.class).build();
+ Optional<DataBroker> dataBroker = this.mountPoint.getService(DataBroker.class);
+
+ if(dataBroker.isPresent()){
+ LOG.debug("GET Available streams ...");
+ ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction();
+ CheckedFuture<Optional<Streams>, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream);
+
+ try {
+ Optional<Streams> streams = checkFeature.checkedGet();
+ if(streams.isPresent()){
+ streamMap = new HashMap<>();
+ for(Stream stream : streams.get().getStream()){
+ LOG.debug("*** find stream {}", stream.getName().getValue());
+ streamMap.put(stream.getName().getValue(), stream);
+ }
+ }
+ } catch (ReadFailedException e) {
+ LOG.warn("Can not read streams for node {}",this.nodeId);
+ }
+
+ } else {
+ LOG.warn("No databroker on node {}", this.nodeId);
+ }
+
+ return Optional.fromNullable(streamMap);