Merge "BUG-1848 : OrderedNormalizedNodeWriter implementation"
authorTony Tkacik <ttkacik@cisco.com>
Fri, 10 Apr 2015 09:56:01 +0000 (09:56 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 09:56:02 +0000 (09:56 +0000)
36 files changed:
features/netconf-connector/src/main/resources/features.xml
karaf/opendaylight-karaf/pom.xml
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml
opendaylight/md-sal/messagebus-impl/pom.xml
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImpl.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java with 72% similarity]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManager.java with 71% similarity]
opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java
opendaylight/md-sal/messagebus-spi/pom.xml [new file with mode: 0644]
opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSource.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistration.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistry.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-spi/src/main/yang/messagebus-event-source-registry.yang [new file with mode: 0644]
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataObjectModification.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeIdentifier.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataObjectModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java [new file with mode: 0644]

index 24a92bdef40b9b54aecb0c148c60803ac935e9d6..16af44f20808a66857120d1cdadef61cff53c371 100644 (file)
         <feature version='${project.version}'>odl-netconf-connector</feature>
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <bundle>mvn:org.opendaylight.controller/messagebus-api/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/messagebus-spi/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/messagebus-impl/${project.version}</bundle>
         <configfile finalname="${config.configfile.directory}/05-message-bus.xml">mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config</configfile>
     </feature>
index 9c02a9d1b1170f0dc71a08af854a575ed89a6956..68238bbc120ea7f7b6074c1d1f4b76d6eaa42586 100644 (file)
           <type>xml</type>
           <scope>runtime</scope>
       </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>features-flow</artifactId>
-      <classifier>features</classifier>
-      <type>xml</type>
-      <scope>runtime</scope>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>features-restconf</artifactId>
index d9f0f8800f813732bac791d471a0fb788ae12006..23b2ea11fd03ae885a6c538b27bbdf4d37b7946b 100644 (file)
                 <artifactId>model-flow-statistics</artifactId>
                 <version>${project.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.opendaylight.controller</groupId>
-                <artifactId>features-flow</artifactId>
-                <version>${project.version}</version>
-                <classifier>features</classifier>
-                <type>xml</type>
-                <scope>runtime</scope>
-            </dependency>
 
             <!-- RESTCONF -->
             <dependency>
                 <artifactId>messagebus-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>messagebus-spi</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.opendaylight.controller</groupId>
                 <artifactId>messagebus-impl</artifactId>
index eed06cfd3ad063dbc51daaae0aa70d10fb626de1..4714c075cb8aec8a45e30fe4b80b7126b31d812c 100644 (file)
@@ -7,11 +7,11 @@
  and is available at http://www.eclipse.org/legal/epl-v10.html
 -->
 <snapshot>
-      <configuration>
+    <configuration>
       <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
           <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
               <module>
-                  <name>messagebus-app</name>
+                  <name>messagebus-app-impl</name>
                   <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">binding-impl:messagebus-app-impl</type>
                   <binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
                       <type xmlns:md-sal-binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">md-sal-binding:binding-broker-osgi-registry</type>
                   </namespace-to-stream>
               </module>
           </modules>
+          <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+              <service>
+                  <type xmlns:mb-esr="urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry">mb-esr:event-source-registry</type>
+                    <instance>
+                        <name>messagebus-app-impl</name>
+                        <provider>/modules/module[type='messagebus-app-impl'][name='messagebus-app-impl']</provider>
+                    </instance>
+              </service>
+          </services>
       </data>
   </configuration>
   <required-capabilities>
       <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl?module=messagebus-app-impl&amp;revision=2015-02-03</capability>
+      <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry?module=messagebus-event-source-registry&amp;revision=2015-04-02</capability>
   </required-capabilities>
 </snapshot>
index 7e3b599ffe5b0473d2f14082f25e0218887d8cd6..d43210d7c4482c34ce1f9559402501080a5ac199 100644 (file)
@@ -56,6 +56,11 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
             <artifactId>messagebus-api</artifactId>\r
             <version>1.2.0-SNAPSHOT</version>\r
         </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>messagebus-spi</artifactId>\r
+            <version>1.2.0-SNAPSHOT</version>\r
+        </dependency>\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-netconf-connector</artifactId>\r
index 022292a6f37c0f0a7d03802d348b5c88eb83748d..dd68714c963490dcb3064a90335ae891a8a1a9d9 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -15,7 +17,10 @@ import org.opendaylight.controller.md.sal.binding.api.MountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-import org.opendaylight.controller.messagebus.app.impl.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
@@ -23,8 +28,9 @@ import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MessageBusAppImplModule extends
-        org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
+import com.google.common.base.Preconditions;
+
+public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
 
     private BundleContext bundleContext;
@@ -52,37 +58,50 @@ public class MessageBusAppImplModule extends
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        final List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
 
         final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
         final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
-
         final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
         final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
         final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
         final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class);
         final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
 
-        final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry);
-        final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(dataBroker, domPublish,
-                domMount, bindingMount, eventSourceTopology, getNamespaceToStream());
-
-        final AutoCloseable closer = new AutoCloseable() {
-            @Override
-            public void close() {
-                eventSourceTopology.close();
-                eventSourceManager.close();
-            }
-        };
+        final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry));
+        final NetconfEventSourceManager netconfEventSourceManager = NetconfEventSourceManager.create(dataBroker, domPublish,domMount, bindingMount, eventSourceRegistryWrapper, getNamespaceToStream());
+        eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager);
+        LOGGER.info("Messagebus initialized");
+        return eventSourceRegistryWrapper;
 
-        return closer;
     }
 
-    private void closeProvider(final AutoCloseable closable) {
-        try {
-            closable.close();
-        } catch (final Exception e) {
-            LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e);
+    //TODO: separate NetconfEventSource into separate bundle, remove this wrapper, return EventSourceTopology directly as EventSourceRegistry
+    private class EventSourceRegistryWrapper implements EventSourceRegistry{
+
+        private final EventSourceRegistry baseEventSourceRegistry;
+        private final Set<AutoCloseable> autoCloseables = new HashSet<>();
+
+        public EventSourceRegistryWrapper(EventSourceRegistry baseEventSourceRegistry) {
+            this.baseEventSourceRegistry = baseEventSourceRegistry;
         }
+
+        public void addAutoCloseable(AutoCloseable ac){
+            Preconditions.checkNotNull(ac);
+            autoCloseables.add(ac);
+        }
+
+        @Override
+        public void close() throws Exception {
+            for(AutoCloseable ac : autoCloseables){
+                ac.close();
+            }
+            baseEventSourceRegistry.close();
+        }
+
+        @Override
+        public <T extends EventSource> EventSourceRegistration<T> registerEventSource(T eventSource) {
+            return this.baseEventSourceRegistry.registerEventSource(eventSource);
+        }
+
     }
 }
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImpl.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImpl.java
new file mode 100644 (file)
index 0000000..d939090
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+import com.google.common.base.Preconditions;
+
+
+class EventSourceRegistrationImpl <T extends EventSource> extends AbstractObjectRegistration<T> implements EventSourceRegistration<T>{
+
+    private final EventSourceTopology eventSourceTopology;
+
+    /**
+     * @param instance of EventSource that has been registered by {@link EventSourceRegistryImpl#registerEventSource(Node, EventSource)}
+     */
+    public EventSourceRegistrationImpl(T instance, EventSourceTopology eventSourceTopology) {
+        super(instance);
+        this.eventSourceTopology = Preconditions.checkNotNull(eventSourceTopology);
+    }
+
+    @Override
+    protected void removeRegistration() {
+        this.eventSourceTopology.unRegister(getInstance());
+    }
+
+}
index 98e168eee9678cdcc6db391f3eb56f70d58d8c5b..13e50b5ce57030b9a1a97864c02b2fe1613c33bd 100644 (file)
@@ -8,9 +8,9 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.regex.Pattern;
+
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
@@ -18,12 +18,17 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class EventSourceTopic implements DataChangeListener {
     private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
     private final NotificationPattern notificationPattern;
@@ -39,10 +44,7 @@ public class EventSourceTopic implements DataChangeListener {
         final String regex = Util.wildcardToRegex(nodeIdPattern);
         this.nodeIdPattern = Pattern.compile(regex);
 
-
-        // FIXME: We need to perform some salting in order to make
-        //        the topic IDs less predictable.
-        this.topicId = new TopicId(Util.md5String(notificationPattern + nodeIdPattern));
+        this.topicId = new TopicId(Util.getUUIDIdent());
     }
 
     public TopicId getTopicId() {
@@ -62,8 +64,14 @@ public class EventSourceTopic implements DataChangeListener {
     }
 
     public void notifyNode(final InstanceIdentifier<?> nodeId) {
+
         try {
-            sourceService.joinTopic(getJoinTopicInputArgument(nodeId));
+            RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
+            if(rpcResultJoinTopic.isSuccessful() == false){
+                for(RpcError err : rpcResultJoinTopic.getErrors()){
+                    LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
+                }
+            }
         } catch (final Exception e) {
             LOG.error("Could not invoke join topic for node {}", nodeId);
         }
@@ -80,5 +88,4 @@ public class EventSourceTopic implements DataChangeListener {
         return jti;
     }
 
-
 }
index 076d1b2fc7e9c1d42fee6e0ece1d7c9adc4a3228..10b9ec83cde9080a5eb652ea69b65936973b0f60 100644 (file)
@@ -20,6 +20,10 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
@@ -57,7 +61,8 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 
-public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+
+public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
 
     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
@@ -73,8 +78,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
                     .child(TopologyTypes.class)
                     .augmentation(TopologyTypes1.class);
 
-    private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
+    private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
             new ConcurrentHashMap<>();
+    private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
+            new ConcurrentHashMap<>();;
 
     private final DataBroker dataBroker;
     private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
@@ -91,7 +98,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
         final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
-
+        LOG.info("EventSourceRegistry has been initialized");
     }
 
     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
@@ -104,13 +111,24 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
 
     }
 
