Merge changes I114cbac1,I45c2e7cd
authorMoiz Raja <moraja@cisco.com>
Fri, 10 Apr 2015 15:21:28 +0000 (15:21 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 10 Apr 2015 15:21:29 +0000 (15:21 +0000)
* changes:
  Calculate replicated log data size on recovery
  Refactor snapshot message processing to RaftActorSnapshotMessageSupport

97 files changed:
features/netconf-connector/src/main/resources/features.xml
features/netconf/src/main/resources/features.xml
karaf/opendaylight-karaf/pom.xml
opendaylight/md-sal/mdsal-artifacts/pom.xml
opendaylight/md-sal/messagebus-config/src/main/resources/initial/05-message-bus.xml
opendaylight/md-sal/messagebus-impl/pom.xml
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModule.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/config/yang/messagebus/app/impl/Providers.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImpl.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopic.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopology.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/Util.java
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSource.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSource.java with 72% similarity]
opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/eventsources/netconf/NetconfEventSourceManager.java [moved from opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManager.java with 71% similarity]
opendaylight/md-sal/messagebus-impl/src/main/yang/messagebus-app-impl.yang
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleFactoryTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/config/yang/messagebus/app/impl/MessageBusAppImplModuleTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopicTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/EventSourceTopologyTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceManagerTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/NetconfEventSourceTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/TopicDOMNotificationTest.java
opendaylight/md-sal/messagebus-impl/src/test/java/org/opendaylight/controller/messagebus/app/impl/UtilTest.java
opendaylight/md-sal/messagebus-spi/pom.xml [new file with mode: 0644]
opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSource.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistration.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistry.java [new file with mode: 0644]
opendaylight/md-sal/messagebus-spi/src/main/yang/messagebus-event-source-registry.yang [new file with mode: 0644]
opendaylight/md-sal/pom.xml
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataObjectModification.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataTreeIdentifier.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/LazyDataObjectModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModifications.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/BatchedModificationsTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMDataTreeIdentifier.java
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/schema/mapping/NetconfMessageTransformer.java
opendaylight/netconf/mdsal-netconf-connector/src/main/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpc.java
opendaylight/netconf/mdsal-netconf-connector/src/test/java/org/opendaylight/controller/netconf/mdsal/connector/ops/RuntimeRpcTest.java
opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container-control.xml [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container.xml [new file with mode: 0644]
opendaylight/netconf/mdsal-netconf-connector/src/test/resources/yang/mdsal-netconf-rpc-test.yang
opendaylight/netconf/netconf-cli/pom.xml
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/Cli.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionHandler.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/NetconfDeviceConnectionManager.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/CommandDispatcher.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/input/Input.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Connect.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Disconnect.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/local/Help.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/output/Output.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/commands/remote/RemoteCommand.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/AbstractReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/Reader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/custom/ConfigReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/custom/EditContentReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/custom/FilterReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/AnyXmlReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/BasicDataHolderReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/ChoiceReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/ContainerReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/GenericListReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/GenericReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/reader/impl/ListEntryReader.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/Writer.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/custom/DataWriter.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/impl/AbstractWriter.java
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/impl/CompositeNodeWriter.java [deleted file]
opendaylight/netconf/netconf-cli/src/main/java/org/opendaylight/controller/netconf/cli/writer/impl/NormalizedNodeWriter.java
opendaylight/netconf/netconf-cli/src/test/java/org/opendaylight/controller/netconf/cli/NetconfCliTest.java
opendaylight/netconf/netconf-util/pom.xml
opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/OrderedNormalizedNodeWriter.java [new file with mode: 0644]
opendaylight/netconf/pom.xml

index 24a92bd..16af44f 100644 (file)
         <feature version='${project.version}'>odl-netconf-connector</feature>
         <feature version='${project.version}'>odl-mdsal-broker</feature>
         <bundle>mvn:org.opendaylight.controller/messagebus-api/${project.version}</bundle>
+        <bundle>mvn:org.opendaylight.controller/messagebus-spi/${project.version}</bundle>
         <bundle>mvn:org.opendaylight.controller/messagebus-impl/${project.version}</bundle>
         <configfile finalname="${config.configfile.directory}/05-message-bus.xml">mvn:org.opendaylight.controller/messagebus-config/${project.version}/xml/config</configfile>
     </feature>
index dbd940f..80b2e36 100644 (file)
@@ -36,6 +36,7 @@
   <feature name='odl-netconf-util' version='${project.version}'>
     <feature version='${project.version}'>odl-netconf-mapping-api</feature>
     <bundle>mvn:org.opendaylight.yangtools/yang-model-api/${yangtools.version}</bundle>
+    <bundle>mvn:org.opendaylight.yangtools/yang-data-api/${yangtools.version}</bundle>
     <bundle>mvn:org.opendaylight.controller/netconf-util/${project.version}</bundle>
   </feature>
     <feature name='odl-netconf-impl' version='${project.version}' description="OpenDaylight :: Netconf :: Impl">
index 9c02a9d..68238bb 100644 (file)
           <type>xml</type>
           <scope>runtime</scope>
       </dependency>
-    <dependency>
-      <groupId>org.opendaylight.controller</groupId>
-      <artifactId>features-flow</artifactId>
-      <classifier>features</classifier>
-      <type>xml</type>
-      <scope>runtime</scope>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>features-restconf</artifactId>
index d9f0f88..23b2ea1 100644 (file)
                 <artifactId>model-flow-statistics</artifactId>
                 <version>${project.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.opendaylight.controller</groupId>
-                <artifactId>features-flow</artifactId>
-                <version>${project.version}</version>
-                <classifier>features</classifier>
-                <type>xml</type>
-                <scope>runtime</scope>
-            </dependency>
 
             <!-- RESTCONF -->
             <dependency>
                 <artifactId>messagebus-api</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.opendaylight.controller</groupId>
+                <artifactId>messagebus-spi</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.opendaylight.controller</groupId>
                 <artifactId>messagebus-impl</artifactId>
index eed06cf..4714c07 100644 (file)
@@ -7,11 +7,11 @@
  and is available at http://www.eclipse.org/legal/epl-v10.html
 -->
 <snapshot>
-      <configuration>
+    <configuration>
       <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
           <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
               <module>
-                  <name>messagebus-app</name>
+                  <name>messagebus-app-impl</name>
                   <type xmlns:binding-impl="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">binding-impl:messagebus-app-impl</type>
                   <binding-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl">
                       <type xmlns:md-sal-binding="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding">md-sal-binding:binding-broker-osgi-registry</type>
                   </namespace-to-stream>
               </module>
           </modules>
+          <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+              <service>
+                  <type xmlns:mb-esr="urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry">mb-esr:event-source-registry</type>
+                    <instance>
+                        <name>messagebus-app-impl</name>
+                        <provider>/modules/module[type='messagebus-app-impl'][name='messagebus-app-impl']</provider>
+                    </instance>
+              </service>
+          </services>
       </data>
   </configuration>
   <required-capabilities>
       <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:app:impl?module=messagebus-app-impl&amp;revision=2015-02-03</capability>
+      <capability>urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry?module=messagebus-event-source-registry&amp;revision=2015-04-02</capability>
   </required-capabilities>
 </snapshot>
index 7e3b599..d43210d 100644 (file)
@@ -56,6 +56,11 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
             <artifactId>messagebus-api</artifactId>\r
             <version>1.2.0-SNAPSHOT</version>\r
         </dependency>\r
+        <dependency>\r
+            <groupId>org.opendaylight.controller</groupId>\r
+            <artifactId>messagebus-spi</artifactId>\r
+            <version>1.2.0-SNAPSHOT</version>\r
+        </dependency>\r
         <dependency>\r
             <groupId>org.opendaylight.controller</groupId>\r
             <artifactId>sal-netconf-connector</artifactId>\r
index 022292a..dd68714 100644 (file)
@@ -1,4 +1,4 @@
-/**
+/*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -15,7 +17,10 @@ import org.opendaylight.controller.md.sal.binding.api.MountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-import org.opendaylight.controller.messagebus.app.impl.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
@@ -23,8 +28,9 @@ import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MessageBusAppImplModule extends
-        org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
+import com.google.common.base.Preconditions;
+
+public class MessageBusAppImplModule extends org.opendaylight.controller.config.yang.messagebus.app.impl.AbstractMessageBusAppImplModule {
     private static final Logger LOGGER = LoggerFactory.getLogger(MessageBusAppImplModule.class);
 
     private BundleContext bundleContext;
@@ -52,37 +58,50 @@ public class MessageBusAppImplModule extends
 
     @Override
     public java.lang.AutoCloseable createInstance() {
-        final List<NamespaceToStream> namespaceMapping = getNamespaceToStream();
 
         final ProviderContext bindingCtx = getBindingBrokerDependency().registerProvider(new Providers.BindingAware());
         final ProviderSession domCtx = getDomBrokerDependency().registerProvider(new Providers.BindingIndependent());
-
         final DataBroker dataBroker = bindingCtx.getSALService(DataBroker.class);
         final DOMNotificationPublishService domPublish = domCtx.getService(DOMNotificationPublishService.class);
         final DOMMountPointService domMount = domCtx.getService(DOMMountPointService.class);
         final MountPointService bindingMount = bindingCtx.getSALService(MountPointService.class);
         final RpcProviderRegistry rpcRegistry = bindingCtx.getSALService(RpcProviderRegistry.class);
 
-        final EventSourceTopology eventSourceTopology = new EventSourceTopology(dataBroker, rpcRegistry);
-        final NetconfEventSourceManager eventSourceManager = new NetconfEventSourceManager(dataBroker, domPublish,
-                domMount, bindingMount, eventSourceTopology, getNamespaceToStream());
-
-        final AutoCloseable closer = new AutoCloseable() {
-            @Override
-            public void close() {
-                eventSourceTopology.close();
-                eventSourceManager.close();
-            }
-        };
+        final EventSourceRegistryWrapper eventSourceRegistryWrapper = new EventSourceRegistryWrapper(new EventSourceTopology(dataBroker, rpcRegistry));
+        final NetconfEventSourceManager netconfEventSourceManager = NetconfEventSourceManager.create(dataBroker, domPublish,domMount, bindingMount, eventSourceRegistryWrapper, getNamespaceToStream());
+        eventSourceRegistryWrapper.addAutoCloseable(netconfEventSourceManager);
+        LOGGER.info("Messagebus initialized");
+        return eventSourceRegistryWrapper;
 
-        return closer;
     }
 
-    private void closeProvider(final AutoCloseable closable) {
-        try {
-            closable.close();
-        } catch (final Exception e) {
-            LOGGER.error("Exception while closing: {}\n Exception: {}", closable, e);
+    //TODO: separate NetconfEventSource into separate bundle, remove this wrapper, return EventSourceTopology directly as EventSourceRegistry
+    private class EventSourceRegistryWrapper implements EventSourceRegistry{
+
+        private final EventSourceRegistry baseEventSourceRegistry;
+        private final Set<AutoCloseable> autoCloseables = new HashSet<>();
+
+        public EventSourceRegistryWrapper(EventSourceRegistry baseEventSourceRegistry) {
+            this.baseEventSourceRegistry = baseEventSourceRegistry;
         }
+
+        public void addAutoCloseable(AutoCloseable ac){
+            Preconditions.checkNotNull(ac);
+            autoCloseables.add(ac);
+        }
+
+        @Override
+        public void close() throws Exception {
+            for(AutoCloseable ac : autoCloseables){
+                ac.close();
+            }
+            baseEventSourceRegistry.close();
+        }
+
+        @Override
+        public <T extends EventSource> EventSourceRegistration<T> registerEventSource(T eventSource) {
+            return this.baseEventSourceRegistry.registerEventSource(eventSource);
+        }
+
     }
 }
diff --git a/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImpl.java b/opendaylight/md-sal/messagebus-impl/src/main/java/org/opendaylight/controller/messagebus/app/impl/EventSourceRegistrationImpl.java
new file mode 100644 (file)
index 0000000..d939090
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.app.impl;
+
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+import com.google.common.base.Preconditions;
+
+
+class EventSourceRegistrationImpl <T extends EventSource> extends AbstractObjectRegistration<T> implements EventSourceRegistration<T>{
+
+    private final EventSourceTopology eventSourceTopology;
+
+    /**
+     * @param instance of EventSource that has been registered by {@link EventSourceRegistryImpl#registerEventSource(Node, EventSource)}
+     */
+    public EventSourceRegistrationImpl(T instance, EventSourceTopology eventSourceTopology) {
+        super(instance);
+        this.eventSourceTopology = Preconditions.checkNotNull(eventSourceTopology);
+    }
+
+    @Override
+    protected void removeRegistration() {
+        this.eventSourceTopology.unRegister(getInstance());
+    }
+
+}
index 98e168e..13e50b5 100644 (file)
@@ -8,9 +8,9 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.regex.Pattern;
+
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
@@ -18,12 +18,17 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class EventSourceTopic implements DataChangeListener {
     private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(EventSourceTopic.class);
     private final NotificationPattern notificationPattern;
@@ -39,10 +44,7 @@ public class EventSourceTopic implements DataChangeListener {
         final String regex = Util.wildcardToRegex(nodeIdPattern);
         this.nodeIdPattern = Pattern.compile(regex);
 
-
-        // FIXME: We need to perform some salting in order to make
-        //        the topic IDs less predictable.
-        this.topicId = new TopicId(Util.md5String(notificationPattern + nodeIdPattern));
+        this.topicId = new TopicId(Util.getUUIDIdent());
     }
 
     public TopicId getTopicId() {
@@ -62,8 +64,14 @@ public class EventSourceTopic implements DataChangeListener {
     }
 
     public void notifyNode(final InstanceIdentifier<?> nodeId) {
+
         try {
-            sourceService.joinTopic(getJoinTopicInputArgument(nodeId));
+            RpcResult<JoinTopicOutput> rpcResultJoinTopic = sourceService.joinTopic(getJoinTopicInputArgument(nodeId)).get();
+            if(rpcResultJoinTopic.isSuccessful() == false){
+                for(RpcError err : rpcResultJoinTopic.getErrors()){
+                    LOG.error("Can not join topic: [{}] on node: [{}]. Error: {}",getTopicId().getValue(),nodeId.toString(),err.toString());
+                }
+            }
         } catch (final Exception e) {
             LOG.error("Could not invoke join topic for node {}", nodeId);
         }
@@ -80,5 +88,4 @@ public class EventSourceTopic implements DataChangeListener {
         return jti;
     }
 
-
 }
index 076d1b2..10b9ec8 100644 (file)
@@ -20,6 +20,10 @@ import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
@@ -57,7 +61,8 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 
-public class EventSourceTopology implements EventAggregatorService, AutoCloseable {
+
+public class EventSourceTopology implements EventAggregatorService, EventSourceRegistry {
     private static final Logger LOG = LoggerFactory.getLogger(EventSourceTopology.class);
 
     private static final String TOPOLOGY_ID = "EVENT-SOURCE-TOPOLOGY" ;
@@ -73,8 +78,10 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
                     .child(TopologyTypes.class)
                     .augmentation(TopologyTypes1.class);
 
-    private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> registrations =
+    private final Map<DataChangeListener, ListenerRegistration<DataChangeListener>> topicListenerRegistrations =
             new ConcurrentHashMap<>();
+    private final Map<NodeKey, RoutedRpcRegistration<EventSourceService>> routedRpcRegistrations =
+            new ConcurrentHashMap<>();;
 
     private final DataBroker dataBroker;
     private final RpcRegistration<EventAggregatorService> aggregatorRpcReg;
@@ -91,7 +98,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
         final TopologyEventSource topologySource = new TopologyEventSourceBuilder().build();
         final TopologyTypes1 topologyTypeAugment = new TopologyTypes1Builder().setTopologyEventSource(topologySource).build();
         putData(OPERATIONAL, TOPOLOGY_TYPE_PATH, topologyTypeAugment);
-
+        LOG.info("EventSourceRegistry has been initialized");
     }
 
     private <T extends DataObject>  void putData(final LogicalDatastoreType store,
@@ -104,13 +111,24 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
 
     }
 
-    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath, final Node node) {
-        final NodeKey nodeKey = node.getKey();
+    private <T extends DataObject>  void deleteData(final LogicalDatastoreType store, final InstanceIdentifier<T> path){
+        final WriteTransaction tx = dataBroker.newWriteOnlyTransaction();
+        tx.delete(OPERATIONAL, path);
+        tx.submit();
+    }
+
+    private void insert(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath) {
+        final NodeKey nodeKey = sourcePath.getKey();
         final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
         final Node1 nodeAgument = new Node1Builder().setEventSourceNode(new NodeId(nodeKey.getNodeId().getValue())).build();
         putData(OPERATIONAL, augmentPath, nodeAgument);
     }
 
+    private void remove(final KeyedInstanceIdentifier<Node, NodeKey> sourcePath){
+        final InstanceIdentifier<Node1> augmentPath = sourcePath.augmentation(Node1.class);
+        deleteData(OPERATIONAL, augmentPath);
+    }
+
     private void notifyExistingNodes(final Pattern nodeIdPatternRegex, final EventSourceTopic eventSourceTopic){
 
         final ReadOnlyTransaction tx = dataBroker.newReadOnlyTransaction();
@@ -151,7 +169,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
         final NotificationPattern notificationPattern = new NotificationPattern(input.getNotificationPattern());
         final String nodeIdPattern = input.getNodeIdPattern().getValue();
         final Pattern nodeIdPatternRegex = Pattern.compile(Util.wildcardToRegex(nodeIdPattern));
-        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, input.getNodeIdPattern().getValue(), eventSourceService);
+        final EventSourceTopic eventSourceTopic = new EventSourceTopic(notificationPattern, nodeIdPattern, eventSourceService);
 
         registerTopic(eventSourceTopic);
 
@@ -161,7 +179,7 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
                 .setTopicId(eventSourceTopic.getTopicId())
                 .build();
 
-        return Util.resultFor(cto);
+        return Util.resultRpcSuccessFor(cto);
     }
 
     @Override
@@ -172,23 +190,45 @@ public class EventSourceTopology implements EventAggregatorService, AutoCloseabl
     @Override
     public void close() {
         aggregatorRpcReg.close();
+        for(ListenerRegistration<DataChangeListener> reg : topicListenerRegistrations.values()){
+            reg.close();
+        }
     }
 
-    public void registerTopic(final EventSourceTopic listener) {
+    private void registerTopic(final EventSourceTopic listener) {
         final ListenerRegistration<DataChangeListener> listenerRegistration = dataBroker.registerDataChangeListener(OPERATIONAL,
                 EVENT_SOURCE_TOPOLOGY_PATH,
                 listener,
                 DataBroker.DataChangeScope.SUBTREE);
 
-        registrations.put(listener, listenerRegistration);
+        topicListenerRegistrations.put(listener, listenerRegistration);
+    }
+
+    public void register(final EventSource eventSource){
+    NodeKey nodeKey = eventSource.getSourceNodeKey();
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+        RoutedRpcRegistration<EventSourceService> reg = rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, eventSource);
+        reg.registerPath(NodeContext.class, sourcePath);
+        routedRpcRegistrations.put(nodeKey,reg);
+        insert(sourcePath);
     }
 
-    public void register(final Node node, final NetconfEventSource netconfEventSource) {
-        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, node.getKey());
-        rpcRegistry.addRoutedRpcImplementation(EventSourceService.class, netconfEventSource)
-            .registerPath(NodeContext.class, sourcePath);
-        insert(sourcePath,node);
-        // FIXME: Return registration object.
+    public void unRegister(final EventSource eventSource){
+        final NodeKey nodeKey = eventSource.getSourceNodeKey();
+        final KeyedInstanceIdentifier<Node, NodeKey> sourcePath = EVENT_SOURCE_TOPOLOGY_PATH.child(Node.class, nodeKey);
+        final RoutedRpcRegistration<EventSourceService> removeRegistration = routedRpcRegistrations.remove(nodeKey);
+        if(removeRegistration != null){
+            removeRegistration.close();
+        remove(sourcePath);
+        }
     }
 
+    @Override
+    public <T extends EventSource> EventSourceRegistration<T> registerEventSource(
+            T eventSource) {
+        EventSourceRegistrationImpl<T> esr = new EventSourceRegistrationImpl<>(eventSource, this);
+        register(eventSource);
+        return esr;
+    }
 }
+
index 1c0b8b3..d6bcbf2 100644 (file)
@@ -8,11 +8,9 @@
 
 package org.opendaylight.controller.messagebus.app.impl;
 
-import java.math.BigInteger;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
 
@@ -22,29 +20,15 @@ import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
 import com.google.common.util.concurrent.Futures;
 
-public final class Util {
-    private static final MessageDigest messageDigestTemplate = getDigestInstance();
-
-    private static MessageDigest getDigestInstance() {
-        try {
-            return MessageDigest.getInstance("MD5");
-        } catch (final NoSuchAlgorithmException e) {
-            throw new RuntimeException("Unable to get MD5 instance");
-        }
-    }
 
-    static String md5String(final String inputString) {
+public final class Util {
 
-        try {
-            final MessageDigest md = (MessageDigest)messageDigestTemplate.clone();
-            md.update(inputString.getBytes("UTF-8"), 0, inputString.length());
-            return new BigInteger(1, md.digest()).toString(16);
-        } catch (final Exception e) {
-            throw new RuntimeException("Unable to get MD5 instance");
-        }
+    public static String getUUIDIdent(){
+        UUID uuid = UUID.randomUUID();
+        return uuid.toString();
     }
 
-    public static <T> Future<RpcResult<T>> resultFor(final T output) {
+    public static <T> Future<RpcResult<T>> resultRpcSuccessFor(final T output) {
         final RpcResult<T> result = RpcResultBuilder.success(output).build();
         return Futures.immediateFuture(result);
     }
@@ -73,7 +57,7 @@ public final class Util {
      * @param wildcard
      * @return
      */
-    static String wildcardToRegex(final String wildcard){
+    public static String wildcardToRegex(final String wildcard){
         final StringBuffer s = new StringBuffer(wildcard.length());
         s.append('^');
         for (final char c : wildcard.toCharArray()) {
@@ -6,13 +6,16 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
+
+import static com.google.common.util.concurrent.Futures.immediateFuture;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.Future;
 import java.util.regex.Pattern;
@@ -30,21 +33,27 @@ import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.messagebus.app.impl.TopicDOMNotification;
+import org.opendaylight.controller.messagebus.app.impl.Util;
+import org.opendaylight.controller.messagebus.spi.EventSource;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.controller.sal.connect.netconf.util.NetconfMessageTransformUtil;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicNotification;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutput;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicStatus;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev140108.NetconfNode;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
@@ -63,7 +72,7 @@ import org.w3c.dom.Element;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 
-public class NetconfEventSource implements EventSourceService, DOMNotificationListener, DataChangeListener {
+public class NetconfEventSource implements EventSource, DOMNotificationListener, DataChangeListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NetconfEventSource.class);
 
@@ -74,23 +83,20 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
     private static final NodeIdentifier STREAM_QNAME = new NodeIdentifier(QName.create(CreateSubscriptionInput.QNAME,"stream"));
     private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath.create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
 
-
     private final String nodeId;
-
+    private final Node node;
 
     private final DOMMountPoint netconfMount;
     private final DOMNotificationPublishService domPublish;
-    private final NotificationsService notificationRpcService;
-
     private final Set<String> activeStreams = new ConcurrentSkipListSet<>();
 
     private final Map<String, String> urnPrefixToStreamMap;
+    private final ConcurrentHashMap<TopicId,ListenerRegistration<NetconfEventSource>> listenerRegistrationMap = new ConcurrentHashMap<>();
 
-
-    public NetconfEventSource(final String nodeId, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
+    public NetconfEventSource(final Node node, final Map<String, String> streamMap, final DOMMountPoint netconfMount, final DOMNotificationPublishService publishService, final MountPoint bindingMount) {
         this.netconfMount = netconfMount;
-        this.notificationRpcService = bindingMount.getService(RpcConsumerRegistry.class).get().getRpcService(NotificationsService.class);
-        this.nodeId = nodeId;
+        this.node = node;
+        this.nodeId = node.getNodeId().getValue();
         this.urnPrefixToStreamMap = streamMap;
         this.domPublish = publishService;
         LOG.info("NetconfEventSource [{}] created.", nodeId);
@@ -99,46 +105,37 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
     @Override
     public Future<RpcResult<JoinTopicOutput>> joinTopic(final JoinTopicInput input) {
         final NotificationPattern notificationPattern = input.getNotificationPattern();
-
-        // FIXME: default language should already be regex
-        final String regex = Util.wildcardToRegex(notificationPattern.getValue());
-
-        final Pattern pattern = Pattern.compile(regex);
-        final List<SchemaPath> matchingNotifications = Util.expandQname(availableNotifications(), pattern);
-        registerNotificationListener(matchingNotifications);
-        final JoinTopicOutput output = new JoinTopicOutputBuilder().build();
-        return com.google.common.util.concurrent.Futures.immediateFuture(RpcResultBuilder.success(output).build());
+        final List<SchemaPath> matchingNotifications = getMatchingNotifications(notificationPattern);
+        return registerNotificationListener(input.getTopicId(),matchingNotifications);
     }
 
-    private List<SchemaPath> availableNotifications() {
-        // FIXME: use SchemaContextListener to get changes asynchronously
-        final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
-        final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
-        for (final NotificationDefinition nd : availableNotifications) {
-            qNs.add(nd.getPath());
+    private synchronized Future<RpcResult<JoinTopicOutput>> registerNotificationListener(final TopicId topicId, final List<SchemaPath> notificationsToSubscribe){
+        if(listenerRegistrationMap.containsKey(topicId)){
+            final String errMessage = "Can not join topic twice. Topic " + topicId.getValue() + " has been joined to node " + this.nodeId;
+            return immediateFuture(RpcResultBuilder.<JoinTopicOutput>failed().withError(ErrorType.APPLICATION, errMessage).build());
         }
-        return qNs;
-    }
-
-    private void registerNotificationListener(final List<SchemaPath> notificationsToSubscribe) {
-
+        ListenerRegistration<NetconfEventSource> registration = null;
+        JoinTopicStatus joinTopicStatus = JoinTopicStatus.Down;
         final Optional<DOMNotificationService> notifyService = netconfMount.getService(DOMNotificationService.class);
+
         if(notifyService.isPresent()) {
             for (final SchemaPath qName : notificationsToSubscribe) {
                 startSubscription(qName);
             }
-            // FIXME: Capture registration
-            notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
+            registration = notifyService.get().registerNotificationListener(this, notificationsToSubscribe);
         }
+
+        if(registration != null){
+            listenerRegistrationMap.put(topicId,registration);
+            joinTopicStatus = JoinTopicStatus.Up;
+        }
+        final JoinTopicOutput output = new JoinTopicOutputBuilder().setStatus(joinTopicStatus).build();
+        return immediateFuture(RpcResultBuilder.success(output).build());
     }
 
     private void startSubscription(final SchemaPath path) {
         final String streamName = resolveStream(path.getLastComponent());
-
-        if (streamIsActive(streamName) == false) {
-            LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
-            startSubscription(streamName);
-        }
+        startSubscription(streamName);
     }
 
     private void resubscribeToActiveStreams() {
@@ -148,11 +145,14 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
     }
 
     private synchronized void startSubscription(final String streamName) {
-        final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
-            .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
-            .build();
-        netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
-        activeStreams.add(streamName);
+        if(streamIsActive(streamName) == false){
+            LOG.info("Stream {} is not active on node {}. Will subscribe.", streamName, nodeId);
+            final ContainerNode input = Builders.containerBuilder().withNodeIdentifier(new NodeIdentifier(CreateSubscriptionInput.QNAME))
+                    .withChild(ImmutableNodes.leafNode(STREAM_QNAME, streamName))
+                    .build();
+            netconfMount.getService(DOMRpcService.class).get().invokeRpc(CREATE_SUBSCRIPTION, input);
+            activeStreams.add(streamName);
+        }
     }
 
     private String resolveStream(final QName qName) {
@@ -194,7 +194,6 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
         final Optional<String> namespace = Optional.of(PAYLOAD_ARG.getNodeType().getNamespace().toString());
         final Element element = XmlUtil.createElement(doc , "payload", namespace);
 
-
         final DOMResult result = new DOMResult(element);
 
         final SchemaContext context = netconfMount.getSchemaContext();
@@ -238,4 +237,35 @@ public class NetconfEventSource implements EventSourceService, DOMNotificationLi
         return NetconfNode.class.equals(changeEntry.getKey().getTargetType());
     }
 
+    private List<SchemaPath> getMatchingNotifications(NotificationPattern notificationPattern){
+        // FIXME: default language should already be regex
+        final String regex = Util.wildcardToRegex(notificationPattern.getValue());
+
+        final Pattern pattern = Pattern.compile(regex);
+        return Util.expandQname(getAvailableNotifications(), pattern);
+    }
+
+    @Override
+    public void close() throws Exception {
+        for(ListenerRegistration<NetconfEventSource> registration : listenerRegistrationMap.values()){
+            registration.close();
+        }
+    }
+
+    @Override
+    public NodeKey getSourceNodeKey(){
+        return node.getKey();
+    }
+
+    @Override
+    public List<SchemaPath> getAvailableNotifications() {
+        // FIXME: use SchemaContextListener to get changes asynchronously
+        final Set<NotificationDefinition> availableNotifications = netconfMount.getSchemaContext().getNotifications();
+        final List<SchemaPath> qNs = new ArrayList<>(availableNotifications.size());
+        for (final NotificationDefinition nd : availableNotifications) {
+            qNs.add(nd.getPath());
+        }
+        return qNs;
+    }
+
 }
@@ -6,15 +6,15 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.controller.messagebus.app.impl;
+package org.opendaylight.controller.messagebus.eventsources.netconf;
 
-import com.google.common.base.Optional;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.opendaylight.controller.config.yang.messagebus.app.impl.NamespaceToStream;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.DataChangeListener;
@@ -26,6 +26,8 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistration;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields.ConnectionStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.network.topology.topology.topology.types.TopologyNetconf;
@@ -59,30 +61,52 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             .build();
     private static final QName NODE_ID_QNAME = QName.create(Node.QNAME,"node-id");
 
-
-    private final EventSourceTopology eventSourceTopology;
     private final Map<String, String> streamMap;
-
-    private final ConcurrentHashMap<InstanceIdentifier<?>, NetconfEventSource> netconfSources = new ConcurrentHashMap<>();
-    private final ListenerRegistration<DataChangeListener> listenerReg;
+    private final ConcurrentHashMap<InstanceIdentifier<?>, EventSourceRegistration<NetconfEventSource>> eventSourceRegistration = new ConcurrentHashMap<>();
     private final DOMNotificationPublishService publishService;
     private final DOMMountPointService domMounts;
     private final MountPointService bindingMounts;
+    private ListenerRegistration<DataChangeListener> listenerRegistration;
+    private final EventSourceRegistry eventSourceRegistry;
+
+    public static NetconfEventSourceManager create(final DataBroker dataBroker,
+            final DOMNotificationPublishService domPublish,
+            final DOMMountPointService domMount,
+            final MountPointService bindingMount,
+            final EventSourceRegistry eventSourceRegistry,
+            final List<NamespaceToStream> namespaceMapping){
+
+        final NetconfEventSourceManager eventSourceManager =
+                new NetconfEventSourceManager(domPublish, domMount, bindingMount, eventSourceRegistry, namespaceMapping);
 
-    public NetconfEventSourceManager(final DataBroker dataStore,
-                              final DOMNotificationPublishService domPublish,
+        eventSourceManager.initialize(dataBroker);
+
+        return eventSourceManager;
+
+    }
+
+    private NetconfEventSourceManager(final DOMNotificationPublishService domPublish,
                               final DOMMountPointService domMount,
                               final MountPointService bindingMount,
-                              final EventSourceTopology eventSourceTopology,
+                              final EventSourceRegistry eventSourceRegistry,
                               final List<NamespaceToStream> namespaceMapping) {
 
-        listenerReg = dataStore.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
-        this.eventSourceTopology = eventSourceTopology;
+        Preconditions.checkNotNull(domPublish);
+        Preconditions.checkNotNull(domMount);
+        Preconditions.checkNotNull(bindingMount);
+        Preconditions.checkNotNull(eventSourceRegistry);
+        Preconditions.checkNotNull(namespaceMapping);
         this.streamMap = namespaceToStreamMapping(namespaceMapping);
         this.domMounts = domMount;
         this.bindingMounts = bindingMount;
         this.publishService = domPublish;
-        LOGGER.info("EventSourceManager initialized.");
+        this.eventSourceRegistry = eventSourceRegistry;
+    }
+
+    private void initialize(final DataBroker dataBroker){
+        Preconditions.checkNotNull(dataBroker);
+        listenerRegistration = dataBroker.registerDataChangeListener(LogicalDatastoreType.OPERATIONAL, NETCONF_DEVICE_PATH, this, DataChangeScope.SUBTREE);
+        LOGGER.info("NetconfEventSourceManager initialized.");
     }
 
     private Map<String,String> namespaceToStreamMapping(final List<NamespaceToStream> namespaceMapping) {
@@ -97,7 +121,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
 
     @Override
     public void onDataChanged(final AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> event) {
-        //FIXME: Prevent creating new event source on subsequent changes in inventory, like disconnect.
+
         LOGGER.debug("[DataChangeEvent<InstanceIdentifier<?>, DataObject>: {}]", event);
         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getCreatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
@@ -105,22 +129,19 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             }
         }
 
-
         for (final Map.Entry<InstanceIdentifier<?>, DataObject> changeEntry : event.getUpdatedData().entrySet()) {
             if (changeEntry.getValue() instanceof Node) {
                 nodeUpdated(changeEntry.getKey(),(Node) changeEntry.getValue());
             }
         }
 
-
     }
 
     private void nodeUpdated(final InstanceIdentifier<?> key, final Node node) {
 
         // we listen on node tree, therefore we should rather throw IllegalStateException when node is null
         if ( node == null ) {
-            LOGGER.debug("OnDataChanged Event. Node is null.");
-            return;
+            throw new IllegalStateException("Node is null");
         }
         if ( isNetconfNode(node) == false ) {
             LOGGER.debug("OnDataChanged Event. Not a Netconf node.");
@@ -134,7 +155,7 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
             return;
         }
 
-        if(!netconfSources.containsKey(key)) {
+        if(!eventSourceRegistration.containsKey(key)) {
             createEventSource(key,node);
         }
     }
@@ -144,10 +165,12 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
         final Optional<MountPoint> bindingMount = bindingMounts.getMountPoint(key);
 
         if(netconfMount.isPresent() && bindingMount.isPresent()) {
-            final String nodeId = node.getNodeId().getValue();
-            final NetconfEventSource netconfEventSource = new NetconfEventSource(nodeId, streamMap, netconfMount.get(), publishService, bindingMount.get());
-            eventSourceTopology.register(node,netconfEventSource);
-            netconfSources.putIfAbsent(key, netconfEventSource);
+
+            final NetconfEventSource netconfEventSource =
+                    new NetconfEventSource(node, streamMap, netconfMount.get(), publishService, bindingMount.get());
+            final EventSourceRegistration<NetconfEventSource> registration = eventSourceRegistry.registerEventSource(netconfEventSource);
+            eventSourceRegistration.putIfAbsent(key, registration);
+
         }
     }
 
@@ -159,13 +182,21 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
         return node.getAugmentation(NetconfNode.class) != null ;
     }
 
-    public boolean isEventSource(final Node node) {
-        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
+    private boolean isEventSource(final Node node) {
 
+        final NetconfNode netconfNode = node.getAugmentation(NetconfNode.class);
         return isEventSource(netconfNode);
+
     }
 
     private boolean isEventSource(final NetconfNode node) {
+        if (node.getAvailableCapabilities() == null) {
+            return false;
+        }
+        final List<String> capabilities = node.getAvailableCapabilities().getAvailableCapability();
+        if(capabilities == null) {
+             return false;
+        }
         for (final String capability : node.getAvailableCapabilities().getAvailableCapability()) {
             if(capability.startsWith("(urn:ietf:params:xml:ns:netconf:notification")) {
                 return true;
@@ -177,6 +208,10 @@ public final class NetconfEventSourceManager implements DataChangeListener, Auto
 
     @Override
     public void close() {
-        listenerReg.close();
+        for(final EventSourceRegistration<NetconfEventSource> reg : eventSourceRegistration.values()){
+            reg.close();
+        }
+        listenerRegistration.close();
     }
+
 }
\ No newline at end of file
index bed6b10..320afcc 100644 (file)
@@ -6,7 +6,7 @@ module messagebus-app-impl {
     import config { prefix config; revision-date 2013-04-05; }
     import opendaylight-md-sal-binding {prefix sal;}
     import opendaylight-md-sal-dom {prefix dom;}
-
+    import messagebus-event-source-registry {prefix esr;}
 
     description
         "Service definition for Message Bus application implementation.";
@@ -17,9 +17,10 @@ module messagebus-app-impl {
 
     identity messagebus-app-impl {
         base config:module-type;
+        config:provided-service esr:event-source-registry;
         config:java-name-prefix MessageBusAppImpl;
     }
-    
+
     augment "/config:modules/config:module/config:configuration" {
         case messagebus-app-impl {
             when "/config:modules/config:module/config:type = 'messagebus-app-impl'";
index 13c4221..7db7dcc 100644 (file)
@@ -7,6 +7,10 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -14,10 +18,6 @@ import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.DynamicMBeanWithInstance;
 import org.osgi.framework.BundleContext;
 
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
 public class MessageBusAppImplModuleFactoryTest {
 
     DependencyResolver dependencyResolverMock;
index 0794364..85d1a1b 100644 (file)
@@ -7,41 +7,17 @@
  */
 package org.opendaylight.controller.config.yang.messagebus.app.impl;
 
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.config.api.DependencyResolver;
 import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.md.sal.binding.api.DataBroker;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.messagebus.app.impl.EventSourceTopology;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.osgi.framework.BundleContext;
 
-import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.notNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doNothing;
-
 public class MessageBusAppImplModuleTest {
 
     MessageBusAppImplModule messageBusAppImplModule;
@@ -79,43 +55,5 @@ public class MessageBusAppImplModuleTest {
         assertEquals("Set and/or get method/s don't work correctly.", bundleContext, messageBusAppImplModule.getBundleContext());
     }
 
-    @Test
-    public void createInstanceTest() {
-        createInstanceTestHelper();
-        messageBusAppImplModule.getInstance();
-        assertNotNull("AutoCloseable instance has not been created correctly.", messageBusAppImplModule.createInstance());
-    }
-
-    private void createInstanceTestHelper(){
-        NamespaceToStream namespaceToStream = mock(NamespaceToStream.class);
-        List<NamespaceToStream> listNamespaceToStreamMock = new ArrayList<>();
-        listNamespaceToStreamMock.add(namespaceToStream);
-        messageBusAppImplModule.setNamespaceToStream(listNamespaceToStreamMock);
-        ObjectName objectName = mock(ObjectName.class);
-        org.opendaylight.controller.sal.core.api.Broker domBrokerDependency = mock(Broker.class);
-        org.opendaylight.controller.sal.binding.api.BindingAwareBroker bindingBrokerDependency = mock(BindingAwareBroker.class);
-        when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.domBrokerJmxAttribute))).thenReturn(domBrokerDependency);
-        when(dependencyResolverMock.resolveInstance((java.lang.Class) notNull(), (javax.management.ObjectName) notNull(), eq(AbstractMessageBusAppImplModule.bindingBrokerJmxAttribute))).thenReturn(bindingBrokerDependency);
-        messageBusAppImplModule.setBindingBroker(objectName);
-        messageBusAppImplModule.setDomBroker(objectName);
-        BindingAwareBroker.ProviderContext providerContextMock = mock(BindingAwareBroker.ProviderContext.class);
-        doReturn(providerContextMock).when(bindingBrokerDependency).registerProvider(any(BindingAwareProvider.class));
-        Broker.ProviderSession providerSessionMock = mock(Broker.ProviderSession.class);
-        doReturn(providerSessionMock).when(domBrokerDependency).registerProvider(any(Provider.class));
-
-        DataBroker dataBrokerMock = mock(DataBroker.class);
-        doReturn(dataBrokerMock).when(providerContextMock).getSALService(DataBroker.class);
-        RpcProviderRegistry rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
-        doReturn(rpcProviderRegistryMock).when(providerContextMock).getSALService(RpcProviderRegistry.class);
-        BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
-        doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
-        EventSourceService eventSourceServiceMock = mock(EventSourceService.class);
-        doReturn(eventSourceServiceMock).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
-
-        WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
-        doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
-        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
-        CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
-        doReturn(checkedFutureMock).when(writeTransactionMock).submit();
-    }
+    //TODO: create MessageBusAppImplModule.createInstance test
 }
index 5e26213..f369a12 100644 (file)
@@ -7,6 +7,16 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -18,16 +28,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.any;
-
 public class EventSourceTopicTest {
 
     EventSourceTopic eventSourceTopic;
index c2f6ef5..ced2e1f 100644 (file)
@@ -7,8 +7,18 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.List;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -16,11 +26,13 @@ import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.messagebus.spi.EventSource;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.CreateTopicInput;
-import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.DestroyTopicInput;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.EventAggregatorService;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.Pattern;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
@@ -34,17 +46,8 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.eq;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 
 public class EventSourceTopologyTest {
 
@@ -63,7 +66,6 @@ public class EventSourceTopologyTest {
     public void setUp() throws Exception {
         dataBrokerMock = mock(DataBroker.class);
         rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
-
     }
 
     @Test
@@ -74,20 +76,24 @@ public class EventSourceTopologyTest {
     }
 
     private void constructorTestHelper(){
+        RpcRegistration<EventAggregatorService> aggregatorRpcReg = mock(RpcRegistration.class);
+        EventSourceService eventSourceService = mock(EventSourceService.class);
+        doReturn(aggregatorRpcReg).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
+        doReturn(eventSourceService).when(rpcProviderRegistryMock).getRpcService(EventSourceService.class);
         WriteTransaction writeTransactionMock = mock(WriteTransaction.class);
         doReturn(writeTransactionMock).when(dataBrokerMock).newWriteOnlyTransaction();
-        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class));
+        doNothing().when(writeTransactionMock).put(any(LogicalDatastoreType.class), any(InstanceIdentifier.class), any(DataObject.class),eq(true));
         CheckedFuture checkedFutureMock = mock(CheckedFuture.class);
         doReturn(checkedFutureMock).when(writeTransactionMock).submit();
     }
 
-    @Test
-    public void createTopicTest() throws Exception{
-        createTopicTestHelper();
-        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
-    }
+//TODO: create test for createTopic
+//    public void createTopicTest() throws Exception{
+//        createTopicTestHelper();
+//        assertNotNull("Topic has not been created correctly.", eventSourceTopology.createTopic(createTopicInputMock));
+//    }
 
-    private void createTopicTestHelper() throws Exception{
+    private void topicTestHelper() throws Exception{
         constructorTestHelper();
         createTopicInputMock = mock(CreateTopicInput.class);
         eventSourceTopology = new EventSourceTopology(dataBrokerMock, rpcProviderRegistryMock);
@@ -126,35 +132,25 @@ public class EventSourceTopologyTest {
 
     @Test
     public void destroyTopicTest() throws Exception{
-        createTopicTestHelper();
+        topicTestHelper();
+        //TODO: modify test when destroyTopic will be implemented
         DestroyTopicInput destroyTopicInput = null;
         assertNotNull("Instance has not been created correctly.", eventSourceTopology.destroyTopic(destroyTopicInput));
     }
 
-    @Test
-    public void closeTest() throws Exception{
-        BindingAwareBroker.RpcRegistration rpcRegistrationMock = mock(BindingAwareBroker.RpcRegistration.class);
-        doReturn(rpcRegistrationMock).when(rpcProviderRegistryMock).addRpcImplementation(eq(EventAggregatorService.class), any(EventSourceTopology.class));
-        doNothing().when(rpcRegistrationMock).close();
-        createTopicTestHelper();
-        eventSourceTopology.createTopic(createTopicInputMock);
-        eventSourceTopology.close();
-        verify(rpcRegistrationMock, times(1)).close();
-    }
-
     @Test
     public void registerTest() throws Exception {
-        createTopicTestHelper();
+        topicTestHelper();
         Node nodeMock = mock(Node.class);
-        NetconfEventSource netconfEventSourceMock = mock(NetconfEventSource.class);
-
+        EventSource eventSourceMock = mock(EventSource.class);
         NodeId nodeId = new NodeId("nodeIdValue1");
         nodeKey = new NodeKey(nodeId);
         doReturn(nodeKey).when(nodeMock).getKey();
-
+        doReturn(nodeKey).when(eventSourceMock).getSourceNodeKey();
         BindingAwareBroker.RoutedRpcRegistration routedRpcRegistrationMock = mock(BindingAwareBroker.RoutedRpcRegistration.class);
-        doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, netconfEventSourceMock);
-        eventSourceTopology.register(nodeMock, netconfEventSourceMock);
+        doReturn(routedRpcRegistrationMock).when(rpcProviderRegistryMock).addRoutedRpcImplementation(EventSourceService.class, eventSourceMock);
+        doNothing().when(routedRpcRegistrationMock).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
+        eventSourceTopology.register(eventSourceMock);
         verify(routedRpcRegistrationMock, times(1)).registerPath(eq(NodeContext.class), any(KeyedInstanceIdentifier.class));
     }
 
index 911c5db..61fa30f 100644 (file)
@@ -7,7 +7,20 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Optional;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -22,7 +35,11 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSourceManager;
+import org.opendaylight.controller.messagebus.spi.EventSource;
+import org.opendaylight.controller.messagebus.spi.EventSourceRegistry;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
+import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNodeFields;
@@ -34,31 +51,19 @@ import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.notNull;
+import com.google.common.base.Optional;
 
 public class NetconfEventSourceManagerTest {
 
+    private static final String notification_capability_prefix = "(urn:ietf:params:xml:ns:netconf:notification";
     NetconfEventSourceManager netconfEventSourceManager;
     ListenerRegistration listenerRegistrationMock;
     DOMMountPointService domMountPointServiceMock;
     MountPointService mountPointServiceMock;
     EventSourceTopology eventSourceTopologyMock;
     AsyncDataChangeEvent asyncDataChangeEventMock;
-
+    RpcProviderRegistry rpcProviderRegistryMock;
+    EventSourceRegistry eventSourceRegistry;
     @BeforeClass
     public static void initTestClass() throws IllegalAccessException, InstantiationException {
     }
@@ -70,58 +75,76 @@ public class NetconfEventSourceManagerTest {
         domMountPointServiceMock = mock(DOMMountPointService.class);
         mountPointServiceMock = mock(MountPointService.class);
         eventSourceTopologyMock = mock(EventSourceTopology.class);
+        rpcProviderRegistryMock = mock(RpcProviderRegistry.class);
+        eventSourceRegistry = mock(EventSourceRegistry.class);
         List<NamespaceToStream> namespaceToStreamList = new ArrayList<>();
 
         listenerRegistrationMock = mock(ListenerRegistration.class);
         doReturn(listenerRegistrationMock).when(dataBrokerMock).registerDataChangeListener(eq(LogicalDatastoreType.OPERATIONAL), any(InstanceIdentifier.class), any(NetconfEventSourceManager.class), eq(AsyncDataBroker.DataChangeScope.SUBTREE));
 
-        netconfEventSourceManager = new NetconfEventSourceManager(dataBrokerMock, domNotificationPublishServiceMock, domMountPointServiceMock,
-                mountPointServiceMock, eventSourceTopologyMock, namespaceToStreamList);
+        netconfEventSourceManager =
+                NetconfEventSourceManager.create(dataBrokerMock,
+                                                 domNotificationPublishServiceMock,
+                                                 domMountPointServiceMock,
+                                                 mountPointServiceMock,
+                                                 eventSourceRegistry,
+                                                 namespaceToStreamList);
     }
 
     @Test
-    public void constructorTest() {
-        assertNotNull("Instance has not been created correctly.", netconfEventSourceManager);
+    public void onDataChangedCreateEventSourceTestByCreateEntry() throws InterruptedException, ExecutionException {
+        onDataChangedTestHelper(true,false,true,notification_capability_prefix);
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
     }
 
     @Test
-    public void onDataChangedTest() {
-        AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
-        Map<InstanceIdentifier, DataObject> map = new HashMap<>();
-        InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
-        Node dataObjectMock = mock(Node.class);
-        map.put(instanceIdentifierMock, dataObjectMock);
-        doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
-        doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
+    public void onDataChangedCreateEventSourceTestByUpdateEntry() throws InterruptedException, ExecutionException {
+        onDataChangedTestHelper(false,true,true, notification_capability_prefix);
+        netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
+        verify(eventSourceRegistry, times(1)).registerEventSource(any(EventSource.class));
+    }
+
+    @Test
+    public void onDataChangedCreateEventSourceTestNotNeconf() throws InterruptedException, ExecutionException {
+        onDataChangedTestHelper(false,true,false,notification_capability_prefix);
         netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-        verify(dataObjectMock, times(2)).getAugmentation(NetconfNode.class);
+        verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
     }
 
     @Test
-    public void onDataChangedCreateEventSourceTest() {
-        onDataChangedCreateEventSourceTestHelper();
+    public void onDataChangedCreateEventSourceTestNotNotificationCapability() throws InterruptedException, ExecutionException {
+        onDataChangedTestHelper(false,true,true,"bad-prefix");
         netconfEventSourceManager.onDataChanged(asyncDataChangeEventMock);
-        verify(eventSourceTopologyMock, times(1)).register(any(Node.class), any(NetconfEventSource.class));
+        verify(eventSourceRegistry, times(0)).registerEventSource(any(EventSource.class));
     }
 
-    private void onDataChangedCreateEventSourceTestHelper(){
+    private void onDataChangedTestHelper(boolean create, boolean update, boolean isNetconf, String notificationCapabilityPrefix) throws InterruptedException, ExecutionException{
         asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
-        Map<InstanceIdentifier, DataObject> map = new HashMap<>();
+        Map<InstanceIdentifier, DataObject> mapCreate = new HashMap<>();
+        Map<InstanceIdentifier, DataObject> mapUpdate = new HashMap<>();
         InstanceIdentifier instanceIdentifierMock = mock(InstanceIdentifier.class);
         Node dataObjectMock = mock(Node.class);
-        map.put(instanceIdentifierMock, dataObjectMock);
-        doReturn(map).when(asyncDataChangeEventMock).getCreatedData();
-        doReturn(map).when(asyncDataChangeEventMock).getUpdatedData();
-
+        if(create){
+            mapCreate.put(instanceIdentifierMock, dataObjectMock);
+        }
+        if(update){
+            mapUpdate.put(instanceIdentifierMock, dataObjectMock);
+        }
+        doReturn(mapCreate).when(asyncDataChangeEventMock).getCreatedData();
+        doReturn(mapUpdate).when(asyncDataChangeEventMock).getUpdatedData();
         NetconfNode netconfNodeMock = mock(NetconfNode.class);
         AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
-        doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
-        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
-        List<String> availableCapabilityList = new ArrayList<>();
-        availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
-        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
-
-        doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+        if(isNetconf){
+            doReturn(netconfNodeMock).when(dataObjectMock).getAugmentation(NetconfNode.class);
+            doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
+            List<String> availableCapabilityList = new ArrayList<>();
+            availableCapabilityList.add(notificationCapabilityPrefix +"_availableCapabilityString1");
+            doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
+            doReturn(NetconfNodeFields.ConnectionStatus.Connected).when(netconfNodeMock).getConnectionStatus();
+        } else {
+            doReturn(null).when(dataObjectMock).getAugmentation(NetconfNode.class);
+        }
 
         Optional optionalMock = mock(Optional.class);
         Optional optionalBindingMountMock = mock(Optional.class);
@@ -144,37 +167,8 @@ public class NetconfEventSourceManagerTest {
         doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
         doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
         doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
+        EventSourceRegistrationImpl esrMock = mock(EventSourceRegistrationImpl.class);
+        doReturn(esrMock).when(eventSourceRegistry).registerEventSource(any(EventSource.class));
     }
 
-    @Test
-    public void isEventSourceTest() {
-        Node nodeMock = mock(Node.class);
-        NetconfNode netconfNodeMock = mock(NetconfNode.class);
-        AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
-        doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
-        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
-        List<String> availableCapabilityList = new ArrayList<>();
-        availableCapabilityList.add("(urn:ietf:params:xml:ns:netconf:notification_availableCapabilityString1");
-        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
-        assertTrue("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
-    }
-
-    @Test
-    public void isNotEventSourceTest() {
-        Node nodeMock = mock(Node.class);
-        NetconfNode netconfNodeMock = mock(NetconfNode.class);
-        AvailableCapabilities availableCapabilitiesMock = mock(AvailableCapabilities.class);
-        doReturn(netconfNodeMock).when(nodeMock).getAugmentation(NetconfNode.class);
-        doReturn(availableCapabilitiesMock).when(netconfNodeMock).getAvailableCapabilities();
-        List<String> availableCapabilityList = new ArrayList<>();
-        availableCapabilityList.add("availableCapabilityString1");
-        doReturn(availableCapabilityList).when(availableCapabilitiesMock).getAvailableCapability();
-        assertFalse("Method has not been run correctly.", netconfEventSourceManager.isEventSource(nodeMock));
-    }
-
-    @Test
-    public void closeTest() {
-        netconfEventSourceManager.close();
-        verify(listenerRegistrationMock, times(1)).close();
-    }
 }
index 73117c1..58da9e3 100644 (file)
@@ -7,22 +7,36 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.md.sal.binding.api.BindingService;
 import org.opendaylight.controller.md.sal.binding.api.MountPoint;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
 import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.md.sal.dom.api.DOMService;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.api.DOMService;
+import org.opendaylight.controller.messagebus.eventsources.netconf.NetconfEventSource;
 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.NotificationPattern;
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.JoinTopicInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.NotificationsService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
@@ -33,38 +47,22 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.inventory.rev1
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-import org.opendaylight.yangtools.yang.common.QName;
-
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
 
 public class NetconfEventSourceTest {
 
     NetconfEventSource netconfEventSource;
     DOMMountPoint domMountPointMock;
     JoinTopicInput joinTopicInputMock;
-
-    @BeforeClass
-    public static void initTestClass() throws IllegalAccessException, InstantiationException {
-    }
+    AsyncDataChangeEvent asyncDataChangeEventMock;
+    Node dataObjectMock;
 
     @Before
     public void setUp() throws Exception {
@@ -81,62 +79,16 @@ public class NetconfEventSourceTest {
         doReturn(onlyOptionalMock).when(mountPointMock).getService(RpcConsumerRegistry.class);
         doReturn(rpcConsumerRegistryMock).when(onlyOptionalMock).get();
         doReturn(notificationsServiceMock).when(rpcConsumerRegistryMock).getRpcService(NotificationsService.class);
-        netconfEventSource = new NetconfEventSource("nodeId1", streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
-    }
-
-    @Test
-    public void constructorTest() {
-        assertNotNull("Instance has not been created correctly.", netconfEventSource);
-    }
-
-    @Test
-    public void joinTopicTest() throws Exception{
-        joinTopicTestHelper();
-        assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
-    }
-
-    private void joinTopicTestHelper() throws Exception{
-        joinTopicInputMock = mock(JoinTopicInput.class);
-        NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
-        doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
-        doReturn("regexString1").when(notificationPatternMock).getValue();
-
-        SchemaContext schemaContextMock = mock(SchemaContext.class);
-        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
-        Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
-        NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
-        notificationDefinitionSet.add(notificationDefinitionMock);
-
-        URI uri = new URI("uriStr1");
-        QName qName = new QName(uri, "localName1");
-        org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
-        doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
-        doReturn(schemaPath).when(notificationDefinitionMock).getPath();
-
-        Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
-        doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
-        doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
-
-        DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
-        doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
-        ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
-        doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
-    }
-
-    @Test (expected=NullPointerException.class)
-    public void onNotificationTest() {
-        DOMNotification domNotificationMock = mock(DOMNotification.class);
-        ContainerNode containerNodeMock = mock(ContainerNode.class);
-        SchemaContext schemaContextMock = mock(SchemaContext.class);
-        SchemaPath schemaPathMock = mock(SchemaPath.class);
-        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
-        doReturn(schemaPathMock).when(domNotificationMock).getType();
-        doReturn(containerNodeMock).when(domNotificationMock).getBody();
-        netconfEventSource.onNotification(domNotificationMock);
+        org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node node
+            = mock(org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node.class);
+        org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId nodeId
+            = new org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId("NodeId1");
+        doReturn(nodeId).when(node).getNodeId();
+        netconfEventSource = new NetconfEventSource(node, streamMap, domMountPointMock, domNotificationPublishServiceMock, mountPointMock);
     }
 
     @Test
-    public void onDataChangedTest() {
+    public void onDataChangedTest(){
         InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
                 .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
         AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
@@ -145,20 +97,23 @@ public class NetconfEventSourceTest {
         dataChangeMap.put(brmIdent, dataObjectMock);
         doReturn(dataChangeMap).when(asyncDataChangeEventMock).getOriginalData();
         doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
-
+        doReturn(true).when(dataObjectMock).isConnected();
         netconfEventSource.onDataChanged(asyncDataChangeEventMock);
         verify(dataObjectMock, times(2)).isConnected();
     }
 
     @Test
     public void onDataChangedResubscribeTest() throws Exception{
+
         InstanceIdentifier brmIdent = InstanceIdentifier.create(Nodes.class)
                 .child(Node.class, new NodeKey(new NodeId("brm"))).augmentation(NetconfNode.class);
+
         AsyncDataChangeEvent asyncDataChangeEventMock = mock(AsyncDataChangeEvent.class);
         NetconfNode dataObjectMock = mock(NetconfNode.class);
         Map<InstanceIdentifier, DataObject> dataChangeMap = new HashMap<>();
         dataChangeMap.put(brmIdent, dataObjectMock);
         doReturn(dataChangeMap).when(asyncDataChangeEventMock).getUpdatedData();
+        doReturn(new HashMap<InstanceIdentifier, DataObject>()).when(asyncDataChangeEventMock).getOriginalData();
         doReturn(true).when(dataObjectMock).isConnected();
 
         Set<String> localSet = getActiveStreams();
@@ -176,6 +131,44 @@ public class NetconfEventSourceTest {
         assertEquals("Size of set has not been set correctly.", 1, getActiveStreams().size());
     }
 
+    @Test
+    public void joinTopicTest() throws Exception{
+        joinTopicTestHelper();
+        assertNotNull("JoinTopic return value has not been created correctly.", netconfEventSource.joinTopic(joinTopicInputMock));
+    }
+
+    private void joinTopicTestHelper() throws Exception{
+        joinTopicInputMock = mock(JoinTopicInput.class);
+        TopicId topicId = new TopicId("topicID007");
+        doReturn(topicId).when(joinTopicInputMock).getTopicId();
+        NotificationPattern notificationPatternMock = mock(NotificationPattern.class);
+        doReturn(notificationPatternMock).when(joinTopicInputMock).getNotificationPattern();
+        doReturn("regexString1").when(notificationPatternMock).getValue();
+
+        SchemaContext schemaContextMock = mock(SchemaContext.class);
+        doReturn(schemaContextMock).when(domMountPointMock).getSchemaContext();
+        Set<NotificationDefinition> notificationDefinitionSet = new HashSet<>();
+        NotificationDefinition notificationDefinitionMock = mock(NotificationDefinition.class);
+        notificationDefinitionSet.add(notificationDefinitionMock);
+
+        URI uri = new URI("uriStr1");
+        QName qName = new QName(uri, "localName1");
+        org.opendaylight.yangtools.yang.model.api.SchemaPath schemaPath = SchemaPath.create(true, qName);
+        doReturn(notificationDefinitionSet).when(schemaContextMock).getNotifications();
+        doReturn(schemaPath).when(notificationDefinitionMock).getPath();
+
+        Optional<DOMNotificationService> domNotificationServiceOptionalMock = (Optional<DOMNotificationService>) mock(Optional.class);
+        doReturn(domNotificationServiceOptionalMock).when(domMountPointMock).getService(DOMNotificationService.class);
+        doReturn(true).when(domNotificationServiceOptionalMock).isPresent();
+
+        DOMNotificationService domNotificationServiceMock = mock(DOMNotificationService.class);
+        doReturn(domNotificationServiceMock).when(domNotificationServiceOptionalMock).get();
+        ListenerRegistration listenerRegistrationMock = mock(ListenerRegistration.class);
+        doReturn(listenerRegistrationMock).when(domNotificationServiceMock).registerNotificationListener(any(NetconfEventSource.class), any(List.class));
+    }
+
+//TODO: create Test for NetConfEventSource#onNotification
+
     private Set getActiveStreams() throws Exception{
         Field nesField = NetconfEventSource.class.getDeclaredField("activeStreams");
         nesField.setAccessible(true);
index 4872127..6dacb97 100644 (file)
@@ -7,6 +7,11 @@
  */
 package org.opendaylight.controller.messagebus.app.impl;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -14,12 +19,9 @@ import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.even
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-
 public class TopicDOMNotificationTest {
 
+    private static final String containerNodeBodyMockToString = "containerNodeBodyMock";
     ContainerNode containerNodeBodyMock;
     TopicDOMNotification topicDOMNotification;
 
@@ -30,6 +32,7 @@ public class TopicDOMNotificationTest {
     @Before
     public void setUp() throws Exception {
         containerNodeBodyMock = mock(ContainerNode.class);
+        doReturn(containerNodeBodyMockToString).when(containerNodeBodyMock).toString();
         topicDOMNotification = new TopicDOMNotification(containerNodeBodyMock);
     }
 
@@ -51,7 +54,7 @@ public class TopicDOMNotificationTest {
 
     @Test
     public void getToStringTest() {
-        String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMock + "]";
+        String bodyString = "TopicDOMNotification [body=" + containerNodeBodyMockToString + "]";
         assertEquals("String has not been created correctly.", bodyString, topicDOMNotification.toString());
     }
 }
index 2ff4654..a88c609 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -19,32 +19,12 @@ import org.junit.Test;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-
+/**
+ * @author ppalmar
+ *
+ */
 public class UtilTest {
 
-    @Test
-    public void testMD5Hash() throws Exception {
-        // empty string
-        createAndAssertHash("", "d41d8cd98f00b204e9800998ecf8427e");
-
-        // non-empty string
-        createAndAssertHash("The Guardian", "69b929ae473ed732d5fb8e0a55a8dc8d");
-
-        // the same hash for the same string
-        createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c");
-        createAndAssertHash("The Independent", "db793706d70c37dcc16454fa8eb21b1c"); // one more time
-
-        // different strings must have different hashes
-        createAndAssertHash("orange", "fe01d67a002dfa0f3ac084298142eccd");
-        createAndAssertHash("yellow", "d487dd0b55dfcacdd920ccbdaeafa351");
-    }
-
-    //TODO: IllegalArgumentException would be better
-    @Test(expected = RuntimeException.class)
-    public void testMD5HashInvalidInput() throws Exception {
-        Util.md5String(null);
-    }
-
     @Test
     public void testWildcardToRegex() throws Exception {
         // empty wildcard string
@@ -73,14 +53,14 @@ public class UtilTest {
     public void testResultFor() throws Exception {
         {
             final String expectedResult = "dummy string";
-            RpcResult<String> rpcResult = Util.resultFor(expectedResult).get();
+            RpcResult<String> rpcResult = Util.resultRpcSuccessFor(expectedResult).get();
             assertEquals(expectedResult, rpcResult.getResult());
             assertTrue(rpcResult.isSuccessful());
             assertTrue(rpcResult.getErrors().isEmpty());
         }
         {
             final Integer expectedResult = 42;
-            RpcResult<Integer> rpcResult = Util.resultFor(expectedResult).get();
+            RpcResult<Integer> rpcResult = Util.resultRpcSuccessFor(expectedResult).get();
             assertEquals(expectedResult, rpcResult.getResult());
             assertTrue(rpcResult.isSuccessful());
             assertTrue(rpcResult.getErrors().isEmpty());
@@ -125,10 +105,6 @@ public class UtilTest {
         }
     }
 
-    private static void createAndAssertHash(final String inString, final String expectedHash) {
-        assertEquals("Incorrect hash.", expectedHash, Util.md5String(inString));
-    }
-
     private static void createAndAssertRegex(final String wildcardStr, final String expectedRegex) {
         assertEquals("Incorrect regex string.", expectedRegex, Util.wildcardToRegex(wildcardStr));
     }
diff --git a/opendaylight/md-sal/messagebus-spi/pom.xml b/opendaylight/md-sal/messagebus-spi/pom.xml
new file mode 100644 (file)
index 0000000..f31b37f
--- /dev/null
@@ -0,0 +1,100 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.opendaylight.controller</groupId>
+    <artifactId>sal-parent</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>messagebus-spi</artifactId>
+  <name>${project.artifactId}</name>
+
+  <packaging>bundle</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>messagebus-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>config-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-data-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-model-api</artifactId>
+    </dependency>
+  </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.opendaylight.yangtools</groupId>
+                <artifactId>yang-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-sources</goal>
+                        </goals>
+                        <configuration>
+                            <codeGenerators>
+                                <generator>
+                                    <codeGeneratorClass>
+                                        org.opendaylight.yangtools.maven.sal.api.gen.plugin.CodeGeneratorImpl
+                                    </codeGeneratorClass>
+                                    <outputBaseDir>
+                                        ${project.build.directory}/generated-sources/sal
+                                    </outputBaseDir>
+                                </generator>
+                                <generator>
+                                    <codeGeneratorClass>
+                                        org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
+                                    </codeGeneratorClass>
+                                    <outputBaseDir>${project.build.directory}/generated-sources/config</outputBaseDir>
+                                    <additionalConfiguration>
+                                        <namespaceToPackage1>
+                                            urn:opendaylight:params:xml:ns:yang:controller==org.opendaylight.controller.config.yang
+                                        </namespaceToPackage1>
+                                    </additionalConfiguration>
+                                </generator>
+                                <generator>
+                                    <codeGeneratorClass>org.opendaylight.yangtools.yang.unified.doc.generator.maven.DocumentationGeneratorImpl</codeGeneratorClass>
+                                    <outputBaseDir>target/site/models</outputBaseDir>
+                                </generator>
+                            </codeGenerators>
+                            <inspectDependencies>true</inspectDependencies>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>${project.build.directory}/generated-sources/config</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+  <scm>
+    <connection>scm:git:ssh://git.opendaylight.org:29418/controller.git</connection>
+    <developerConnection>scm:git:ssh://git.opendaylight.org:29418/controller.git</developerConnection>
+    <tag>HEAD</tag>
+    <url>https://wiki.opendaylight.org/view/OpenDaylight_Controller:MD-SAL</url>
+  </scm>
+</project>
diff --git a/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSource.java b/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSource.java
new file mode 100644 (file)
index 0000000..6a6266a
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+import java.util.List;
+
+import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventsource.rev141202.EventSourceService;
+import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.NodeKey;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Event source is a node in topology which is able to produces notifications.
+ * To register event source you use {@link EventSourceRegistry#registerEventSource(EventSource)()}.
+ * EventSourceRegistry will request registered event source to publish notifications
+ * whenever EventSourceRegistry has been asked to publish a certain type of notifications.
+ * EventSourceRegistry will call method JoinTopic to request EventSource to publish notification.
+ * Event source must implement method JoinTopic (from superinterface {@link EventSourceService}).
+ */
+
+public interface EventSource extends EventSourceService, AutoCloseable {
+
+    /**
+     * Identifier of node associated with event source
+     *
+     * @return instance of NodeKey
+     */
+    NodeKey getSourceNodeKey();
+
+    /**
+     * List the types of notifications which source can produce.
+     *
+     * @return list of available notification
+     */
+    List<SchemaPath> getAvailableNotifications();
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistration.java b/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistration.java
new file mode 100644 (file)
index 0000000..06af7c1
--- /dev/null
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
+/**
+ * Instance of EventSourceRegistration is returned by {@link EventSourceRegistry#registerEventSource(EventSource)}
+ * and it is used to unregister EventSource.
+ *
+ */
+public interface EventSourceRegistration <T extends EventSource> extends ObjectRegistration<T>{
+
+    @Override
+    public void close();
+
+}
diff --git a/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistry.java b/opendaylight/md-sal/messagebus-spi/src/main/java/org/opendaylight/controller/messagebus/spi/EventSourceRegistry.java
new file mode 100644 (file)
index 0000000..10d3b5b
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.messagebus.spi;
+
+/**
+ *EventSourceRegistry is used to register {@link EventSource}.
+ *
+ */
+public interface EventSourceRegistry extends AutoCloseable {
+
+    /**
+     * Registers the given EventSource for public consumption. The EventSource is
+     * associated with the node identified via {@linkEventSource#getSourceNodeKey}.
+     *
+     * @param eventSource the EventSource instance to register
+     * @return an EventSourceRegistration instance that is used to unregister the EventSource via {@link EventSourceRegistrationImpl#close()}.
+     */
+    <T extends EventSource> EventSourceRegistration<T> registerEventSource(T eventSource);
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/messagebus-spi/src/main/yang/messagebus-event-source-registry.yang b/opendaylight/md-sal/messagebus-spi/src/main/yang/messagebus-event-source-registry.yang
new file mode 100644 (file)
index 0000000..4c5a47c
--- /dev/null
@@ -0,0 +1,21 @@
+module messagebus-event-source-registry {
+    yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:messagebus:spi:eventsourceregistry";
+    prefix "mb-esr";
+
+    import config { prefix config; revision-date 2013-04-05; }
+
+    description
+            "Event source registry service interface definition for MessageBus";
+
+     revision "2015-04-02" {
+             description
+                 "Initial revision";
+         }
+
+     identity event-source-registry {
+             base "config:service-type";
+             config:java-class "org.opendaylight.controller.messagebus.spi.EventSourceRegistry";
+     }
+
+}
index bf30a16..c058765 100644 (file)
@@ -83,6 +83,7 @@
 
     <!-- Message Bus -->
     <module>messagebus-api</module>
+    <module>messagebus-spi</module>
     <module>messagebus-impl</module>
     <module>messagebus-config</module>
   </modules>
       </modules>
     </profile>
   </profiles>
-</project>
\ No newline at end of file
+</project>
index 678ac34..3dc6e40 100644 (file)
@@ -65,7 +65,17 @@ public interface DataObjectModification<T extends DataObject> extends org.openda
     @Nonnull ModificationType getModificationType();
 
     /**
-     * Returns after state of top level container.
+     * Returns before-state of top level container. Implementations are encouraged,
+     * but not required to provide this state.
+     *
+     * @param root Class representing data container
+     * @return State of object before modification. Null if subtree was not present,
+     *         or the implementation cannot provide the state.
+     */
+    @Nullable T getDataBefore();
+
+    /**
+     * Returns after-state of top level container.
      *
      * @param root Class representing data container
      * @return State of object after modification. Null if subtree is not present.
index c1c23d5..b86d31b 100644 (file)
@@ -44,7 +44,7 @@ public final class DataTreeIdentifier<T extends DataObject> implements Immutable
      *
      * @return Instance identifier corresponding to the root node.
      */
-    public @Nonnull InstanceIdentifier<?> getRootIdentifier() {
+    public @Nonnull InstanceIdentifier<T> getRootIdentifier() {
         return rootIdentifier;
     }
 
index a165242..83d48f7 100644 (file)
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
  *
  * @param <T> Type of Binding Data Object
  */
-class LazyDataObjectModification<T extends DataObject> implements DataObjectModification<T> {
+final class LazyDataObjectModification<T extends DataObject> implements DataObjectModification<T> {
 
     private final static Logger LOG = LoggerFactory.getLogger(LazyDataObjectModification.class);
 
@@ -57,7 +57,7 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
         return new LazyDataObjectModification<>(codec,domData);
     }
 
-    static Collection<DataObjectModification<? extends DataObject>> from(final BindingCodecTreeNode<?> parentCodec,
+    private static Collection<DataObjectModification<? extends DataObject>> from(final BindingCodecTreeNode<?> parentCodec,
             final Collection<DataTreeCandidateNode> domChildNodes) {
         final ArrayList<DataObjectModification<? extends DataObject>> result = new ArrayList<>(domChildNodes.size());
         populateList(result, parentCodec, domChildNodes);
@@ -79,7 +79,7 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
                             parentCodec.yangPathArgumentChild(domChildNode.getIdentifier());
                     populateList(result,type, childCodec, domChildNode);
                 } catch (final IllegalArgumentException e) {
-                    if(type == BindingStructuralType.UNKNOWN) {
+                    if (type == BindingStructuralType.UNKNOWN) {
                         LOG.debug("Unable to deserialize unknown DOM node {}",domChildNode,e);
                     } else {
                         LOG.debug("Binding representation for DOM node {} was not found",domChildNode,e);
@@ -89,7 +89,6 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
         }
     }
 
-
     private static void populateList(final List<DataObjectModification<? extends DataObject>> result,
             final BindingStructuralType type, final BindingCodecTreeNode<?> childCodec,
             final DataTreeCandidateNode domChildNode) {
@@ -116,6 +115,11 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
         }
     }
 
+    @Override
+    public T getDataBefore() {
+        return deserialize(domData.getDataBefore());
+    }
+
     @Override
     public T getDataAfter() {
         return deserialize(domData.getDataAfter());
@@ -149,8 +153,8 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
 
     @Override
     public Collection<DataObjectModification<? extends DataObject>> getModifiedChildren() {
-        if(childNodesCache == null) {
-            childNodesCache = from(codec,domData.getChildNodes());
+        if (childNodesCache == null) {
+            childNodesCache = from(codec, domData.getChildNodes());
         }
         return childNodesCache;
     }
@@ -191,7 +195,7 @@ class LazyDataObjectModification<T extends DataObject> implements DataObjectModi
     }
 
     private T deserialize(final Optional<NormalizedNode<?, ?>> dataAfter) {
-        if(dataAfter.isPresent()) {
+        if (dataAfter.isPresent()) {
             return codec.deserialize(dataAfter.get());
         }
         return null;
index d94e1c6..81605d8 100644 (file)
@@ -7,40 +7,17 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import scala.concurrent.Future;
 
 abstract class AbstractTransactionContext implements TransactionContext {
 
-    private final List<Future<Object>> recordedOperationFutures = new ArrayList<>();
     private final TransactionIdentifier identifier;
 
     protected AbstractTransactionContext(TransactionIdentifier identifier) {
         this.identifier = identifier;
     }
 
-    @Override
-    public final void copyRecordedOperationFutures(Collection<Future<Object>> target) {
-        target.addAll(recordedOperationFutures);
-    }
-
     protected final TransactionIdentifier getIdentifier() {
         return identifier;
     }
-
-    protected final Collection<Future<Object>> copyRecordedOperationFutures() {
-        return ImmutableList.copyOf(recordedOperationFutures);
-    }
-
-    protected final int recordedOperationCount() {
-        return recordedOperationFutures.size();
-    }
-
-    protected final void recordOperationFuture(Future<Object> future) {
-        recordedOperationFutures.add(future);
-    }
-}
+}
\ No newline at end of file
index cc2905c..81449c5 100644 (file)
@@ -433,8 +433,12 @@ public class Shard extends RaftActor {
         //
         if(isLeader()) {
             try {
-                BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
-                sender().tell(reply, self());
+                boolean ready = commitCoordinator.handleTransactionModifications(batched);
+                if(ready) {
+                    sender().tell(READY_TRANSACTION_REPLY, self());
+                } else {
+                    sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+                }
             } catch (Exception e) {
                 LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                         batched.getTransactionID(), e);
@@ -474,20 +478,21 @@ public class Shard extends RaftActor {
         // node. In that case, the subsequent 3-phase commit messages won't contain the
         // transactionId so to maintain backwards compatibility, we create a separate cohort actor
         // to provide the compatible behavior.
-        if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
-            ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                    ready.getTransactionID()));
+        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+            ActorRef replyActorPath = getSelf();
+            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+                LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+                replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                        ready.getTransactionID()));
+            }
 
             ReadyTransactionReply readyTransactionReply =
-                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+                            ready.getTxnClientVersion());
             getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                    readyTransactionReply, getSelf());
-
+                readyTransactionReply, getSelf());
         } else {
-
-            getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
-                    READY_TRANSACTION_REPLY, getSelf());
+            getSender().tell(READY_TRANSACTION_REPLY, getSelf());
         }
     }
 
index 54f15fc..b96e38d 100644 (file)
@@ -22,7 +22,6 @@ import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -119,7 +118,7 @@ public class ShardCommitCoordinator {
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
-    public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
+    public boolean handleTransactionModifications(BatchedModifications batched)
             throws ExecutionException {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
@@ -137,7 +136,6 @@ public class ShardCommitCoordinator {
 
         cohortEntry.applyModifications(batched.getModifications());
 
-        String cohortPath = null;
         if(batched.isReady()) {
             if(log.isDebugEnabled()) {
                 log.debug("{}: Readying Tx {}, client version {}", name,
@@ -145,10 +143,9 @@ public class ShardCommitCoordinator {
             }
 
             cohortEntry.ready(cohortDecorator);
-            cohortPath = shardActorPath;
         }
 
-        return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
+        return batched.isReady();
     }
 
     /**
index 87da2b0..1d5b1d8 100644 (file)
@@ -1,6 +1,6 @@
 /*
- *
  *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *  Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  *  This program and the accompanying materials are made available under the
  *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -40,6 +40,8 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 public class ShardWriteTransaction extends ShardTransaction {
 
     private final MutableCompositeModification compositeModification = new MutableCompositeModification();
+    private int totalBatchedModificationsReceived;
+    private Exception lastBatchedModificationsException;
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
@@ -86,9 +88,29 @@ public class ShardWriteTransaction extends ShardTransaction {
                 modification.apply(transaction);
             }
 
-            getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+            totalBatchedModificationsReceived++;
+            if(batched.isReady()) {
+                if(lastBatchedModificationsException != null) {
+                    throw lastBatchedModificationsException;
+                }
+
+                if(totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
+                    throw new IllegalStateException(String.format(
+                            "The total number of batched messages received %d does not match the number sent %d",
+                            totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
+                }
+
+                readyTransaction(transaction, false);
+            } else {
+                getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+            }
         } catch (Exception e) {
+            lastBatchedModificationsException = e;
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+            if(batched.isReady()) {
+                getSelf().tell(PoisonPill.getInstance(), getSelf());
+            }
         }
     }
 
index a5a7494..bc6e5f2 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collection;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
@@ -33,6 +32,4 @@ interface TransactionContext {
     void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
 
     void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
-
-    void copyRecordedOperationFutures(Collection<Future<Object>> target);
 }
index c61682d..c722918 100644 (file)
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -11,18 +12,14 @@ import akka.actor.ActorSelection;
 import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
@@ -49,6 +46,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     private final OperationCompleter operationCompleter;
     private BatchedModifications batchedModifications;
+    private int totalBatchedModificationsSent;
 
     protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
             String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
@@ -93,90 +91,56 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
-        // Send the remaining batched modifications if any.
+        // Send the remaining batched modifications, if any, with the ready flag set.
 
-        sendAndRecordBatchedModifications();
+        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
 
-        // Send the ReadyTransaction message to the Tx actor.
-
-        Future<Object> readyReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
-
-        return combineRecordedOperationsFutures(readyReplyFuture);
+        return transformReadyReply(lastModificationsFuture);
     }
 
-    protected Future<ActorSelection> combineRecordedOperationsFutures(final Future<Object> withLastReplyFuture) {
-        // Combine all the previously recorded put/merge/delete operation reply Futures and the
-        // ReadyTransactionReply Future into one Future. If any one fails then the combined
-        // Future will fail. We need all prior operations and the ready operation to succeed
-        // in order to attempt commit.
-
-        List<Future<Object>> futureList = Lists.newArrayListWithCapacity(recordedOperationCount() + 1);
-        copyRecordedOperationFutures(futureList);
-        futureList.add(withLastReplyFuture);
-
-        Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
-                actorContext.getClientDispatcher());
-
-        // Transform the combined Future into a Future that returns the cohort actor path from
-        // the ReadyTransactionReply. That's the end result of the ready operation.
+    protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
+        // Transform the last reply Future into a Future that returns the cohort actor path from
+        // the last reply message. That's the end result of the ready operation.
 
-        return combinedFutures.transform(new Mapper<Iterable<Object>, ActorSelection>() {
+        return readyReplyFuture.transform(new Mapper<Object, ActorSelection>() {
             @Override
-            public ActorSelection checkedApply(Iterable<Object> notUsed) {
-                LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
-                    getIdentifier());
-
-                // At this point all the Futures succeeded and we need to extract the cohort
-                // actor path from the ReadyTransactionReply. For the recorded operations, they
-                // don't return any data so we're only interested that they completed
-                // successfully. We could be paranoid and verify the correct reply types but
-                // that really should never happen so it's not worth the overhead of
-                // de-serializing each reply.
-
-                // Note the Future get call here won't block as it's complete.
-                Object serializedReadyReply = withLastReplyFuture.value().get().get();
-                if (serializedReadyReply instanceof ReadyTransactionReply) {
-                    return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply instanceof BatchedModificationsReply) {
-                    return actorContext.actorSelection(((BatchedModificationsReply)serializedReadyReply).getCohortPath());
-                } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
-                    ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                    String cohortPath = deserializeCohortPath(reply.getCohortPath());
-                    return actorContext.actorSelection(cohortPath);
-                } else {
-                    // Throwing an exception here will fail the Future.
-                    throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
-                        getIdentifier(), serializedReadyReply.getClass()));
+            public ActorSelection checkedApply(Object serializedReadyReply) {
+                LOG.debug("Tx {} readyTransaction", getIdentifier());
+
+                // At this point the ready operation succeeded and we need to extract the cohort
+                // actor path from the reply.
+                if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+                    ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+                    return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
                 }
+
+                // Throwing an exception here will fail the Future.
+                throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+                        getIdentifier(), serializedReadyReply.getClass()));
             }
         }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
-    protected String deserializeCohortPath(String cohortPath) {
-        return cohortPath;
+    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+        return readyTxReply.getCohortPath();
+    }
+
+    private BatchedModifications newBatchedModifications() {
+        return new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion, transactionChainId);
     }
 
     private void batchModification(Modification modification) {
         if(batchedModifications == null) {
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            batchedModifications = newBatchedModifications();
         }
 
         batchedModifications.addModification(modification);
 
         if(batchedModifications.getModifications().size() >=
                 actorContext.getDatastoreContext().getShardBatchedModificationCount()) {
-            sendAndRecordBatchedModifications();
-        }
-    }
-
-    private void sendAndRecordBatchedModifications() {
-        Future<Object> sentFuture = sendBatchedModifications();
-        if(sentFuture != null) {
-            recordOperationFuture(sentFuture);
+            sendBatchedModifications();
         }
     }
 
@@ -186,17 +150,25 @@ public class TransactionContextImpl extends AbstractTransactionContext {
 
     protected Future<Object> sendBatchedModifications(boolean ready) {
         Future<Object> sent = null;
-        if(batchedModifications != null) {
+        if(ready || (batchedModifications != null && !batchedModifications.getModifications().isEmpty())) {
+            if(batchedModifications == null) {
+                batchedModifications = newBatchedModifications();
+            }
+
             if(LOG.isDebugEnabled()) {
                 LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
                         batchedModifications.getModifications().size(), ready);
             }
 
             batchedModifications.setReady(ready);
+            batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
             sent = executeOperationAsync(batchedModifications);
 
-            batchedModifications = new BatchedModifications(getIdentifier().toString(), remoteTransactionVersion,
-                    transactionChainId);
+            if(ready) {
+                batchedModifications = null;
+            } else {
+                batchedModifications = newBatchedModifications();
+            }
         }
 
         return sent;
@@ -232,7 +204,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        sendAndRecordBatchedModifications();
+        sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
@@ -274,7 +246,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
 
-        sendAndRecordBatchedModifications();
+        sendBatchedModifications();
 
         OnComplete<Object> onComplete = new OnComplete<Object>() {
             @Override
index 59c9298..388dd9f 100644 (file)
@@ -16,12 +16,16 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +38,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -153,19 +158,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return new TransactionIdentifier(memberName, counter.getAndIncrement());
     }
 
-    @VisibleForTesting
-    List<Future<Object>> getRecordedOperationFutures() {
-        List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if (transactionContext != null) {
-                transactionContext.copyRecordedOperationFutures(recordedOperationFutures);
-            }
-        }
-
-        return recordedOperationFutures;
-    }
-
     @VisibleForTesting
     boolean hasTransactionContext() {
         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
@@ -178,6 +170,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return false;
     }
 
+    private boolean isRootPath(YangInstanceIdentifier path){
+        return !path.getPathArguments().iterator().hasNext();
+    }
+
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
 
@@ -186,21 +182,62 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
         LOG.debug("Tx {} read {}", getIdentifier(), path);
 
-        throttleOperation();
-
         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.readData(path, proxyFuture);
-            }
-        });
+        if(isRootPath(path)){
+            readAllData(path, proxyFuture);
+        } else {
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, proxyFuture);
+                }
+            });
+
+        }
 
         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
+    private void readAllData(final YangInstanceIdentifier path,
+                             final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+        Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
+        List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
+
+        for(String shardName : allShardNames){
+            final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
+
+            throttleOperation();
+
+            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
+            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.readData(path, subProxyFuture);
+                }
+            });
+
+            futures.add(subProxyFuture);
+        }
+
+        final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
+
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+                            future.get(), actorContext.getSchemaContext()));
+                } catch (InterruptedException | ExecutionException e) {
+                    proxyFuture.setException(e);
+                }
+            }
+        }, actorContext.getActorSystem().dispatcher());
+    }
+
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
 
@@ -409,6 +446,10 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
         String shardName = shardNameFromIdentifier(path);
+        return getOrCreateTxFutureCallback(shardName);
+    }
+
+    private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
         if(txFutureCallback == null) {
             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
@@ -685,10 +726,6 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
                 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
                         transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
                         operationCompleter);
-            } else if (transactionType == TransactionType.WRITE_ONLY &&
-                    actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
-                return new WriteOnlyTransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
-                    actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
             } else {
                 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
                         actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/WriteOnlyTransactionContextImpl.java
deleted file mode 100644 (file)
index e131354..0000000
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-
-/**
- * Context for a write-only transaction.
- *
- * @author Thomas Pantelis
- */
-public class WriteOnlyTransactionContextImpl extends TransactionContextImpl {
-    private static final Logger LOG = LoggerFactory.getLogger(WriteOnlyTransactionContextImpl.class);
-
-    public WriteOnlyTransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
-            String transactionChainId, ActorContext actorContext, SchemaContext schemaContext, boolean isTxActorLocal,
-            short remoteTransactionVersion, OperationCompleter operationCompleter) {
-        super(actor, identifier, transactionChainId, actorContext, schemaContext, isTxActorLocal,
-                remoteTransactionVersion, operationCompleter);
-    }
-
-    @Override
-    public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
-
-        // Send the remaining batched modifications if any.
-
-        Future<Object> lastModificationsFuture = sendBatchedModifications(true);
-
-        return combineRecordedOperationsFutures(lastModificationsFuture);
-    }
-}
index c345033..d17497c 100644 (file)
@@ -15,6 +15,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIden
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -45,36 +46,32 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
 
     @Override
     public void deleteData(YangInstanceIdentifier path) {
-        recordOperationFuture(executeOperationAsync(
-                new DeleteData(path, getRemoteTransactionVersion())));
+        executeOperationAsync(new DeleteData(path, getRemoteTransactionVersion()));
     }
 
     @Override
     public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordOperationFuture(executeOperationAsync(
-                new MergeData(path, data, getRemoteTransactionVersion())));
+        executeOperationAsync(new MergeData(path, data, getRemoteTransactionVersion()));
     }
 
     @Override
     public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        recordOperationFuture(executeOperationAsync(
-                new WriteData(path, data, getRemoteTransactionVersion())));
+        executeOperationAsync(new WriteData(path, data, getRemoteTransactionVersion()));
     }
 
     @Override
     public Future<ActorSelection> readyTransaction() {
-        LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
-            getIdentifier(), recordedOperationCount());
+        LOG.debug("Tx {} readyTransaction called", getIdentifier());
 
         // Send the ReadyTransaction message to the Tx actor.
 
         Future<Object> lastReplyFuture = executeOperationAsync(ReadyTransaction.INSTANCE);
 
-        return combineRecordedOperationsFutures(lastReplyFuture);
+        return transformReadyReply(lastReplyFuture);
     }
 
     @Override
-    protected String deserializeCohortPath(String cohortPath) {
+    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
         // In base Helium we used to return the local path of the actor which represented
         // a remote ThreePhaseCommitCohort. The local path would then be converted to
         // a remote path using this resolvePath method. To maintain compatibility with
@@ -83,9 +80,9 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
         // we could remove this code to resolvePath and just use the cohortPath as the
         // resolved cohortPath
         if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            return getActorContext().resolvePath(transactionPath, cohortPath);
+            return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath());
         }
 
-        return cohortPath;
+        return readyTxReply.getCohortPath();
     }
 }
index a9ce94b..86f96f5 100644 (file)
@@ -22,6 +22,7 @@ public class BatchedModifications extends MutableCompositeModification implement
     private static final long serialVersionUID = 1L;
 
     private boolean ready;
+    private int totalMessagesSent;
     private String transactionID;
     private String transactionChainID;
 
@@ -42,6 +43,14 @@ public class BatchedModifications extends MutableCompositeModification implement
         this.ready = ready;
     }
 
+    public int getTotalMessagesSent() {
+        return totalMessagesSent;
+    }
+
+    public void setTotalMessagesSent(int totalMessagesSent) {
+        this.totalMessagesSent = totalMessagesSent;
+    }
+
     public String getTransactionID() {
         return transactionID;
     }
@@ -56,6 +65,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         transactionID = in.readUTF();
         transactionChainID = in.readUTF();
         ready = in.readBoolean();
+        totalMessagesSent = in.readInt();
     }
 
     @Override
@@ -64,6 +74,7 @@ public class BatchedModifications extends MutableCompositeModification implement
         out.writeUTF(transactionID);
         out.writeUTF(transactionChainID);
         out.writeBoolean(ready);
+        out.writeInt(totalMessagesSent);
     }
 
     @Override
@@ -74,8 +85,10 @@ public class BatchedModifications extends MutableCompositeModification implement
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", ready=").append(ready)
-                .append(", modifications size=").append(getModifications().size()).append("]");
+        builder.append("BatchedModifications [transactionID=").append(transactionID).append(", transactionChainID=")
+                .append(transactionChainID).append(", ready=").append(ready).append(", totalMessagesSent=")
+                .append(totalMessagesSent).append(", modifications size=").append(getModifications().size())
+                .append("]");
         return builder.toString();
     }
 }
index a10c6ac..895de3a 100644 (file)
@@ -19,11 +19,7 @@ import java.io.ObjectOutput;
 public class BatchedModificationsReply extends VersionedExternalizableMessage {
     private static final long serialVersionUID = 1L;
 
-    private static final byte COHORT_PATH_NOT_PRESENT = 0;
-    private static final byte COHORT_PATH_PRESENT = 1;
-
     private int numBatched;
-    private String cohortPath;
 
     public BatchedModificationsReply() {
     }
@@ -32,40 +28,20 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage {
         this.numBatched = numBatched;
     }
 
-    public BatchedModificationsReply(int numBatched, String cohortPath) {
-        this.numBatched = numBatched;
-        this.cohortPath = cohortPath;
-    }
-
     public int getNumBatched() {
         return numBatched;
     }
 
-    public String getCohortPath() {
-        return cohortPath;
-    }
-
     @Override
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         super.readExternal(in);
         numBatched = in.readInt();
-
-        if(in.readByte() == COHORT_PATH_PRESENT) {
-            cohortPath = in.readUTF();
-        }
     }
 
     @Override
     public void writeExternal(ObjectOutput out) throws IOException {
         super.writeExternal(out);
         out.writeInt(numBatched);
-
-        if(cohortPath != null) {
-            out.writeByte(COHORT_PATH_PRESENT);
-            out.writeUTF(cohortPath);
-        } else {
-            out.writeByte(COHORT_PATH_NOT_PRESENT);
-        }
     }
 
     @Override
@@ -76,8 +52,7 @@ public class BatchedModificationsReply extends VersionedExternalizableMessage {
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder();
-        builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append(", cohortPath=")
-                .append(cohortPath).append("]");
+        builder.append("BatchedModificationsReply [numBatched=").append(numBatched).append("]");
         return builder.toString();
     }
 }
index 38886c9..0f87243 100644 (file)
@@ -20,9 +20,9 @@ public class ForwardedReadyTransaction {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
-    private final int txnClientVersion;
+    private final short txnClientVersion;
 
-    public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+    public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
             DOMStoreThreePhaseCommitCohort cohort, Modification modification,
             boolean returnSerialized) {
         this.transactionID = transactionID;
@@ -48,7 +48,7 @@ public class ForwardedReadyTransaction {
         return returnSerialized;
     }
 
-    public int getTxnClientVersion() {
+    public short getTxnClientVersion() {
         return txnClientVersion;
     }
 }
index 09617ab..8d617d0 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.messages;
 
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
+@Deprecated
 public class ReadyTransaction implements SerializableMessage{
     public static final Class<ShardTransactionMessages.ReadyTransaction> SERIALIZABLE_CLASS =
             ShardTransactionMessages.ReadyTransaction.class;
index 282e23e..b25a5dd 100644 (file)
@@ -8,15 +8,29 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 
-public class ReadyTransactionReply implements SerializableMessage {
+public class ReadyTransactionReply extends VersionedExternalizableMessage {
+    private static final long serialVersionUID = 1L;
+
     public static final Class<ShardTransactionMessages.ReadyTransactionReply> SERIALIZABLE_CLASS =
             ShardTransactionMessages.ReadyTransactionReply.class;
 
-    private final String cohortPath;
+    private String cohortPath;
+
+    public ReadyTransactionReply() {
+    }
 
     public ReadyTransactionReply(String cohortPath) {
+        this(cohortPath, DataStoreVersions.CURRENT_VERSION);
+    }
+
+    public ReadyTransactionReply(String cohortPath, short version) {
+        super(version);
         this.cohortPath = cohortPath;
     }
 
@@ -25,16 +39,38 @@ public class ReadyTransactionReply implements SerializableMessage {
     }
 
     @Override
-    public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
-        return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
-                .setActorPath(cohortPath)
-                .build();
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        cohortPath = in.readUTF();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        out.writeUTF(cohortPath);
+    }
+
+    @Override
+    public Object toSerializable() {
+        if(getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
+            return this;
+        } else {
+            return ShardTransactionMessages.ReadyTransactionReply.newBuilder().setActorPath(cohortPath).build();
+        }
     }
 
     public static ReadyTransactionReply fromSerializable(Object serializable) {
-        ShardTransactionMessages.ReadyTransactionReply o =
-                (ShardTransactionMessages.ReadyTransactionReply) serializable;
+        if(serializable instanceof ReadyTransactionReply) {
+            return (ReadyTransactionReply)serializable;
+        } else {
+            ShardTransactionMessages.ReadyTransactionReply o =
+                    (ShardTransactionMessages.ReadyTransactionReply) serializable;
+            return new ReadyTransactionReply(o.getActorPath(), DataStoreVersions.HELIUM_2_VERSION);
+        }
+    }
 
-        return new ReadyTransactionReply(o.getActorPath());
+    public static boolean isSerializedType(Object message) {
+        return message instanceof ReadyTransactionReply ||
+               message instanceof ShardTransactionMessages.ReadyTransactionReply;
     }
 }
index f53368d..17d9880 100644 (file)
@@ -540,6 +540,10 @@ public class ActorContext {
         return this.dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Notification);
     }
 
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
     protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout){
         return ask(actorRef, message, timeout);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregator.java
new file mode 100644 (file)
index 0000000..eb13078
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class NormalizedNodeAggregator {
+
+    private static final ExecutorService executorService = MoreExecutors.newDirectExecutorService();
+
+    private final YangInstanceIdentifier rootIdentifier;
+    private final List<Optional<NormalizedNode<?, ?>>> nodes;
+    private final InMemoryDOMDataStore dataStore;
+
+    NormalizedNodeAggregator(YangInstanceIdentifier rootIdentifier, List<Optional<NormalizedNode<?, ?>>> nodes,
+                             SchemaContext schemaContext){
+
+        this.rootIdentifier = rootIdentifier;
+        this.nodes = nodes;
+        this.dataStore = new InMemoryDOMDataStore("aggregator", executorService);
+        this.dataStore.onGlobalContextUpdated(schemaContext);
+    }
+
+    /**
+     * Combine data from all the nodes in the list into a tree with root as rootIdentifier
+     *
+     * @param nodes
+     * @param schemaContext
+     * @return
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    public static Optional<NormalizedNode<?,?>> aggregate(YangInstanceIdentifier rootIdentifier,
+                                                          List<Optional<NormalizedNode<?, ?>>> nodes,
+                                                          SchemaContext schemaContext)
+            throws ExecutionException, InterruptedException {
+        return new NormalizedNodeAggregator(rootIdentifier, nodes, schemaContext).aggregate();
+    }
+
+    private Optional<NormalizedNode<?,?>> aggregate() throws ExecutionException, InterruptedException {
+        return combine().getRootNode();
+    }
+
+    private NormalizedNodeAggregator combine() throws InterruptedException, ExecutionException {
+        DOMStoreWriteTransaction domStoreWriteTransaction = dataStore.newWriteOnlyTransaction();
+
+        for(Optional<NormalizedNode<?,?>> node : nodes) {
+            if(node.isPresent()) {
+                domStoreWriteTransaction.merge(rootIdentifier, node.get());
+            }
+        }
+        DOMStoreThreePhaseCommitCohort ready = domStoreWriteTransaction.ready();
+        ready.canCommit().get();
+        ready.preCommit().get();
+        ready.commit().get();
+
+        return this;
+    }
+
+    private Optional<NormalizedNode<?, ?>> getRootNode() throws InterruptedException, ExecutionException {
+        DOMStoreReadTransaction readTransaction = dataStore.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
+                readTransaction.read(rootIdentifier);
+
+        return read.get();
+    }
+
+
+}
index c6c5486..6a1e12a 100644 (file)
@@ -50,7 +50,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
@@ -204,10 +203,6 @@ public abstract class AbstractTransactionProxyTest {
         return argThat(matcher);
     }
 
-    protected Future<Object> readySerializedTxReply(String path) {
-        return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
-    }
-
     protected Future<Object> readyTxReply(String path) {
         return Futures.successful((Object)new ReadyTransactionReply(path));
     }
@@ -250,10 +245,8 @@ public abstract class AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
-    protected void expectBatchedModificationsReady(ActorRef actorRef, int count) {
-        Future<BatchedModificationsReply> replyFuture = Futures.successful(
-                new BatchedModificationsReply(count, actorRef.path().toString()));
-        doReturn(replyFuture).when(mockActorContext).executeOperationAsync(
+    protected void expectBatchedModificationsReady(ActorRef actorRef) {
+        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
     }
 
@@ -267,11 +260,6 @@ public abstract class AbstractTransactionProxyTest {
                 any(ActorSelection.class), isA(BatchedModifications.class));
     }
 
-    protected void expectReadyTransaction(ActorRef actorRef) {
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
-    }
-
     protected void expectFailedBatchedModifications(ActorRef actorRef) {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(BatchedModifications.class));
index 103d18b..e3b82df 100644 (file)
@@ -437,42 +437,42 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            final String transactionID1 = "tx1";
-            final String transactionID2 = "tx2";
-            final String transactionID3 = "tx3";
+         // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort1 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort2 = new AtomicReference<>();
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort3 = new AtomicReference<>();
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual) {
-                    if(transactionID.equals(transactionID1)) {
-                        mockCohort1.set(createDelegatingMockCohort("cohort1", actual));
-                        return mockCohort1.get();
-                    } else if(transactionID.equals(transactionID2)) {
-                        mockCohort2.set(createDelegatingMockCohort("cohort2", actual));
-                        return mockCohort2.get();
-                    } else {
-                        mockCohort3.set(createDelegatingMockCohort("cohort3", actual));
-                        return mockCohort3.get();
-                    }
-                }
-            };
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification3);
 
             long timeoutSec = 5;
             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
             final Timeout timeout = new Timeout(duration);
 
-            // Send a BatchedModifications message for the first transaction.
+            // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            BatchedModificationsReply batchedReply = expectMsgClass(duration, BatchedModificationsReply.class);
-            assertEquals("getCohortPath", shard.path().toString(), batchedReply.getCohortPath());
-            assertEquals("getNumBatched", 1, batchedReply.getNumBatched());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
+                    expectMsgClass(duration, ReadyTransactionReply.class));
+            assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -481,16 +481,15 @@ public class ShardTest extends AbstractShardTest {
                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Can commit", true, canCommitReply.getCanCommit());
 
-            // Send BatchedModifications for the next 2 Tx's.
+            // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
             // processed after the first Tx completes.
@@ -583,16 +582,16 @@ public class ShardTest extends AbstractShardTest {
 
             assertEquals("Commits complete", true, done);
 
-            InOrder inOrder = inOrder(mockCohort1.get(), mockCohort2.get(), mockCohort3.get());
-            inOrder.verify(mockCohort1.get()).canCommit();
-            inOrder.verify(mockCohort1.get()).preCommit();
-            inOrder.verify(mockCohort1.get()).commit();
-            inOrder.verify(mockCohort2.get()).canCommit();
-            inOrder.verify(mockCohort2.get()).preCommit();
-            inOrder.verify(mockCohort2.get()).commit();
-            inOrder.verify(mockCohort3.get()).canCommit();
-            inOrder.verify(mockCohort3.get()).preCommit();
-            inOrder.verify(mockCohort3.get()).commit();
+            InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
+            inOrder.verify(cohort1).canCommit();
+            inOrder.verify(cohort1).preCommit();
+            inOrder.verify(cohort1).commit();
+            inOrder.verify(cohort2).canCommit();
+            inOrder.verify(cohort2).preCommit();
+            inOrder.verify(cohort2).commit();
+            inOrder.verify(cohort3).canCommit();
+            inOrder.verify(cohort3).preCommit();
+            inOrder.verify(cohort3).commit();
 
             // Verify data in the data store.
 
@@ -670,7 +669,7 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(newBatchedModifications(transactionID, YangInstanceIdentifier.builder(
                     TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -729,7 +728,7 @@ public class ShardTest extends AbstractShardTest {
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             shard.tell(newBatchedModifications(transactionID1, transactionChainID, path,
                     containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Create a read Tx on the same chain.
 
@@ -811,14 +810,24 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
+            // Setup a simulated transactions with a mock cohort.
+
             String transactionID = "tx";
+            MutableCompositeModification modification = new MutableCompositeModification();
+            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+                    TestModel.TEST_PATH, containerNode, modification);
+
             FiniteDuration duration = duration("5 seconds");
 
-            // Send a BatchedModifications to start a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH, containerNode, true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -832,6 +841,11 @@ public class ShardTest extends AbstractShardTest {
             shard.tell(new CommitTransaction(transactionID).toSerializable(), getRef());
             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
 
+            InOrder inOrder = inOrder(cohort);
+            inOrder.verify(cohort).canCommit();
+            inOrder.verify(cohort).preCommit();
+            inOrder.verify(cohort).commit();
+
             NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.TEST_PATH);
             assertEquals(TestModel.TEST_QNAME.getLocalName(), containerNode, actualNode);
 
@@ -865,7 +879,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -920,7 +934,7 @@ public class ShardTest extends AbstractShardTest {
 
                 shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
                         cohort, modification, true), getRef());
-                expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
+                expectMsgClass(duration, ReadyTransactionReply.class);
 
                 // Send the CanCommitTransaction message.
 
@@ -959,40 +973,34 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            // Setup 2 mock cohorts. The first one fails in the commit phase.
+         // Setup 2 simulated transactions with mock cohorts. The first one fails in the
+            // commit phase.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            // Send BatchedModifications to start and ready each transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
@@ -1045,27 +1053,19 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).preCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
             FiniteDuration duration = duration("5 seconds");
 
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -1100,24 +1100,16 @@ public class ShardTest extends AbstractShardTest {
             final FiniteDuration duration = duration("5 seconds");
 
             String transactionID = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return cohort;
-                }
-            };
-
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready a transaction.
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message.
 
@@ -1161,9 +1153,14 @@ public class ShardTest extends AbstractShardTest {
                 }
             };
 
-            shard.tell(newBatchedModifications(transactionID, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
+                    modification, preCommit);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
@@ -1197,26 +1194,42 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
 
-            // Create and ready the 1st Tx - will timeout
+            // Create 1st Tx - will timeout
 
             String transactionID1 = "tx1";
-            shard.tell(newBatchedModifications(transactionID1, YangInstanceIdentifier.builder(
-                    TestModel.OUTER_LIST_PATH).nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
-                ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                        .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
+                    modification1);
 
-            // Create and ready the 2nd Tx
+            // Create 2nd Tx
 
-            String transactionID2 = "tx2";
+            String transactionID2 = "tx3";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
-                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            shard.tell(newBatchedModifications(transactionID2, listNodePath,
-                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+                .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+                    listNodePath,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
+                    modification2);
+
+            // Ready the Tx's
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
 
@@ -1253,23 +1266,38 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
+            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+
             String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
             String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.OUTER_LIST_PATH,
+                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                    modification2);
+
             String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
-            // Send a BatchedModifications to start transactions and ready them.
+            // Ready the Tx's
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID2,TestModel.OUTER_LIST_PATH,
-                    ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID3, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // canCommit 1st Tx.
 
@@ -1314,37 +1342,30 @@ public class ShardTest extends AbstractShardTest {
 
             // Setup 2 simulated transactions with mock cohorts. The first one will be aborted.
 
-            final String transactionID1 = "tx1";
-            final DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
-            final String transactionID2 = "tx2";
-            final DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
             final Timeout timeout = new Timeout(duration);
 
-            ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
-                @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String transactionID,
-                        DOMStoreThreePhaseCommitCohort actual) {
-                    return transactionID1.equals(transactionID) ? cohort1 : cohort2;
-                }
-            };
+            // Simulate the ForwardedReadyTransaction messages that would be sent
+            // by the ShardTransaction.
 
-            shard.underlyingActor().getCommitCoordinator().setCohortDecorator(cohortDecorator);
-
-            // Send BatchedModifications to start and ready each transaction.
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
-            shard.tell(newBatchedModifications(transactionID1, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
-
-            shard.tell(newBatchedModifications(transactionID2, TestModel.TEST_PATH,
-                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), true), getRef());
-            expectMsgClass(duration, BatchedModificationsReply.class);
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
 
             // Send the CanCommitTransaction message for the first Tx.
 
index 41ea7aa..c9335f3 100644 (file)
@@ -4,8 +4,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doThrow;
 import akka.actor.ActorRef;
 import akka.actor.Props;
+import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
@@ -52,6 +54,7 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@@ -409,34 +412,125 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveReadyTransaction() throws Exception {
+    public void testOnReceiveBatchedModificationsReady() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveBatchedModificationsReady");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+            NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                    new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                    withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.addModification(new WriteModification(writePath, writeData));
+
+            transaction.tell(batched, getRef());
+            BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
+            assertEquals("getNumBatched", 1, reply.getNumBatched());
+
+            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+        }};
+    }
+
+    @Test(expected=TestException.class)
+    public void testOnReceiveBatchedModificationsFailure() throws Throwable {
+        new JavaTestKit(getSystem()) {{
+
+            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
+            final ActorRef transaction = newTransactionActor(mockWriteTx,
+                    "testOnReceiveBatchedModificationsFailure");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            YangInstanceIdentifier path = TestModel.TEST_PATH;
+            ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+            doThrow(new TestException()).when(mockWriteTx).write(path, node);
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.addModification(new WriteModification(path, node));
+
+            transaction.tell(batched, getRef());
+            expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+
+            batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
+        new JavaTestKit(getSystem()) {{
+
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
+
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
+
+            BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
+            batched.setReady(true);
+            batched.setTotalMessagesSent(2);
+
+            transaction.tell(batched, getRef());
+
+            Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
+
+            if(failure != null) {
+                throw failure.cause();
+            }
+        }};
+    }
+
+    @Test
+    public void testOnReceivePreLithiumReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testReadyTransaction");
+                    "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
 
-            watch(transaction);
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
 
             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
-                    Terminated.class);
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
-                    Terminated.class);
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
 
         // test
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testReadyTransaction2");
+                    "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
 
-            watch(transaction);
+            JavaTestKit watcher = new JavaTestKit(getSystem());
+            watcher.watch(transaction);
 
             transaction.tell(new ReadyTransaction(), getRef());
 
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
-                    Terminated.class);
-            expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
-                    Terminated.class);
+            expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+            watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
         }};
     }
 
@@ -539,4 +633,8 @@ public class ShardTransactionTest extends AbstractActorTest {
             expectMsgClass(duration("3 seconds"), Terminated.class);
         }};
     }
+
+    public static class TestException extends RuntimeException {
+        private static final long serialVersionUID = 1L;
+    }
 }
index acba775..026b549 100644 (file)
@@ -30,8 +30,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -176,7 +174,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        batchedReplyPromise1.success(new BatchedModificationsReply(1, txActorRef1.path().toString()));
+        batchedReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         // Tx 2 should've proceeded to find the primary shard.
         verify(mockActorContext, timeout(5000).times(2)).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
@@ -196,7 +194,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         Promise<Object> readyReplyPromise1 = akka.dispatch.Futures.promise();
         doReturn(readyReplyPromise1.future()).when(mockActorContext).executeOperationAsync(
-                eq(actorSelection(txActorRef1)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+                eq(actorSelection(txActorRef1)), isA(BatchedModifications.class));
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newReadWriteTransaction();
 
@@ -205,7 +203,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         writeTx1.ready();
 
-        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), false);
+        verifyOneBatchedModification(txActorRef1, new WriteModification(TestModel.TEST_PATH, writeNode1), true);
 
         String tx2MemberName = "tx2MemberName";
         doReturn(tx2MemberName).when(mockActorContext).getCurrentMemberName();
@@ -247,7 +245,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
             fail("Tx 2 should not have initiated until the Tx 1's ready future completed");
         }
 
-        readyReplyPromise1.success(readySerializedTxReply(txActorRef1.path().toString()).value().get().get());
+        readyReplyPromise1.success(readyTxReply(txActorRef1.path().toString()).value().get().get());
 
         verify(mockActorContext, timeout(5000)).executeOperationAsync(eq(getSystem().actorSelection(shardActorRef2.path())),
                 eqCreateTransaction(tx2MemberName, READ_WRITE));
index a247100..cc9692b 100644 (file)
@@ -3,12 +3,12 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType.READ_ONLY;
@@ -20,11 +20,14 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.dispatch.Futures;
 import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Assert;
@@ -37,7 +40,6 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
@@ -45,18 +47,19 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregatorTest;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Promise;
-import scala.concurrent.duration.Duration;
 
 @SuppressWarnings("resource")
 public class TransactionProxyTest extends AbstractTransactionProxyTest {
@@ -305,29 +308,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         transactionProxy.exists(TestModel.TEST_PATH);
     }
 
-    private void verifyRecordingOperationFutures(List<Future<Object>> futures,
-            Class<?>... expResultTypes) throws Exception {
-        assertEquals("getRecordingOperationFutures size", expResultTypes.length, futures.size());
-
-        int i = 0;
-        for( Future<Object> future: futures) {
-            assertNotNull("Recording operation Future is null", future);
-
-            Class<?> expResultType = expResultTypes[i++];
-            if(Throwable.class.isAssignableFrom(expResultType)) {
-                try {
-                    Await.result(future, Duration.create(5, TimeUnit.SECONDS));
-                    fail("Expected exception from recording operation Future");
-                } catch(Exception e) {
-                    // Expected
-                }
-            } else {
-                assertEquals(String.format("Recording operation %d Future result type", i +1 ), expResultType,
-                             Await.result(future, Duration.create(5, TimeUnit.SECONDS)).getClass());
-            }
-        }
-    }
-
     @Test
     public void testWrite() throws Exception {
         dataStoreContextBuilder.shardBatchedModificationCount(1);
@@ -356,8 +336,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
+        expectBatchedModificationsReady(actorRef);
 
         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -396,10 +375,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         // This sends the batched modification.
         transactionProxy.ready();
 
-        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), false);
-
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
+        verifyOneBatchedModification(actorRef, new WriteModification(TestModel.TEST_PATH, nodeToWrite), true);
     }
 
     @Test(expected=IllegalStateException.class)
@@ -448,7 +424,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     }
 
     @Test
-    public void testReadyWithReadWrite() throws Exception {
+    public void testReadWrite() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
@@ -457,7 +433,34 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
         expectBatchedModifications(actorRef, 1);
-        expectReadyTransaction(actorRef);
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), false,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+    }
+
+    @Test
+    public void testReadyWithReadWrite() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
@@ -471,31 +474,29 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
-        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
-                isA(BatchedModifications.class));
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
-        verify(mockActorContext).executeOperationAsync(eq(actorSelection(actorRef)),
-                isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        verifyBatchedModifications(batchedModifications.get(0), true,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+        assertEquals("getTotalMessageCount", 1, batchedModifications.get(0).getTotalMessagesSent());
     }
 
     @Test
-    public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
-        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
-
-        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
+    public void testReadyWithNoModifications() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
-        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        expectBatchedModificationsReady(actorRef, 1);
+        expectBatchedModificationsReady(actorRef);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
-        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+        transactionProxy.read(TestModel.TEST_PATH);
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
@@ -503,28 +504,23 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures());
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
         assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), true,
-                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
-
-        verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
-                isA(ReadyTransaction.SERIALIZABLE_CLASS));
+        verifyBatchedModifications(batchedModifications.get(0), true);
     }
 
     @Test
-    public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
-        dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
+    public void testReadyWithWriteOnlyAndLastBatchPending() throws Exception {
+        dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
+
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectBatchedModificationsReady(actorRef, 1);
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -536,34 +532,26 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class);
-
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
         List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
-        assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
+        assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
 
-        verifyBatchedModifications(batchedModifications.get(0), false,
+        verifyBatchedModifications(batchedModifications.get(0), true,
                 new WriteModification(TestModel.TEST_PATH, nodeToWrite));
 
-        verifyBatchedModifications(batchedModifications.get(1), true);
-
         verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
                 isA(ReadyTransaction.SERIALIZABLE_CLASS));
     }
 
     @Test
-    public void testReadyWithRecordingOperationFailure() throws Exception {
+    public void testReadyWithWriteOnlyAndLastBatchEmpty() throws Exception {
         dataStoreContextBuilder.shardBatchedModificationCount(1).writeOnlyTransactionOptimizationsEnabled(true);
-
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        expectFailedBatchedModifications(actorRef);
-
-        doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
 
@@ -575,9 +563,18 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
 
-        verifyCohortFutures(proxy, TestException.class);
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), TestException.class);
+        List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
+        assertEquals("Captured BatchedModifications count", 2, batchedModifications.size());
+
+        verifyBatchedModifications(batchedModifications.get(0), false,
+                new WriteModification(TestModel.TEST_PATH, nodeToWrite));
+
+        verifyBatchedModifications(batchedModifications.get(1), true);
+
+        verify(mockActorContext, never()).executeOperationAsync(eq(actorSelection(actorRef)),
+                isA(ReadyTransaction.SERIALIZABLE_CLASS));
     }
 
     @Test
@@ -749,18 +746,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
-        doReturn(batchedModificationsReply(1)).when(mockActorContext).executeOperationAsync(
-                any(ActorSelection.class), isA(BatchedModifications.class));
+        expectBatchedModificationsReady(actorRef);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
-        // testing ready
-        doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
-            eq(actorSelection(actorRef)), isA(ReadyTransaction.class));
-
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
 
         assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
@@ -1188,8 +1180,6 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, shardBatchedModificationCount);
 
-        expectReadyTransaction(actorRef);
-
         YangInstanceIdentifier writePath1 = TestModel.TEST_PATH;
         NormalizedNode<?, ?> writeNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -1234,17 +1224,10 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         verifyBatchedModifications(batchedModifications.get(1), false, new MergeModification(mergePath1, mergeNode1),
                 new MergeModification(mergePath2, mergeNode2), new WriteModification(writePath3, writeNode3));
 
-        boolean optimizedWriteOnly = type == WRITE_ONLY && dataStoreContextBuilder.build().isWriteOnlyTransactionOptimizationsEnabled();
-        verifyBatchedModifications(batchedModifications.get(2), optimizedWriteOnly, new MergeModification(mergePath3, mergeNode3),
+        verifyBatchedModifications(batchedModifications.get(2), true, new MergeModification(mergePath3, mergeNode3),
                 new DeleteModification(deletePath2));
 
-        if(optimizedWriteOnly) {
-            verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                    BatchedModificationsReply.class, BatchedModificationsReply.class);
-        } else {
-            verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                    BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
-        }
+        assertEquals("getTotalMessageCount", 3, batchedModifications.get(2).getTotalMessagesSent());
     }
 
     @Test
@@ -1349,8 +1332,80 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         inOrder.verify(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
+    }
+
+    @Test
+    public void testReadRoot() throws ReadFailedException, InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
+
+        SchemaContext schemaContext = SchemaContextHelper.full();
+        Configuration configuration = mock(Configuration.class);
+        doReturn(configuration).when(mockActorContext).getConfiguration();
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
+        doReturn(Sets.newHashSet("test", "cars")).when(configuration).getAllShardNames();
+
+        NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
+
+        setUpReadData("test", NormalizedNodeAggregatorTest.getRootNode(expectedNode1, schemaContext));
+        setUpReadData("cars", NormalizedNodeAggregatorTest.getRootNode(expectedNode2, schemaContext));
 
-        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
-                BatchedModificationsReply.class, BatchedModificationsReply.class, BatchedModificationsReply.class);
+        doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+
+        doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+
+        doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+
+        Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
+                YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
+
+        assertEquals("NormalizedNode isPresent", true, readOptional.isPresent());
+
+        NormalizedNode<?, ?> normalizedNode = readOptional.get();
+
+        assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
+
+        Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
+
+        for(NormalizedNode<?,?> node : collection){
+            assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
+        }
+
+        assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
+                NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME) != null);
+
+        assertEquals(expectedNode1, NormalizedNodeAggregatorTest.findChildWithQName(collection, TestModel.TEST_QNAME));
+
+        assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
+                NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
+
+        assertEquals(expectedNode2, NormalizedNodeAggregatorTest.findChildWithQName(collection, CarsModel.BASE_QNAME));
+    }
+
+
+    private void setUpReadData(String shardName, NormalizedNode<?, ?> expectedNode) {
+        ActorSystem actorSystem = getSystem();
+        ActorRef shardActorRef = getSystem().actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(getSystem().actorSelection(shardActorRef.path())).
+                when(mockActorContext).actorSelection(shardActorRef.path().toString());
+
+        doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
+                when(mockActorContext).findPrimaryShardAsync(eq(shardName));
+
+        doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
+
+        ActorRef txActorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
+
+        doReturn(actorSystem.actorSelection(txActorRef.path())).
+                when(mockActorContext).actorSelection(txActorRef.path().toString());
+
+        doReturn(Futures.successful(createTransactionReply(txActorRef, DataStoreVersions.CURRENT_VERSION))).when(mockActorContext).
+                executeOperationAsync(eq(actorSystem.actorSelection(shardActorRef.path())),
+                        eqCreateTransaction(memberName, TransactionType.READ_ONLY));
+
+        doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(txActorRef)), eqSerializedReadData(YangInstanceIdentifier.builder().build()));
     }
 }
index cc860ea..9e1557a 100644 (file)
@@ -11,7 +11,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.inOrder;
-import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
+import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.dispatch.Dispatchers;
@@ -246,7 +246,7 @@ public class PreLithiumShardTest extends AbstractShardTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
                     cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
@@ -261,11 +261,11 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
                     cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+            shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
                     cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
index 2980f83..4cf8b67 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -41,6 +42,7 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Future;
 
 /**
  * Unit tests for backwards compatibility with pre-Lithium versions.
@@ -93,6 +95,10 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         return argThat(matcher);
     }
 
+    private Future<Object> readySerializedTxReply(String path, short version) {
+        return Futures.successful(new ReadyTransactionReply(path, version).toSerializable());
+    }
+
     private ActorRef testCompatibilityWithHeliumVersion(short version) throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, version);
 
@@ -110,7 +116,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(Futures.successful(new DeleteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyDeleteData(TestModel.TEST_PATH));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+        doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
@@ -170,7 +176,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(Futures.successful(new WriteDataReply().toSerializable(version))).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqLegacyWriteData(testNode));
 
-        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+        doReturn(readySerializedTxReply(actorRef.path().toString(), version)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
 
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
index c4027ad..1df8e97 100644 (file)
@@ -46,6 +46,7 @@ public class BatchedModificationsTest {
         batched.addModification(new MergeModification(mergePath, mergeData));
         batched.addModification(new DeleteModification(deletePath));
         batched.setReady(true);
+        batched.setTotalMessagesSent(5);
 
         BatchedModifications clone = (BatchedModifications) SerializationUtils.clone(
                 (Serializable) batched.toSerializable());
@@ -54,6 +55,7 @@ public class BatchedModificationsTest {
         assertEquals("getTransactionID", "tx1", clone.getTransactionID());
         assertEquals("getTransactionChainID", "txChain", clone.getTransactionChainID());
         assertEquals("isReady", true, clone.isReady());
+        assertEquals("getTotalMessagesSent", 5, clone.getTotalMessagesSent());
 
         assertEquals("getModifications size", 3, clone.getModifications().size());
 
@@ -91,11 +93,5 @@ public class BatchedModificationsTest {
         BatchedModificationsReply clone = (BatchedModificationsReply) SerializationUtils.clone(
                 (Serializable) new BatchedModificationsReply(100).toSerializable());
         assertEquals("getNumBatched", 100, clone.getNumBatched());
-        assertEquals("getCohortPath", null, clone.getCohortPath());
-
-        clone = (BatchedModificationsReply) SerializationUtils.clone(
-                (Serializable) new BatchedModificationsReply(50, "cohort path").toSerializable());
-        assertEquals("getNumBatched", 50, clone.getNumBatched());
-        assertEquals("getCohortPath", "cohort path", clone.getCohortPath());
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReplyTest.java
new file mode 100644 (file)
index 0000000..db525ea
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import java.io.Serializable;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+
+/**
+ * Unit tests for ReadyTransactionReply.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadyTransactionReplyTest {
+
+    @Test
+    public void testSerialization() {
+        String cohortPath = "cohort path";
+        ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ReadyTransactionReply.class, serialized.getClass());
+
+        ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, actual.getVersion());
+        assertEquals("getCohortPath", cohortPath, actual.getCohortPath());
+    }
+
+    @Test
+    public void testSerializationWithPreLithiumVersion() throws Exception {
+        String cohortPath = "cohort path";
+        ReadyTransactionReply expected = new ReadyTransactionReply(cohortPath, DataStoreVersions.HELIUM_2_VERSION);
+
+        Object serialized = expected.toSerializable();
+        assertEquals("Serialized type", ShardTransactionMessages.ReadyTransactionReply.class, serialized.getClass());
+
+        ReadyTransactionReply actual = ReadyTransactionReply.fromSerializable(SerializationUtils.clone(
+                (Serializable) serialized));
+        assertEquals("getVersion", DataStoreVersions.HELIUM_2_VERSION, actual.getVersion());
+        assertEquals("getCohortPath", cohortPath, actual.getCohortPath());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/NormalizedNodeAggregatorTest.java
new file mode 100644 (file)
index 0000000..40d3704
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import org.junit.Test;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class NormalizedNodeAggregatorTest {
+
+    @Test
+    public void testAggregate() throws InterruptedException, ExecutionException, ReadFailedException {
+        SchemaContext schemaContext = SchemaContextHelper.full();
+        NormalizedNode<?, ?> expectedNode1 = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        NormalizedNode<?, ?> expectedNode2 = ImmutableNodes.containerNode(CarsModel.CARS_QNAME);
+
+        Optional<NormalizedNode<?, ?>> optional = NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
+                Lists.newArrayList(
+                        Optional.<NormalizedNode<?, ?>>of(getRootNode(expectedNode1, schemaContext)),
+                        Optional.<NormalizedNode<?, ?>>of(getRootNode(expectedNode2, schemaContext))),
+                schemaContext);
+
+
+        NormalizedNode<?,?> normalizedNode = optional.get();
+
+        assertTrue("Expect value to be a Collection", normalizedNode.getValue() instanceof Collection);
+
+        Collection<NormalizedNode<?,?>> collection = (Collection<NormalizedNode<?,?>>) normalizedNode.getValue();
+
+        for(NormalizedNode<?,?> node : collection){
+            assertTrue("Expected " + node + " to be a ContainerNode", node instanceof ContainerNode);
+        }
+
+        assertTrue("Child with QName = " + TestModel.TEST_QNAME + " not found",
+                findChildWithQName(collection, TestModel.TEST_QNAME) != null);
+
+        assertEquals(expectedNode1, findChildWithQName(collection, TestModel.TEST_QNAME));
+
+        assertTrue("Child with QName = " + CarsModel.BASE_QNAME + " not found",
+                findChildWithQName(collection, CarsModel.BASE_QNAME) != null);
+
+        assertEquals(expectedNode2, findChildWithQName(collection, CarsModel.BASE_QNAME));
+
+    }
+
+    public static NormalizedNode<?,?> getRootNode(NormalizedNode<?, ?> moduleNode, SchemaContext schemaContext) throws ReadFailedException, ExecutionException, InterruptedException {
+        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", Executors.newSingleThreadExecutor());
+        store.onGlobalContextUpdated(schemaContext);
+
+        DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+
+        writeTransaction.merge(YangInstanceIdentifier.builder().node(moduleNode.getNodeType()).build(), moduleNode);
+
+        DOMStoreThreePhaseCommitCohort ready = writeTransaction.ready();
+
+        ready.canCommit().get();
+        ready.preCommit().get();
+        ready.commit().get();
+
+        DOMStoreReadTransaction readTransaction = store.newReadOnlyTransaction();
+
+        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read = readTransaction.read(YangInstanceIdentifier.builder().build());
+
+        Optional<NormalizedNode<?, ?>> nodeOptional = read.checkedGet();
+
+        return nodeOptional.get();
+    }
+
+    public static NormalizedNode<?,?> findChildWithQName(Collection<NormalizedNode<?, ?>> collection, QName qName) {
+        for(NormalizedNode<?,?> node : collection){
+            if(node.getNodeType().equals(qName)){
+                return node;
+            }
+        }
+
+        return null;
+    }
+
+}
\ No newline at end of file
index f404c06..8b3a830 100644 (file)
@@ -6,6 +6,7 @@
  */
 package org.opendaylight.controller.md.sal.dom.api;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import java.util.Iterator;
@@ -102,4 +103,9 @@ public final class DOMDataTreeIdentifier implements Immutable, Path<DOMDataTreeI
 
         return oi.hasNext() ? -1 : 0;
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("datastore", datastoreType).add("root", rootIdentifier).toString();
+    }
 }
index e06f572..303f3e6 100644 (file)
@@ -31,6 +31,7 @@ import javax.xml.transform.dom.DOMResult;
 import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
 import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.OrderedNormalizedNodeWriter;
 import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -44,7 +45,6 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlUtils;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
@@ -206,17 +206,15 @@ public class NetconfMessageTransformer implements MessageTransformer<NetconfMess
     }
 
     private void writeNormalizedRpc(final ContainerNode normalized, final DOMResult result, final SchemaPath schemaPath, final SchemaContext baseNetconfCtx) throws IOException, XMLStreamException {
-        final NormalizedNodeWriter normalizedNodeWriter;
+        final OrderedNormalizedNodeWriter normalizedNodeWriter;
         NormalizedNodeStreamWriter normalizedNodeStreamWriter = null;
         XMLStreamWriter writer = null;
         try {
             writer = NetconfMessageTransformUtil.XML_FACTORY.createXMLStreamWriter(result);
             normalizedNodeStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(writer, baseNetconfCtx, schemaPath);
-            normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(normalizedNodeStreamWriter);
-
-            for (final DataContainerChild<? extends YangInstanceIdentifier.PathArgument, ?> editElement : normalized.getValue()) {
-                normalizedNodeWriter.write(editElement);
-            }
+            normalizedNodeWriter = new OrderedNormalizedNodeWriter(normalizedNodeStreamWriter, baseNetconfCtx, schemaPath);
+            Collection<DataContainerChild<?, ?>> value = (Collection) normalized.getValue();
+            normalizedNodeWriter.write(value);
             normalizedNodeWriter.flush();
         } finally {
             try {
index ff7d30d..51f9e22 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.util.concurrent.CheckedFuture;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -32,16 +33,15 @@ import org.opendaylight.controller.netconf.api.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.mapping.api.HandlingPriority;
 import org.opendaylight.controller.netconf.mapping.api.NetconfOperationChainedExecution;
 import org.opendaylight.controller.netconf.mdsal.connector.CurrentSchemaContext;
+import org.opendaylight.controller.netconf.util.OrderedNormalizedNodeWriter;
 import org.opendaylight.controller.netconf.util.exception.MissingNameSpaceException;
 import org.opendaylight.controller.netconf.util.mapping.AbstractSingletonNetconfOperation;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XMLStreamNormalizedNodeStreamWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.DomUtils;
 import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
@@ -110,7 +110,7 @@ public class RuntimeRpc extends AbstractSingletonNetconfOperation {
 
     //this returns module with the newest revision if more then 1 module with same namespace is found
     private Optional<Module> getModule(final URI namespaceURI) {
-        return Optional.of(schemaContext.getCurrentContext().findModuleByNamespaceAndRevision(namespaceURI, null));
+        return Optional.fromNullable(schemaContext.getCurrentContext().findModuleByNamespaceAndRevision(namespaceURI, null));
     }
 
     private Optional<RpcDefinition> getRpcDefinitionFromModule(Module module, URI namespaceURI, String name) {
@@ -211,7 +211,7 @@ public class RuntimeRpc extends AbstractSingletonNetconfOperation {
         final NormalizedNodeStreamWriter nnStreamWriter = XMLStreamNormalizedNodeStreamWriter.create(xmlWriter,
                 schemaContext.getCurrentContext(), rpcOutputPath);
 
-        final NormalizedNodeWriter nnWriter = NormalizedNodeWriter.forStreamWriter(nnStreamWriter);
+        final OrderedNormalizedNodeWriter nnWriter = new OrderedNormalizedNodeWriter(nnStreamWriter, schemaContext.getCurrentContext(), rpcOutputPath);
 
         writeRootElement(xmlWriter, nnWriter, (ContainerNode) data);
         try {
@@ -232,11 +232,10 @@ public class RuntimeRpc extends AbstractSingletonNetconfOperation {
         }
     }
 
-    private void writeRootElement(final XMLStreamWriter xmlWriter, final NormalizedNodeWriter nnWriter, final ContainerNode data) {
+    private void writeRootElement(final XMLStreamWriter xmlWriter, final OrderedNormalizedNodeWriter nnWriter, final ContainerNode data) {
         try {
-            for (final DataContainerChild<? extends PathArgument, ?> child : data.getValue()) {
-                nnWriter.write(child);
-            }
+            Collection<DataContainerChild<?, ?>> value = (Collection) data.getValue();
+            nnWriter.write(value);
             nnWriter.flush();
             xmlWriter.flush();
         } catch (XMLStreamException | IOException e) {
index 32eb08c..040066d 100644 (file)
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.xml.transform.TransformerException;
 import org.custommonkey.xmlunit.DetailedDiff;
 import org.custommonkey.xmlunit.Diff;
 import org.custommonkey.xmlunit.XMLUnit;
@@ -201,6 +202,18 @@ public class RuntimeRpcTest {
         verifyResponse(response, XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-nonvoid-control.xml"));
     }
 
+    @Test
+    public void testSuccesfullContainerInvocation() throws Exception {
+        RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceSuccesfullInvocation);
+
+        Document rpcDocument = XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-container.xml");
+        HandlingPriority priority = rpc.canHandle(rpcDocument);
+        Preconditions.checkState(priority != HandlingPriority.CANNOT_HANDLE);
+
+        Document response = rpc.handle(rpcDocument, NetconfOperationChainedExecution.EXECUTION_TERMINATION_POINT);
+        verifyResponse(response, XmlFileLoader.xmlFileToDocument("messages/mapping/rpcs/rpc-container-control.xml"));
+    }
+
     @Test
     public void testFailedInvocation() throws Exception {
         RuntimeRpc rpc = new RuntimeRpc(sessionIdForReporting, currentSchemaContext, rpcServiceFailedInvocation);
@@ -232,10 +245,11 @@ public class RuntimeRpcTest {
         verifyResponse(response, RPC_REPLY_OK);
     }
 
-    private void verifyResponse(Document response, Document template) {
+    private void verifyResponse(Document response, Document template) throws IOException, TransformerException {
         DetailedDiff dd = new DetailedDiff(new Diff(response, template));
         dd.overrideElementQualifier(new RecursiveElementNameAndTextQualifier());
-        assertTrue(dd.similar());
+        //we care about order so response has to be identical
+        assertTrue(dd.identical());
     }
 
     private RpcDefinition getRpcDefinitionFromModule(Module module, URI namespaceURI, String name) {
diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container-control.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container-control.xml
new file mode 100644 (file)
index 0000000..1c06ea9
--- /dev/null
@@ -0,0 +1,27 @@
+<!--
+  ~ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<rpc-reply message-id="2"
+     xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+    <cont1 xmlns="urn:opendaylight:mdsal:mapping:rpc:test">
+        <test-string>
+            cont1 input string 1
+        </test-string>
+        <test-string2>
+            cont1 input string 2
+        </test-string2>
+    </cont1>
+    <cont2 xmlns="urn:opendaylight:mdsal:mapping:rpc:test">
+        <test-string>
+            cont2 input string 1
+        </test-string>
+        <test-string2>
+            cont2 input string 2
+        </test-string2>
+    </cont2>
+</rpc-reply>
\ No newline at end of file
diff --git a/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container.xml b/opendaylight/netconf/mdsal-netconf-connector/src/test/resources/messages/mapping/rpcs/rpc-container.xml
new file mode 100644 (file)
index 0000000..570d6df
--- /dev/null
@@ -0,0 +1,29 @@
+<!--
+  ~ Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+  ~
+  ~ This program and the accompanying materials are made available under the
+  ~ terms of the Eclipse Public License v1.0 which accompanies this distribution,
+  ~ and is available at http://www.eclipse.org/legal/epl-v10.html
+  -->
+
+<rpc message-id="2"
+     xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+    <container-rpc xmlns="urn:opendaylight:mdsal:mapping:rpc:test">
+        <cont1>
+            <test-string>
+                cont1 input string 1
+            </test-string>
+            <test-string2>
+                cont1 input string 2
+            </test-string2>
+        </cont1>
+        <cont2>
+            <test-string>
+                cont2 input string 1
+            </test-string>
+            <test-string2>
+                cont2 input string 2
+            </test-string2>
+        </cont2>
+    </container-rpc>
+</rpc>
\ No newline at end of file
index d493840..6a59cdc 100644 (file)
@@ -40,5 +40,51 @@ module rpc-test {
             }
         }
     }
+
+    rpc container-rpc {
+        input {
+            container cont1 {
+                leaf test-string {
+                    type string;
+                }
+
+                leaf test-string2 {
+                    type string;
+                }
+            }
+
+            container cont2 {
+                leaf test-string {
+                    type string;
+                }
+
+                leaf test-string2 {
+                    type string;
+                }
+            }
+        }
+
+        output {
+            container cont1 {
+                leaf test-string {
+                    type string;
+                }
+
+                leaf test-string2 {
+                    type string;
+                }
+            }
+
+            container cont2 {
+                leaf test-string {
+                    type string;
+                }
+
+                leaf test-string2 {
+                    type string;
+                }
+            }
+        }
+    }
 }
 
index c292d93..27c9bd8 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-impl</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.opendaylight.yangtools</groupId>
-      <artifactId>yang-data-composite-node</artifactId>
-    </dependency>
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-model-api</artifactId>
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-parser-impl</artifactId>
     </dependency>
+      <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-data-api</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-common</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>netconf-client</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-binding</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.opendaylight.yangtools</groupId>
+          <artifactId>yang-model-api</artifactId>
+      </dependency>
   </dependencies>
 
   <build>
index a49c7b9..0ae1be4 100644 (file)
@@ -30,9 +30,9 @@ import org.opendaylight.controller.netconf.cli.reader.ReadingException;
 import org.opendaylight.controller.netconf.cli.writer.OutFormatter;
 import org.opendaylight.controller.netconf.cli.writer.WriteException;
 import org.opendaylight.controller.netconf.cli.writer.Writer;
-import org.opendaylight.controller.netconf.cli.writer.impl.CompositeNodeWriter;
+import org.opendaylight.controller.netconf.cli.writer.impl.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
 
@@ -102,7 +102,7 @@ public class Cli implements Runnable {
 
     private void handleRegularOutput(final Output response, final OutputDefinition outputDefinition,
             final Writer<DataSchemaNode> outHandler) {
-        final Map<DataSchemaNode, List<Node<?>>> unwrap = response.unwrap(outputDefinition);
+        final Map<DataSchemaNode, List<NormalizedNode<?, ?>>> unwrap = response.unwrap(outputDefinition);
 
         for (final DataSchemaNode schemaNode : unwrap.keySet()) {
             Preconditions.checkNotNull(schemaNode);
@@ -132,8 +132,8 @@ public class Cli implements Runnable {
 
     private void handleEmptyOutput(final Command command, final Output response) {
         try {
-            new CompositeNodeWriter(consoleIO, new OutFormatter()).write(null,
-                    Collections.<Node<?>> singletonList(response.getOutput()));
+            new NormalizedNodeWriter(consoleIO, new OutFormatter()).write(null,
+                    Collections.<NormalizedNode<?, ?>>singletonList(response.getOutput()));
         } catch (final WriteException e) {
             throw new IllegalStateException("Unable to write value for: " + response.getOutput().getNodeType()
                     + " from: " + command.getCommandId(), e);
@@ -141,7 +141,7 @@ public class Cli implements Runnable {
     }
 
     private Input handleInput(final InputDefinition inputDefinition) {
-        List<Node<?>> allArgs = Collections.emptyList();
+        List<NormalizedNode<?, ?>> allArgs = Collections.emptyList();
         try {
             if (!inputDefinition.isEmpty()) {
                 allArgs = argumentHandlerRegistry.getGenericReader(schemaContextRegistry.getLocalSchemaContext()).read(
index bede549..50c3243 100644 (file)
@@ -10,13 +10,13 @@ package org.opendaylight.controller.netconf.cli;
 import com.google.common.base.Optional;
 import jline.console.completer.Completer;
 import jline.console.completer.NullCompleter;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.netconf.cli.commands.CommandDispatcher;
 import org.opendaylight.controller.netconf.cli.io.ConsoleContext;
 import org.opendaylight.controller.netconf.cli.io.ConsoleIO;
 import org.opendaylight.controller.sal.connect.api.RemoteDeviceHandler;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfSessionPreferences;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -42,7 +42,7 @@ public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<Netco
 
     @Override
     public synchronized void onDeviceConnected(final SchemaContext context,
-            final NetconfSessionPreferences preferences, final RpcImplementation rpcImplementation) {
+            final NetconfSessionPreferences preferences, final DOMRpcService rpcService) {
         console.enterRootContext(new ConsoleContext() {
 
             @Override
@@ -60,7 +60,7 @@ public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<Netco
         // possible
         // TODO detect netconf base version
         // TODO detect inet types version
-        commandDispatcher.addRemoteCommands(rpcImplementation, context);
+        commandDispatcher.addRemoteCommands(rpcService, context);
         schemaContextRegistry.setRemoteSchemaContext(context);
         up = true;
         this.notify();
@@ -87,8 +87,8 @@ public class NetconfDeviceConnectionHandler implements RemoteDeviceHandler<Netco
     }
 
     @Override
-    public void onNotification(final CompositeNode compositeNode) {
-        // FIXME
+    public void onNotification(ContainerNode domNotification) {
+
     }
 
     @Override
index 67e9658..2438df4 100644 (file)
@@ -26,7 +26,6 @@ import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
 import org.opendaylight.controller.sal.connect.netconf.NetconfDevice.SchemaResourcesDTO;
 import org.opendaylight.controller.sal.connect.netconf.NetconfStateSchemas.NetconfStateSchemasResolverImpl;
 import org.opendaylight.controller.sal.connect.netconf.listener.NetconfDeviceCommunicator;
-import org.opendaylight.controller.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
 import org.opendaylight.controller.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceFilter;
@@ -90,7 +89,7 @@ public class NetconfDeviceConnectionManager implements Closeable {
         repository.registerSchemaSourceListener(TextToASTTransformer.create(repository, repository));
 
         device = new NetconfDevice(new SchemaResourcesDTO(repository, schemaContextFactory, new NetconfStateSchemasResolverImpl()),
-                deviceId, handler, executor, new NetconfMessageTransformer());
+                deviceId, handler, executor, true);
         listener = new NetconfDeviceCommunicator(deviceId, device);
         configBuilder.withSessionListener(listener);
         listener.initializeRemoteConnection(netconfClientDispatcher, configBuilder.build());
index ec7b5b4..f1b14ea 100644 (file)
@@ -19,6 +19,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.netconf.cli.NetconfDeviceConnectionHandler;
 import org.opendaylight.controller.netconf.cli.NetconfDeviceConnectionManager;
 import org.opendaylight.controller.netconf.cli.commands.local.Close;
@@ -27,7 +28,6 @@ import org.opendaylight.controller.netconf.cli.commands.local.Disconnect;
 import org.opendaylight.controller.netconf.cli.commands.local.Help;
 import org.opendaylight.controller.netconf.cli.commands.remote.RemoteCommand;
 import org.opendaylight.controller.netconf.cli.io.IOUtil;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
@@ -106,15 +106,15 @@ public class CommandDispatcher {
     public static final Collection<String> BASE_NETCONF_SCHEMA_PATHS = Lists.newArrayList("/schema/remote/ietf-netconf.yang",
             "/schema/common/netconf-cli-ext.yang", "/schema/common/ietf-inet-types.yang");
 
-    public synchronized void addRemoteCommands(final RpcImplementation rpcInvoker, final SchemaContext remoteSchema) {
-        this.addRemoteCommands(rpcInvoker, remoteSchema, parseSchema(BASE_NETCONF_SCHEMA_PATHS));
+    public synchronized void addRemoteCommands(final DOMRpcService rpcService, final SchemaContext remoteSchema) {
+        this.addRemoteCommands(rpcService, remoteSchema, parseSchema(BASE_NETCONF_SCHEMA_PATHS));
     }
 
-    public synchronized void addRemoteCommands(final RpcImplementation rpcInvoker, final SchemaContext remoteSchema, final SchemaContext baseNetconfSchema) {
+    public synchronized void addRemoteCommands(final DOMRpcService rpcService, final SchemaContext remoteSchema, final SchemaContext baseNetconfSchema) {
         for (final SchemaContext context : Lists.newArrayList(remoteSchema, baseNetconfSchema)) {
             for (final Module module : context.getModules()) {
                 for (final RpcDefinition rpcDefinition : module.getRpcs()) {
-                    final Command command = RemoteCommand.fromRpc(rpcDefinition, rpcInvoker);
+                    final Command command = RemoteCommand.fromRpc(rpcDefinition, rpcService);
                     remoteCommands.put(rpcDefinition.getQName(), command);
                     nameToQNameRemote.put(getCommandName(rpcDefinition, module), rpcDefinition.getQName());
                 }
index 02173ac..e2cc83d 100644 (file)
@@ -8,46 +8,52 @@
 package org.opendaylight.controller.netconf.cli.commands.input;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
 /**
  * Input arguments for and rpc/command execution
  */
 public class Input {
 
-    private final List<Node<?>> args;
+    private final List<NormalizedNode<?, ?>> args;
 
-    private final Map<String, Node<?>> nameToArg = new HashMap<String, Node<?>>();
+    private final Map<String, NormalizedNode<?, ?>> nameToArg = new HashMap<>();
 
-    public Input(final List<Node<?>> args) {
+    public Input(final List<NormalizedNode<?, ?>> args) {
         // FIXME empty Input should be constructed from static factory method
         if(args.isEmpty()) {
             this.args = Collections.emptyList();
             return;
         }
 
-        final Node<?> input = args.iterator().next();
+        final NormalizedNode<?, ?> input = args.iterator().next();
         Preconditions
-                .checkArgument(input instanceof CompositeNode, "Input container has to be of type composite node.");
-        this.args = ((CompositeNode) input).getValue();
+                .checkArgument(input instanceof DataContainerChild<?, ?>, "Input container has to be of type Data Container Child.");
+        this.args = new ArrayList<>((Collection) input.getValue());
 
-        for (final Node<?> arg : this.args) {
+        for (final NormalizedNode<?, ?> arg : this.args) {
             nameToArg.put(arg.getNodeType().getLocalName(), arg);
         }
     }
 
-    public Node<?> getArg(final String name) {
+    public NormalizedNode<?, ?> getArg(final String name) {
         return nameToArg.get(name);
     }
 
-    public CompositeNode wrap(final QName rpcQName) {
-        return new CompositeNodeTOImpl(rpcQName, null, args);
+    public NormalizedNode<?, ?> wrap(final QName rpcQName) {
+        //TODO just add the list as children to the node
+        return ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(rpcQName))
+                .withValue((Collection) args).build();
     }
 }
index 54706b8..b9abb5a 100644 (file)
@@ -29,10 +29,15 @@ import org.opendaylight.controller.netconf.nettyutil.handler.ssh.authentication.
 import org.opendaylight.protocol.framework.NeverReconnectStrategy;
 import org.opendaylight.protocol.framework.ReconnectStrategy;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.Node;
-import org.opendaylight.yangtools.yang.data.api.SimpleNode;
-import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
-import org.opendaylight.yangtools.yang.data.impl.SimpleNodeTOImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 
 /**
@@ -59,14 +64,21 @@ public class Connect extends AbstractCommand {
     private Output invoke(final NetconfClientConfigurationBuilder config, final String addressName, final Input inputArgs) {
         final Set<String> remoteCmds = connectManager.connectBlocking(addressName, getAdress(inputArgs), config);
 
-        final ArrayList<Node<?>> output = Lists.newArrayList();
-        output.add(new SimpleNodeTOImpl<>(QName.create(getCommandId(), "status"), null, "Connection initiated"));
+        final ArrayList<DataContainerChild<?, ?>> output = Lists.newArrayList();
+        output.add(ImmutableLeafNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(QName.create(getCommandId(), "status")))
+                .withValue("Connection initiated").build());
 
+        final ArrayList<LeafSetEntryNode<Object>> leafListChildren = Lists.newArrayList();
         for (final String cmdId : remoteCmds) {
-            output.add(new SimpleNodeTOImpl<>(QName.create(getCommandId(), "remote-commands"), null, cmdId));
+            leafListChildren.add(ImmutableLeafSetEntryNodeBuilder.create()
+                    .withNodeIdentifier(new NodeWithValue(QName.create(getCommandId(), "remote-commands"), cmdId))
+                    .withValue(cmdId).build());
         }
 
-        return new Output(new CompositeNodeTOImpl(getCommandId(), null, output));
+        return new Output(ImmutableLeafSetNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(QName.create(getCommandId(), "remote-commands")))
+                .withValue(leafListChildren).build());
     }
 
     private NetconfClientConfigurationBuilder getConfig(final Input inputArgs) {
@@ -105,11 +117,11 @@ public class Connect extends AbstractCommand {
 
     private <T> Optional<T> getArgumentOpt(final Input inputArgs, final String argName, final Class<T> type) {
         final QName argQName = QName.create(getCommandId(), argName);
-        final Node<?> argumentNode = inputArgs.getArg(argName);
+        final NormalizedNode<?, ?> argumentNode = inputArgs.getArg(argName);
         if (argumentNode == null) {
             return Optional.absent();
         }
-        Preconditions.checkArgument(argumentNode instanceof SimpleNode, "Only simple type argument supported, %s",
+        Preconditions.checkArgument(argumentNode instanceof LeafNode, "Only simple type argument supported, %s",
                 argQName);
 
         final Object value = argumentNode.getValue();