From: Marian Adamjak Date: Tue, 12 May 2015 07:15:21 +0000 (+0200) Subject: BUG 3121 - destroy topic implementation X-Git-Tag: release/beryllium~533 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=cce450550bec259d4f925389bafd007676f2186f BUG 3121 - destroy topic implementation - implementing destroyTopic rpc in EventSourceTopology - add rpc dis-join-topic in event-sourec.yang and implement it - fixed bugs identified during test with real device - revision of code and cleaning up - rebased Change-Id: I0f43ef2ca1f54db5a08c379792a59c3894a77767 Signed-off-by: Marian Adamjak (cherry picked from commit d039ff1e73202fa815e72db49d63ded711386f07) --- diff --git a/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang b/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang index c56243b3fa..c90b26696b 100644 --- a/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang +++ b/opendaylight/md-sal/messagebus-api/src/main/yang/event-source.yang @@ -24,7 +24,9 @@ module event-source { http://www.eclipse.org/legal/epl-v10.html"; revision "2014-12-02" { - description "first revision"; + description "first revision + + add rpc dis-join-topic + + add notification event-source-status-notification"; } // FIXME: expand this @@ -84,6 +86,21 @@ module event-source { } } + rpc dis-join-topic { + input { + leaf node { + ext:context-reference "inv:node-context"; + type "instance-identifier"; + } + leaf topic-id { + type aggr:topic-id; + mandatory true; + description "identifier of topic to be disjoin"; + } + } + + } + notification event-source-status-notification { description diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java index 6de407f58b..d6132beb8c 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java @@ -8,40 +8,64 @@ package org.opendaylight.controller.messagebus.app.impl; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; - +import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - -public class EventSourceTopic implements DataChangeListener { +public class EventSourceTopic implements DataChangeListener, AutoCloseable { private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class); private final NotificationPattern notificationPattern; private final EventSourceService sourceService; private final Pattern nodeIdPattern; private final TopicId topicId; + private ListenerRegistration listenerRegistration; + private final CopyOnWriteArraySet> joinedEventSources = new CopyOnWriteArraySet<>(); - public EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdPattern, final EventSourceService eventSource) { - this.notificationPattern = Preconditions.checkNotNull(notificationPattern); - this.sourceService = eventSource; - this.nodeIdPattern = Pattern.compile(nodeIdPattern); + public static EventSourceTopic create(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceTopology eventSourceTopology){ + final EventSourceTopic est = new EventSourceTopic(notificationPattern, nodeIdRegexPattern, eventSourceTopology.getEventSourceService()); + est.registerListner(eventSourceTopology); + est.notifyExistingNodes(eventSourceTopology); + return est; + } - this.topicId = new TopicId(Util.getUUIDIdent()); + private EventSourceTopic(final NotificationPattern notificationPattern, final String nodeIdRegexPattern, final EventSourceService sourceService) { + this.notificationPattern = Preconditions.checkNotNull(notificationPattern); + this.sourceService = Preconditions.checkNotNull(sourceService); + this.nodeIdPattern = Pattern.compile(nodeIdRegexPattern); + this.topicId = new TopicId(getUUIDIdent()); + this.listenerRegistration = null; + LOG.info("EventSourceTopic created - topicId {}", topicId.getValue()); } public TopicId getTopicId() { @@ -50,10 +74,22 @@ public class EventSourceTopic implements DataChangeListener { @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { - for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { + + for (final Map.Entry, DataObject> createdEntry : event.getCreatedData().entrySet()) { + if (createdEntry.getValue() instanceof Node) { + final Node node = (Node) createdEntry.getValue(); + LOG.debug("Create node..."); + if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) { + LOG.debug("Matched..."); + notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + + for (final Map.Entry, DataObject> changeEntry : event.getUpdatedData().entrySet()) { if (changeEntry.getValue() instanceof Node) { final Node node = (Node) changeEntry.getValue(); - if (getNodeIdRegexPattern().matcher(node.getId().getValue()).matches()) { + if (getNodeIdRegexPattern().matcher(node.getNodeId().getValue()).matches()) { notifyNode(changeEntry.getKey()); } } @@ -61,19 +97,56 @@ public class EventSourceTopic implements DataChangeListener { } public void notifyNode(final InstanceIdentifier nodeId) { - + LOG.debug("Notify node: {}", nodeId); try { - RpcResult rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get(); + final RpcResult rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get(); if(rpcResultJoinTopic.isSuccessful() == false){ - for(RpcError err : rpcResultJoinTopic.getErrors()){ + for(final RpcError err : rpcResultJoinTopic.getErrors()){ LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString()); } + } else { + joinedEventSources.add(nodeId); } } catch (final Exception e) { LOG.error("Could not invoke join topic for node {}", nodeId); } } + private void notifyExistingNodes(final EventSourceTopology eventSourceTopology){ + LOG.debug("Notify existing nodes"); + final Pattern nodeRegex = this.nodeIdPattern; + + final ReadOnlyTransaction tx = eventSourceTopology.getDataBroker().newReadOnlyTransaction(); + final CheckedFuture, ReadFailedException> future = + tx.read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH); + + Futures.addCallback(future, new FutureCallback>(){ + + @Override + public void onSuccess(final Optional data) { + if(data.isPresent()) { + final List nodes = data.get().getNode(); + if(nodes != null){ + for (final Node node : nodes) { + if (nodeRegex.matcher(node.getNodeId().getValue()).matches()) { + notifyNode(EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); + } + } + } + } + tx.close(); + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("Can not notify existing nodes", t); + tx.close(); + } + + }); + + } + private JoinTopicInput getJoinTopicInputArgument(final InstanceIdentifier path) { final NodeRef nodeRef = new NodeRef(path); final JoinTopicInput jti = @@ -89,4 +162,46 @@ public class EventSourceTopic implements DataChangeListener { return nodeIdPattern; } + private DisJoinTopicInput getDisJoinTopicInputArgument(final InstanceIdentifier eventSourceNodeId){ + final NodeRef nodeRef = new NodeRef(eventSourceNodeId); + final DisJoinTopicInput dji = new DisJoinTopicInputBuilder() + .setNode(nodeRef.getValue()) + .setTopicId(topicId) + .build(); + return dji; + } + + private void registerListner(final EventSourceTopology eventSourceTopology) { + this.listenerRegistration = + eventSourceTopology.getDataBroker().registerDataChangeListener( + LogicalDatastoreType.OPERATIONAL, + EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH, + this, + DataBroker.DataChangeScope.SUBTREE); + } + + @Override + public void close() { + if(this.listenerRegistration != null){ + this.listenerRegistration.close(); + } + for(final InstanceIdentifier eventSourceNodeId : joinedEventSources){ + try { + final RpcResult result = sourceService.disJoinTopic(getDisJoinTopicInputArgument(eventSourceNodeId)).get(); + if(result.isSuccessful() == false){ + for(final RpcError err : result.getErrors()){ + LOG.error("Can not destroy topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),eventSourceNodeId,err.toString()); + } + } + } catch (InterruptedException | ExecutionException ex) { + LOG.error("Can not close event source topic / destroy topic {} on node {}.", this.topicId.getValue(), eventSourceNodeId, ex); + } + } + joinedEventSources.clear(); + } + + private static String getUUIDIdent(){ + final UUID uuid = UUID.randomUUID(); + return uuid.toString(); + } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java index 279528907c..3aa470b10a 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java @@ -8,18 +8,14 @@ package org.opendaylight.controller.messagebus.app.impl; -import java.util.List; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; -import java.util.regex.Pattern; - import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; -import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.messagebus.spi.EventSource; import org.opendaylight.controller.messagebus.spi.EventSourceRegistration; import org.opendaylight.controller.messagebus.spi.EventSourceRegistry; @@ -32,6 +28,7 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.Node1Builder; @@ -48,7 +45,6 @@ import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology. import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.TopologyTypes; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; @@ -56,11 +52,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; - public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry { private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class); @@ -69,7 +60,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR private static final TopologyKey EVENT_SOURCE_TOPOLOGY_KEY = new TopologyKey(new TopologyId(TOPOLOGY_ID)); private static final LogicalDatastoreType OPERATIONAL = LogicalDatastoreType.OPERATIONAL; - private static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = + static final InstanceIdentifier EVENT_SOURCE_TOPOLOGY_PATH = InstanceIdentifier.create(NetworkTopology.class) .child(Topology.class, EVENT_SOURCE_TOPOLOGY_KEY); @@ -78,8 +69,7 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR .child(TopologyTypes.class) .augmentation(TopologyTypes1.class); - private final Map> topicListenerRegistrations = - new ConcurrentHashMap<>(); + private final Map eventSourceTopicMap = new ConcurrentHashMap<>(); private final Map> routedRpcRegistrations = new ConcurrentHashMap<>(); @@ -105,16 +95,39 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR final InstanceIdentifier path, final T data){ - final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); tx.put(store, path, data, true); - tx.submit(); + Futures.addCallback( tx.submit(), new FutureCallback(){ + + @Override + public void onSuccess(final Void result) { + LOG.trace("Data has put into datastore {} {}", store, path); + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("Can not put data into datastore [store: {}] [path: {}] [exception: {}]",store,path, t); + } + }); } private void deleteData(final LogicalDatastoreType store, final InstanceIdentifier path){ - final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); + final WriteTransaction tx = getDataBroker().newWriteOnlyTransaction(); tx.delete(OPERATIONAL, path); - tx.submit(); + Futures.addCallback( tx.submit(), new FutureCallback(){ + + @Override + public void onSuccess(final Void result) { + LOG.trace("Data has deleted from datastore {} {}", store, path); + } + + @Override + public void onFailure(final Throwable t) { + LOG.error("Can not delete data from datastore [store: {}] [path: {}] [exception: {}]",store,path, t); + } + + }); } private void insert(final KeyedInstanceIdentifier sourcePath) { @@ -129,100 +142,56 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR deleteData(OPERATIONAL, augmentPath); } - private void notifyExistingNodes(final EventSourceTopic eventSourceTopic){ - - final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction(); - - final CheckedFuture, ReadFailedException> future = tx.read(OPERATIONAL, EVENT_SOURCE_TOPOLOGY_PATH); - - Futures.addCallback(future, new FutureCallback>(){ - - @Override - public void onSuccess(Optional data) { - if(data.isPresent()) { - LOG.info("Topology data are present..."); - final List nodes = data.get().getNode(); - if(nodes != null){ - LOG.info("List of nodes is not null..."); - final Pattern nodeIdPatternRegex = eventSourceTopic.getNodeIdRegexPattern(); - for (final Node node : nodes) { - if (nodeIdPatternRegex.matcher(node.getNodeId().getValue()).matches()) { - eventSourceTopic.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey())); - } - } - } else { - LOG.info("List of nodes is NULL..."); - } - } - tx.close(); - } - - @Override - public void onFailure(Throwable t) { - LOG.error("Can not notify existing nodes {}", t); - tx.close(); - } - - }); - - } - @Override public Future> createTopic(final CreateTopicInput input) { - LOG.info("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", + LOG.debug("Received Topic creation request: NotificationPattern -> {}, NodeIdPattern -> {}", input.getNotificationPattern(), input.getNodeIdPattern()); final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern()); + //FIXME: do not use Util.wildcardToRegex - NodeIdPatter should be regex final String nodeIdPattern = input.getNodeIdPattern().getValue(); - final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService); + final EventSourceTopic eventSourceTopic = EventSourceTopic.create(notificationPattern, nodeIdPattern, this); - registerTopic(eventSourceTopic); - - notifyExistingNodes(eventSourceTopic); + eventSourceTopicMap.put(eventSourceTopic.getTopicId(), eventSourceTopic); final CreateTopicOutput cto = new CreateTopicOutputBuilder() .setTopicId(eventSourceTopic.getTopicId()) .build(); + LOG.info("Topic has been created: NotificationPattern -> {}, NodeIdPattern -> {}", + input.getNotificationPattern(), + input.getNodeIdPattern()); + return Util.resultRpcSuccessFor(cto); } @Override public Future> destroyTopic(final DestroyTopicInput input) { - return Futures.immediateFailedFuture(new UnsupportedOperationException("Not Implemented")); + final EventSourceTopic topicToDestroy = eventSourceTopicMap.remove(input.getTopicId()); + if(topicToDestroy != null){ + topicToDestroy.close(); + } + return Util.resultRpcSuccessFor((Void) null); } @Override public void close() { aggregatorRpcReg.close(); - for(ListenerRegistration reg : topicListenerRegistrations.values()){ - reg.close(); + for(final EventSourceTopic est : eventSourceTopicMap.values()){ + est.close(); } } - private void registerTopic(final EventSourceTopic listener) { - final ListenerRegistration listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL, - EVENT_SOURCE_TOPOLOGY_PATH, - listener, - DataBroker.DataChangeScope.SUBTREE); - - topicListenerRegistrations.put(listener, listenerRegistration); - } - public void register(final EventSource eventSource){ - NodeKey nodeKey = eventSource.getSourceNodeKey(); + + final NodeKey nodeKey = eventSource.getSourceNodeKey(); final KeyedInstanceIdentifier sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey); - RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); + final RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource); reg.registerPath(NodeContext.class, sourcePath); routedRpcRegistrations.put(nodeKey,reg); insert(sourcePath); - for(EventSourceTopic est : topicListenerRegistrations.keySet()){ - if(est.getNodeIdRegexPattern().matcher(nodeKey.getNodeId().getValue()).matches()){ - est.notifyNode(EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey)); - } - } } public void unRegister(final EventSource eventSource){ @@ -236,11 +205,18 @@ public class EventSourceTopology implements EventAggregatorService, EventSourceR } @Override - public EventSourceRegistration registerEventSource( - T eventSource) { - EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); + public EventSourceRegistration registerEventSource(final T eventSource) { + final EventSourceRegistrationImpl esr = new EventSourceRegistrationImpl<>(eventSource, this); register(eventSource); return esr; } + + DataBroker getDataBroker() { + return dataBroker; + } + + EventSourceService getEventSourceService() { + return eventSourceService; + } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java index d6bcbf2920..c70fd3c403 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.messagebus.app.impl; import java.util.ArrayList; import java.util.List; -import java.util.UUID; import java.util.concurrent.Future; import java.util.regex.Pattern; @@ -23,11 +22,6 @@ import com.google.common.util.concurrent.Futures; public final class Util { - public static String getUUIDIdent(){ - UUID uuid = UUID.randomUUID(); - return uuid.toString(); - } - public static Future> resultRpcSuccessFor(final T output) { final RpcResult result = RpcResultBuilder.success(output).build(); return Futures.immediateFuture(result); diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java index 2cbac7b97c..61e1af84bc 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/ConnectionNotificationTopicRegistration.java @@ -7,8 +7,8 @@ */ package org.opendaylight.controller.messagebus.eventsources.netconf; -import java.net.URI; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import javax.xml.parsers.DocumentBuilder; @@ -51,40 +51,42 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe public ConnectionNotificationTopicRegistration(String SourceName, DOMNotificationListener domNotificationListener) { super(NotificationSourceType.ConnectionStatusChange, SourceName, EVENT_SOURCE_STATUS_PATH.getLastComponent().getNamespace().toString()); this.domNotificationListener = Preconditions.checkNotNull(domNotificationListener); - LOG.info("Connection notification source has been initialized..."); + LOG.info("Connection notification source has been initialized."); setActive(true); setReplaySupported(false); } @Override public void close() throws Exception { - LOG.info("Connection notification - publish Deactive"); - publishNotification(EventSourceStatus.Deactive); - notificationTopicMap.clear(); - setActive(false); + if(isActive()){ + LOG.debug("Connection notification - publish Deactive"); + publishNotification(EventSourceStatus.Deactive); + notificationTopicMap.clear(); + setActive(false); + } } @Override void activateNotificationSource() { - LOG.info("Connection notification - publish Active"); + LOG.debug("Connection notification - publish Active"); publishNotification(EventSourceStatus.Active); } @Override void deActivateNotificationSource() { - LOG.info("Connection notification - publish Inactive"); + LOG.debug("Connection notification - publish Inactive"); publishNotification(EventSourceStatus.Inactive); } @Override void reActivateNotificationSource() { - LOG.info("Connection notification - reactivate - publish active"); + LOG.debug("Connection notification - reactivate - publish active"); publishNotification(EventSourceStatus.Active); } @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) { - if(validateNotifactionSchemaPath(notificationPath) == false){ + if(checkNotificationPath(notificationPath) == false){ LOG.debug("Bad SchemaPath for notification try to register"); return false; } @@ -107,16 +109,20 @@ public class ConnectionNotificationTopicRegistration extends NotificationTopicRe } @Override - void unRegisterNotificationTopic(TopicId topicId) { - // TODO: need code when EventAggregator.destroyTopic will be implemented - } - - private boolean validateNotifactionSchemaPath(SchemaPath notificationPath){ - if(notificationPath == null){ - return false; + synchronized void unRegisterNotificationTopic(TopicId topicId) { + List notificationPathToRemove = new ArrayList<>(); + for(SchemaPath notifKey : notificationTopicMap.keySet()){ + ArrayList topicList = notificationTopicMap.get(notifKey); + if(topicList != null){ + topicList.remove(topicId); + if(topicList.isEmpty()){ + notificationPathToRemove.add(notifKey); + } + } + } + for(SchemaPath notifKey : notificationPathToRemove){ + notificationTopicMap.remove(notifKey); } - URI notificationNameSpace = notificationPath.getLastComponent().getNamespace(); - return getNotificationUrnPrefix().startsWith(notificationNameSpace.toString()); } private void publishNotification(EventSourceStatus eventSourceStatus){ diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java index a640064751..e4ad387f4d 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java @@ -47,6 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.DisJoinTopicInput; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.Netconf; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; @@ -111,9 +112,12 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener notificationTopicRegistrationList.add(new ConnectionNotificationTopicRegistration(ConnectionNotificationSourceName, this)); Optional> streamMap = getAvailableStreams(); if(streamMap.isPresent()){ + LOG.debug("Stream configuration compare..."); for (String urnPrefix : this.urnPrefixToStreamMap.keySet()) { final String streamName = this.urnPrefixToStreamMap.get(urnPrefix); + LOG.debug("urnPrefix: {} streamName: {}", urnPrefix, streamName); if(streamMap.get().containsKey(streamName)){ + LOG.debug("Stream containig on device"); notificationTopicRegistrationList.add(new StreamNotificationTopicRegistration(streamMap.get().get(streamName),urnPrefix, this)); } } @@ -127,7 +131,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener Optional dataBroker = this.mountPoint.getService(DataBroker.class); if(dataBroker.isPresent()){ - + LOG.debug("GET Available streams ..."); ReadOnlyTransaction tx = dataBroker.get().newReadOnlyTransaction(); CheckedFuture, ReadFailedException> checkFeature = tx.read(LogicalDatastoreType.OPERATIONAL,pathStream); @@ -136,6 +140,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener if(streams.isPresent()){ streamMap = new HashMap<>(); for(Stream stream : streams.get().getStream()){ + LOG.debug("*** find stream {}", stream.getName().getValue()); streamMap.put(stream.getName().getValue(), stream); } } @@ -143,6 +148,8 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener LOG.warn("Can not read streams for node {}",this.nodeId); } + } else { + LOG.warn("No databroker on node {}", this.nodeId); } return Optional.fromNullable(streamMap); @@ -150,34 +157,49 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener @Override public Future> joinTopic(final JoinTopicInput input) { - + LOG.debug("Join topic {} on {}", input.getTopicId().getValue(), this.nodeId); final NotificationPattern notificationPattern = input.getNotificationPattern(); final List matchingNotifications = getMatchingNotifications(notificationPattern); return registerTopic(input.getTopicId(),matchingNotifications); } - private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe){ + @Override + public Future> disJoinTopic(DisJoinTopicInput input) { + for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ + reg.unRegisterNotificationTopic(input.getTopicId()); + } + return Util.resultRpcSuccessFor((Void) null) ; + } + private synchronized Future> registerTopic(final TopicId topicId, final List notificationsToSubscribe){ + LOG.debug("Join topic {} - register"); JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down; if(notificationsToSubscribe != null && notificationsToSubscribe.isEmpty() == false){ + LOG.debug("Notifications to subscribe has found - count {}",notificationsToSubscribe.size() ); final Optional notifyService = getDOMMountPoint().getService(DOMNotificationService.class); if(notifyService.isPresent()){ - int subscribedStreams = 0; + int registeredNotificationCount = 0; for(SchemaPath schemaNotification : notificationsToSubscribe){ for(NotificationTopicRegistration reg : notificationTopicRegistrationList){ - LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() ); - reg.activateNotificationSource(); - boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); - if(regSuccess){ - subscribedStreams = subscribedStreams +1; - } + LOG.debug("Try notification registratio {} on SchemaPathNotification {}", reg.getSourceName(), schemaNotification.getLastComponent().getLocalName()); + if(reg.checkNotificationPath(schemaNotification)){ + LOG.info("Source of notification {} is activating, TopicId {}", reg.getSourceName(), topicId.getValue() ); + boolean regSuccess = reg.registerNotificationTopic(schemaNotification, topicId); + if(regSuccess){ + registeredNotificationCount = registeredNotificationCount +1; + } + } } } - if(subscribedStreams > 0){ + if(registeredNotificationCount > 0){ joinTopicStatus = JoinTopicStatus.Up; } + } else { + LOG.warn("NO DOMNotification service on node {}", this.nodeId); } + } else { + LOG.debug("Notifications to subscribe has NOT found"); } final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build(); @@ -195,13 +217,12 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener public void deActivateStreams(){ for (NotificationTopicRegistration reg : notificationTopicRegistrationList) { LOG.info("Source of notification {} is deactivating on node {}", reg.getSourceName(), this.nodeId); - reg.deActivateNotificationSource(); + reg.deActivateNotificationSource(); } } @Override public void onNotification(final DOMNotification notification) { - LOG.info("Notification {} has been arrived...",notification.getType()); SchemaPath notificationPath = notification.getType(); Date notificationEventTime = null; if(notification instanceof DOMEvent){ @@ -218,7 +239,7 @@ public class NetconfEventSource implements EventSource, DOMNotificationListener for(TopicId topicId : topicIdsForNotification){ publishNotification(notification, topicId); - LOG.info("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); + LOG.debug("Notification {} has been published for TopicId {}",notification.getType(), topicId.getValue()); } } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java index 180d3d4214..6d19c52f61 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java @@ -107,7 +107,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto @Override public void onDataChanged(final AsyncDataChangeEvent, DataObject> event) { - LOG.info("[DataChangeEvent, DataObject>: {}]", event); + LOG.debug("[DataChangeEvent, DataObject>: {}]", event); for (final Map.Entry, DataObject> changeEntry : event.getCreatedData().entrySet()) { if (changeEntry.getValue() instanceof Node) { nodeCreated(changeEntry.getKey(),(Node) changeEntry.getValue()); diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java index 891887e7f6..2dbda375c6 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceRegistration.java @@ -60,7 +60,7 @@ public class NetconfEventSourceRegistration implements AutoCloseable{ } NetconfEventSourceRegistration nesr = new NetconfEventSourceRegistration(instanceIdent, node, netconfEventSourceManager); nesr.updateStatus(); - LOG.info("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue()); + LOG.debug("NetconfEventSourceRegistration for node {} has been initialized...",node.getNodeId().getValue()); return nesr; } diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java index 7812bd223d..c12b67ed24 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NotificationTopicRegistration.java @@ -11,10 +11,14 @@ import java.util.ArrayList; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class NotificationTopicRegistration implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(NotificationTopicRegistration.class); + public enum NotificationSourceType{ NetconfDeviceStream, ConnectionStatusChange; @@ -54,6 +58,14 @@ public abstract class NotificationTopicRegistration implements AutoCloseable { return notificationUrnPrefix; } + public boolean checkNotificationPath(SchemaPath notificationPath){ + if(notificationPath == null){ + return false; + } + String nameSpace = notificationPath.getLastComponent().toString(); + LOG.debug("CheckNotification - name space {} - NotificationUrnPrefix {}", nameSpace, getNotificationUrnPrefix()); + return nameSpace.startsWith(getNotificationUrnPrefix()); + } abstract void activateNotificationSource(); abstract void deActivateNotificationSource(); diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java index e0d4fe21e1..2e654d0b8b 100644 --- a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java +++ b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/StreamNotificationTopicRegistration.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.messagebus.eventsources.netconf; import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint; @@ -58,6 +59,7 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist this.lastEventTime= null; setReplaySupported(this.stream.isReplaySupport()); setActive(false); + LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName()); } void activateNotificationSource() { @@ -129,16 +131,26 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId){ - if(validateNotificationPath(notificationPath) == false){ + + if(checkNotificationPath(notificationPath) == false){ LOG.debug("Bad SchemaPath for notification try to register"); return false; } + final Optional notifyService = domMountPoint.getService(DOMNotificationService.class); if(notifyService.isPresent() == false){ LOG.debug("DOMNotificationService is not present"); return false; } - ListenerRegistration registration = notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath); + + activateNotificationSource(); + if(isActive() == false){ + LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(), notificationPath.toString()); + return false; + } + + ListenerRegistration registration = + notifyService.get().registerNotificationListener(this.netconfEventSource,notificationPath); notificationRegistrationMap.put(notificationPath, registration); ArrayList topicIds = getNotificationTopicIds(notificationPath); if(topicIds == null){ @@ -149,16 +161,30 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist topicIds.add(topicId); } } + notificationTopicMap.put(notificationPath, topicIds); return true; } - private boolean validateNotificationPath(SchemaPath notificationPath){ - if(notificationPath == null){ - return false; + @Override + synchronized void unRegisterNotificationTopic(TopicId topicId) { + List notificationPathToRemove = new ArrayList<>(); + for(SchemaPath notifKey : notificationTopicMap.keySet()){ + ArrayList topicList = notificationTopicMap.get(notifKey); + if(topicList != null){ + topicList.remove(topicId); + if(topicList.isEmpty()){ + notificationPathToRemove.add(notifKey); + } + } + } + for(SchemaPath notifKey : notificationPathToRemove){ + notificationTopicMap.remove(notifKey); + ListenerRegistration reg = notificationRegistrationMap.remove(notifKey); + if(reg != null){ + reg.close(); + } } - String nameSpace = notificationPath.getLastComponent().toString(); - return nameSpace.startsWith(getNotificationUrnPrefix()); } Optional getLastEventTime() { @@ -175,9 +201,4 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist closeStream(); } - @Override - void unRegisterNotificationTopic(TopicId topicId) { - // TODO: use it when destroy topic will be implemented - } - } diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java index e26502f949..fe5e60b3e4 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java @@ -28,6 +28,8 @@ import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.osgi.framework.BundleContext; +import com.google.common.util.concurrent.CheckedFuture; + import javax.management.ObjectName; import static org.junit.Assert.assertEquals; @@ -101,7 +103,8 @@ public class MessageBusAppImplModuleTest { WriteTransaction writeTransactionMock = mock(WriteTransaction.class); doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction(); doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class), eq(true)); - + CheckedFuture checkedFutureMock = mock(CheckedFuture.class); + doReturn(checkedFutureMock).when(writeTransactionMock).submit(); assertNotNull("EventSourceRegistryWrapper has not been created correctly.", messageBusAppImplModule.createInstance()); } diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java index 9f513c464b..0c32cb51e3 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java @@ -9,6 +9,8 @@ package org.opendaylight.controller.messagebus.app.impl; import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -20,20 +22,30 @@ import java.util.Map; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; +import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node; +import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import com.google.common.util.concurrent.CheckedFuture; + public class EventSourceTopicTest { EventSourceTopic eventSourceTopic; - org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node dataObjectMock; + Node dataObjectNodeMock; NodeId nodeIdMock; + DataBroker dataBrokerMock; EventSourceService eventSourceServiceMock; + EventSourceTopology eventSourceTopologyMock; @BeforeClass public static void initTestClass() throws IllegalAccessException, InstantiationException { @@ -43,7 +55,22 @@ public class EventSourceTopicTest { public void setUp() throws Exception { NotificationPattern notificationPattern = new NotificationPattern("value1"); eventSourceServiceMock = mock(EventSourceService.class); - eventSourceTopic = new EventSourceTopic(notificationPattern, "nodeIdPattern1", eventSourceServiceMock); + eventSourceTopologyMock = mock(EventSourceTopology.class); + dataBrokerMock = mock(DataBroker.class); + doReturn(eventSourceServiceMock).when(eventSourceTopologyMock).getEventSourceService(); + doReturn(dataBrokerMock).when(eventSourceTopologyMock).getDataBroker(); + + WriteTransaction writeTransactionMock = mock(WriteTransaction.class); + doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction(); + doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class),eq(true)); + CheckedFuture checkedFutureWriteMock = mock(CheckedFuture.class); + doReturn(checkedFutureWriteMock).when(writeTransactionMock).submit(); + + ReadOnlyTransaction readOnlyTransactionMock = mock(ReadOnlyTransaction.class); + doReturn(readOnlyTransactionMock).when(dataBrokerMock).newReadOnlyTransaction(); + CheckedFuture checkedFutureReadMock = mock(CheckedFuture.class); + doReturn(checkedFutureReadMock).when(readOnlyTransactionMock).read(LogicalDatastoreType.OPERATIONAL, EventSourceTopology.EVENT_SOURCE_TOPOLOGY_PATH); + eventSourceTopic = EventSourceTopic.create(notificationPattern, "nodeIdPattern1", eventSourceTopologyMock); } @Test @@ -61,19 +88,20 @@ public class EventSourceTopicTest { AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class); onDataChangedTestHelper(asyncDataChangeEventMock); eventSourceTopic.onDataChanged(asyncDataChangeEventMock); - verify(dataObjectMock, times(1)).getId(); - verify(nodeIdMock, times(1)).getValue(); + verify(dataObjectNodeMock, times(2)).getNodeId(); + verify(nodeIdMock, times(2)).getValue(); } private void onDataChangedTestHelper(AsyncDataChangeEvent asyncDataChangeEventMock){ Map, DataObject> map = new HashMap<>(); InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class); - dataObjectMock = mock(org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node.class); - map.put(instanceIdentifierMock, dataObjectMock); + dataObjectNodeMock = mock(Node.class); + doReturn(getNodeKey("testNodeId01")).when(dataObjectNodeMock).getKey(); + map.put(instanceIdentifierMock, dataObjectNodeMock); doReturn(map).when(asyncDataChangeEventMock).getUpdatedData(); - + doReturn(map).when(asyncDataChangeEventMock).getCreatedData(); nodeIdMock = mock(NodeId.class); - doReturn(nodeIdMock).when(dataObjectMock).getId(); + doReturn(nodeIdMock).when(dataObjectNodeMock).getNodeId(); doReturn("nodeIdPattern1").when(nodeIdMock).getValue(); } @@ -84,4 +112,7 @@ public class EventSourceTopicTest { verify(eventSourceServiceMock, times(1)).joinTopic(any(JoinTopicInput.class)); } + public NodeKey getNodeKey(String nodeId){ + return new NodeKey(new NodeId(nodeId)); + } } \ No newline at end of file diff --git a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java index 50ae4d9389..68f2a43dd8 100644 --- a/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java +++ b/opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java @@ -22,10 +22,8 @@ import java.util.List; import java.util.Map; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.md.sal.binding.api.DataBroker; -import org.opendaylight.controller.md.sal.binding.api.DataChangeListener; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; @@ -35,9 +33,11 @@ import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistr import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInputBuilder; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern; +import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId; import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId; @@ -62,10 +62,6 @@ public class EventSourceTopologyTest { NodeKey nodeKey; RpcRegistration aggregatorRpcReg; - @BeforeClass - public static void initTestClass() throws IllegalAccessException, InstantiationException { - } - @Before public void setUp() throws Exception { dataBrokerMock = mock(DataBroker.class); @@ -97,6 +93,18 @@ public class EventSourceTopologyTest { assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock)); } + @Test + public void destroyTopicTest() throws Exception{ + topicTestHelper(); + TopicId topicId = new TopicId("topic-id-007"); + Map localMap = getEventSourceTopicMap(); + EventSourceTopic eventSourceTopicMock = mock(EventSourceTopic.class); + localMap.put(topicId, eventSourceTopicMock); + DestroyTopicInput input = new DestroyTopicInputBuilder().setTopicId(topicId).build(); + eventSourceTopology.destroyTopic(input); + verify(eventSourceTopicMock, times(1)).close(); + } + private void topicTestHelper() throws Exception{ constructorTestHelper(); createTopicInputMock = mock(CreateTopicInput.class); @@ -134,25 +142,17 @@ public class EventSourceTopologyTest { doReturn(nodeId).when(nodeMock).getNodeId(); } - @Test - public void destroyTopicTest() throws Exception{ - topicTestHelper(); - //TODO: modify test when destroyTopic will be implemented - DestroyTopicInput destroyTopicInput = null; - assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput)); - } - @Test public void closeTest() throws Exception{ constructorTestHelper(); topicTestHelper(); - Map> localMap = getTopicListenerRegistrations(); - DataChangeListener dataChangeListenerMock = mock(DataChangeListener.class); - ListenerRegistration listenerListenerRegistrationMock = (ListenerRegistration) mock(ListenerRegistration.class); - localMap.put(dataChangeListenerMock, listenerListenerRegistrationMock); + Map localMap = getEventSourceTopicMap(); + TopicId topicIdMock = mock(TopicId.class); + EventSourceTopic eventSourceTopicMock = mock(EventSourceTopic.class); + localMap.put(topicIdMock, eventSourceTopicMock); eventSourceTopology.close(); verify(aggregatorRpcReg, times(1)).close(); - verify(listenerListenerRegistrationMock, times(1)).close(); + verify(eventSourceTopicMock, times(1)).close(); } @Test @@ -201,8 +201,8 @@ public class EventSourceTopologyTest { assertNotNull("Return value has not been created correctly.", eventSourceTopology.registerEventSource(eventSourceMock)); } - private Map getTopicListenerRegistrations() throws Exception{ - Field nesField = EventSourceTopology.class.getDeclaredField("topicListenerRegistrations"); + private Map getEventSourceTopicMap() throws Exception{ + Field nesField = EventSourceTopology.class.getDeclaredField("eventSourceTopicMap"); nesField.setAccessible(true); return (Map) nesField.get(eventSourceTopology); }