import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+
+public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
.child(TopologyTypes.class)
.augmentation(TopologyTypes1.class);
- private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
+ private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
new ConcurrentHashMap<>();
+ private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
+ new ConcurrentHashMap<>();;
private final DataBroker dataBroker;
private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
-
+ LOG.info("EventSourceRegistry has been initialized");
}
private <T extends DataObject> void putData(final LogicalDatastoreType store,
}
- private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
- final NodeKey nodeKey = node.getKey();
+ private <T extends DataObject> void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
+ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+ tx.delete(OPERATIONAL, path);
+ tx.submit();
+ }
+
+ private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+ final NodeKey nodeKey = sourcePath.getKey();
final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
putData(OPERATIONAL, augmentPath, nodeAgument);
}
+ private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath){
+ final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+ deleteData(OPERATIONAL, augmentPath);
+ }
+
private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
final String nodeIdPattern = input.getNodeIdPattern().getValue();
final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
- final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
+ final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
registerTopic(eventSourceTopic);
.setTopicId(eventSourceTopic.getTopicId())
.build();
- return Util.resultFor(cto);
+ return Util.resultRpcSuccessFor(cto);
}
@Override
@Override
public void close() {
aggregatorRpcReg.close();
+ for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
+ reg.close();
+ }
}
- public void registerTopic(final EventSourceTopic listener) {
+ private void registerTopic(final EventSourceTopic listener) {
final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
EVENT_SOURCE_TOPOLOGY_PATH,
listener,
DataBroker.DataChangeScope.SUBTREE);
- registrations.put(listener, listenerRegistration);
+ topicListenerRegistrations.put(listener, listenerRegistration);
+ }
+
+ public void register(final EventSource eventSource){
+ NodeKey nodeKey = eventSource.getSourceNodeKey();
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+ RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
+ reg.registerPath(NodeContext.class, sourcePath);
+ routedRpcRegistrations.put(nodeKey,reg);
+ insert(sourcePath);
}
- public void register(final Node node, final NetconfEventSource netconfEventSource) {
- final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
- rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
- .registerPath(NodeContext.class, sourcePath);
- insert(sourcePath,node);
- // FIXME: Return registration object.
+ public void unRegister(final EventSource eventSource){
+ final NodeKey nodeKey = eventSource.getSourceNodeKey();
+ final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+ final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
+ if(removeRegistration != null){
+ removeRegistration.close();
+ remove(sourcePath);
+ }
}
+ @Override
+ public <T extends EventSource> EventSourceRegistration<T> registerEventSource(
+ T eventSource) {
+ EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+ register(eventSource);
+ return esr;
+ }
}
+