+ @Override
+ public ListenableFuture<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+ LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+ input.getNotificationPattern(),
+ input.getNodeIdPattern());
+
+ final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+ //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
+ final String nodeIdPattern = input.getNodeIdPattern().getValue();
+ final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
+
+ eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
+
+ final CreateTopicOutput cto = new CreateTopicOutputBuilder()
+ .setTopicId(eventSourceTopic.getTopicId())
+ .build();
+
+ LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
+ input.getNotificationPattern(),
+ input.getNodeIdPattern());
+
+ return Util.resultRpcSuccessFor(cto);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<DestroyTopicOutput>> destroyTopic(final DestroyTopicInput input) {
+ final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId());
+ if (topicToDestroy != null) {
+ topicToDestroy.close();
+ }
+ return Util.resultRpcSuccessFor(new DestroyTopicOutputBuilder().build());
+ }
+
+ @Override
+ public void close() {
+ aggregatorRpcReg.close();
+ eventSourceTopicMap.values().forEach(EventSourceTopic::close);
+ }
+
+ public void register(final EventSource eventSource) {
+ final NodeKey nodeKey = eventSource.getSourceNodeKey();
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+ final Registration reg = rpcRegistry.registerRpcImplementation(EventSourceService.class, eventSource,
+ Collections.singleton(sourcePath));
+ routedRpcRegistrations.put(nodeKey, reg);
+ insert(sourcePath);
+ }
+
+ public void unRegister(final EventSource eventSource) {
+ final NodeKey nodeKey = eventSource.getSourceNodeKey();
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+ final Registration removeRegistration = routedRpcRegistrations.remove(nodeKey);
+ if (removeRegistration != null) {
+ removeRegistration.close();
+ remove(sourcePath);
+ }
+ }
+
+ @Override
+ public <T extends EventSource> EventSourceRegistration<T> registerEventSource(final T eventSource) {
+ final EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+ register(eventSource);
+ return esr;
+ }
+
+ DataBroker getDataBroker() {
+ return dataBroker;
+ }
+
+ EventSourceService getEventSourceService() {
+ return eventSourceService;
+ }
+
+ @VisibleForTesting
+ Map<NodeKey, Registration> getRoutedRpcRegistrations() {
+ return routedRpcRegistrations;
+ }