+ private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier<?> eventSourceNodeId){
+ final NodeRef nodeRef = new NodeRef(eventSourceNodeId);
+ final DisJoinTopicInput dji = new DisJoinTopicInputBuilder()
+ .setNode(nodeRef.getValue())
+ .setTopicId(topicId)
+ .build();
+ return dji;
+ }
+
+ private void registerListner(final EventSourceTopology eventSourceTopology) {
+ this.listenerRegistration =
+ eventSourceTopology.getDataBroker().registerDataChangeListener(
+ LogicalDatastoreType.OPERATIONAL,
+ EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH,
+ this,
+ DataBroker.DataChangeScope.SUBTREE);
+ }
+
+ @Override
+ public void close() {
+ if(this.listenerRegistration != null){
+ this.listenerRegistration.close();
+ }
+ for(final InstanceIdentifier<?> eventSourceNodeId : joinedEventSources){
+ try {
+ final RpcResult<Void> result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
+ if(result.isSuccessful() == false){
+ for(final RpcError err : result.getErrors()){
+ LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString());
+ }
+ }
+ } catch (InterruptedException | ExecutionException ex) {
+ LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex);
+ }
+ }
+ joinedEventSources.clear();
+ }
+
+ private static String getUUIDIdent(){
+ final UUID uuid = UUID.randomUUID();
+ return uuid.toString();
+ }