-    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
-        final NodeKey nodeKey = node.getKey();
+    private <T extends DataObject>  void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
+        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        tx.delete(OPERATIONAL, path);
+        tx.submit();
+    }
+
+    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+        final NodeKey nodeKey = sourcePath.getKey();
         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
         final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
         putData(OPERATIONAL, augmentPath, nodeAgument);
     }
 
+    private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath){
+        final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+        deleteData(OPERATIONAL, augmentPath);
+    }
+
     private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
 
         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
@@ -151,7 +169,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
         final String nodeIdPattern = input.getNodeIdPattern().getValue();
         final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
-        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
+        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
 
         registerTopic(eventSourceTopic);
 
@@ -161,7 +179,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
                 .setTopicId(eventSourceTopic.getTopicId())
                 .build();
 
-        return Util.resultFor(cto);
+        return Util.resultRpcSuccessFor(cto);
     }
 
     @Override
@@ -172,23 +190,45 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
     @Override
     public void close() {
         aggregatorRpcReg.close();
+        for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
+            reg.close();
+        }
     }
 
-    public void registerTopic(final EventSourceTopic listener) {
+    private void registerTopic(final EventSourceTopic listener) {
         final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
                 EVENT_SOURCE_TOPOLOGY_PATH,
                 listener,
                 DataBroker.DataChangeScope.SUBTREE);
 
-        registrations.put(listener, listenerRegistration);
+        topicListenerRegistrations.put(listener, listenerRegistration);
+    }
+
+    public void register(final EventSource eventSource){
+    NodeKey nodeKey = eventSource.getSourceNodeKey();
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+        RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
+        reg.registerPath(NodeContext.class, sourcePath);
+        routedRpcRegistrations.put(nodeKey,reg);
+        insert(sourcePath);
     }
 
-    public void register(final Node node, final NetconfEventSource netconfEventSource) {
-        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
-        rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
-            .registerPath(NodeContext.class, sourcePath);
-        insert(sourcePath,node);
-        // FIXME: Return registration object.
+    public void unRegister(final EventSource eventSource){
+        final NodeKey nodeKey = eventSource.getSourceNodeKey();
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+        final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
+        if(removeRegistration != null){
+            removeRegistration.close();
+        remove(sourcePath);
+        }
     }
 
+    @Override
+    public <T extends EventSource> EventSourceRegistration<T> registerEventSource(
+            T eventSource) {
+        EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+        register(eventSource);
+        return esr;
+    }
 }
+
index 1c0b8b3ef89197d9369457fd58cb3a8ee96e8010..d6bcbf2920e1283692297e10f7b830140c4bf6f7 100644 (file)
@@ -8,11 +8,9 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
@@ -22,29 +20,15 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 import com.google.common.util.concurrent.Futures;
 
