+ @Override
+ public void close() {
+ if (this.listenerRegistration != null) {
+ this.listenerRegistration.close();
+ }
+ for (final InstanceIdentifier<?> eventSourceNodeId : joinedEventSources) {
+ try {
+ final RpcResult<DisJoinTopicOutput> result = sourceService
+ .disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get();
+ if (result.isSuccessful() == false) {
+ for (final RpcError err : result.getErrors()) {
+ LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}", getTopicId().getValue(),
+ eventSourceNodeId, err.toString());
+ }
+ }
+ } catch (InterruptedException | ExecutionException ex) {
+ LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(),
+ eventSourceNodeId, ex);
+ }
+ }
+ joinedEventSources.clear();
+ }
+
+ private static String getUUIDIdent() {
+ final UUID uuid = UUID.randomUUID();
+ return uuid.toString();
+ }