+ private final DataBroker dataBroker;
+ private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
+ private final EventSourceService eventSourceService;
+ private final RpcProviderRegistry rpcRegistry;
+
+ public EventSourceTopology(final DataBroker dataBroker, final RpcProviderRegistry rpcRegistry) {
+
+ this.dataBroker = dataBroker;
+ this.rpcRegistry = rpcRegistry;
+ aggregatorRpcReg = rpcRegistry.addRpcImplementation(EventAggregatorService.class, this);
+ eventSourceService = rpcRegistry.getRpcService(EventSourceService.class);
+
+ final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
+ final TopologyTypes1 topologyTypeAugment =
+ new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
+ putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
+ LOG.info("EventSourceRegistry has been initialized");
+ }
+
+ private <T extends DataObject> void putData(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path,
+ final T data) {
+
+ final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
+ tx.put(store, path, data, true);
+ Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.trace("Data has put into datastore {} {}", store, path);
+ }
+
+ @Override
+ public void onFailure(final Throwable ex) {
+ LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ private <T extends DataObject> void deleteData(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path) {
+ final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction();
+ tx.delete(OPERATIONAL, path);
+ Futures.addCallback(tx.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.trace("Data has deleted from datastore {} {}", store, path);
+ }
+
+ @Override
+ public void onFailure(final Throwable ex) {
+ LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, ex);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
+ private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+ final NodeKey nodeKey = sourcePath.getKey();
+ final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+ final Node1 nodeAgument = new Node1Builder().setEventSourceNode(
+ new NodeId(nodeKey.getNodeId().getValue())).build();
+ putData(OPERATIONAL, augmentPath, nodeAgument);
+ }
+
+ private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+ final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+ deleteData(OPERATIONAL, augmentPath);
+ }
+
+ @Override
+ public Future<RpcResult<CreateTopicOutput>> createTopic(final CreateTopicInput input) {
+ LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}",
+ input.getNotificationPattern(),
+ input.getNodeIdPattern());
+
+ final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
+ //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex
+ final String nodeIdPattern = input.getNodeIdPattern().getValue();
+ final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this);
+
+ eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic);
+
+ final CreateTopicOutput cto = new CreateTopicOutputBuilder()
+ .setTopicId(eventSourceTopic.getTopicId())
+ .build();
+
+ LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}",
+ input.getNotificationPattern(),
+ input.getNodeIdPattern());