-public final class Util {
-    private static final MessageDigest messageDigestTemplate = getDigestInstance();
-
-    private static MessageDigest getDigestInstance() {
-        try {
-            return MessageDigest.getInstance("MD5");
-        } catch (final NoSuchAlgorithmException e) {
-            throw new RuntimeException("Unable to get MD5 instance");
-        }
-    }
 
-    static String md5String(final String inputString) {
+public final class Util {
 
-        try {
-            final MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
-            md.update(inputString.getBytes("UTF-8"), 0, inputString.length());
-            return new BigInteger(1, md.digest()).toString(16);
-        } catch (final Exception e) {
-            throw new RuntimeException("Unable to get MD5 instance");
-        }
+    public static String getUUIDIdent(){
+        UUID uuid = UUID.randomUUID();
+        return uuid.toString();
     }
 
-    public static <T> Future<RpcResult<T>> resultFor(final T output) {
+    public static <T> Future<RpcResult<T>> resultRpcSuccessFor(final T output) {
         final RpcResult<T> result = RpcResultBuilder.success(output).build();
         return Futures.immediateFuture(result);
     }
@@ -73,7 +57,7 @@ public final class Util {
      * @param wildcard
      * @return
      */
-    static String wildcardToRegex(final String wildcard){
+    public static String wildcardToRegex(final String wildcard){
         final StringBuffer s = new StringBuffer(wildcard.length());
         s.append('^');
         for (final char c : wildcard.toCharArray()) {
@@ -6,13 +6,16 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
@@ -30,21 +33,27 @@ import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification;
+import org.opendaylight.controller.messagebus.app.impl.Util;
+import org.opendaylight.controller.messagebus.spi.EventSource;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
@@ -63,7 +72,7 @@ import org.w3c.dom.Element;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 
-public class NetconfEventSource implements EventSourceService, DOMNotificationListener, DataChangeListener {
+public class NetconfEventSource implements EventSource, DOMNotificationListener, DataChangeListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
 
@@ -74,23 +83,20 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
     private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
     private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
 
-
     private final String nodeId;
-
+    private final Node node;
 
     private final DOMMountPoint netconfMount;
     private final DOMNotificationPublishService domPublish;
-    private final NotificationsService notificationRpcService;
-
     private final Set<String> activeStreams = new ConcurrentSkipListSet<>();
 
     private final Map<String, String> urnPrefixToStreamMap;
+    private final ConcurrentHashMap<TopicId,ListenerRegistration<NetconfEventSource>> listenerRegistrationMap = new ConcurrentHashMap<>();
 
-
-    public NetconfEventSource(final String nodeId, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
+    public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
         this.netconfMount = netconfMount;
-        this.notificationRpcService = bindingMount.getService(RpcConsumerRegistry.class).get().getRpcService(NotificationsService.class);
-        this.nodeId = nodeId;
+        this.node = node;
+        this.nodeId = node.getNodeId().getValue();
         this.urnPrefixToStreamMap = streamMap;
         this.domPublish = publishService;
         LOG.info("NetconfEventSource [{}] created.", nodeId);
@@ -99,46 +105,37 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
     @Override
     public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
         final NotificationPattern notificationPattern = input.getNotificationPattern();
-
-        // FIXME: default language should already be regex
-        final String regex = Util.wildcardToRegex(notificationPattern.getValue());
-
-        final Pattern pattern = Pattern.compile(regex);
-        final List<SchemaPath> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
-        registerNotificationListener(matchingNotifications);
-        final JoinTopicOutput output = new JoinTopicOutputBuilder().build();
-        return com.google.common.util.concurrent.Futures.immediateFuture(RpcResultBuilder.success(output).build());
+        final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
+        return registerNotificationListener(input.getTopicId(),matchingNotifications);
     }
 
-    private List<SchemaPath> availableNotifications() {
-        // FIXME: use SchemaContextListener to get changes asynchronously
-        final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
-        final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
-        for (final NotificationDefinition nd : availableNotifications) {
-            qNs.add(nd.getPath());
+    private synchronized Future<RpcResult<JoinTopicOutput>> registerNotificationListener(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+        if(listenerRegistrationMap.containsKey(topicId)){
+            final String errMessage = "Can not join topic twice. Topic " + topicId.getValue() + " has been joined to node " + this.nodeId;
+            return immediateFuture(RpcResultBuilder.<JoinTopicOutput>failed().withError(ErrorType.APPLICATION, errMessage).build());
         }
-        return qNs;
-    }
-
-    private void registerNotificationListener(final List<SchemaPath> notificationsToSubscribe) {
-
+        ListenerRegistration<NetconfEventSource> registration = null;
+        JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
         final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
+
         if(notifyService.isPresent()) {
             for (final SchemaPath qName : notificationsToSubscribe) {
                 startSubscription(qName);
             }
-            // FIXME: Capture registration
-            notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
+            registration = notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
         }
+
+        if(registration != null){
+            listenerRegistrationMap.put(topicId,registration);
+            joinTopicStatus = JoinTopicStatus.Up;
+        }
+        final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
+        return immediateFuture(RpcResultBuilder.success(output).build());
     }
 
     private void startSubscription(final SchemaPath path) {
         final String streamName = resolveStream(path.getLastComponent());
-
-        if (streamIsActive(streamName) == false) {
-            LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
-            startSubscription(streamName);
-        }
+        startSubscription(streamName);
     }
 
     private void resubscribeToActiveStreams() {
@@ -148,11 +145,14 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
     }
 
     private synchronized void startSubscription(final String streamName) {
-        final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
-            .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
-            .build();
-        netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
-        activeStreams.add(streamName);
+        if(streamIsActive(streamName) == false){
+            LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
+            final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+                    .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
+                    .build();
+            netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+            activeStreams.add(streamName);
+        }
     }
 
     private String resolveStream(final QName qName) {
@@ -194,7 +194,6 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
         final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
         final Element element = XmlUtil.createElement(doc , "payload", namespace);
 
-
         final DOMResult result = new DOMResult(element);
 
         final SchemaContext context = netconfMount.getSchemaContext();
@@ -238,4 +237,35 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
         return NetconfNode.class.equals(changeEntry.getKey().getTargetType());
     }
 
+    private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern){
+        // FIXME: default language should already be regex
+        final String regex = Util.wildcardToRegex(notificationPattern.getValue());
+
+        final Pattern pattern = Pattern.compile(regex);
+        return Util.expandQname(getAvailableNotifications(), pattern);
+    }
+
+    @Override
+    public void close() throws Exception {
+        for(ListenerRegistration<NetconfEventSource> registration : listenerRegistrationMap.values()){
+            registration.close();
+        }
+    }
+
+    @Override
+    public NodeKey getSourceNodeKey(){
+        return node.getKey();
+    }
+
+    @Override
+    public List<SchemaPath> getAvailableNotifications() {
+        // FIXME: use SchemaContextListener to get changes asynchronously
+        final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
+        final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
+        for (final NotificationDefinition nd : availableNotifications) {
+            qNs.add(nd.getPath());
+        }
+        return qNs;
+    }
+
 }
@@ -6,15 +6,15 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
 
-import com.google.common.base.Optional;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
@@ -26,6 +26,8 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
@@ -59,30 +61,52 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             .build();
     private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
 
-
-    private final EventSourceTopology eventSourceTopology;
     private final Map<String, String> streamMap;
-
-    private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSource> netconfSources = new ConcurrentHashMap<>();
-    private final ListenerRegistration<DataChangeListener> listenerReg;
+    private final ConcurrentHashMap<InstanceIdentifier<?>, EventSourceRegistration<NetconfEventSource>> eventSourceRegistration = new ConcurrentHashMap<>();
     private final DOMNotificationPublishService publishService;
     private final DOMMountPointService domMounts;
     private final MountPointService bindingMounts;
+    private ListenerRegistration<DataChangeListener> listenerRegistration;
+    private final EventSourceRegistry eventSourceRegistry;
+
+    public static NetconfEventSourceManager create(final DataBroker dataBroker,
+            final DOMNotificationPublishService domPublish,
+            final DOMMountPointService domMount,
+            final MountPointService bindingMount,
+            final EventSourceRegistry eventSourceRegistry,
+            final List<NamespaceToStream> namespaceMapping){
+
+        final NetconfEventSourceManager eventSourceManager =
+                new NetconfEventSourceManager(domPublish, domMount, bindingMount, eventSourceRegistry, namespaceMapping);
 
-    public NetconfEventSourceManager(final DataBroker dataStore,
-                              final DOMNotificationPublishService domPublish,
+        eventSourceManager.initialize(dataBroker);
+
+        return eventSourceManager;
+
+    }
+
+    private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
                               final DOMMountPointService domMount,
                               final MountPointService bindingMount,
-                              final EventSourceTopology eventSourceTopology,
+                              final EventSourceRegistry eventSourceRegistry,
                               final List<NamespaceToStream> namespaceMapping) {
 
-        listenerReg = dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
-        this.eventSourceTopology = eventSourceTopology;
+        Preconditions.checkNotNull(domPublish);
+        Preconditions.checkNotNull(domMount);
+        Preconditions.checkNotNull(bindingMount);
+        Preconditions.checkNotNull(eventSourceRegistry);
+        Preconditions.checkNotNull(namespaceMapping);
         this.streamMap = namespaceToStreamMapping(namespaceMapping);
         this.domMounts = domMount;
         this.bindingMounts = bindingMount;
         this.publishService = domPublish;
-        LOGGER.info("EventSourceManager initialized.");
+        this.eventSourceRegistry = eventSourceRegistry;
+    }
+
+    private void initialize(final DataBroker dataBroker){
+        Preconditions.checkNotNull(dataBroker);
+        listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
+        LOGGER.info("NetconfEventSourceManager initialized.");
     }
 
     private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
@@ -97,7 +121,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
 
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-        //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+
         LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
@@ -105,22 +129,19 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             }
         }
 
-
         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
                 nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
             }
         }
 
-
     }
 
     private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
 
         // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
         if ( node == null ) {
-            LOGGER.debug("OnDataChanged Event. Node is null.");
-            return;
+            throw new IllegalStateException("Node is null");
         }
         if ( isNetconfNode(node) == false ) {
             LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
@@ -134,7 +155,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             return;
         }
 
-        if(!netconfSources.containsKey(key)) {
+        if(!eventSourceRegistration.containsKey(key)) {
             createEventSource(key,node);
         }
     }
@@ -144,10 +165,12 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
         final Optional<MountPoint> bindingMount = bindingMounts.getMountPoint(key);
 
         if(netconfMount.isPresent() && bindingMount.isPresent()) {
-            final String nodeId = node.getNodeId().getValue();
-            final NetconfEventSource netconfEventSource = new NetconfEventSource(nodeId, streamMap, netconfMount.get(), publishService, bindingMount.get());
-            eventSourceTopology.register(node,netconfEventSource);
-            netconfSources.putIfAbsent(key, netconfEventSource);
+
+            final NetconfEventSource netconfEventSource =
+                    new NetconfEventSource(node, streamMap, netconfMount.get(), publishService, bindingMount.get());
+            final EventSourceRegistration<NetconfEventSource> registration = eventSourceRegistry.registerEventSource(netconfEventSource);
+            eventSourceRegistration.putIfAbsent(key, registration);
+
         }
     }
 
@@ -159,13 +182,21 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
         return node.getAugmentation(NetconfNode.class) != null ;
     }
 
-    public boolean isEventSource(final Node node) {
-        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+    private boolean isEventSource(final Node node) {
 
+        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
         return isEventSource(netconfNode);
+
     }
 
     private boolean isEventSource(final NetconfNode node) {
+        if (node.getAvailableCapabilities() == null) {
+            return false;
+        }
+        final List<String> capabilities = node.getAvailableCapabilities().getAvailableCapability();
+        if(capabilities == null) {
+             return false;
+        }
         for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
             if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
                 return true;
@@ -177,6 +208,10 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
 
     @Override
     public void close() {
-        listenerReg.close();
+        for(final EventSourceRegistration<NetconfEventSource> reg : eventSourceRegistration.values()){
+            reg.close();
+        }
+        listenerRegistration.close();
     }
+
 }
\ No newline at end of file
index bed6b1085a2593303cc0f195082033e22c8bd124..320afccb2ec5a318033dfed2245ea8b22ee65913 100644 (file)
@@ -6,7 +6,7 @@ module messagebus-app-impl {
     import config { prefix config; revision-date 2013-04-05; }
     import opendaylight-md-sal-binding {prefix sal;}
     import opendaylight-md-sal-dom {prefix dom;}
-
+    import messagebus-event-source-registry {prefix esr;}
 
     description
         "Service definition for Message Bus application implementation.";
@@ -17,9 +17,10 @@ module messagebus-app-impl {
 
     identity messagebus-app-impl {
         base config:module-type;
+        config:provided-service esr:event-source-registry;
         config:java-name-prefix MessageBusAppImpl;
     }
-    
+
     augment "/config:modules/config:module/config:configuration" {
         case messagebus-app-impl {
             when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
index 13c42210258622b26237367d9db15adc43c781c6..7db7dcc33316f4cad6a9d6f68e84dca756463f44 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -14,10 +18,6 @@ import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
 import org.osgi.framework.BundleContext;
 
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
 public class MessageBusAppImplModuleFactoryTest {
 
     DependencyResolver dependencyResolverMock;
index 079436422e2cec2a3cf0369aa6ae044e9800a777..85d1a1b109819228a2bf8425077f4e12878d33a7 100644 (file)
@@ -7,41 +7,17 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.osgi.framework.BundleContext;
 
-import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.notNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doNothing;
-
 public class MessageBusAppImplModuleTest {
 
     MessageBusAppImplModule messageBusAppImplModule;
@@ -79,43 +55,5 @@ public class MessageBusAppImplModuleTest {
         assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext());
     }
 
-    @Test
-    public void createInstanceTest() {
-        createInstanceTestHelper();
-        messageBusAppImplModule.getInstance();
-        assertNotNull("AutoCloseable instance has not been created correctly.", messageBusAppImplModule.createInstance());
-    }
-
-    private void createInstanceTestHelper(){
-        NamespaceToStream namespaceToStream = mock(NamespaceToStream.class);
-        List<NamespaceToStream> listNamespaceToStreamMock = new ArrayList<>();
-        listNamespaceToStreamMock.add(namespaceToStream);
-        messageBusAppImplModule.setNamespaceToStream(listNamespaceToStreamMock);
-        ObjectName objectName = mock(ObjectName.class);
-        org.opendaylight.controller.sal.core.api.Broker domBrokerDependency = mock(Broker.class);
-        org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingBrokerDependency = mock(BindingAwareBroker.class);
-        when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.domBrokerJmxAttribute))).thenReturn(domBrokerDependency);
-        when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.bindingBrokerJmxAttribute))).thenReturn(bindingBrokerDependency);
-        messageBusAppImplModule.setBindingBroker(objectName);
-        messageBusAppImplModule.setDomBroker(objectName);
-        BindingAwareBroker.ProviderContext providerContextMock = mock(BindingAwareBroker.ProviderContext.class);
-        doReturn(providerContextMock).when(bindingBrokerDependency).registerProvider(any(BindingAwareProvider.class));
-        Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class);
-        doReturn(providerSessionMock).when(domBrokerDependency).registerProvider(any(Provider.class));
-
-        DataBroker dataBrokerMock = mock(DataBroker.class);
-        doReturn(dataBrokerMock).when(providerContextMock).getSALService(DataBroker.class);
-        RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
-        doReturn(rpcProviderRegistryMock).when(providerContextMock).getSALService(RpcProviderRegistry.class);
-        BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
-        doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
-        EventSourceService eventSourceServiceMock = mock(EventSourceService.class);
-        doReturn(eventSourceServiceMock).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
-
-        WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
-        doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
-        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
-        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
-        doReturn(checkedFutureMock).when(writeTransactionMock).submit();
-    }
+    //TODO: create MessageBusAppImplModule.createInstance test
 }
