X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopology.java;h=10b9ec83cde9080a5eb652ea69b65936973b0f60;hb=9f1061c46af5220ad95d8d0b94411ba2fd832a50;hp=076d1b2fc7e9c1d42fee6e0ece1d7c9adc4a3228;hpb=95115ca49f3b16b936e0f6c88aedfc17cd0ee92c;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 076d1b2fc7..10b9ec83cd 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -20,6 +20,10 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.messagebus.spi.EventSource; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; @@ -57,7 +61,8 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -public class EventSourceTopology implements EventAggregatorService, AutoCloseable { + +public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ; @@ -73,8 +78,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl .child(TopologyTypes.class) .augmentation(TopologyTypes1.class); - private final Map> registrations = + private final Map> topicListenerRegistrations = new ConcurrentHashMap<>(); + private final Map> routedRpcRegistrations = + new ConcurrentHashMap<>();; private final DataBroker dataBroker; private final RpcRegistration aggregatorRpcReg; @@ -91,7 +98,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); - + LOG.info("EventSourceRegistry has been initialized"); } private void putData(final LogicalDatastoreType store, @@ -104,13 +111,24 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl } - private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) { - final NodeKey nodeKey = node.getKey(); + private void deleteData(final LogicalDatastoreType store, final InstanceIdentifier path){ + final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + tx.delete(OPERATIONAL, path); + tx.submit(); + } + + private void insert(final KeyedInstanceIdentifier sourcePath) { + final NodeKey nodeKey = sourcePath.getKey(); final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build(); putData(OPERATIONAL, augmentPath, nodeAgument); } + private void remove(final KeyedInstanceIdentifier sourcePath){ + final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); + deleteData(OPERATIONAL, augmentPath); + } + private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); @@ -151,7 +169,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); final String nodeIdPattern = input.getNodeIdPattern().getValue(); final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern)); - final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService); + final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService); registerTopic(eventSourceTopic); @@ -161,7 +179,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl .setTopicId(eventSourceTopic.getTopicId()) .build(); - return Util.resultFor(cto); + return Util.resultRpcSuccessFor(cto); } @Override @@ -172,23 +190,45 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl @Override public void close() { aggregatorRpcReg.close(); + for(ListenerRegistration reg : topicListenerRegistrations.values()){ + reg.close(); + } } - public void registerTopic(final EventSourceTopic listener) { + private void registerTopic(final EventSourceTopic listener) { final ListenerRegistration listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH, listener, DataBroker.DataChangeScope.SUBTREE); - registrations.put(listener, listenerRegistration); + topicListenerRegistrations.put(listener, listenerRegistration); + } + + public void register(final EventSource eventSource){ + NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); + reg.registerPath(NodeContext.class, sourcePath); + routedRpcRegistrations.put(nodeKey,reg); + insert(sourcePath); } - public void register(final Node node, final NetconfEventSource netconfEventSource) { - final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()); - rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource) - .registerPath(NodeContext.class, sourcePath); - insert(sourcePath,node); - // FIXME: Return registration object. + public void unRegister(final EventSource eventSource){ + final NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + final RoutedRpcRegistration removeRegistration = routedRpcRegistrations.remove(nodeKey); + if(removeRegistration != null){ + removeRegistration.close(); + remove(sourcePath); + } } + @Override + public EventSourceRegistration registerEventSource( + T eventSource) { + EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); + register(eventSource); + return esr; + } } +