Merge changes Ic8cadf33,If59b1dd3
authorTony Tkacik <ttkacik@cisco.com>
Thu, 18 Dec 2014 14:22:32 +0000 (14:22 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 18 Dec 2014 14:22:32 +0000 (14:22 +0000)
* changes:
  Bug-2342: Add stress test with more threads and start of testtool device.
  Bug-2342: Fixing rpc-reply messages id's get mixed up

40 files changed:
opendaylight/config/config-persister-directory-xml-adapter/src/main/java/org/opendaylight/controller/config/persist/storage/directory/xml/XmlDirectoryPersister.java
opendaylight/config/config-persister-feature-adapter/src/main/java/org/opendaylight/controller/configpusherfeature/internal/FeatureConfigSnapshotHolder.java
opendaylight/config/config-persister-file-xml-adapter/src/main/java/org/opendaylight/controller/config/persist/storage/file/xml/model/Config.java
opendaylight/config/yang-jmx-generator/pom.xml
opendaylight/config/yang-jmx-generator/src/main/java/org/opendaylight/controller/config/yangjmxgenerator/ModuleMXBeanEntryBuilder.java
opendaylight/config/yang-jmx-generator/src/main/java/org/opendaylight/controller/config/yangjmxgenerator/RuntimeBeanEntry.java
opendaylight/config/yang-jmx-generator/src/test/java/org/opendaylight/controller/config/yangjmxgenerator/RuntimeBeanEntryTest.java
opendaylight/config/yang-test/src/main/java/org/opendaylight/controller/config/yang/test/util/NetconfTestImplModuleUtil.java
opendaylight/config/yang-test/src/main/yang/config-test-impl.yang
opendaylight/config/yang-test/src/main/yang/types/test-groups.yang [new file with mode: 0644]
opendaylight/config/yang-test/src/main/yang/types/test-types.yang
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/FollowerTest.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/RpcProviderRegistryImpl.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotification.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationPublishService.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationService.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMService.java
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/AbstractDOMNotificationListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/ForwardingDOMNotificationPublishService.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/ForwardingDOMNotificationService.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfDocumentedExceptionMapper.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlToCompositeNodeReader.java
opendaylight/netconf/config-netconf-connector/src/test/java/org/opendaylight/controller/netconf/confignetconfconnector/NetconfMappingTest.java
opendaylight/netconf/netconf-client/src/main/java/org/opendaylight/controller/netconf/client/NetconfClientSession.java
opendaylight/netconf/netconf-client/src/test/java/org/opendaylight/controller/netconf/client/NetconfClientSessionTest.java
opendaylight/netconf/netconf-impl/src/main/java/org/opendaylight/controller/netconf/impl/NetconfServerSession.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/AbstractNetconfConfigTest.java
opendaylight/netconf/netconf-it/src/test/java/org/opendaylight/controller/netconf/it/NetconfConfigPersisterITTest.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSession.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEXICodec.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToEXIEncoder.java
opendaylight/netconf/netconf-netty-util/src/test/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionTest.java

index 85f70b9a01172f314fec16020d508e9769efeea7..3ea432e1739a7ed83047091f0a392e8918c8e45e 100644 (file)
@@ -23,6 +23,10 @@ import java.util.SortedSet;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
 import org.opendaylight.controller.config.persist.api.Persister;
 import org.opendaylight.controller.config.persist.storage.file.xml.model.ConfigSnapshot;
@@ -105,8 +109,15 @@ public class XmlDirectoryPersister implements Persister {
     public static ConfigSnapshotHolder loadLastConfig(final File file) throws JAXBException {
         JAXBContext jaxbContext = JAXBContext.newInstance(ConfigSnapshot.class);
         Unmarshaller um = jaxbContext.createUnmarshaller();
-
-        return asHolder((ConfigSnapshot) um.unmarshal(file));
+        XMLInputFactory xif = XMLInputFactory.newFactory();
+        xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+        xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+        try {
+            XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(file));
+            return asHolder((ConfigSnapshot) um.unmarshal(xsr));
+        } catch (final XMLStreamException e) {
+            throw new JAXBException(e);
+        }
     }
 
     private static ConfigSnapshotHolder asHolder(final ConfigSnapshot unmarshalled) {
index 1bce5f236414242c3523adbbebb5a551559d1fb1..518716cfa75b2043b189970719fd80fc8442701a 100644 (file)
@@ -20,6 +20,10 @@ import java.util.SortedSet;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
 import org.apache.karaf.features.ConfigFileInfo;
 import org.apache.karaf.features.Feature;
 import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
@@ -59,10 +63,18 @@ public class FeatureConfigSnapshotHolder implements ConfigSnapshotHolder {
         Preconditions.checkNotNull(feature);
         this.fileInfo = fileInfo;
         this.featureChain.add(feature);
+        // TODO extract utility method for umarshalling config snapshots
         JAXBContext jaxbContext = JAXBContext.newInstance(ConfigSnapshot.class);
         Unmarshaller um = jaxbContext.createUnmarshaller();
-        File file = new File(fileInfo.getFinalname());
-        unmarshalled = ((ConfigSnapshot) um.unmarshal(file));
+        XMLInputFactory xif = XMLInputFactory.newFactory();
+        xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+        xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+        try {
+            XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(new File(fileInfo.getFinalname())));
+            unmarshalled = ((ConfigSnapshot) um.unmarshal(xsr));
+        } catch (final XMLStreamException e) {
+            throw new JAXBException(e);
+        }
     }
     /*
      * (non-Javadoc)
index e629d20db52e0c2c65bf26c76aeb1e2dbce62eff..6a6d360cfa794c2209ba7d5fe28a8dc1efeca3ab 100644 (file)
@@ -22,6 +22,10 @@ import javax.xml.bind.Unmarshaller;
 import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlElementWrapper;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
 import org.apache.commons.lang3.StringUtils;
 
 @XmlRootElement(name = "persisted-snapshots")
@@ -72,9 +76,12 @@ public final class Config {
         try {
             JAXBContext jaxbContext = JAXBContext.newInstance(Config.class);
             Unmarshaller um = jaxbContext.createUnmarshaller();
-
-            return (Config) um.unmarshal(from);
-        } catch (JAXBException e) {
+            XMLInputFactory xif = XMLInputFactory.newFactory();
+            xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+            xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+            XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(from));
+            return ((Config) um.unmarshal(xsr));
+        } catch (JAXBException | XMLStreamException e) {
             throw new PersistException("Unable to restore configuration", e);
         }
     }
index 979b39688b04aac10514db6e0b796bfd97b139e4..bfeb3f0f7acd74605a2f39782e5ea9a991b22044 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>binding-type-provider</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-model-util</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.opendaylight.yangtools</groupId>
index 80db46df2b5eaa627a84f8e2aaf2fa97a9aed6d5..ed727c9a13590733178d84e66b64cae71f23b058 100644 (file)
@@ -374,7 +374,7 @@ final class ModuleMXBeanEntryBuilder {
             final String javaNamePrefix) {
 
         return RuntimeBeanEntry.extractClassNameToRuntimeBeanMap(packageName, dataNodeContainer, moduleLocalNameFromXPath,
-                typeProviderWrapper, javaNamePrefix, currentModule).values();
+                typeProviderWrapper, javaNamePrefix, currentModule, schemaContext).values();
 
     }
 
index b6ed8243217f1e70d350342ce746d7e0e79bbdac..74981a95827d7aafd4cf767599d363575e6a4a99 100644 (file)
@@ -11,8 +11,14 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -20,6 +26,7 @@ import java.util.Comparator;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -36,15 +43,16 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
 import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode;
 import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
 import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.UsesNode;
+import org.opendaylight.yangtools.yang.model.util.SchemaContextUtil;
 
 /**
  * Holds information about runtime bean to be generated. There are two kinds of
@@ -56,6 +64,21 @@ import org.opendaylight.yangtools.yang.model.api.UsesNode;
  * lined via children so that a tree with all beans can be created.
  */
 public class RuntimeBeanEntry {
+
+    private static final Function<SchemaNode, QName> QNAME_FROM_NODE = new Function<SchemaNode, QName>() {
+        @Override
+        public QName apply(final SchemaNode input) {
+            return input.getQName();
+        }
+    };
+
+    private static final Function<UnknownSchemaNode, String> UNKNOWN_NODE_TO_STRING = new Function<UnknownSchemaNode, String>() {
+        @Override
+        public String apply(final UnknownSchemaNode input) {
+            return input.getQName().getLocalName() + input.getNodeParameter();
+        }
+    };
+
     private final String packageName;
     private final String yangName, javaNamePrefix;
     private final boolean isRoot;
@@ -112,13 +135,12 @@ public class RuntimeBeanEntry {
     public static Map<String, RuntimeBeanEntry> extractClassNameToRuntimeBeanMap(
             final String packageName, final DataNodeContainer container,
             final String moduleYangName, final TypeProviderWrapper typeProviderWrapper,
-            final String javaNamePrefix, final Module currentModule) {
+            final String javaNamePrefix, final Module currentModule, final SchemaContext schemaContext) {
 
-        Map<QName, Set<RpcDefinition>> identitiesToRpcs = getIdentitiesToRpcs(currentModule);
 
         AttributesRpcsAndRuntimeBeans attributesRpcsAndRuntimeBeans = extractSubtree(
                 packageName, container, typeProviderWrapper, currentModule,
-                identitiesToRpcs);
+                schemaContext);
         Map<String, RuntimeBeanEntry> result = new HashMap<>();
 
         List<AttributeIfc> attributes;
@@ -150,52 +172,41 @@ public class RuntimeBeanEntry {
         return result;
     }
 
-    private static Map<QName/* of identity */, Set<RpcDefinition>> getIdentitiesToRpcs(
-            final Module currentModule) {
-        // currently only looks for local identities (found in currentModule)
-        Map<QName, Set<RpcDefinition>> result = new HashMap<>();
-        for (IdentitySchemaNode identity : currentModule.getIdentities()) {
-            // add all
-            result.put(identity.getQName(), new HashSet<RpcDefinition>());
-        }
+    private static Multimap<QName/* of identity */, RpcDefinition> getIdentitiesToRpcs(
+            final SchemaContext schemaCtx) {
+        Multimap<QName, RpcDefinition> result = HashMultimap.create();
+        for (Module currentModule : schemaCtx.getModules()) {
 
-        for (RpcDefinition rpc : currentModule.getRpcs()) {
-            ContainerSchemaNode input = rpc.getInput();
-            if (input != null) {
-                for (UsesNode uses : input.getUses()) {
+            // Find all identities in current module for later identity->rpc mapping
+            Set<QName> allIdentitiesInModule = Sets.newHashSet(Collections2.transform(currentModule.getIdentities(), QNAME_FROM_NODE));
 
-                    if (uses.getGroupingPath().getPath().size() != 1) {
-                        continue;
-                    }
+            for (RpcDefinition rpc : currentModule.getRpcs()) {
+                ContainerSchemaNode input = rpc.getInput();
+                if (input != null) {
+                    for (UsesNode uses : input.getUses()) {
 
-                    // check grouping path
-                    QName qname = uses.getGroupingPath().getPath().get(0);
-                    if (false == qname
-                            .equals(ConfigConstants.RPC_CONTEXT_REF_GROUPING_QNAME)) {
-                        continue;
-                    }
+                        // Check if the rpc is config rpc by looking for input argument rpc-context-ref
+                        Iterator<QName> pathFromRoot = uses.getGroupingPath().getPathFromRoot().iterator();
+                        if (!pathFromRoot.hasNext() ||
+                                !pathFromRoot.next().equals(ConfigConstants.RPC_CONTEXT_REF_GROUPING_QNAME)) {
+                            continue;
+                        }
 
-                    for (SchemaNode refinedNode : uses.getRefines().values()) {
-
-                        for (UnknownSchemaNode unknownSchemaNode : refinedNode
-                                .getUnknownSchemaNodes()) {
-                            if (ConfigConstants.RPC_CONTEXT_INSTANCE_EXTENSION_QNAME
-                                    .equals(unknownSchemaNode.getNodeType())) {
-                                String localIdentityName = unknownSchemaNode
-                                        .getNodeParameter();
-                                QName identityQName = QName.create(
-                                        currentModule.getNamespace(),
-                                        currentModule.getRevision(),
-                                        localIdentityName);
-                                Set<RpcDefinition> rpcDefinitions = result
-                                        .get(identityQName);
-                                if (rpcDefinitions == null) {
-                                    throw new IllegalArgumentException(
-                                            "Identity referenced by rpc not found. Identity:"
-                                                    + localIdentityName + " , rpc "
-                                                    + rpc);
+                        for (SchemaNode refinedNode : uses.getRefines().values()) {
+                            for (UnknownSchemaNode unknownSchemaNode : refinedNode
+                                    .getUnknownSchemaNodes()) {
+                                if (ConfigConstants.RPC_CONTEXT_INSTANCE_EXTENSION_QNAME
+                                        .equals(unknownSchemaNode.getNodeType())) {
+                                    String localIdentityName = unknownSchemaNode
+                                            .getNodeParameter();
+                                    QName identityQName = QName.create(
+                                            currentModule.getNamespace(),
+                                            currentModule.getRevision(),
+                                            localIdentityName);
+                                    Preconditions.checkArgument(allIdentitiesInModule.contains(identityQName),
+                                            "Identity referenced by rpc not found. Identity: %s, rpc: %s", localIdentityName, rpc);
+                                    result.put(identityQName, rpc);
                                 }
-                                rpcDefinitions.add(rpc);
                             }
                         }
                     }
@@ -212,7 +223,9 @@ public class RuntimeBeanEntry {
     private static AttributesRpcsAndRuntimeBeans extractSubtree(
             final String packageName, final DataNodeContainer subtree,
             final TypeProviderWrapper typeProviderWrapper, final Module currentModule,
-            final Map<QName, Set<RpcDefinition>> identitiesToRpcs) {
+            final SchemaContext ctx) {
+
+        Multimap<QName, RpcDefinition> identitiesToRpcs = getIdentitiesToRpcs(ctx);
 
         List<AttributeIfc> attributes = Lists.newArrayList();
         List<RuntimeBeanEntry> runtimeBeanEntries = new ArrayList<>();
@@ -234,7 +247,7 @@ public class RuntimeBeanEntry {
                     ListSchemaNode listSchemaNode = (ListSchemaNode) child;
                     RuntimeBeanEntry hierarchicalChild = createHierarchical(
                             packageName, listSchemaNode, typeProviderWrapper,
-                            currentModule, identitiesToRpcs);
+                            currentModule, ctx);
                     runtimeBeanEntries.add(hierarchicalChild);
                 } else /* ordinary list attribute */{
                     ListAttribute listAttribute = ListAttribute.create(
@@ -258,18 +271,11 @@ public class RuntimeBeanEntry {
             if (ConfigConstants.RPC_CONTEXT_INSTANCE_EXTENSION_QNAME
                     .equals(unknownSchemaNode.getNodeType())) {
                 String localIdentityName = unknownSchemaNode.getNodeParameter();
-                QName identityQName = QName.create(currentModule.getNamespace(),
-                        currentModule.getRevision(), localIdentityName);
-                Set<RpcDefinition> rpcDefinitions = identitiesToRpcs
-                        .get(identityQName);
-                if (rpcDefinitions == null) {
-                    throw new IllegalArgumentException("Cannot find identity "
-                            + localIdentityName + " to be used as "
-                            + "context reference when resolving "
-                            + unknownSchemaNode);
-                }
+                QName identityQName = unknownSchemaNode.isAddedByUses() ?
+                        findQNameFromGrouping(subtree, ctx, unknownSchemaNode, localIdentityName) :
+                        QName.create(currentModule.getNamespace(), currentModule.getRevision(), localIdentityName);
                 // convert RpcDefinition to Rpc
-                for (RpcDefinition rpcDefinition : rpcDefinitions) {
+                for (RpcDefinition rpcDefinition : identitiesToRpcs.get(identityQName)) {
                     String name = TypeProviderWrapper
                             .findJavaParameter(rpcDefinition);
                     AttributeIfc returnType;
@@ -310,6 +316,22 @@ public class RuntimeBeanEntry {
                 attributes, rpcs);
     }
 
+    /**
+     * Find "proper" qname of unknown node in case it comes from a grouping
+     */
+    private static QName findQNameFromGrouping(final DataNodeContainer subtree, final SchemaContext ctx, final UnknownSchemaNode unknownSchemaNode, final String localIdentityName) {
+        QName identityQName = null;
+        for (UsesNode usesNode : subtree.getUses()) {
+            SchemaNode dataChildByName = SchemaContextUtil.findDataSchemaNode(ctx, usesNode.getGroupingPath());
+            Module m = SchemaContextUtil.findParentModule(ctx, dataChildByName);
+            List<UnknownSchemaNode> unknownSchemaNodes = dataChildByName.getUnknownSchemaNodes();
+            if(Collections2.transform(unknownSchemaNodes, UNKNOWN_NODE_TO_STRING).contains(UNKNOWN_NODE_TO_STRING.apply(unknownSchemaNode))) {
+                identityQName = QName.create(dataChildByName.getQName(), localIdentityName);
+            }
+        }
+        return identityQName;
+    }
+
     private static AttributeIfc getReturnTypeAttribute(final DataSchemaNode child, final TypeProviderWrapper typeProviderWrapper,
             final String packageName) {
         if (child instanceof LeafSchemaNode) {
@@ -353,13 +375,13 @@ public class RuntimeBeanEntry {
     private static RuntimeBeanEntry createHierarchical(final String packageName,
             final ListSchemaNode listSchemaNode,
             final TypeProviderWrapper typeProviderWrapper, final Module currentModule,
-            final Map<QName, Set<RpcDefinition>> identitiesToRpcs) {
+            final SchemaContext ctx) {
 
         // supported are numeric types, strings, enums
         // get all attributes
         AttributesRpcsAndRuntimeBeans attributesRpcsAndRuntimeBeans = extractSubtree(
                 packageName, listSchemaNode, typeProviderWrapper,
-                currentModule, identitiesToRpcs);
+                currentModule, ctx);
 
         Optional<String> keyYangName;
         if (listSchemaNode.getKeyDefinition().isEmpty()) {
index 1503b84d64a263dff2654cb13c6fb46a7ea08fa2..98b1f752afbbe669a8dd01448b6609b751b1f461 100644 (file)
@@ -52,7 +52,7 @@ public class RuntimeBeanEntryTest extends AbstractYangTest {
                 .getUnknownSchemaNodes();
         Map<String, RuntimeBeanEntry> runtimeBeans = RuntimeBeanEntry
                 .extractClassNameToRuntimeBeanMap(PACKAGE_NAME, caseNode, "test-name", new TypeProviderWrapper(new
-                        TypeProviderImpl(context)), "test", jmxImplModule);
+                        TypeProviderImpl(context)), "test", jmxImplModule, context);
         assertEquals(1, runtimeBeans.size());
         RuntimeBeanEntry runtimeMXBean = runtimeBeans.get("testRuntimeMXBean");
         assertTrue(runtimeMXBean.isRoot());
index 5e37f5afcfe4cd8a1f15b78e6e878ea766b29bd1..2428b10941ee4c2a47f47d6f3f8a109c6fe05a14 100644 (file)
@@ -11,6 +11,7 @@
 package org.opendaylight.controller.config.yang.test.util;
 
 import com.google.common.collect.Lists;
+import java.math.BigInteger;
 import java.util.List;
 import org.opendaylight.controller.config.yang.test.impl.Asdf;
 import org.opendaylight.controller.config.yang.test.impl.Deep2;
@@ -43,11 +44,36 @@ public class NetconfTestImplModuleUtil {
                 return asdf;
             }
 
+            @Override
+            public BigInteger getCommonStat() {
+                return new BigInteger("54");
+            }
+
             @Override
             public String noArg(final String arg1) {
                 return arg1.toUpperCase();
             }
 
+            @Override
+            public Long commonRpcTwo() {
+                return 1L;
+            }
+
+            @Override
+            public String commonRpcThree() {
+                return "true";
+            }
+
+            @Override
+            public Boolean commonRpc() {
+                return true;
+            }
+
+            @Override
+            public void netconfImplRpcFromGrouping() {
+                // rpc from grouping within same yang module
+            }
+
         });
 
         for (int i = 0; i < module.getSimpleShort(); i++) {
index e7aa64d7a621cc5ecdfa4781c6389e576b7865c1..093d7b3f13673f0f0799030fe35c70597db7ba1e 100644 (file)
@@ -8,6 +8,7 @@ module config-test-impl {
     import ietf-inet-types { prefix inet; revision-date 2010-09-24;}
     import rpc-context { prefix rpcx; revision-date 2013-06-17; }
     import test-types { prefix tt; revision-date 2013-11-27; }
+    import test-groups { prefix tg; revision-date 2014-12-08; }
 
     description
         "Testing IMPL";
@@ -347,6 +348,22 @@ module config-test-impl {
         }
     }
 
+    grouping netconf-impl-rpc {
+       rpcx:rpc-context-instance netconf-impl-rpc-ctx;
+    }
+
+    identity netconf-impl-rpc-ctx;
+
+    rpc netconf-impl-rpc-from-grouping {
+        input {
+            uses rpcx:rpc-context-ref {
+                refine context-instance {
+                    rpcx:rpc-context-instance "netconf-impl-rpc-ctx";
+                }
+            }
+        }
+    }
+
     augment "/config:modules/config:module/config:state" {
         case impl-netconf {
             when "/config:modules/config:module/config:type = 'impl-netconf'";
@@ -354,6 +371,11 @@ module config-test-impl {
                 // rpc
                 rpcx:rpc-context-instance "test-rpc";
 
+                // add some stats + rpc from groupings outside this module
+                uses tt:common-operational;
+                uses tg:common-operational-rpc;
+                uses netconf-impl-rpc;
+
                 // root runtime bean
                 leaf created-sessions {
                     type uint32;
diff --git a/opendaylight/config/yang-test/src/main/yang/types/test-groups.yang b/opendaylight/config/yang-test/src/main/yang/types/test-groups.yang
new file mode 100644 (file)
index 0000000..00f704b
--- /dev/null
@@ -0,0 +1,52 @@
+module test-groups {
+    yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:config:test:groups";
+    prefix "tg";
+
+    import rpc-context { prefix rpcx; revision-date 2013-06-17; }
+
+    description
+        "Groupings generated for testing";
+
+    revision "2014-12-08";
+
+    grouping common-operational-rpc {
+        rpcx:rpc-context-instance common-rpc-ctx;
+        rpcx:rpc-context-instance common-rpc-ctx-two;
+    }
+
+    identity common-rpc-ctx;
+    identity common-rpc-ctx-two;
+
+    rpc common-rpc {
+        input {
+            uses rpcx:rpc-context-ref {
+                refine context-instance {
+                    rpcx:rpc-context-instance "common-rpc-ctx";
+                }
+            }
+        }
+
+        output {
+            leaf output {
+                type boolean;
+            }
+        }
+    }
+
+    rpc common-rpc-two {
+        input {
+            uses rpcx:rpc-context-ref {
+                refine context-instance {
+                    rpcx:rpc-context-instance "common-rpc-ctx-two";
+                }
+            }
+        }
+
+        output {
+            leaf output {
+                type uint32;
+            }
+        }
+    }
+}
index df5387be2c1f2581e2a44872ef596e62ec76d628..ee466b4034335d94a8cb15841354619f29f219e3 100644 (file)
@@ -3,6 +3,8 @@ module test-types {
     namespace "urn:opendaylight:params:xml:ns:yang:controller:config:test:types";
     prefix "tt";
 
+    import rpc-context { prefix rpcx; revision-date 2013-06-17; }
+
     description
         "Types generated for testing";
 
@@ -40,4 +42,34 @@ module test-types {
     identity test-identity2 {
         base test-identity1;
     }
+
+    grouping common-operational {
+       leaf common-stat {
+           type uint64;
+       }
+       // This would not work, since it clashes with identity common-rpc-ctx from test-groups
+       // Both grouping add the same unknown node "rpcx:rpc-context-instance common-rpc-ctx-three;"
+       // and we cannot match the unknown node to the grouping that added it
+       //rpcx:rpc-context-instance common-rpc-ctx-three;
+       rpcx:rpc-context-instance common-rpc-ctx-three;
+    }
+
+    //identity common-rpc-ctx;
+    identity common-rpc-ctx-three;
+
+    rpc common-rpc-three {
+        input {
+            uses rpcx:rpc-context-ref {
+                refine context-instance {
+                    rpcx:rpc-context-instance "common-rpc-ctx-three";
+                }
+            }
+        }
+
+        output {
+            leaf output {
+                type string;
+            }
+        }
+    }
 }
index b1c73f6f4155e045935fa37912245e3d03d45fc9..cc2e55d51b115cc124890c3e643ed810244a7c24 100644 (file)
@@ -45,8 +45,35 @@ public class Follower extends AbstractRaftActorBehavior {
         scheduleElection(electionDuration());
     }
 
+    private boolean isLogEntryPresent(long index){
+        if(index == context.getReplicatedLog().getSnapshotIndex()){
+            return true;
+        }
+
+        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+                .get(index);
+
+        return previousEntry != null;
+
+    }
+
+    private long getLogEntryTerm(long index){
+        if(index == context.getReplicatedLog().getSnapshotIndex()){
+            return context.getReplicatedLog().getSnapshotTerm();
+        }
+
+        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+                .get(index);
+
+        if(previousEntry != null){
+            return previousEntry.getTerm();
+        }
+
+        return -1;
+    }
+
     @Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
-        AppendEntries appendEntries) {
+                                                              AppendEntries appendEntries) {
 
         if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
             if(LOG.isDebugEnabled()) {
@@ -67,15 +94,15 @@ public class Follower extends AbstractRaftActorBehavior {
         // 2. Reply false if log doesn’t contain an entry at prevLogIndex
         // whose term matches prevLogTerm (§5.3)
 
-        ReplicatedLogEntry previousEntry = context.getReplicatedLog()
-            .get(appendEntries.getPrevLogIndex());
+        long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
+        boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
 
 
         boolean outOfSync = true;
 
         // First check if the logs are in sync or not
         if (lastIndex() == -1
-            && appendEntries.getPrevLogIndex() != -1) {
+                && appendEntries.getPrevLogIndex() != -1) {
 
             // The follower's log is out of sync because the leader does have
             // an entry at prevLogIndex and this follower has no entries in
@@ -83,34 +110,34 @@ public class Follower extends AbstractRaftActorBehavior {
 
             if(LOG.isDebugEnabled()) {
                 LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
-                    appendEntries.getPrevLogIndex());
+                        appendEntries.getPrevLogIndex());
             }
 
         } else if (lastIndex() > -1
-            && appendEntries.getPrevLogIndex() != -1
-            && previousEntry == null) {
+                && appendEntries.getPrevLogIndex() != -1
+                && !prevEntryPresent) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry was not found in it's log
 
             if(LOG.isDebugEnabled()) {
                 LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
-                    appendEntries.getPrevLogIndex());
+                        appendEntries.getPrevLogIndex());
             }
 
         } else if (lastIndex() > -1
-            && previousEntry != null
-            && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+                && prevEntryPresent
+                && prevLogTerm != appendEntries.getPrevLogTerm()) {
 
             // The follower's log is out of sync because the Leader's
             // prevLogIndex entry does exist in the follower's log but it has
             // a different term in it
 
-            if(LOG.isDebugEnabled()) {
+            if (LOG.isDebugEnabled()) {
                 LOG.debug(
-                    "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
-                    , previousEntry.getTerm()
-                    , appendEntries.getPrevLogTerm());
+                        "Cannot append entries because previous entry term {}  is not equal to append entries prevLogTerm {}"
+                        , prevLogTerm
+                        , appendEntries.getPrevLogTerm());
             }
         } else {
             outOfSync = false;
@@ -120,9 +147,9 @@ public class Follower extends AbstractRaftActorBehavior {
             // We found that the log was out of sync so just send a negative
             // reply and return
             if(LOG.isDebugEnabled()) {
-                LOG.debug("Follower is out-of-sync, " +
+                LOG.debug("Follower ({}) is out-of-sync, " +
                         "so sending negative reply, lastIndex():{}, lastTerm():{}",
-                    lastIndex(), lastTerm()
+                        context.getId(), lastIndex(), lastTerm()
                 );
             }
             sender.tell(
index 0ee9693d326e6eff9ff37c77eeaf37b4c8678572..a04d6aeb556cd2f84ffb10ac23302c9e5928451b 100644 (file)
@@ -421,6 +421,119 @@ public class FollowerTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleAppendEntriesPreviousLogEntryMissing(){
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                    createActorContext();
+
+            // Prepare the receivers log
+            MockRaftActorContext.SimpleReplicatedLog log =
+                    new MockRaftActorContext.SimpleReplicatedLog();
+            log.append(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
+            log.append(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
+            log.append(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
+
+            context.setReplicatedLog(log);
+
+            // Prepare the entries to be sent with AppendEntries
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            entries.add(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+            AppendEntries appendEntries =
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftActorBehavior raftBehavior =
+                    behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftBehavior);
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                    "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(false, out);
+
+        }};
+
+    }
+
+    @Test
+    public void testHandleAppendAfterInstallingSnapshot(){
+        new JavaTestKit(getSystem()) {{
+
+            MockRaftActorContext context = (MockRaftActorContext)
+                    createActorContext();
+
+
+            // Prepare the receivers log
+            MockRaftActorContext.SimpleReplicatedLog log =
+                    new MockRaftActorContext.SimpleReplicatedLog();
+
+            // Set up a log as if it has been snapshotted
+            log.setSnapshotIndex(3);
+            log.setSnapshotTerm(1);
+
+            context.setReplicatedLog(log);
+
+            // Prepare the entries to be sent with AppendEntries
+            List<ReplicatedLogEntry> entries = new ArrayList<>();
+            entries.add(
+                    new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+            AppendEntries appendEntries =
+                    new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+
+            RaftActorBehavior behavior = createBehavior(context);
+
+            // Send an unknown message so that the state of the RaftActor remains unchanged
+            RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+            RaftActorBehavior raftBehavior =
+                    behavior.handleMessage(getRef(), appendEntries);
+
+            assertEquals(expected, raftBehavior);
+
+            // Also expect an AppendEntriesReply to be sent where success is false
+            final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+                    "AppendEntriesReply") {
+                // do not put code outside this method, will run afterwards
+                protected Boolean match(Object in) {
+                    if (in instanceof AppendEntriesReply) {
+                        AppendEntriesReply reply = (AppendEntriesReply) in;
+                        return reply.isSuccess();
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get();
+
+            assertEquals(true, out);
+
+        }};
+
+    }
+
 
     /**
      * This test verifies that when InstallSnapshot is received by
index de28ae81fcbd5c1d9da6b28d7f5a352e91d3e4c5..0949d3d7612dfb34a1f8890329653af7efebc35e 100644 (file)
@@ -89,8 +89,7 @@ public class RpcProviderRegistryImpl implements RpcProviderRegistry, RouteChange
     }
 
     @Override
-    public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(final Class<T> type, final T implementation)
-            throws IllegalStateException {
+    public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(final Class<T> type, final T implementation) {
 
         // FIXME: This should be well documented - addRpcImplementation for
         // routed RPCs
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/raft/protobuff/client/messages/CompositeModificationByteStringPayload.java
new file mode 100644 (file)
index 0000000..99de5dd
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeModificationByteStringPayload extends Payload implements
+        Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private ByteString byteString;
+    private SoftReference<PersistentMessages.CompositeModification> modificationReference;
+    private static final Logger LOG = LoggerFactory.getLogger(CompositeModificationByteStringPayload.class);
+
+    public CompositeModificationByteStringPayload(){
+        byteString = null;
+    }
+    public CompositeModificationByteStringPayload(Object modification){
+        this(((PersistentMessages.CompositeModification) modification).toByteString());
+        this.modificationReference = new SoftReference<>((PersistentMessages.CompositeModification) modification);
+    }
+
+    private CompositeModificationByteStringPayload(ByteString byteString){
+        this.byteString = Preconditions.checkNotNull(byteString, "byteString should not be null");
+    }
+
+
+    @Override
+    public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+        Preconditions.checkState(byteString!=null);
+        Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+        map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification,
+                getModificationInternal());
+        return map;
+    }
+
+    @Override
+    public Payload decode(
+            AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+        PersistentMessages.CompositeModification modification = payload
+                .getExtension(
+                        org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification);
+
+        // The extension was put in the unknown field.
+        // This is because extensions need to be registered
+        // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+        // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+        // If that is not done then on the other end the extension shows up as an unknown field
+        // Need to figure out a better way to do this
+        if(payload.getUnknownFields().hasField(2)){
+            UnknownFieldSet.Field field =
+                    payload.getUnknownFields().getField(2);
+
+            return new CompositeModificationByteStringPayload(field.getLengthDelimitedList().get(0));
+        }
+
+        return new CompositeModificationByteStringPayload(modification);
+    }
+
+    public Object getModification(){
+        return getModificationInternal();
+    }
+
+    private PersistentMessages.CompositeModification getModificationInternal(){
+        if(this.modificationReference != null && this.modificationReference.get() != null){
+            return this.modificationReference.get();
+        }
+        try {
+            PersistentMessages.CompositeModification compositeModification = PersistentMessages.CompositeModification.parseFrom(this.byteString);
+            this.modificationReference = new SoftReference<>(compositeModification);
+            return compositeModification;
+        } catch (InvalidProtocolBufferException e) {
+            LOG.error("Unexpected exception occurred when parsing byteString to CompositeModification", e);
+        }
+
+        return null;
+    }
+
+    public int size(){
+        return byteString.size();
+    }
+
+    private void writeObject(java.io.ObjectOutputStream stream)
+            throws IOException {
+        byteString.writeTo(stream);
+    }
+
+    private void readObject(java.io.ObjectInputStream stream)
+            throws IOException, ClassNotFoundException {
+        byteString = ByteString.readFrom(stream);
+    }
+
+    @VisibleForTesting
+    public void clearModificationReference(){
+        if(this.modificationReference != null) {
+            this.modificationReference.clear();
+        }
+    }
+}
\ No newline at end of file
index a22e535fad1f6fa7052121c6be736a299e0b48cb..7d6dde9c8af296df1b82f331f431daaa0d431832 100644 (file)
@@ -69,6 +69,7 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -321,7 +322,7 @@ public class Shard extends RaftActor {
             cohortEntry.getCohort().preCommit().get();
 
             Shard.this.persistData(getSender(), transactionID,
-                    new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+                    new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
         } catch (InterruptedException | ExecutionException e) {
             LOG.error(e, "An exception occurred while preCommitting transaction {}",
                     cohortEntry.getTransactionID());
@@ -679,6 +680,8 @@ public class Shard extends RaftActor {
     protected void appendRecoveredLogEntry(final Payload data) {
         if (data instanceof CompositeModificationPayload) {
             currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+        } else if (data instanceof CompositeModificationByteStringPayload) {
+            currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
         } else {
             LOG.error("Unknown state received {} during recovery", data);
         }
@@ -755,19 +758,12 @@ public class Shard extends RaftActor {
         if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
-            if(modification == null) {
-                LOG.error(
-                     "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
-                     identifier, clientActor != null ? clientActor.path().toString() : null);
-            } else if(clientActor == null) {
-                // There's no clientActor to which to send a commit reply so we must be applying
-                // replicated state from the leader.
-                commitWithNewTransaction(MutableCompositeModification.fromSerializable(
-                        modification, schemaContext));
-            } else {
-                // This must be the OK to commit after replication consensus.
-                finishCommit(clientActor, identifier);
-            }
+            applyModificationToState(clientActor, identifier, modification);
+        } else if(data instanceof CompositeModificationByteStringPayload ){
+            Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+            applyModificationToState(clientActor, identifier, modification);
+
         } else {
             LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
                     data, data.getClass().getClassLoader(),
@@ -778,6 +774,22 @@ public class Shard extends RaftActor {
 
     }
 
+    private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+        if(modification == null) {
+            LOG.error(
+                    "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+                    identifier, clientActor != null ? clientActor.path().toString() : null);
+        } else if(clientActor == null) {
+            // There's no clientActor to which to send a commit reply so we must be applying
+            // replicated state from the leader.
+            commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+                    modification, schemaContext));
+        } else {
+            // This must be the OK to commit after replication consensus.
+            finishCommit(clientActor, identifier);
+        }
+    }
+
     private void updateJournalStats() {
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/CompositeModificationByteStringPayloadTest.java
new file mode 100644 (file)
index 0000000..db9f3d1
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class CompositeModificationByteStringPayloadTest {
+
+    private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
+
+    @Test
+    public void testSerialization(){
+        WriteModification writeModification =
+                new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+                        .containerNode(TestModel.TEST_QNAME),
+                        TestModel.createTestContext());
+
+        MutableCompositeModification compositeModification =
+                new MutableCompositeModification();
+
+        compositeModification.addModification(writeModification);
+
+        CompositeModificationByteStringPayload compositeModificationByteStringPayload
+                = new CompositeModificationByteStringPayload(compositeModification.toSerializable());
+
+        byte[] bytes = SerializationUtils.serialize(compositeModificationByteStringPayload);
+
+        Object deserialize = SerializationUtils.deserialize(bytes);
+
+        assertTrue(deserialize instanceof CompositeModificationByteStringPayload);
+
+    }
+
+    @Test
+    public void testAppendEntries(){
+        List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+        CompositeModificationByteStringPayload payload = newByteStringPayload(
+                new WriteModification(TestModel.OUTER_LIST_PATH,
+                        ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+                        SCHEMA_CONTEXT));
+
+        payload.clearModificationReference();
+
+        entries.add(new ReplicatedLogImplEntry(0, 1, payload));
+
+
+        assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+    }
+
+
+
+    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+    }
+
+}
index 926cef6ba53ea2805f4310e2d9a55b59371f4094..2792342ab2f3921f451999651477edca5664397b 100644 (file)
@@ -75,6 +75,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -432,9 +433,9 @@ public class ShardTest extends AbstractActorTest {
                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                           SCHEMA_CONTEXT))));
 
-        int nListEntries = 11;
+        int nListEntries = 16;
         Set<Integer> listEntryKeys = new HashSet<>();
-        for(int i = 1; i <= nListEntries; i++) {
+        for(int i = 1; i <= nListEntries-5; i++) {
             listEntryKeys.add(Integer.valueOf(i));
             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
@@ -445,6 +446,19 @@ public class ShardTest extends AbstractActorTest {
                     newPayload(mod)));
         }
 
+        // Add some of the new CompositeModificationByteStringPayload
+        for(int i = 11; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+            Modification mod = new MergeModification(path,
+                    ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+                    SCHEMA_CONTEXT);
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                    newByteStringPayload(mod)));
+        }
+
+
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
                 new ApplyLogEntries(nListEntries));
 
@@ -516,6 +530,16 @@ public class ShardTest extends AbstractActorTest {
         return new CompositeModificationPayload(compMod.toSerializable());
     }
 
+    private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+        MutableCompositeModification compMod = new MutableCompositeModification();
+        for(Modification mod: mods) {
+            compMod.addModification(mod);
+        }
+
+        return new CompositeModificationByteStringPayload(compMod.toSerializable());
+    }
+
+
     private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
             final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification) {
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotification.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotification.java
new file mode 100644 (file)
index 0000000..d99001e
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * A single YANG notification.
+ */
+public interface DOMNotification {
+    /**
+     * Return the type of this notification.
+     *
+     * @return Notification type.
+     */
+    @Nonnull SchemaPath getType();
+
+    /**
+     * Return the body of this notification.
+     *
+     * @return Notification body.
+     */
+    @Nonnull ContainerNode getBody();
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationListener.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationListener.java
new file mode 100644 (file)
index 0000000..7085588
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import java.util.EventListener;
+import javax.annotation.Nonnull;
+
+/**
+ * Interface implemented by listeners interested in {@link DOMNotification}s.
+ */
+public interface DOMNotificationListener extends EventListener {
+    /**
+     * Invoked whenever a {@link DOMNotification} matching the subscription
+     * criteria is received.
+     *
+     * @param notification Received notification
+     */
+    void onNotification(@Nonnull DOMNotification notification);
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationListenerRegistration.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationListenerRegistration.java
new file mode 100644 (file)
index 0000000..4dccad2
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * A registration of a {@link DOMNotificationListener}. Invoking {@link #close()} will prevent further
+ * delivery of events to the listener.
+ */
+public interface DOMNotificationListenerRegistration extends ListenerRegistration<DOMNotificationListener> {
+
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationPublishService.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationPublishService.java
new file mode 100644 (file)
index 0000000..8a845e8
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * A {@link DOMService} which allows its user to send {@link DOMNotification}s. It
+ * provides two styles of initiating the notification delivery, similar to
+ * {@link java.util.concurrent.BlockingQueue}:
+ * - a put-style method which waits until the implementation can accept the notification
+ *   for delivery, and
+ * - an offer-style method, which attempts to enqueue the notification, but allows
+ *   the caller to specify that it should never wait, or put an upper bound on how
+ *   long it is going to wait.
+ */
+public interface DOMNotificationPublishService extends DOMService {
+    /**
+     * Well-known value indicating that the implementation is currently not
+     * able to accept a notification.
+     */
+    ListenableFuture<Object> REJECTED = Futures.immediateFailedFuture(new Throwable("Unacceptable blocking conditions encountered"));
+
+    /**
+     * Publish a notification. The result of this method is a {@link ListenableFuture}
+     * which will complete once the notification has been delivered to all immediate
+     * registrants. The type of the object resulting from the future is not defined
+     * and implementations may use it to convey additional information related to the
+     * publishing process.
+     *
+     * Abstract subclasses can refine the return type as returning a promise of a
+     * more specific type, e.g.:
+     *
+     *     public interface DeliveryStatus { int getListenerCount(); }
+     *     ListenableFuture<? extends DeliveryStatus> putNotification(DOMNotification notification);
+     *
+     * Once the Future succeeds, the resulting object can be queried for traits using
+     * instanceof, e.g:
+     *
+     *     // Can block when (for example) the implemention's ThreadPool queue is full
+     *     Object o = service.putNotification(notif).get();
+     *     if (o instanceof DeliveryStatus) {
+     *         DeliveryStatus ds = (DeliveryStatus)o;
+     *         LOG.debug("Notification was received by {} listeners", ds.getListenerCount(););
+     *     }
+     * }
+     *
+     * In case an implementation is running out of resources, it can block the calling
+     * thread until enough resources become available to accept the notification for
+     * processing, or it is interrupted.
+     *
+     * Caution: completion here means that the implementation has completed processing
+     *          of the notification. This does not mean that all existing registrants
+     *          have seen the notification. Most importantly, the delivery process at
+     *          other cluster nodes may have not begun yet.
+     *
+     * @param notification Notification to be published.
+     * @return A listenable future which will report completion when the service
+     *         has finished propagating the notification to its immediate registrants.
+     * @throws InterruptedException if interrupted while waiting
+     * @throws NullPointerException if notification is null.
+     */
+    @Nonnull ListenableFuture<? extends Object> putNotification(@Nonnull DOMNotification notification) throws InterruptedException;
+
+    /**
+     * Attempt to publish a notification. The result of this method is a {@link ListenableFuture}
+     * which will complete once the notification has been delivered to all immediate
+     * registrants. The type of the object resulting from the future is not defined
+     * and implementations may use it to convey additional information related to the
+     * publishing process. Unlike {@link #putNotification(DOMNotification)}, this method
+     * is guaranteed not to block if the underlying implementation encounters contention.
+     *
+     * @param notification Notification to be published.
+     * @return A listenable future which will report completion when the service
+     *         has finished propagating the notification to its immediate registrants,
+     *         or {@value #REJECTED} if resource constraints prevent
+     *         the implementation from accepting the notification for delivery.
+     * @throws NullPointerException if notification is null.
+     */
+    @Nonnull ListenableFuture<? extends Object> offerNotification(@Nonnull DOMNotification notification);
+
+    /**
+     * Attempt to publish a notification. The result of this method is a {@link ListenableFuture}
+     * which will complete once the notification has been delivered to all immediate
+     * registrants. The type of the object resulting from the future is not defined
+     * and implementations may use it to convey additional information related to the
+     * publishing process. Unlike {@link #putNotification(DOMNotification)}, this method
+     * is guaranteed to block more than the specified timeout.
+     *
+     * @param notification Notification to be published.
+     * @param timeout how long to wait before giving up, in units of unit
+     * @param unit a TimeUnit determining how to interpret the timeout parameter
+     * @return A listenable future which will report completion when the service
+     *         has finished propagating the notification to its immediate registrants,
+     *         or {@value #REJECTED} if resource constraints prevent
+     *         the implementation from accepting the notification for delivery.
+     * @throws InterruptedException if interrupted while waiting
+     * @throws NullPointerException if notification or unit is null.
+     * @throws IllegalArgumentException if timeout is negative.
+     */
+    @Nonnull ListenableFuture<? extends Object> offerNotification(@Nonnull DOMNotification notification,
+        @Nonnegative long timeout, @Nonnull TimeUnit unit) throws InterruptedException;
+}
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationService.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/md/sal/dom/api/DOMNotificationService.java
new file mode 100644 (file)
index 0000000..6bce9c4
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * A {@link DOMService} which allows its users to subscribe to receive
+ * {@link DOMNotification}s.
+ */
+public interface DOMNotificationService {
+    /**
+     * Register a {@link DOMNotificationListener} to receive a set of notifications. As with
+     * other ListenerRegistration-based interfaces, registering an instance multiple times
+     * results in notifications being delivered for each registration.
+     *
+     * @param listener Notification instance to register
+     * @param types Notification types which should be delivered to the listener. Duplicate
+     *              entries are processed only once, null entries are ignored.
+     * @return Registration handle. Invoking {@link DOMNotificationListenerRegistration#close()}
+     *         will stop the delivery of notifications to the listener
+     * @throws IllegalArgumentException if types is empty or contains an invalid element, such as
+     *         null or a SchemaPath which does not represent a valid {@link DOMNotification} type.
+     * @throws NullPointerException if either of the arguments is null
+     */
+    DOMNotificationListenerRegistration registerNotificationListener(@Nonnull DOMNotificationListener listener, @Nonnull Collection<SchemaPath> types);
+
+    /**
+     * Register a {@link DOMNotificationListener} to receive a set of notifications. As with
+     * other ListenerRegistration-based interfaces, registering an instance multiple times
+     * results in notifications being delivered for each registration.
+     *
+     * @param listener Notification instance to register
+     * @param types Notification types which should be delivered to the listener. Duplicate
+     *              entries are processed only once, null entries are ignored.
+     * @return Registration handle. Invoking {@link DOMNotificationListenerRegistration#close()}
+     *         will stop the delivery of notifications to the listener
+     * @throws IllegalArgumentException if types is empty or contains an invalid element, such as
+     *         null or a SchemaPath which does not represent a valid {@link DOMNotification} type.
+     * @throws NullPointerException if listener is null
+     */
+    // FIXME: Java 8: provide a default implementation of this method.
+    DOMNotificationListenerRegistration registerNotificationListener(@Nonnull DOMNotificationListener listener, SchemaPath... types);
+}
index 357cb8bfe610fbfd226a3d3b2f5cbb2c5cdc4d67..dc18394ffd4225131c3369baefe42bbf19cca2e6 100644 (file)
@@ -5,9 +5,12 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html.
  */
-
 package org.opendaylight.controller.md.sal.dom.api;
 
+/**
+ * Marker interface for services which can be obtained from a {@link DOMMountPoint}
+ * instance. No further semantics are implied.
+ */
 public interface DOMService {
 
 }
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/AbstractDOMNotificationListenerRegistration.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/AbstractDOMNotificationListenerRegistration.java
new file mode 100644 (file)
index 0000000..2934b0d
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+
+/**
+ * Utility base class for {@link DOMNotificationListenerRegistration}
+ * implementations.
+ */
+public abstract class AbstractDOMNotificationListenerRegistration extends AbstractListenerRegistration<DOMNotificationListener> implements DOMNotificationListenerRegistration {
+    /**
+     * Default constructor. Subclasses need to invoke it from their
+     * constructor(s).
+     *
+     * @param listener {@link DOMNotificationListener} instance which is
+     *                 being held by this registration. May not be null.
+     */
+    protected AbstractDOMNotificationListenerRegistration(final @Nonnull DOMNotificationListener listener) {
+        super(listener);
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/ForwardingDOMNotificationPublishService.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/ForwardingDOMNotificationPublishService.java
new file mode 100644 (file)
index 0000000..6bc1107
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+
+/**
+ * Utility implementations of {@link DOMNotificationPublishService} which forwards
+ * all requests to a delegate instance.
+ */
+public abstract class ForwardingDOMNotificationPublishService extends ForwardingObject implements DOMNotificationPublishService {
+    @Override
+    protected abstract DOMNotificationPublishService delegate();
+
+    @Override
+    public ListenableFuture<? extends Object> putNotification(final DOMNotification notification) throws InterruptedException {
+        return delegate().putNotification(notification);
+    }
+
+    @Override
+    public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
+        return delegate().offerNotification(notification);
+    }
+
+    @Override
+    public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
+            final TimeUnit unit) throws InterruptedException {
+        return delegate().offerNotification(notification, timeout, unit);
+    }
+}
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/ForwardingDOMNotificationService.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/md/sal/dom/spi/ForwardingDOMNotificationService.java
new file mode 100644 (file)
index 0000000..5199a38
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import com.google.common.collect.ForwardingObject;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListenerRegistration;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Utility implementation of a {@link DOMNotificationService} which forwards all requests
+ * to a delegate instance.
+ */
+public abstract class ForwardingDOMNotificationService extends ForwardingObject implements DOMNotificationService {
+    @Override
+    protected abstract DOMNotificationService delegate();
+
+    @Override
+    public DOMNotificationListenerRegistration registerNotificationListener(final DOMNotificationListener listener,
+            final Collection<SchemaPath> types) {
+        return delegate().registerNotificationListener(listener, types);
+    }
+
+    @Override
+    public DOMNotificationListenerRegistration registerNotificationListener(final DOMNotificationListener listener,
+            final SchemaPath... types) {
+        return delegate().registerNotificationListener(listener, types);
+    }
+}
index d564d4cbd40d92ff18bc19fdb6c6c04698a12bcf..16b3ee67080ed70979a96d57d5b095a341bffc17 100644 (file)
@@ -211,6 +211,7 @@ public class RestconfDocumentedExceptionMapper implements ExceptionMapper<Restco
         }
 
         DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+
         factory.setNamespaceAware(true);
         factory.setCoalescing(true);
         factory.setIgnoringElementContentWhitespace(true);
index 062a4488f369075a70c4322f4d75b5a1ce24841b..ad7122058c880d3ab270d40ae56ef6775ccd3c2f 100644 (file)
@@ -52,6 +52,15 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
 
     static {
         DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+        try {
+            factory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
+            factory.setFeature("http://xml.org/sax/features/external-general-entities", false);
+            factory.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
+            factory.setXIncludeAware(false);
+            factory.setExpandEntityReferences(false);
+        } catch (ParserConfigurationException e) {
+            throw new ExceptionInInitializerError(e);
+        }
         factory.setNamespaceAware(true);
         factory.setCoalescing(true);
         factory.setIgnoringElementContentWhitespace(true);
index 6b9da80c685f913cce59edb19bc4588b16e0ed57..d71a12ff744c2b06b804a7d4a53487ec66739451 100644 (file)
@@ -32,6 +32,9 @@ import org.opendaylight.yangtools.yang.data.api.Node;
 public class XmlToCompositeNodeReader {
 
     private final static XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
+    static {
+        xmlInputFactory.setProperty("javax.xml.stream.isSupportingExternalEntities", false);
+    }
     private XMLEventReader eventReader;
 
     public Node<?> read(InputStream entityStream) throws XMLStreamException,
index 5ed528e9bfe8c5144a5c96698bc234248d5fecac..beb3365f1c068487ddfb5cbb70883cb81ff14776 100644 (file)
@@ -712,7 +712,7 @@ public class NetconfMappingTest extends AbstractConfigTest {
     private List<InputStream> getYangs() throws FileNotFoundException {
         List<String> paths = Arrays.asList("/META-INF/yang/config.yang", "/META-INF/yang/rpc-context.yang",
                 "/META-INF/yang/config-test.yang", "/META-INF/yang/config-test-impl.yang", "/META-INF/yang/test-types.yang",
-                "/META-INF/yang/ietf-inet-types.yang");
+                "/META-INF/yang/test-groups.yang", "/META-INF/yang/ietf-inet-types.yang");
         final Collection<InputStream> yangDependencies = new ArrayList<>();
         for (String path : paths) {
             final InputStream is = Preconditions
index 9bafe9760ae202cd8c75adfd0aa8a27aafae42ae..31a7661b2c17ccb8fdff3688f7111a26b4fe3f97 100644 (file)
@@ -9,11 +9,11 @@
 package org.opendaylight.controller.netconf.client;
 
 import io.netty.channel.Channel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
 import java.util.Collection;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.nettyutil.AbstractNetconfSession;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
 import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
 import org.opendaylight.controller.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
 import org.slf4j.Logger;
@@ -49,10 +49,10 @@ public class NetconfClientSession extends AbstractNetconfSession<NetconfClientSe
     }
 
     @Override
-    protected void addExiHandlers(final NetconfEXICodec exiCodec) {
+    protected void addExiHandlers(final ByteToMessageDecoder decoder, final MessageToByteEncoder<NetconfMessage> encoder) {
         // TODO used only in negotiator, client supports only auto start-exi
-        replaceMessageDecoder(new NetconfEXIToMessageDecoder(exiCodec));
-        replaceMessageEncoder(new NetconfMessageToEXIEncoder(exiCodec));
+        replaceMessageDecoder(decoder);
+        replaceMessageEncoder(encoder);
     }
 
     @Override
index 850ad55fc61314481cde821ff575bee9789a3f86..e11be554d0df948700dc37b8650db0e3c5722172 100644 (file)
@@ -12,7 +12,6 @@ import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
-
 import com.google.common.collect.Lists;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
@@ -24,6 +23,8 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
 import org.openexi.proc.common.EXIOptions;
 
 public class NetconfClientSessionTest {
@@ -53,7 +54,9 @@ public class NetconfClientSessionTest {
         Mockito.doReturn("").when(channelHandler).toString();
 
         NetconfClientSession session = new NetconfClientSession(sessionListener, channel, sessId, caps);
-        session.addExiHandlers(codec);
+        final NetconfMessageToEXIEncoder exiEncoder = new NetconfMessageToEXIEncoder(codec);
+        final NetconfEXIToMessageDecoder exiDecoder = new NetconfEXIToMessageDecoder(codec);
+        session.addExiHandlers(exiDecoder, exiEncoder);
         session.stopExiCommunication();
 
         assertEquals(caps, session.getServerCapabilities());
index ca604a4f6531c4f03d878825ac5fde5cfd360aeb..8f2c39df063a813696735daa66d8e7831f9a756c 100644 (file)
@@ -10,15 +10,15 @@ package org.opendaylight.controller.netconf.impl;
 
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
 import org.opendaylight.controller.netconf.nettyutil.AbstractNetconfSession;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
 import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
 import org.opendaylight.controller.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
@@ -46,8 +46,8 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
     private Date loginTime;
     private long inRpcSuccess, inRpcFail, outRpcError;
 
-    public NetconfServerSession(NetconfServerSessionListener sessionListener, Channel channel, long sessionId,
-            NetconfHelloMessageAdditionalHeader header) {
+    public NetconfServerSession(final NetconfServerSessionListener sessionListener, final Channel channel, final long sessionId,
+            final NetconfHelloMessageAdditionalHeader header) {
         super(sessionListener, channel, sessionId);
         this.header = header;
         LOG.debug("Session {} created", toString());
@@ -109,7 +109,7 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
         return builder.build();
     }
 
-    private Class<? extends Transport> getTransportForString(String transport) {
+    private Class<? extends Transport> getTransportForString(final String transport) {
         switch(transport) {
         case "ssh" :
             return NetconfSsh.class;
@@ -120,7 +120,7 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
         }
     }
 
-    private String formatDateTime(Date loginTime) {
+    private String formatDateTime(final Date loginTime) {
         SimpleDateFormat dateFormat = new SimpleDateFormat(ISO_DATE_FORMAT);
         return dateFormat.format(loginTime);
     }
@@ -131,9 +131,9 @@ public final class NetconfServerSession extends AbstractNetconfSession<NetconfSe
     }
 
     @Override
-    protected void addExiHandlers(NetconfEXICodec exiCodec) {
-        replaceMessageDecoder(new NetconfEXIToMessageDecoder(exiCodec));
-        replaceMessageEncoderAfterNextMessage(new NetconfMessageToEXIEncoder(exiCodec));
+    protected void addExiHandlers(final ByteToMessageDecoder decoder, final MessageToByteEncoder<NetconfMessage> encoder) {
+        replaceMessageDecoder(decoder);
+        replaceMessageEncoderAfterNextMessage(encoder);
     }
 
     @Override
index a724d1d9c5e4e8b6cec46cdb94f98a7158c48407..65810a6bdab8d840919e9f25b1d0961b499d0566 100644 (file)
@@ -187,6 +187,7 @@ public abstract class AbstractNetconfConfigTest extends AbstractConfigTest {
                 "/META-INF/yang/config-test.yang",
                 "/META-INF/yang/config-test-impl.yang",
                 "/META-INF/yang/test-types.yang",
+                "/META-INF/yang/test-groups.yang",
                 "/META-INF/yang/ietf-inet-types.yang");
 
         final Collection<InputStream> yangDependencies = new ArrayList<>();
index cc170358dd2c0e5e82fbbb644c889ebf9dc5c7ee..92c96d92f28962eeb88ccc961665d852cd39364c 100644 (file)
@@ -112,8 +112,8 @@ public class NetconfConfigPersisterITTest extends AbstractNetconfConfigTest {
         }
 
         notificationVerifier.assertNotificationCount(2);
-        notificationVerifier.assertNotificationContent(0, 0, 0, 8);
-        notificationVerifier.assertNotificationContent(1, 4, 3, 8);
+        notificationVerifier.assertNotificationContent(0, 0, 0, 9);
+        notificationVerifier.assertNotificationContent(1, 4, 3, 9);
 
         mockedAggregator.assertSnapshotCount(2);
         // Capabilities are stripped for persister
index fd11ce8c51875e379976917fd722ce6c5797d0ec..8eb792b6cdfc368930db066dec991abdf47a3ea8 100644 (file)
@@ -10,6 +10,8 @@ package org.opendaylight.controller.netconf.nettyutil;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
 import java.io.IOException;
 import org.opendaylight.controller.netconf.api.NetconfExiSession;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
@@ -17,6 +19,8 @@ import org.opendaylight.controller.netconf.api.NetconfSession;
 import org.opendaylight.controller.netconf.api.NetconfSessionListener;
 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
 import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
 import org.opendaylight.controller.netconf.nettyutil.handler.exi.EXIParameters;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.protocol.framework.AbstractProtocolSession;
@@ -117,12 +121,22 @@ public abstract class AbstractNetconfSession<S extends NetconfSession, L extends
             LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
             throw new IllegalArgumentException("Cannot parse options", e);
         }
+
         final NetconfEXICodec exiCodec = new NetconfEXICodec(exiParams.getOptions());
-        addExiHandlers(exiCodec);
+        final NetconfMessageToEXIEncoder exiEncoder = new NetconfMessageToEXIEncoder(exiCodec);
+        final NetconfEXIToMessageDecoder exiDecoder = new NetconfEXIToMessageDecoder(exiCodec);
+        addExiHandlers(exiDecoder, exiEncoder);
+
         LOG.debug("Session {} EXI handlers added to pipeline", this);
     }
 
-    protected abstract void addExiHandlers(NetconfEXICodec exiCodec);
+    /**
+     * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
+     *
+     * @param decoder EXI decoder
+     * @param encoder EXI encoder
+     */
+    protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
 
     public final boolean isUp() {
         return up;
index 98baef0f8580fb5ae8b2677ee20072ff14c3ab5c..16da7a7f9dcf3eabb7fee06e8c3201fce1c41155 100644 (file)
@@ -1,6 +1,9 @@
 package org.opendaylight.controller.netconf.nettyutil.handler;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import org.openexi.proc.HeaderOptionsOutputType;
 import org.openexi.proc.common.EXIOptions;
 import org.openexi.proc.common.EXIOptionsException;
@@ -8,6 +11,9 @@ import org.openexi.proc.common.GrammarOptions;
 import org.openexi.proc.grammars.GrammarCache;
 import org.openexi.sax.EXIReader;
 import org.openexi.sax.Transmogrifier;
+import org.openexi.sax.TransmogrifierException;
+import org.xml.sax.EntityResolver;
+import org.xml.sax.InputSource;
 
 public final class NetconfEXICodec {
     /**
@@ -16,13 +22,41 @@ public final class NetconfEXICodec {
      * of the stream. This is really useful, so let's output it now.
      */
     private static final boolean OUTPUT_EXI_COOKIE = true;
+    /**
+     * OpenEXI does not allow us to directly prevent resolution of external entities. In order
+     * to prevent XXE attacks, we reuse a single no-op entity resolver.
+     */
+    private static final EntityResolver ENTITY_RESOLVER = new EntityResolver() {
+        @Override
+        public InputSource resolveEntity(final String publicId, final String systemId) {
+            return new InputSource();
+        }
+    };
+
+    /**
+     * Since we have a limited number of options we can have, instantiating a weak cache
+     * will allow us to reuse instances where possible.
+     */
+    private static final LoadingCache<Short, GrammarCache> GRAMMAR_CACHES = CacheBuilder.newBuilder().weakValues().build(new CacheLoader<Short, GrammarCache>() {
+        @Override
+        public GrammarCache load(final Short key) {
+            return new GrammarCache(key);
+        }
+    });
+
+    /**
+     * Grammar cache acts as a template and is duplicated by the Transmogrifier and the Reader
+     * before use. It is safe to reuse a single instance.
+     */
+    private final GrammarCache exiGrammarCache;
     private final EXIOptions exiOptions;
 
     public NetconfEXICodec(final EXIOptions exiOptions) {
         this.exiOptions = Preconditions.checkNotNull(exiOptions);
+        this.exiGrammarCache = createGrammarCache(exiOptions);
     }
 
-    private GrammarCache getGrammarCache() {
+    private static GrammarCache createGrammarCache(final EXIOptions exiOptions) {
         short go = GrammarOptions.DEFAULT_OPTIONS;
         if (exiOptions.getPreserveComments()) {
             go = GrammarOptions.addCM(go);
@@ -37,23 +71,25 @@ public final class NetconfEXICodec {
             go = GrammarOptions.addPI(go);
         }
 
-        return new GrammarCache(null, go);
+        return GRAMMAR_CACHES.getUnchecked(go);
     }
 
     EXIReader getReader() throws EXIOptionsException {
         final EXIReader r = new EXIReader();
         r.setPreserveLexicalValues(exiOptions.getPreserveLexicalValues());
-        r.setGrammarCache(getGrammarCache());
+        r.setGrammarCache(exiGrammarCache);
+        r.setEntityResolver(ENTITY_RESOLVER);
         return r;
     }
 
-    Transmogrifier getTransmogrifier() throws EXIOptionsException {
+    Transmogrifier getTransmogrifier() throws EXIOptionsException, TransmogrifierException {
         final Transmogrifier transmogrifier = new Transmogrifier();
         transmogrifier.setAlignmentType(exiOptions.getAlignmentType());
         transmogrifier.setBlockSize(exiOptions.getBlockSize());
-        transmogrifier.setGrammarCache(getGrammarCache());
+        transmogrifier.setGrammarCache(exiGrammarCache);
         transmogrifier.setOutputCookie(OUTPUT_EXI_COOKIE);
         transmogrifier.setOutputOptions(HeaderOptionsOutputType.all);
+        transmogrifier.setResolveExternalGeneralEntities(false);
         return transmogrifier;
     }
 }
index f1e72ed85fb6475ebeab5ae8e400d953077adf7e..e90bc7916dc03fd210e2b69048161d944fda10e4 100644 (file)
@@ -20,6 +20,7 @@ import javax.xml.transform.sax.SAXResult;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.openexi.proc.common.EXIOptionsException;
 import org.openexi.sax.Transmogrifier;
+import org.openexi.sax.TransmogrifierException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +33,7 @@ public final class NetconfMessageToEXIEncoder extends MessageToByteEncoder<Netco
     }
 
     @Override
-    protected void encode(final ChannelHandlerContext ctx, final NetconfMessage msg, final ByteBuf out) throws EXIOptionsException, IOException, TransformerException {
+    protected void encode(final ChannelHandlerContext ctx, final NetconfMessage msg, final ByteBuf out) throws EXIOptionsException, IOException, TransformerException, TransmogrifierException {
         LOG.trace("Sent to encode : {}", msg);
 
         try (final OutputStream os = new ByteBufOutputStream(out)) {
index 7946afdbf5e1c80add226b08137b1faedd3bf6bf..3a1b26dd9e4dc0c44fedc890a78b9586c6512e29 100644 (file)
@@ -19,12 +19,13 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
-
 import com.google.common.base.Optional;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
 import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,7 +35,6 @@ import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfSession;
 import org.opendaylight.controller.netconf.api.NetconfSessionListener;
 import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
 import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
@@ -115,7 +115,7 @@ public class AbstractNetconfSessionTest {
         testingNetconfSession = spy(testingNetconfSession);
 
         testingNetconfSession.startExiCommunication(NetconfStartExiMessage.create(new EXIOptions(), "4"));
-        verify(testingNetconfSession).addExiHandlers(any(NetconfEXICodec.class));
+        verify(testingNetconfSession).addExiHandlers(any(ByteToMessageDecoder.class), any(MessageToByteEncoder.class));
     }
 
     @Test
@@ -148,7 +148,7 @@ public class AbstractNetconfSessionTest {
         }
 
         @Override
-        protected void addExiHandlers(final NetconfEXICodec exiCodec) {}
+        protected void addExiHandlers(final ByteToMessageDecoder decoder, final MessageToByteEncoder<NetconfMessage> encoder) {}
 
         @Override
         public void stopExiCommunication() {}