index 5e262136469c3f920755be611b81b8cb3d8e8987..f369a128adaefb18f7aa7ed183cb5916c7ef4f19 100644 (file)
@@ -7,6 +7,16 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -18,16 +28,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.any;
-
 public class EventSourceTopicTest {
 
     EventSourceTopic eventSourceTopic;
index c2f6ef54bfdae145a2be44cdec65bf5c75f9edc3..ced2e1f01b47ab2bdbfbc1b2c0de972b4f7a6578 100644 (file)
@@ -7,8 +7,18 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -16,11 +26,13 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.messagebus.spi.EventSource;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
@@ -34,17 +46,8 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.eq;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 
 public class EventSourceTopologyTest {
 
@@ -63,7 +66,6 @@ public class EventSourceTopologyTest {
     public void setUp() throws Exception {
         dataBrokerMock = mock(DataBroker.class);
         rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
-
     }
 
     @Test
@@ -74,20 +76,24 @@ public class EventSourceTopologyTest {
     }
 
     private void constructorTestHelper(){
+        RpcRegistration<EventAggregatorService> aggregatorRpcReg = mock(RpcRegistration.class);
+        EventSourceService eventSourceService = mock(EventSourceService.class);
+        doReturn(aggregatorRpcReg).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
+        doReturn(eventSourceService).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
         WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
         doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
-        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
+        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class),eq(true));
         CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
         doReturn(checkedFutureMock).when(writeTransactionMock).submit();
     }
 
-    @Test
-    public void createTopicTest() throws Exception{
-        createTopicTestHelper();
-        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
-    }
+//TODO: create test for createTopic
+//    public void createTopicTest() throws Exception{
+//        createTopicTestHelper();
+//        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
+//    }
 
-    private void createTopicTestHelper() throws Exception{
+    private void topicTestHelper() throws Exception{
         constructorTestHelper();
         createTopicInputMock = mock(CreateTopicInput.class);
         eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock);
@@ -126,35 +132,25 @@ public class EventSourceTopologyTest {
 
     @Test
     public void destroyTopicTest() throws Exception{
-        createTopicTestHelper();
+        topicTestHelper();
+        //TODO: modify test when destroyTopic will be implemented
         DestroyTopicInput destroyTopicInput = null;
         assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
     }
 
-    @Test
-    public void closeTest() throws Exception{
-        BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
-        doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
-        doNothing().when(rpcRegistrationMock).close();
-        createTopicTestHelper();
-        eventSourceTopology.createTopic(createTopicInputMock);
-        eventSourceTopology.close();
-        verify(rpcRegistrationMock, times(1)).close();
-    }
-
     @Test
     public void registerTest() throws Exception {
-        createTopicTestHelper();
+        topicTestHelper();
         Node nodeMock = mock(Node.class);
-        NetconfEventSource netconfEventSourceMock = mock(NetconfEventSource.class);
-
+        EventSource eventSourceMock = mock(EventSource.class);
         NodeId nodeId = new NodeId("nodeIdValue1");
         nodeKey = new NodeKey(nodeId);
         doReturn(nodeKey).when(nodeMock).getKey();
-
+        doReturn(nodeKey).when(eventSourceMock).getSourceNodeKey();
         BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class);
-        doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, netconfEventSourceMock);
-        eventSourceTopology.register(nodeMock, netconfEventSourceMock);
+        doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, eventSourceMock);
+        doNothing().when(routedRpcRegistrationMock).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
+        eventSourceTopology.register(eventSourceMock);
         verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
     }
 
