X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fmessagebus-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fmessagebus%2Fapp%2Fimpl%2FEventSourceTopology.java;h=6140a78ba586a7beaa89a92ae0ab9cc5fb109016;hb=e1fc2efacc0b2da9075b6cd4e5ae149a141c486d;hp=076d1b2fc7e9c1d42fee6e0ece1d7c9adc4a3228;hpb=d4fbdfac9adbde8bd6649f596e083a0cf4037847;p=controller.git diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 076d1b2fc7..6140a78ba5 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -20,6 +20,10 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.messagebus.spi.EventSource; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; +import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; +import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; @@ -57,7 +61,8 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -public class EventSourceTopology implements EventAggregatorService, AutoCloseable { + +public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ; @@ -73,7 +78,9 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl .child(TopologyTypes.class) .augmentation(TopologyTypes1.class); - private final Map> registrations = + private final Map> topicListenerRegistrations = + new ConcurrentHashMap<>(); + private final Map> routedRpcRegistrations = new ConcurrentHashMap<>(); private final DataBroker dataBroker; @@ -91,7 +98,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build(); final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build(); putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment); - + LOG.info("EventSourceRegistry has been initialized"); } private void putData(final LogicalDatastoreType store, @@ -104,13 +111,24 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl } - private void insert(final KeyedInstanceIdentifier sourcePath, final Node node) { - final NodeKey nodeKey = node.getKey(); + private void deleteData(final LogicalDatastoreType store, final InstanceIdentifier path){ + final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + tx.delete(OPERATIONAL, path); + tx.submit(); + } + + private void insert(final KeyedInstanceIdentifier sourcePath) { + final NodeKey nodeKey = sourcePath.getKey(); final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build(); putData(OPERATIONAL, augmentPath, nodeAgument); } + private void remove(final KeyedInstanceIdentifier sourcePath){ + final InstanceIdentifier augmentPath = sourcePath.augmentation(Node1.class); + deleteData(OPERATIONAL, augmentPath); + } + private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){ final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); @@ -150,8 +168,8 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); final String nodeIdPattern = input.getNodeIdPattern().getValue(); - final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern)); - final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService); + final Pattern nodeIdPatternRegex = Pattern.compile(nodeIdPattern); + final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService); registerTopic(eventSourceTopic); @@ -161,7 +179,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl .setTopicId(eventSourceTopic.getTopicId()) .build(); - return Util.resultFor(cto); + return Util.resultRpcSuccessFor(cto); } @Override @@ -172,23 +190,49 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl @Override public void close() { aggregatorRpcReg.close(); + for(ListenerRegistration reg : topicListenerRegistrations.values()){ + reg.close(); + } } - public void registerTopic(final EventSourceTopic listener) { + private void registerTopic(final EventSourceTopic listener) { final ListenerRegistration listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH, listener, DataBroker.DataChangeScope.SUBTREE); - registrations.put(listener, listenerRegistration); + topicListenerRegistrations.put(listener, listenerRegistration); + } + + public void register(final EventSource eventSource){ + NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); + reg.registerPath(NodeContext.class, sourcePath); + routedRpcRegistrations.put(nodeKey,reg); + insert(sourcePath); + + for(EventSourceTopic est : topicListenerRegistrations.keySet()){ + est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey)); + } } - public void register(final Node node, final NetconfEventSource netconfEventSource) { - final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey()); - rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource) - .registerPath(NodeContext.class, sourcePath); - insert(sourcePath,node); - // FIXME: Return registration object. + public void unRegister(final EventSource eventSource){ + final NodeKey nodeKey = eventSource.getSourceNodeKey(); + final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); + final RoutedRpcRegistration removeRegistration = routedRpcRegistrations.remove(nodeKey); + if(removeRegistration != null){ + removeRegistration.close(); + remove(sourcePath); + } } + @Override + public EventSourceRegistration registerEventSource( + T eventSource) { + EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); + register(eventSource); + return esr; + } } +