- public void registerDataChangeListener(DataChangeListener listener) {
- ListenerRegistration<DataChangeListener> listenerRegistration = dataStore.registerDataChangeListener(datastoreType,
- eventSourceTopologyPath,
+ @Override
+ public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+ LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+ input.getNotificationPattern(),
+ input.getNodeIdPattern());
+
+ final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+ final String nodeIdPattern = input.getNodeIdPattern().getValue();
+ final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
+ final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
+
+ registerTopic(eventSourceTopic);
+
+ notifyExistingNodes(nodeIdPatternRegex, eventSourceTopic);
+
+ final CreateTopicOutput cto = new CreateTopicOutputBuilder()
+ .setTopicId(eventSourceTopic.getTopicId())
+ .build();
+
+ return Util.resultFor(cto);
+ }
+
+ @Override
+ public Future<RpcResult<Void>> destroyTopic(final DestroyTopicInput input) {
+ return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented"));
+ }
+
+ @Override
+ public void close() {
+ aggregatorRpcReg.close();
+ }
+
+ public void registerTopic(final EventSourceTopic listener) {
+ final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
+ EVENT_SOURCE_TOPOLOGY_PATH,