index 911c5db1c180884f84ddf3e91e0ac5125c8ca61b..61fa30f40ece04082238ca35732c8d84192b6e9d 100644 (file)
@@ -7,7 +7,20 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Optional;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -22,7 +35,11 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
@@ -34,31 +51,19 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.notNull;
+import com.google.common.base.Optional;
 
 public class NetconfEventSourceManagerTest {
 
+    private static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
     NetconfEventSourceManager netconfEventSourceManager;
     ListenerRegistration listenerRegistrationMock;
     DOMMountPointService domMountPointServiceMock;
     MountPointService mountPointServiceMock;
     EventSourceTopology eventSourceTopologyMock;
     AsyncDataChangeEvent asyncDataChangeEventMock;
-
+    RpcProviderRegistry rpcProviderRegistryMock;
+    EventSourceRegistry eventSourceRegistry;
     @BeforeClass
     public static void initTestClass() throws IllegalAccessException, InstantiationException {
     }
@@ -70,58 +75,76 @@ public class NetconfEventSourceManagerTest {
         domMountPointServiceMock = mock(DOMMountPointService.class);
         mountPointServiceMock = mock(MountPointService.class);
         eventSourceTopologyMock = mock(EventSourceTopology.class);
+        rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+        eventSourceRegistry = mock(EventSourceRegistry.class);
         List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
 
         listenerRegistrationMock = mock(ListenerRegistration.class);
         doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
 
-        netconfEventSourceManager = new NetconfEventSourceManager(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock,
-                mountPointServiceMock, eventSourceTopologyMock, namespaceToStreamList);
+        netconfEventSourceManager =
+                NetconfEventSourceManager.create(dataBrokerMock,
+                                                 domNotificationPublishServiceMock,
+                                                 domMountPointServiceMock,
+                                                 mountPointServiceMock,
+                                                 eventSourceRegistry,
+                                                 namespaceToStreamList);
     }
 
     @Test
-    public void constructorTest() {
-        assertNotNull("Instance has not been created correctly.", netconfEventSourceManager);
+    public void onDataChangedCreateEventSourceTestByCreateEntry() throws InterruptedException, ExecutionException {
+        onDataChangedTestHelper(true,false,true,notification_capability_prefix);
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
     }
 
     @Test
-    public void onDataChangedTest() {
-        AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
-        Map<InstanceIdentifier, DataObject> map = new HashMap<>();
-        InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
-        Node dataObjectMock = mock(Node.class);
-        map.put(instanceIdentifierMock, dataObjectMock);
-        doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
-        doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+    public void onDataChangedCreateEventSourceTestByUpdateEntry() throws InterruptedException, ExecutionException {
+        onDataChangedTestHelper(false,true,true, notification_capability_prefix);
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
+    }
+
+    @Test
+    public void onDataChangedCreateEventSourceTestNotNeconf() throws InterruptedException, ExecutionException {
+        onDataChangedTestHelper(false,true,false,notification_capability_prefix);
         netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-        verify(dataObjectMock, times(2)).getAugmentation(NetconfNode.class);
+        verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
     }
 
     @Test
-    public void onDataChangedCreateEventSourceTest() {
-        onDataChangedCreateEventSourceTestHelper();
+    public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws InterruptedException, ExecutionException {
+        onDataChangedTestHelper(false,true,true,"bad-prefix");
         netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-        verify(eventSourceTopologyMock, times(1)).register(any(Node.class), any(NetconfEventSource.class));
+        verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
     }
 
-    private void onDataChangedCreateEventSourceTestHelper(){
+    private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws InterruptedException, ExecutionException{
         asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
-        Map<InstanceIdentifier, DataObject> map = new HashMap<>();
+        Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
+        Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
         InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
         Node dataObjectMock = mock(Node.class);
-        map.put(instanceIdentifierMock, dataObjectMock);
-        doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
-        doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
-
+        if(create){
+            mapCreate.put(instanceIdentifierMock, dataObjectMock);
+        }
+        if(update){
+            mapUpdate.put(instanceIdentifierMock, dataObjectMock);
+        }
+        doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
+        doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
         NetconfNode netconfNodeMock = mock(NetconfNode.class);
         AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
-        doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
-        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
-        List<String> availableCapabilityList = new ArrayList<>();
-        availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
-        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
-
-        doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+        if(isNetconf){
+            doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
+            doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+            List<String> availableCapabilityList = new ArrayList<>();
+            availableCapabilityList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
+            doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+            doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+        } else {
+            doReturn(null).when(dataObjectMock).getAugmentation(NetconfNode.class);
+        }
 
         Optional optionalMock = mock(Optional.class);
         Optional optionalBindingMountMock = mock(Optional.class);
@@ -144,37 +167,8 @@ public class NetconfEventSourceManagerTest {
         doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
         doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
         doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+        EventSourceRegistrationImpl esrMock = mock(EventSourceRegistrationImpl.class);
+        doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class));
     }
 
-    @Test
-    public void isEventSourceTest() {
-        Node nodeMock = mock(Node.class);
-        NetconfNode netconfNodeMock = mock(NetconfNode.class);
-        AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
-        doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
-        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
-        List<String> availableCapabilityList = new ArrayList<>();
-        availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
-        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
-        assertTrue("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
-    }
-
-    @Test
-    public void isNotEventSourceTest() {
-        Node nodeMock = mock(Node.class);
-        NetconfNode netconfNodeMock = mock(NetconfNode.class);
-        AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
-        doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
-        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
-        List<String> availableCapabilityList = new ArrayList<>();
-        availableCapabilityList.add("availableCapabilityString1");
-        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
-        assertFalse("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
-    }
-
-    @Test
-    public void closeTest() {
-        netconfEventSourceManager.close();
-        verify(listenerRegistrationMock, times(1)).close();
-    }
 }
index 73117c12bae74c25e51475ea6e03c5b121be4adb..58da9e3eb13dfe54ec3eb41f74b12f8681b8d95b 100644 (file)
@@ -7,22 +7,36 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.BindingService;
 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.md.sal.dom.api.DOMService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSource;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -33,38 +47,22 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev1
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-import org.opendaylight.yangtools.yang.common.QName;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 
 public class NetconfEventSourceTest {
 
     NetconfEventSource netconfEventSource;
     DOMMountPoint domMountPointMock;
     JoinTopicInput joinTopicInputMock;
-
-    @BeforeClass
-    public static void initTestClass() throws IllegalAccessException, InstantiationException {
-    }
+    AsyncDataChangeEvent asyncDataChangeEventMock;
+    Node dataObjectMock;
 
     @Before
     public void setUp() throws Exception {
@@ -81,62 +79,16 @@ public class NetconfEventSourceTest {
         doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
         doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
         doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
-        netconfEventSource = new NetconfEventSource("nodeId1", streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
-    }
-
-    @Test
-    public void constructorTest() {
-        assertNotNull("Instance has not been created correctly.", netconfEventSource);
-    }
-
-    @Test
-    public void joinTopicTest() throws Exception{
-        joinTopicTestHelper();
-        assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
-    }
-
-    private void joinTopicTestHelper() throws Exception{
-        joinTopicInputMock = mock(JoinTopicInput.class);
-        NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
-        doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
-        doReturn("regexString1").when(notificationPatternMock).getValue();
-
-        SchemaContext schemaContextMock = mock(SchemaContext.class);
-        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
-        Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
-        NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
-        notificationDefinitionSet.add(notificationDefinitionMock);
-
-        URI uri = new URI("uriStr1");
-        QName qName = new QName(uri, "localName1");
-        org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
-        doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
-        doReturn(schemaPath).when(notificationDefinitionMock).getPath();
-
-        Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
-        doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
-        doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
-
-        DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
-        doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
-        ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
-        doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
-    }
-
-    @Test (expected=NullPointerException.class)
-    public void onNotificationTest() {
-        DOMNotification domNotificationMock = mock(DOMNotification.class);
-        ContainerNode containerNodeMock = mock(ContainerNode.class);
-        SchemaContext schemaContextMock = mock(SchemaContext.class);
-        SchemaPath schemaPathMock = mock(SchemaPath.class);
-        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
-        doReturn(schemaPathMock).when(domNotificationMock).getType();
-        doReturn(containerNodeMock).when(domNotificationMock).getBody();
-        netconfEventSource.onNotification(domNotificationMock);
+        org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node
+            = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
+        org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId
+            = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
+        doReturn(nodeId).when(node).getNodeId();
+        netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
     }
 
     @Test
-    public void onDataChangedTest() {
+    public void onDataChangedTest(){
         InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
                 .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
         AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
@@ -145,20 +97,23 @@ public class NetconfEventSourceTest {
         dataChangeMap.put(brmIdent, dataObjectMock);
         doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData();
         doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
-
+        doReturn(true).when(dataObjectMock).isConnected();
         netconfEventSource.onDataChanged(asyncDataChangeEventMock);
         verify(dataObjectMock, times(2)).isConnected();
     }
 
     @Test
     public void onDataChangedResubscribeTest() throws Exception{
+
         InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
                 .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
+
         AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
         NetconfNode dataObjectMock = mock(NetconfNode.class);
         Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
         dataChangeMap.put(brmIdent, dataObjectMock);
         doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
+        doReturn(new HashMap<InstanceIdentifier, DataObject>()).when(asyncDataChangeEventMock).getOriginalData();
         doReturn(true).when(dataObjectMock).isConnected();
 
         Set<String> localSet = getActiveStreams();
@@ -176,6 +131,44 @@ public class NetconfEventSourceTest {
         assertEquals("Size of set has not been set correctly.", 1, getActiveStreams().size());
     }
 
+    @Test
+    public void joinTopicTest() throws Exception{
+        joinTopicTestHelper();
+        assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
+    }
+
+    private void joinTopicTestHelper() throws Exception{
+        joinTopicInputMock = mock(JoinTopicInput.class);
+        TopicId topicId = new TopicId("topicID007");
+        doReturn(topicId).when(joinTopicInputMock).getTopicId();
+        NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
+        doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
+        doReturn("regexString1").when(notificationPatternMock).getValue();
+
+        SchemaContext schemaContextMock = mock(SchemaContext.class);
+        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+        Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
+        NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
+        notificationDefinitionSet.add(notificationDefinitionMock);
+
+        URI uri = new URI("uriStr1");
+        QName qName = new QName(uri, "localName1");
+        org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
+        doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
+        doReturn(schemaPath).when(notificationDefinitionMock).getPath();
+
+        Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
+        doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
+        doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
+
+        DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
+        doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
+        ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
+        doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
+    }
+
+//TODO: create Test for NetConfEventSource#onNotification
+
     private Set getActiveStreams() throws Exception{
         Field nesField = NetconfEventSource.class.getDeclaredField("activeStreams");
         nesField.setAccessible(true);
index 4872127e837750c0ef3cd613b31451030ec05dd6..6dacb9738a18f9f5d12987eecdc99b8574bcbb06 100644 (file)
@@ -7,6 +7,11 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -14,12 +19,9 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
 public class TopicDOMNotificationTest {
 
+    private static final String containerNodeBodyMockToString = "containerNodeBodyMock";
     ContainerNode containerNodeBodyMock;
     TopicDOMNotification topicDOMNotification;
 
@@ -30,6 +32,7 @@ public class TopicDOMNotificationTest {
     @Before
     public void setUp() throws Exception {
         containerNodeBodyMock = mock(ContainerNode.class);
+        doReturn(containerNodeBodyMockToString).when(containerNodeBodyMock).toString();
         topicDOMNotification = new TopicDOMNotification(containerNodeBodyMock);
     }
 
@@ -51,7 +54,7 @@ public class TopicDOMNotificationTest {
 
     @Test
     public void getToStringTest() {
-        String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMock + "]";
+        String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMockToString + "]";
         assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString());
     }
 }
index 2ff4654155402231c43e1722374ee7fa0088b452..a88c609a263317deb9ed4906893150f52390a219 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -19,32 +19,12 @@ import org.junit.Test;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
+/**
+ * @author ppalmar
+ *
+ */
 public class UtilTest {
 
-    @Test
-    public void testMD5Hash() throws Exception {
-        // empty string
-        createAndAssertHash("", "d41d8cd98f00b204e9800998ecf8427e");
-
-        // non-empty string
-        createAndAssertHash("The Guardian", "69b929ae473ed732d5fb8e0a55a8dc8d");
-
-        // the same hash for the same string
-        createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c");
-        createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c"); // one more time
-
-        // different strings must have different hashes
-        createAndAssertHash("orange", "fe01d67a002dfa0f3ac084298142eccd");
-        createAndAssertHash("yellow", "d487dd0b55dfcacdd920ccbdaeafa351");
-    }
-
-    //TODO: IllegalArgumentException would be better
-    @Test(expected = RuntimeException.class)
-    public void testMD5HashInvalidInput() throws Exception {
-        Util.md5String(null);
-    }
-
     @Test
     public void testWildcardToRegex() throws Exception {
         // empty wildcard string
@@ -73,14 +53,14 @@ public class UtilTest {
     public void testResultFor() throws Exception {
         {
             final String expectedResult = "dummy string";
-            RpcResult<String> rpcResult = Util.resultFor(expectedResult).get();
+            RpcResult<String> rpcResult = Util.resultRpcSuccessFor(expectedResult).get();
             assertEquals(expectedResult, rpcResult.getResult());
             assertTrue(rpcResult.isSuccessful());
             assertTrue(rpcResult.getErrors().isEmpty());
         }
         {
             final Integer expectedResult = 42;
-            RpcResult<Integer> rpcResult = Util.resultFor(expectedResult).get();
+            RpcResult<Integer> rpcResult = Util.resultRpcSuccessFor(expectedResult).get();
             assertEquals(expectedResult, rpcResult.getResult());
             assertTrue(rpcResult.isSuccessful());
             assertTrue(rpcResult.getErrors().isEmpty());
@@ -125,10 +105,6 @@ public class UtilTest {
         }
     }
 
-    private static void createAndAssertHash(final String inString, final String expectedHash) {
-        assertEquals("Incorrect hash.", expectedHash, Util.md5String(inString));
-    }
-
     private static void createAndAssertRegex(final String wildcardStr, final String expectedRegex) {
         assertEquals("Incorrect regex string.", expectedRegex, Util.wildcardToRegex(wildcardStr));
     }
diff --git a/opendaylight/md-sal/messagebus-spi/pom.xml b/opendaylight/md-sal/messagebus-spi/pom.xml
new file mode 100644 (file)
index 0000000..f31b37f
--- /dev/null
@@ -0,0 +1,100 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>sal-parent</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>messagebus-spi</artifactId>
+  <name>${project.artifactId}</name>
+
+  <packaging>bundle</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>messagebus-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>config-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-model-api</artifactId>
+    </dependency>
+  </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-sources</goal>
+                        </goals>
+                        <configuration>
+                            <codeGenerators>
+                                <generator>
+                                    <codeGeneratorClass>
+                                        org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl
+                                    </codeGeneratorClass>
+                                    <outputBaseDir>
+                                        ${project.build.directory}/generated-sources/sal
+                                    </outputBaseDir>
+                                </generator>
+                                <generator>
+                                    <codeGeneratorClass>
+                                        org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+                                    </codeGeneratorClass>
+                                    <outputBaseDir>${project.build.directory}/generated-sources/config</outputBaseDir>
+                                    <additionalConfiguration>
+                                        <namespaceToPackage1>
+                                            urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang
+                                        </namespaceToPackage1>
+                                    </additionalConfiguration>
+                                </generator>
+                                <generator>
+                                    <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>
+                                    <outputBaseDir>target/site/models</outputBaseDir>
+                                </generator>
+                            </codeGenerators>
+                            <inspectDependencies>true</inspectDependencies>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/config</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+  <scm>
+    <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+    <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+    <tag>HEAD</tag>
+    <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+  </scm>
+</project>
diff --git a/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSource.java b/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSource.java
new file mode 100644 (file)
index 0000000..6a6266a
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+import java.util.List;
+
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Event source is a node in topology which is able to produces notifications.
+ * To register event source you use {@link EventSourceRegistry#registerEventSource(EventSource)()}.
+ * EventSourceRegistry will request registered event source to publish notifications
+ * whenever EventSourceRegistry has been asked to publish a certain type of notifications.
+ * EventSourceRegistry will call method JoinTopic to request EventSource to publish notification.
+ * Event source must implement method JoinTopic (from superinterface {@link EventSourceService}).
+ */
+
+public interface EventSource extends EventSourceService, AutoCloseable {
+
+    /**
+     * Identifier of node associated with event source
+     *
+     * @return instance of NodeKey
+     */
+    NodeKey getSourceNodeKey();
+
+    /**
+     * List the types of notifications which source can produce.
+     *
+     * @return list of available notification
+     */
+    List<SchemaPath> getAvailableNotifications();
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistration.java b/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistration.java
new file mode 100644 (file)
index 0000000..06af7c1
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
+/**
+ * Instance of EventSourceRegistration is returned by {@link EventSourceRegistry#registerEventSource(EventSource)}
+ * and it is used to unregister EventSource.
+ *
+ */
+public interface EventSourceRegistration <T extends EventSource> extends ObjectRegistration<T>{
+
+    @Override
+    public void close();
+
+}
diff --git a/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistry.java b/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistry.java
new file mode 100644 (file)
index 0000000..10d3b5b
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+/**
+ *EventSourceRegistry is used to register {@link EventSource}.
+ *
+ */
+public interface EventSourceRegistry extends AutoCloseable {
+
+    /**
+     * Registers the given EventSource for public consumption. The EventSource is
+     * associated with the node identified via {@linkEventSource#getSourceNodeKey}.
+     *
+     * @param eventSource the EventSource instance to register
+     * @return an EventSourceRegistration instance that is used to unregister the EventSource via {@link EventSourceRegistrationImpl#close()}.
+     */
+    <T extends EventSource> EventSourceRegistration<T> registerEventSource(T eventSource);
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-spi/src/main/yang/messagebus-event-source-registry.yang b/opendaylight/md-sal/messagebus-spi/src/main/yang/messagebus-event-source-registry.yang
new file mode 100644 (file)
index 0000000..4c5a47c
--- /dev/null
@@ -0,0 +1,21 @@
+module messagebus-event-source-registry {
+    yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry";
+    prefix "mb-esr";
+
+    import config { prefix config; revision-date 2013-04-05; }
+
+    description
+            "Event source registry service interface definition for MessageBus";
+
+     revision "2015-04-02" {
+             description
+                 "Initial revision";
+         }
+
+     identity event-source-registry {
+             base "config:service-type";
+             config:java-class "org.opendaylight.controller.messagebus.spi.EventSourceRegistry";
+     }
+
+}
index bf30a1684239fc0ef0c9b1c64c4aab5dc15f5f11..c0587652c075c6424e5e5a64365ae7c79df8e411 100644 (file)
@@ -83,6 +83,7 @@
 
     <!-- Message Bus -->
     <module>messagebus-api</module>
+    <module>messagebus-spi</module>
     <module>messagebus-impl</module>
     <module>messagebus-config</module>
   </modules>
       </modules>
     </profile>
   </profiles>
-</project>
\ No newline at end of file
+</project>
index 678ac34e39d25d54fc12316fc17df1d51c5ed28f..3dc6e4030f56953620ab8762ecae7c32d4ffbd74 100644 (file)
@@ -65,7 +65,17 @@ public interface DataObjectModification<T extends DataObject> extends org.openda
     @Nonnull ModificationType getModificationType();
 
     /**
-     * Returns after state of top level container.
+     * Returns before-state of top level container. Implementations are encouraged,
+     * but not required to provide this state.
+     *
+     * @param root Class representing data container
+     * @return State of object before modification. Null if subtree was not present,
+     *         or the implementation cannot provide the state.
+     */
+    @Nullable T getDataBefore();
+
+    /**
+     * Returns after-state of top level container.
      *
      * @param root Class representing data container
      * @return State of object after modification. Null if subtree is not present.
index c1c23d5e6fc7d3d17f24c9a7613375fb6b4881e3..b86d31b79082db4641126f90d9123eb9e8fb75c2 100644 (file)
@@ -44,7 +44,7 @@ public final class DataTreeIdentifier<T extends DataObject> implements Immutable
      *
      * @return Instance identifier corresponding to the root node.
      */
-    public @Nonnull InstanceIdentifier<?> getRootIdentifier() {
+    public @Nonnull InstanceIdentifier<T> getRootIdentifier() {
         return rootIdentifier;
     }
 
index a165242b30f08d29902240d868b24ca84464fde9..83d48f77a0889ca9019d88e00cf7f8bb08ba44c6 100644 (file)
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
  *
  * @param <T> Type of Binding Data Object
  */
-class LazyDataObjectModification<T extends DataObject> implements DataObjectModification<T> {
+final class LazyDataObjectModification<T extends DataObject> implements DataObjectModification<T> {
 
     private final static Logger LOG = LoggerFactory.getLogger(LazyDataObjectModification.class);
 
@@ -57,7 +57,7 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
         return new LazyDataObjectModification<>(codec,domData);
     }
 
-    static Collection<DataObjectModification<? extends DataObject>> from(final BindingCodecTreeNode<?> parentCodec,
+    private static Collection<DataObjectModification<? extends DataObject>> from(final BindingCodecTreeNode<?> parentCodec,
             final Collection<DataTreeCandidateNode> domChildNodes) {
         final ArrayList<DataObjectModification<? extends DataObject>> result = new ArrayList<>(domChildNodes.size());
         populateList(result, parentCodec, domChildNodes);
@@ -79,7 +79,7 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
                             parentCodec.yangPathArgumentChild(domChildNode.getIdentifier());
                     populateList(result,type, childCodec, domChildNode);
                 } catch (final IllegalArgumentException e) {
-                    if(type == BindingStructuralType.UNKNOWN) {
+                    if (type == BindingStructuralType.UNKNOWN) {
                         LOG.debug("Unable to deserialize unknown DOM node {}",domChildNode,e);
                     } else {
                         LOG.debug("Binding representation for DOM node {} was not found",domChildNode,e);
@@ -89,7 +89,6 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
         }
     }
 
-
     private static void populateList(final List<DataObjectModification<? extends DataObject>> result,
             final BindingStructuralType type, final BindingCodecTreeNode<?> childCodec,
             final DataTreeCandidateNode domChildNode) {
@@ -116,6 +115,11 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
         }
     }
 
+    @Override
+    public T getDataBefore() {
+        return deserialize(domData.getDataBefore());
+    }
+
     @Override
     public T getDataAfter() {
         return deserialize(domData.getDataAfter());
@@ -149,8 +153,8 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
 
     @Override
     public Collection<DataObjectModification<? extends DataObject>> getModifiedChildren() {
-        if(childNodesCache == null) {
-            childNodesCache = from(codec,domData.getChildNodes());
+        if (childNodesCache == null) {
+            childNodesCache = from(codec, domData.getChildNodes());
         }
         return childNodesCache;
     }
@@ -191,7 +195,7 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
     }
 
     private T deserialize(final Optional<NormalizedNode<?, ?>> dataAfter) {
-        if(dataAfter.isPresent()) {
+        if (dataAfter.isPresent()) {
             return codec.deserialize(dataAfter.get());
         }
         return null;
index 59c9298499c4ed0a58961b57fe40019f15214de1..5f9cc83618b760705a73a02d1260df5a77ecda03 100644 (file)
@@ -16,12 +16,16 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -178,6 +183,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return false;
     }
 
+    private boolean isRootPath(YangInstanceIdentifier path){
+        return !path.getPathArguments().iterator().hasNext();
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
 
@@ -186,21 +195,62 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} read {}", getIdentifier(), path);
 
-        throttleOperation();
-
         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.readData(path, proxyFuture);
-            }
-        });
+        if(isRootPath(path)){
+            readAllData(path, proxyFuture);
+        } else {
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, proxyFuture);
+                }
+            });
+
+        }
 
         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
+    private void readAllData(final YangInstanceIdentifier path,
+                             final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+        Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+        List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
+
+        for(String shardName : allShardNames){
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
+
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, subProxyFuture);
+                }
+            });
+
+            futures.add(subProxyFuture);
+        }
+
+        final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
+
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+                            future.get(), actorContext.getSchemaContext()));
+                } catch (InterruptedException | ExecutionException e) {
+                    proxyFuture.setException(e);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
+    }
+
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
 
@@ -409,6 +459,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
+        return getOrCreateTxFutureCallback(shardName);
+    }
+
+    private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
index f53368d8869416cb28340ab272aedcbfab449512..17d988005fadf32c8209837671d43d0aa2981b60 100644 (file)
@@ -540,6 +540,10 @@ public class ActorContext {
         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
     }
 
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
         return ask(actorRef, message, timeout);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java
new file mode 100644 (file)
index 0000000..eb13078
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class NormalizedNodeAggregator {
+
+    private static final ExecutorService executorService = MoreExecutors.newDirectExecutorService();
+
+    private final YangInstanceIdentifier rootIdentifier;
+    private final List<Optional<NormalizedNode<?, ?>>> nodes;
+    private final InMemoryDOMDataStore dataStore;
+
+    NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List<Optional<NormalizedNode<?, ?>>> nodes,
+                             SchemaContext schemaContext){
+
+        this.rootIdentifier = rootIdentifier;
+        this.nodes = nodes;
+        this.dataStore = new InMemoryDOMDataStore("aggregator", executorService);
+        this.dataStore.onGlobalContextUpdated(schemaContext);
+    }
+
+    /**
+     * Combine data from all the nodes in the list into a tree with root as rootIdentifier
+     *
+     * @param nodes
+     * @param schemaContext
+     * @return
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    public static Optional<NormalizedNode<?,?>> aggregate(YangInstanceIdentifier rootIdentifier,
+                                                          List<Optional<NormalizedNode<?, ?>>> nodes,
+                                                          SchemaContext schemaContext)
+            throws ExecutionException, InterruptedException {
+        return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext).aggregate();
+    }
+
+    private Optional<NormalizedNode<?,?>> aggregate() throws ExecutionException, InterruptedException {
+        return combine().getRootNode();
+    }
+
+    private NormalizedNodeAggregator combine() throws InterruptedException, ExecutionException {
+        DOMStoreWriteTransaction domStoreWriteTransaction = dataStore.newWriteOnlyTransaction();
+
+        for(Optional<NormalizedNode<?,?>> node : nodes) {
+            if(node.isPresent()) {
+                domStoreWriteTransaction.merge(rootIdentifier, node.get());
+            }
+        }
+        DOMStoreThreePhaseCommitCohort ready = domStoreWriteTransaction.ready();
+        ready.canCommit().get();
+        ready.preCommit().get();
+        ready.commit().get();
+
+        return this;
+    }
+
+    private Optional<NormalizedNode<?, ?>> getRootNode() throws InterruptedException, ExecutionException {
+        DOMStoreReadTransaction readTransaction = dataStore.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
+                readTransaction.read(rootIdentifier);
+
+        return read.get();
+    }
+
+
+}
index a2471001864c1571a8a52d4abf0b8f41e7571ea3..29a5b09c5c8e33fcb8892054564d4264dd2a711d 100644 (file)
@@ -9,6 +9,7 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -20,11 +21,14 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
@@ -45,14 +49,18 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
@@ -1353,4 +1361,79 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
                 BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
     }
+
+    @Test
+    public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
+
+        SchemaContext schemaContext = SchemaContextHelper.full();
+        Configuration configuration = mock(Configuration.class);
+        doReturn(configuration).when(mockActorContext).getConfiguration();
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
+
+        NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
+
+        setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
+        setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
+
+        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+                YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+        NormalizedNode<?, ?> normalizedNode = readOptional.get();
+
+        assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
+
+        Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
+
+        for(NormalizedNode<?,?> node : collection){
+            assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
+        }
+
+        assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
+                NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
+
+        assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
+
+        assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
+                NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
+
+        assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
+    }
+
+
+    private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(getSystem().actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
+                when(mockActorContext).findPrimaryShardAsync(eq(shardName));
+
+        doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
+
+        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(actorSystem.actorSelection(txActorRef.path())).
+                when(mockActorContext).actorSelection(txActorRef.path().toString());
+
+        doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, TransactionType.READ_ONLY));
+
+        doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java
new file mode 100644 (file)
index 0000000..40d3704
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class NormalizedNodeAggregatorTest {
+
+    @Test
+    public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException {
+        SchemaContext schemaContext = SchemaContextHelper.full();
+        NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
+
+        Optional<NormalizedNode<?, ?>> optional = NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+                Lists.newArrayList(
+                        Optional.<NormalizedNode<?, ?>>of(getRootNode(expectedNode1, schemaContext)),
+                        Optional.<NormalizedNode<?, ?>>of(getRootNode(expectedNode2, schemaContext))),
+                schemaContext);
+
+
+        NormalizedNode<?,?> normalizedNode = optional.get();
+
+        assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
+
+        Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
+
+        for(NormalizedNode<?,?> node : collection){
+            assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
+        }
+
+        assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
+                findChildWithQName(collection, TestModel.TEST_QNAME) != null);
+
+        assertEquals(expectedNode1, findChildWithQName(collection, TestModel.TEST_QNAME));
+
+        assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
+                findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
+
+        assertEquals(expectedNode2, findChildWithQName(collection, CarsModel.BASE_QNAME));
+
+    }
+
+    public static NormalizedNode<?,?> getRootNode(NormalizedNode<?, ?> moduleNode, SchemaContext schemaContext) throws ReadFailedException, ExecutionException, InterruptedException {
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", Executors.newSingleThreadExecutor());
+        store.onGlobalContextUpdated(schemaContext);
+
+        DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+
+        writeTransaction.merge(YangInstanceIdentifier.builder().node(moduleNode.getNodeType()).build(), moduleNode);
+
+        DOMStoreThreePhaseCommitCohort ready = writeTransaction.ready();
+
+        ready.canCommit().get();
+        ready.preCommit().get();
+        ready.commit().get();
+
+        DOMStoreReadTransaction readTransaction = store.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = readTransaction.read(YangInstanceIdentifier.builder().build());
+
+        Optional<NormalizedNode<?, ?>> nodeOptional = read.checkedGet();
+
+        return nodeOptional.get();
+    }
+
+    public static NormalizedNode<?,?> findChildWithQName(Collection<NormalizedNode<?, ?>> collection, QName qName) {
+        for(NormalizedNode<?,?> node : collection){
+            if(node.getNodeType().equals(qName)){
+                return node;
+            }
+        }
+
+        return null;
+    }
+
+}
\ No newline at end of file