import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.config.persist.storage.file.xml.model.ConfigSnapshot;
public static ConfigSnapshotHolder loadLastConfig(final File file) throws JAXBException {
JAXBContext jaxbContext = JAXBContext.newInstance(ConfigSnapshot.class);
Unmarshaller um = jaxbContext.createUnmarshaller();
-
- return asHolder((ConfigSnapshot) um.unmarshal(file));
+ XMLInputFactory xif = XMLInputFactory.newFactory();
+ xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+ xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+ try {
+ XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(file));
+ return asHolder((ConfigSnapshot) um.unmarshal(xsr));
+ } catch (final XMLStreamException e) {
+ throw new JAXBException(e);
+ }
}
private static ConfigSnapshotHolder asHolder(final ConfigSnapshot unmarshalled) {
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
import org.apache.karaf.features.ConfigFileInfo;
import org.apache.karaf.features.Feature;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
Preconditions.checkNotNull(feature);
this.fileInfo = fileInfo;
this.featureChain.add(feature);
+ // TODO extract utility method for umarshalling config snapshots
JAXBContext jaxbContext = JAXBContext.newInstance(ConfigSnapshot.class);
Unmarshaller um = jaxbContext.createUnmarshaller();
- File file = new File(fileInfo.getFinalname());
- unmarshalled = ((ConfigSnapshot) um.unmarshal(file));
+ XMLInputFactory xif = XMLInputFactory.newFactory();
+ xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+ xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+ try {
+ XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(new File(fileInfo.getFinalname())));
+ unmarshalled = ((ConfigSnapshot) um.unmarshal(xsr));
+ } catch (final XMLStreamException e) {
+ throw new JAXBException(e);
+ }
}
/*
* (non-Javadoc)
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
import org.apache.commons.lang3.StringUtils;
@XmlRootElement(name = "persisted-snapshots")
try {
JAXBContext jaxbContext = JAXBContext.newInstance(Config.class);
Unmarshaller um = jaxbContext.createUnmarshaller();
-
- return (Config) um.unmarshal(from);
- } catch (JAXBException e) {
+ XMLInputFactory xif = XMLInputFactory.newFactory();
+ xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+ xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+ XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(from));
+ return ((Config) um.unmarshal(xsr));
+ } catch (JAXBException | XMLStreamException e) {
throw new PersistException("Unable to restore configuration", e);
}
}
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>binding-type-provider</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-model-util</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
final String javaNamePrefix) {
return RuntimeBeanEntry.extractClassNameToRuntimeBeanMap(packageName, dataNodeContainer, moduleLocalNameFromXPath,
- typeProviderWrapper, javaNamePrefix, currentModule).values();
+ typeProviderWrapper, javaNamePrefix, currentModule, schemaContext).values();
}
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
-import org.opendaylight.yangtools.yang.model.api.IdentitySchemaNode;
import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaNode;
import org.opendaylight.yangtools.yang.model.api.UnknownSchemaNode;
import org.opendaylight.yangtools.yang.model.api.UsesNode;
+import org.opendaylight.yangtools.yang.model.util.SchemaContextUtil;
/**
* Holds information about runtime bean to be generated. There are two kinds of
* lined via children so that a tree with all beans can be created.
*/
public class RuntimeBeanEntry {
+
+ private static final Function<SchemaNode, QName> QNAME_FROM_NODE = new Function<SchemaNode, QName>() {
+ @Override
+ public QName apply(final SchemaNode input) {
+ return input.getQName();
+ }
+ };
+
+ private static final Function<UnknownSchemaNode, String> UNKNOWN_NODE_TO_STRING = new Function<UnknownSchemaNode, String>() {
+ @Override
+ public String apply(final UnknownSchemaNode input) {
+ return input.getQName().getLocalName() + input.getNodeParameter();
+ }
+ };
+
private final String packageName;
private final String yangName, javaNamePrefix;
private final boolean isRoot;
public static Map<String, RuntimeBeanEntry> extractClassNameToRuntimeBeanMap(
final String packageName, final DataNodeContainer container,
final String moduleYangName, final TypeProviderWrapper typeProviderWrapper,
- final String javaNamePrefix, final Module currentModule) {
+ final String javaNamePrefix, final Module currentModule, final SchemaContext schemaContext) {
- Map<QName, Set<RpcDefinition>> identitiesToRpcs = getIdentitiesToRpcs(currentModule);
AttributesRpcsAndRuntimeBeans attributesRpcsAndRuntimeBeans = extractSubtree(
packageName, container, typeProviderWrapper, currentModule,
- identitiesToRpcs);
+ schemaContext);
Map<String, RuntimeBeanEntry> result = new HashMap<>();
List<AttributeIfc> attributes;
return result;
}
- private static Map<QName/* of identity */, Set<RpcDefinition>> getIdentitiesToRpcs(
- final Module currentModule) {
- // currently only looks for local identities (found in currentModule)
- Map<QName, Set<RpcDefinition>> result = new HashMap<>();
- for (IdentitySchemaNode identity : currentModule.getIdentities()) {
- // add all
- result.put(identity.getQName(), new HashSet<RpcDefinition>());
- }
+ private static Multimap<QName/* of identity */, RpcDefinition> getIdentitiesToRpcs(
+ final SchemaContext schemaCtx) {
+ Multimap<QName, RpcDefinition> result = HashMultimap.create();
+ for (Module currentModule : schemaCtx.getModules()) {
- for (RpcDefinition rpc : currentModule.getRpcs()) {
- ContainerSchemaNode input = rpc.getInput();
- if (input != null) {
- for (UsesNode uses : input.getUses()) {
+ // Find all identities in current module for later identity->rpc mapping
+ Set<QName> allIdentitiesInModule = Sets.newHashSet(Collections2.transform(currentModule.getIdentities(), QNAME_FROM_NODE));
- if (uses.getGroupingPath().getPath().size() != 1) {
- continue;
- }
+ for (RpcDefinition rpc : currentModule.getRpcs()) {
+ ContainerSchemaNode input = rpc.getInput();
+ if (input != null) {
+ for (UsesNode uses : input.getUses()) {
- // check grouping path
- QName qname = uses.getGroupingPath().getPath().get(0);
- if (false == qname
- .equals(ConfigConstants.RPC_CONTEXT_REF_GROUPING_QNAME)) {
- continue;
- }
+ // Check if the rpc is config rpc by looking for input argument rpc-context-ref
+ Iterator<QName> pathFromRoot = uses.getGroupingPath().getPathFromRoot().iterator();
+ if (!pathFromRoot.hasNext() ||
+ !pathFromRoot.next().equals(ConfigConstants.RPC_CONTEXT_REF_GROUPING_QNAME)) {
+ continue;
+ }
- for (SchemaNode refinedNode : uses.getRefines().values()) {
-
- for (UnknownSchemaNode unknownSchemaNode : refinedNode
- .getUnknownSchemaNodes()) {
- if (ConfigConstants.RPC_CONTEXT_INSTANCE_EXTENSION_QNAME
- .equals(unknownSchemaNode.getNodeType())) {
- String localIdentityName = unknownSchemaNode
- .getNodeParameter();
- QName identityQName = QName.create(
- currentModule.getNamespace(),
- currentModule.getRevision(),
- localIdentityName);
- Set<RpcDefinition> rpcDefinitions = result
- .get(identityQName);
- if (rpcDefinitions == null) {
- throw new IllegalArgumentException(
- "Identity referenced by rpc not found. Identity:"
- + localIdentityName + " , rpc "
- + rpc);
+ for (SchemaNode refinedNode : uses.getRefines().values()) {
+ for (UnknownSchemaNode unknownSchemaNode : refinedNode
+ .getUnknownSchemaNodes()) {
+ if (ConfigConstants.RPC_CONTEXT_INSTANCE_EXTENSION_QNAME
+ .equals(unknownSchemaNode.getNodeType())) {
+ String localIdentityName = unknownSchemaNode
+ .getNodeParameter();
+ QName identityQName = QName.create(
+ currentModule.getNamespace(),
+ currentModule.getRevision(),
+ localIdentityName);
+ Preconditions.checkArgument(allIdentitiesInModule.contains(identityQName),
+ "Identity referenced by rpc not found. Identity: %s, rpc: %s", localIdentityName, rpc);
+ result.put(identityQName, rpc);
}
- rpcDefinitions.add(rpc);
}
}
}
private static AttributesRpcsAndRuntimeBeans extractSubtree(
final String packageName, final DataNodeContainer subtree,
final TypeProviderWrapper typeProviderWrapper, final Module currentModule,
- final Map<QName, Set<RpcDefinition>> identitiesToRpcs) {
+ final SchemaContext ctx) {
+
+ Multimap<QName, RpcDefinition> identitiesToRpcs = getIdentitiesToRpcs(ctx);
List<AttributeIfc> attributes = Lists.newArrayList();
List<RuntimeBeanEntry> runtimeBeanEntries = new ArrayList<>();
ListSchemaNode listSchemaNode = (ListSchemaNode) child;
RuntimeBeanEntry hierarchicalChild = createHierarchical(
packageName, listSchemaNode, typeProviderWrapper,
- currentModule, identitiesToRpcs);
+ currentModule, ctx);
runtimeBeanEntries.add(hierarchicalChild);
} else /* ordinary list attribute */{
ListAttribute listAttribute = ListAttribute.create(
if (ConfigConstants.RPC_CONTEXT_INSTANCE_EXTENSION_QNAME
.equals(unknownSchemaNode.getNodeType())) {
String localIdentityName = unknownSchemaNode.getNodeParameter();
- QName identityQName = QName.create(currentModule.getNamespace(),
- currentModule.getRevision(), localIdentityName);
- Set<RpcDefinition> rpcDefinitions = identitiesToRpcs
- .get(identityQName);
- if (rpcDefinitions == null) {
- throw new IllegalArgumentException("Cannot find identity "
- + localIdentityName + " to be used as "
- + "context reference when resolving "
- + unknownSchemaNode);
- }
+ QName identityQName = unknownSchemaNode.isAddedByUses() ?
+ findQNameFromGrouping(subtree, ctx, unknownSchemaNode, localIdentityName) :
+ QName.create(currentModule.getNamespace(), currentModule.getRevision(), localIdentityName);
// convert RpcDefinition to Rpc
- for (RpcDefinition rpcDefinition : rpcDefinitions) {
+ for (RpcDefinition rpcDefinition : identitiesToRpcs.get(identityQName)) {
String name = TypeProviderWrapper
.findJavaParameter(rpcDefinition);
AttributeIfc returnType;
attributes, rpcs);
}
+ /**
+ * Find "proper" qname of unknown node in case it comes from a grouping
+ */
+ private static QName findQNameFromGrouping(final DataNodeContainer subtree, final SchemaContext ctx, final UnknownSchemaNode unknownSchemaNode, final String localIdentityName) {
+ QName identityQName = null;
+ for (UsesNode usesNode : subtree.getUses()) {
+ SchemaNode dataChildByName = SchemaContextUtil.findDataSchemaNode(ctx, usesNode.getGroupingPath());
+ Module m = SchemaContextUtil.findParentModule(ctx, dataChildByName);
+ List<UnknownSchemaNode> unknownSchemaNodes = dataChildByName.getUnknownSchemaNodes();
+ if(Collections2.transform(unknownSchemaNodes, UNKNOWN_NODE_TO_STRING).contains(UNKNOWN_NODE_TO_STRING.apply(unknownSchemaNode))) {
+ identityQName = QName.create(dataChildByName.getQName(), localIdentityName);
+ }
+ }
+ return identityQName;
+ }
+
private static AttributeIfc getReturnTypeAttribute(final DataSchemaNode child, final TypeProviderWrapper typeProviderWrapper,
final String packageName) {
if (child instanceof LeafSchemaNode) {
private static RuntimeBeanEntry createHierarchical(final String packageName,
final ListSchemaNode listSchemaNode,
final TypeProviderWrapper typeProviderWrapper, final Module currentModule,
- final Map<QName, Set<RpcDefinition>> identitiesToRpcs) {
+ final SchemaContext ctx) {
// supported are numeric types, strings, enums
// get all attributes
AttributesRpcsAndRuntimeBeans attributesRpcsAndRuntimeBeans = extractSubtree(
packageName, listSchemaNode, typeProviderWrapper,
- currentModule, identitiesToRpcs);
+ currentModule, ctx);
Optional<String> keyYangName;
if (listSchemaNode.getKeyDefinition().isEmpty()) {
.getUnknownSchemaNodes();
Map<String, RuntimeBeanEntry> runtimeBeans = RuntimeBeanEntry
.extractClassNameToRuntimeBeanMap(PACKAGE_NAME, caseNode, "test-name", new TypeProviderWrapper(new
- TypeProviderImpl(context)), "test", jmxImplModule);
+ TypeProviderImpl(context)), "test", jmxImplModule, context);
assertEquals(1, runtimeBeans.size());
RuntimeBeanEntry runtimeMXBean = runtimeBeans.get("testRuntimeMXBean");
assertTrue(runtimeMXBean.isRoot());
package org.opendaylight.controller.config.yang.test.util;
import com.google.common.collect.Lists;
+import java.math.BigInteger;
import java.util.List;
import org.opendaylight.controller.config.yang.test.impl.Asdf;
import org.opendaylight.controller.config.yang.test.impl.Deep2;
return asdf;
}
+ @Override
+ public BigInteger getCommonStat() {
+ return new BigInteger("54");
+ }
+
@Override
public String noArg(final String arg1) {
return arg1.toUpperCase();
}
+ @Override
+ public Long commonRpcTwo() {
+ return 1L;
+ }
+
+ @Override
+ public String commonRpcThree() {
+ return "true";
+ }
+
+ @Override
+ public Boolean commonRpc() {
+ return true;
+ }
+
+ @Override
+ public void netconfImplRpcFromGrouping() {
+ // rpc from grouping within same yang module
+ }
+
});
for (int i = 0; i < module.getSimpleShort(); i++) {
import ietf-inet-types { prefix inet; revision-date 2010-09-24;}
import rpc-context { prefix rpcx; revision-date 2013-06-17; }
import test-types { prefix tt; revision-date 2013-11-27; }
+ import test-groups { prefix tg; revision-date 2014-12-08; }
description
"Testing IMPL";
}
}
+ grouping netconf-impl-rpc {
+ rpcx:rpc-context-instance netconf-impl-rpc-ctx;
+ }
+
+ identity netconf-impl-rpc-ctx;
+
+ rpc netconf-impl-rpc-from-grouping {
+ input {
+ uses rpcx:rpc-context-ref {
+ refine context-instance {
+ rpcx:rpc-context-instance "netconf-impl-rpc-ctx";
+ }
+ }
+ }
+ }
+
augment "/config:modules/config:module/config:state" {
case impl-netconf {
when "/config:modules/config:module/config:type = 'impl-netconf'";
// rpc
rpcx:rpc-context-instance "test-rpc";
+ // add some stats + rpc from groupings outside this module
+ uses tt:common-operational;
+ uses tg:common-operational-rpc;
+ uses netconf-impl-rpc;
+
// root runtime bean
leaf created-sessions {
type uint32;
--- /dev/null
+module test-groups {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:config:test:groups";
+ prefix "tg";
+
+ import rpc-context { prefix rpcx; revision-date 2013-06-17; }
+
+ description
+ "Groupings generated for testing";
+
+ revision "2014-12-08";
+
+ grouping common-operational-rpc {
+ rpcx:rpc-context-instance common-rpc-ctx;
+ rpcx:rpc-context-instance common-rpc-ctx-two;
+ }
+
+ identity common-rpc-ctx;
+ identity common-rpc-ctx-two;
+
+ rpc common-rpc {
+ input {
+ uses rpcx:rpc-context-ref {
+ refine context-instance {
+ rpcx:rpc-context-instance "common-rpc-ctx";
+ }
+ }
+ }
+
+ output {
+ leaf output {
+ type boolean;
+ }
+ }
+ }
+
+ rpc common-rpc-two {
+ input {
+ uses rpcx:rpc-context-ref {
+ refine context-instance {
+ rpcx:rpc-context-instance "common-rpc-ctx-two";
+ }
+ }
+ }
+
+ output {
+ leaf output {
+ type uint32;
+ }
+ }
+ }
+}
namespace "urn:opendaylight:params:xml:ns:yang:controller:config:test:types";
prefix "tt";
+ import rpc-context { prefix rpcx; revision-date 2013-06-17; }
+
description
"Types generated for testing";
identity test-identity2 {
base test-identity1;
}
+
+ grouping common-operational {
+ leaf common-stat {
+ type uint64;
+ }
+ // This would not work, since it clashes with identity common-rpc-ctx from test-groups
+ // Both grouping add the same unknown node "rpcx:rpc-context-instance common-rpc-ctx-three;"
+ // and we cannot match the unknown node to the grouping that added it
+ //rpcx:rpc-context-instance common-rpc-ctx-three;
+ rpcx:rpc-context-instance common-rpc-ctx-three;
+ }
+
+ //identity common-rpc-ctx;
+ identity common-rpc-ctx-three;
+
+ rpc common-rpc-three {
+ input {
+ uses rpcx:rpc-context-ref {
+ refine context-instance {
+ rpcx:rpc-context-instance "common-rpc-ctx-three";
+ }
+ }
+ }
+
+ output {
+ leaf output {
+ type string;
+ }
+ }
+ }
}
scheduleElection(electionDuration());
}
+ private boolean isLogEntryPresent(long index){
+ if(index == context.getReplicatedLog().getSnapshotIndex()){
+ return true;
+ }
+
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(index);
+
+ return previousEntry != null;
+
+ }
+
+ private long getLogEntryTerm(long index){
+ if(index == context.getReplicatedLog().getSnapshotIndex()){
+ return context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(index);
+
+ if(previousEntry != null){
+ return previousEntry.getTerm();
+ }
+
+ return -1;
+ }
+
@Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
- ReplicatedLogEntry previousEntry = context.getReplicatedLog()
- .get(appendEntries.getPrevLogIndex());
+ long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
+ boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
boolean outOfSync = true;
// First check if the logs are in sync or not
if (lastIndex() == -1
- && appendEntries.getPrevLogIndex() != -1) {
+ && appendEntries.getPrevLogIndex() != -1) {
// The follower's log is out of sync because the leader does have
// an entry at prevLogIndex and this follower has no entries in
if(LOG.isDebugEnabled()) {
LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
- && appendEntries.getPrevLogIndex() != -1
- && previousEntry == null) {
+ && appendEntries.getPrevLogIndex() != -1
+ && !prevEntryPresent) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
if(LOG.isDebugEnabled()) {
LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
- && previousEntry != null
- && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+ && prevEntryPresent
+ && prevLogTerm != appendEntries.getPrevLogTerm()) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , previousEntry.getTerm()
- , appendEntries.getPrevLogTerm());
+ "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , prevLogTerm
+ , appendEntries.getPrevLogTerm());
}
} else {
outOfSync = false;
// We found that the log was out of sync so just send a negative
// reply and return
if(LOG.isDebugEnabled()) {
- LOG.debug("Follower is out-of-sync, " +
+ LOG.debug("Follower ({}) is out-of-sync, " +
"so sending negative reply, lastIndex():{}, lastTerm():{}",
- lastIndex(), lastTerm()
+ context.getId(), lastIndex(), lastTerm()
);
}
sender.tell(
}};
}
+ @Test
+ public void testHandleAppendEntriesPreviousLogEntryMissing(){
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+ AppendEntries appendEntries =
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftActorBehavior raftBehavior =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftBehavior);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+
+ }};
+
+ }
+
+ @Test
+ public void testHandleAppendAfterInstallingSnapshot(){
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+
+ // Set up a log as if it has been snapshotted
+ log.setSnapshotIndex(3);
+ log.setSnapshotTerm(1);
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+ AppendEntries appendEntries =
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftActorBehavior raftBehavior =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftBehavior);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+
+ }};
+
+ }
+
/**
* This test verifies that when InstallSnapshot is received by
}
@Override
- public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(final Class<T> type, final T implementation)
- throws IllegalStateException {
+ public final <T extends RpcService> RpcRegistration<T> addRpcImplementation(final Class<T> type, final T implementation) {
// FIXME: This should be well documented - addRpcImplementation for
// routed RPCs
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeModificationByteStringPayload extends Payload implements
+ Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private ByteString byteString;
+ private SoftReference<PersistentMessages.CompositeModification> modificationReference;
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeModificationByteStringPayload.class);
+
+ public CompositeModificationByteStringPayload(){
+ byteString = null;
+ }
+ public CompositeModificationByteStringPayload(Object modification){
+ this(((PersistentMessages.CompositeModification) modification).toByteString());
+ this.modificationReference = new SoftReference<>((PersistentMessages.CompositeModification) modification);
+ }
+
+ private CompositeModificationByteStringPayload(ByteString byteString){
+ this.byteString = Preconditions.checkNotNull(byteString, "byteString should not be null");
+ }
+
+
+ @Override
+ public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+ Preconditions.checkState(byteString!=null);
+ Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+ map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification,
+ getModificationInternal());
+ return map;
+ }
+
+ @Override
+ public Payload decode(
+ AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+ PersistentMessages.CompositeModification modification = payload
+ .getExtension(
+ org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification);
+
+ // The extension was put in the unknown field.
+ // This is because extensions need to be registered
+ // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+ // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+ // If that is not done then on the other end the extension shows up as an unknown field
+ // Need to figure out a better way to do this
+ if(payload.getUnknownFields().hasField(2)){
+ UnknownFieldSet.Field field =
+ payload.getUnknownFields().getField(2);
+
+ return new CompositeModificationByteStringPayload(field.getLengthDelimitedList().get(0));
+ }
+
+ return new CompositeModificationByteStringPayload(modification);
+ }
+
+ public Object getModification(){
+ return getModificationInternal();
+ }
+
+ private PersistentMessages.CompositeModification getModificationInternal(){
+ if(this.modificationReference != null && this.modificationReference.get() != null){
+ return this.modificationReference.get();
+ }
+ try {
+ PersistentMessages.CompositeModification compositeModification = PersistentMessages.CompositeModification.parseFrom(this.byteString);
+ this.modificationReference = new SoftReference<>(compositeModification);
+ return compositeModification;
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Unexpected exception occurred when parsing byteString to CompositeModification", e);
+ }
+
+ return null;
+ }
+
+ public int size(){
+ return byteString.size();
+ }
+
+ private void writeObject(java.io.ObjectOutputStream stream)
+ throws IOException {
+ byteString.writeTo(stream);
+ }
+
+ private void readObject(java.io.ObjectInputStream stream)
+ throws IOException, ClassNotFoundException {
+ byteString = ByteString.readFrom(stream);
+ }
+
+ @VisibleForTesting
+ public void clearModificationReference(){
+ if(this.modificationReference != null) {
+ this.modificationReference.clear();
+ }
+ }
+}
\ No newline at end of file
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
cohortEntry.getCohort().preCommit().get();
Shard.this.persistData(getSender(), transactionID,
- new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+ new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
} catch (InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred while preCommitting transaction {}",
cohortEntry.getTransactionID());
protected void appendRecoveredLogEntry(final Payload data) {
if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+ } else if (data instanceof CompositeModificationByteStringPayload) {
+ currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
LOG.error("Unknown state received {} during recovery", data);
}
if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
- if(modification == null) {
- LOG.error(
- "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor != null ? clientActor.path().toString() : null);
- } else if(clientActor == null) {
- // There's no clientActor to which to send a commit reply so we must be applying
- // replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(
- modification, schemaContext));
- } else {
- // This must be the OK to commit after replication consensus.
- finishCommit(clientActor, identifier);
- }
+ applyModificationToState(clientActor, identifier, modification);
+ } else if(data instanceof CompositeModificationByteStringPayload ){
+ Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+ applyModificationToState(clientActor, identifier, modification);
+
} else {
LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
data, data.getClass().getClassLoader(),
}
+ private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+ if(modification == null) {
+ LOG.error(
+ "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ identifier, clientActor != null ? clientActor.path().toString() : null);
+ } else if(clientActor == null) {
+ // There's no clientActor to which to send a commit reply so we must be applying
+ // replicated state from the leader.
+ commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+ modification, schemaContext));
+ } else {
+ // This must be the OK to commit after replication consensus.
+ finishCommit(clientActor, identifier);
+ }
+ }
+
private void updateJournalStats() {
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class CompositeModificationByteStringPayloadTest {
+
+ private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
+
+ @Test
+ public void testSerialization(){
+ WriteModification writeModification =
+ new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME),
+ TestModel.createTestContext());
+
+ MutableCompositeModification compositeModification =
+ new MutableCompositeModification();
+
+ compositeModification.addModification(writeModification);
+
+ CompositeModificationByteStringPayload compositeModificationByteStringPayload
+ = new CompositeModificationByteStringPayload(compositeModification.toSerializable());
+
+ byte[] bytes = SerializationUtils.serialize(compositeModificationByteStringPayload);
+
+ Object deserialize = SerializationUtils.deserialize(bytes);
+
+ assertTrue(deserialize instanceof CompositeModificationByteStringPayload);
+
+ }
+
+ @Test
+ public void testAppendEntries(){
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+ CompositeModificationByteStringPayload payload = newByteStringPayload(
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+ SCHEMA_CONTEXT));
+
+ payload.clearModificationReference();
+
+ entries.add(new ReplicatedLogImplEntry(0, 1, payload));
+
+
+ assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+ }
+
+
+
+ private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+ MutableCompositeModification compMod = new MutableCompositeModification();
+ for(Modification mod: mods) {
+ compMod.addModification(mod);
+ }
+
+ return new CompositeModificationByteStringPayload(compMod.toSerializable());
+ }
+
+}
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
SCHEMA_CONTEXT))));
- int nListEntries = 11;
+ int nListEntries = 16;
Set<Integer> listEntryKeys = new HashSet<>();
- for(int i = 1; i <= nListEntries; i++) {
+ for(int i = 1; i <= nListEntries-5; i++) {
listEntryKeys.add(Integer.valueOf(i));
YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
newPayload(mod)));
}
+ // Add some of the new CompositeModificationByteStringPayload
+ for(int i = 11; i <= nListEntries; i++) {
+ listEntryKeys.add(Integer.valueOf(i));
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+ Modification mod = new MergeModification(path,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+ SCHEMA_CONTEXT);
+ InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ newByteStringPayload(mod)));
+ }
+
+
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
new ApplyLogEntries(nListEntries));
return new CompositeModificationPayload(compMod.toSerializable());
}
+ private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+ MutableCompositeModification compMod = new MutableCompositeModification();
+ for(Modification mod: mods) {
+ compMod.addModification(mod);
+ }
+
+ return new CompositeModificationByteStringPayload(compMod.toSerializable());
+ }
+
+
private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
final MutableCompositeModification modification) {
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * A single YANG notification.
+ */
+public interface DOMNotification {
+ /**
+ * Return the type of this notification.
+ *
+ * @return Notification type.
+ */
+ @Nonnull SchemaPath getType();
+
+ /**
+ * Return the body of this notification.
+ *
+ * @return Notification body.
+ */
+ @Nonnull ContainerNode getBody();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import java.util.EventListener;
+import javax.annotation.Nonnull;
+
+/**
+ * Interface implemented by listeners interested in {@link DOMNotification}s.
+ */
+public interface DOMNotificationListener extends EventListener {
+ /**
+ * Invoked whenever a {@link DOMNotification} matching the subscription
+ * criteria is received.
+ *
+ * @param notification Received notification
+ */
+ void onNotification(@Nonnull DOMNotification notification);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * A registration of a {@link DOMNotificationListener}. Invoking {@link #close()} will prevent further
+ * delivery of events to the listener.
+ */
+public interface DOMNotificationListenerRegistration extends ListenerRegistration<DOMNotificationListener> {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * A {@link DOMService} which allows its user to send {@link DOMNotification}s. It
+ * provides two styles of initiating the notification delivery, similar to
+ * {@link java.util.concurrent.BlockingQueue}:
+ * - a put-style method which waits until the implementation can accept the notification
+ * for delivery, and
+ * - an offer-style method, which attempts to enqueue the notification, but allows
+ * the caller to specify that it should never wait, or put an upper bound on how
+ * long it is going to wait.
+ */
+public interface DOMNotificationPublishService extends DOMService {
+ /**
+ * Well-known value indicating that the implementation is currently not
+ * able to accept a notification.
+ */
+ ListenableFuture<Object> REJECTED = Futures.immediateFailedFuture(new Throwable("Unacceptable blocking conditions encountered"));
+
+ /**
+ * Publish a notification. The result of this method is a {@link ListenableFuture}
+ * which will complete once the notification has been delivered to all immediate
+ * registrants. The type of the object resulting from the future is not defined
+ * and implementations may use it to convey additional information related to the
+ * publishing process.
+ *
+ * Abstract subclasses can refine the return type as returning a promise of a
+ * more specific type, e.g.:
+ *
+ * public interface DeliveryStatus { int getListenerCount(); }
+ * ListenableFuture<? extends DeliveryStatus> putNotification(DOMNotification notification);
+ *
+ * Once the Future succeeds, the resulting object can be queried for traits using
+ * instanceof, e.g:
+ *
+ * // Can block when (for example) the implemention's ThreadPool queue is full
+ * Object o = service.putNotification(notif).get();
+ * if (o instanceof DeliveryStatus) {
+ * DeliveryStatus ds = (DeliveryStatus)o;
+ * LOG.debug("Notification was received by {} listeners", ds.getListenerCount(););
+ * }
+ * }
+ *
+ * In case an implementation is running out of resources, it can block the calling
+ * thread until enough resources become available to accept the notification for
+ * processing, or it is interrupted.
+ *
+ * Caution: completion here means that the implementation has completed processing
+ * of the notification. This does not mean that all existing registrants
+ * have seen the notification. Most importantly, the delivery process at
+ * other cluster nodes may have not begun yet.
+ *
+ * @param notification Notification to be published.
+ * @return A listenable future which will report completion when the service
+ * has finished propagating the notification to its immediate registrants.
+ * @throws InterruptedException if interrupted while waiting
+ * @throws NullPointerException if notification is null.
+ */
+ @Nonnull ListenableFuture<? extends Object> putNotification(@Nonnull DOMNotification notification) throws InterruptedException;
+
+ /**
+ * Attempt to publish a notification. The result of this method is a {@link ListenableFuture}
+ * which will complete once the notification has been delivered to all immediate
+ * registrants. The type of the object resulting from the future is not defined
+ * and implementations may use it to convey additional information related to the
+ * publishing process. Unlike {@link #putNotification(DOMNotification)}, this method
+ * is guaranteed not to block if the underlying implementation encounters contention.
+ *
+ * @param notification Notification to be published.
+ * @return A listenable future which will report completion when the service
+ * has finished propagating the notification to its immediate registrants,
+ * or {@value #REJECTED} if resource constraints prevent
+ * the implementation from accepting the notification for delivery.
+ * @throws NullPointerException if notification is null.
+ */
+ @Nonnull ListenableFuture<? extends Object> offerNotification(@Nonnull DOMNotification notification);
+
+ /**
+ * Attempt to publish a notification. The result of this method is a {@link ListenableFuture}
+ * which will complete once the notification has been delivered to all immediate
+ * registrants. The type of the object resulting from the future is not defined
+ * and implementations may use it to convey additional information related to the
+ * publishing process. Unlike {@link #putNotification(DOMNotification)}, this method
+ * is guaranteed to block more than the specified timeout.
+ *
+ * @param notification Notification to be published.
+ * @param timeout how long to wait before giving up, in units of unit
+ * @param unit a TimeUnit determining how to interpret the timeout parameter
+ * @return A listenable future which will report completion when the service
+ * has finished propagating the notification to its immediate registrants,
+ * or {@value #REJECTED} if resource constraints prevent
+ * the implementation from accepting the notification for delivery.
+ * @throws InterruptedException if interrupted while waiting
+ * @throws NullPointerException if notification or unit is null.
+ * @throws IllegalArgumentException if timeout is negative.
+ */
+ @Nonnull ListenableFuture<? extends Object> offerNotification(@Nonnull DOMNotification notification,
+ @Nonnegative long timeout, @Nonnull TimeUnit unit) throws InterruptedException;
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * A {@link DOMService} which allows its users to subscribe to receive
+ * {@link DOMNotification}s.
+ */
+public interface DOMNotificationService {
+ /**
+ * Register a {@link DOMNotificationListener} to receive a set of notifications. As with
+ * other ListenerRegistration-based interfaces, registering an instance multiple times
+ * results in notifications being delivered for each registration.
+ *
+ * @param listener Notification instance to register
+ * @param types Notification types which should be delivered to the listener. Duplicate
+ * entries are processed only once, null entries are ignored.
+ * @return Registration handle. Invoking {@link DOMNotificationListenerRegistration#close()}
+ * will stop the delivery of notifications to the listener
+ * @throws IllegalArgumentException if types is empty or contains an invalid element, such as
+ * null or a SchemaPath which does not represent a valid {@link DOMNotification} type.
+ * @throws NullPointerException if either of the arguments is null
+ */
+ DOMNotificationListenerRegistration registerNotificationListener(@Nonnull DOMNotificationListener listener, @Nonnull Collection<SchemaPath> types);
+
+ /**
+ * Register a {@link DOMNotificationListener} to receive a set of notifications. As with
+ * other ListenerRegistration-based interfaces, registering an instance multiple times
+ * results in notifications being delivered for each registration.
+ *
+ * @param listener Notification instance to register
+ * @param types Notification types which should be delivered to the listener. Duplicate
+ * entries are processed only once, null entries are ignored.
+ * @return Registration handle. Invoking {@link DOMNotificationListenerRegistration#close()}
+ * will stop the delivery of notifications to the listener
+ * @throws IllegalArgumentException if types is empty or contains an invalid element, such as
+ * null or a SchemaPath which does not represent a valid {@link DOMNotification} type.
+ * @throws NullPointerException if listener is null
+ */
+ // FIXME: Java 8: provide a default implementation of this method.
+ DOMNotificationListenerRegistration registerNotificationListener(@Nonnull DOMNotificationListener listener, SchemaPath... types);
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html.
*/
-
package org.opendaylight.controller.md.sal.dom.api;
+/**
+ * Marker interface for services which can be obtained from a {@link DOMMountPoint}
+ * instance. No further semantics are implied.
+ */
public interface DOMService {
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+
+/**
+ * Utility base class for {@link DOMNotificationListenerRegistration}
+ * implementations.
+ */
+public abstract class AbstractDOMNotificationListenerRegistration extends AbstractListenerRegistration<DOMNotificationListener> implements DOMNotificationListenerRegistration {
+ /**
+ * Default constructor. Subclasses need to invoke it from their
+ * constructor(s).
+ *
+ * @param listener {@link DOMNotificationListener} instance which is
+ * being held by this registration. May not be null.
+ */
+ protected AbstractDOMNotificationListenerRegistration(final @Nonnull DOMNotificationListener listener) {
+ super(listener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+
+/**
+ * Utility implementations of {@link DOMNotificationPublishService} which forwards
+ * all requests to a delegate instance.
+ */
+public abstract class ForwardingDOMNotificationPublishService extends ForwardingObject implements DOMNotificationPublishService {
+ @Override
+ protected abstract DOMNotificationPublishService delegate();
+
+ @Override
+ public ListenableFuture<? extends Object> putNotification(final DOMNotification notification) throws InterruptedException {
+ return delegate().putNotification(notification);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
+ return delegate().offerNotification(notification);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
+ final TimeUnit unit) throws InterruptedException {
+ return delegate().offerNotification(notification, timeout, unit);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import com.google.common.collect.ForwardingObject;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListenerRegistration;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Utility implementation of a {@link DOMNotificationService} which forwards all requests
+ * to a delegate instance.
+ */
+public abstract class ForwardingDOMNotificationService extends ForwardingObject implements DOMNotificationService {
+ @Override
+ protected abstract DOMNotificationService delegate();
+
+ @Override
+ public DOMNotificationListenerRegistration registerNotificationListener(final DOMNotificationListener listener,
+ final Collection<SchemaPath> types) {
+ return delegate().registerNotificationListener(listener, types);
+ }
+
+ @Override
+ public DOMNotificationListenerRegistration registerNotificationListener(final DOMNotificationListener listener,
+ final SchemaPath... types) {
+ return delegate().registerNotificationListener(listener, types);
+ }
+}
}
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+
factory.setNamespaceAware(true);
factory.setCoalescing(true);
factory.setIgnoringElementContentWhitespace(true);
static {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ try {
+ factory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
+ factory.setFeature("http://xml.org/sax/features/external-general-entities", false);
+ factory.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
+ factory.setXIncludeAware(false);
+ factory.setExpandEntityReferences(false);
+ } catch (ParserConfigurationException e) {
+ throw new ExceptionInInitializerError(e);
+ }
factory.setNamespaceAware(true);
factory.setCoalescing(true);
factory.setIgnoringElementContentWhitespace(true);
public class XmlToCompositeNodeReader {
private final static XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
+ static {
+ xmlInputFactory.setProperty("javax.xml.stream.isSupportingExternalEntities", false);
+ }
private XMLEventReader eventReader;
public Node<?> read(InputStream entityStream) throws XMLStreamException,
private List<InputStream> getYangs() throws FileNotFoundException {
List<String> paths = Arrays.asList("/META-INF/yang/config.yang", "/META-INF/yang/rpc-context.yang",
"/META-INF/yang/config-test.yang", "/META-INF/yang/config-test-impl.yang", "/META-INF/yang/test-types.yang",
- "/META-INF/yang/ietf-inet-types.yang");
+ "/META-INF/yang/test-groups.yang", "/META-INF/yang/ietf-inet-types.yang");
final Collection<InputStream> yangDependencies = new ArrayList<>();
for (String path : paths) {
final InputStream is = Preconditions
package org.opendaylight.controller.netconf.client;
import io.netty.channel.Channel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
import java.util.Collection;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.nettyutil.AbstractNetconfSession;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
import org.opendaylight.controller.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
import org.slf4j.Logger;
}
@Override
- protected void addExiHandlers(final NetconfEXICodec exiCodec) {
+ protected void addExiHandlers(final ByteToMessageDecoder decoder, final MessageToByteEncoder<NetconfMessage> encoder) {
// TODO used only in negotiator, client supports only auto start-exi
- replaceMessageDecoder(new NetconfEXIToMessageDecoder(exiCodec));
- replaceMessageEncoder(new NetconfMessageToEXIEncoder(exiCodec));
+ replaceMessageDecoder(decoder);
+ replaceMessageEncoder(encoder);
}
@Override
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
-
import com.google.common.collect.Lists;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
import org.openexi.proc.common.EXIOptions;
public class NetconfClientSessionTest {
Mockito.doReturn("").when(channelHandler).toString();
NetconfClientSession session = new NetconfClientSession(sessionListener, channel, sessId, caps);
- session.addExiHandlers(codec);
+ final NetconfMessageToEXIEncoder exiEncoder = new NetconfMessageToEXIEncoder(codec);
+ final NetconfEXIToMessageDecoder exiDecoder = new NetconfEXIToMessageDecoder(codec);
+ session.addExiHandlers(exiDecoder, exiEncoder);
session.stopExiCommunication();
assertEquals(caps, session.getServerCapabilities());
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.monitoring.NetconfManagementSession;
import org.opendaylight.controller.netconf.nettyutil.AbstractNetconfSession;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToXMLEncoder;
import org.opendaylight.controller.netconf.nettyutil.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
private Date loginTime;
private long inRpcSuccess, inRpcFail, outRpcError;
- public NetconfServerSession(NetconfServerSessionListener sessionListener, Channel channel, long sessionId,
- NetconfHelloMessageAdditionalHeader header) {
+ public NetconfServerSession(final NetconfServerSessionListener sessionListener, final Channel channel, final long sessionId,
+ final NetconfHelloMessageAdditionalHeader header) {
super(sessionListener, channel, sessionId);
this.header = header;
LOG.debug("Session {} created", toString());
return builder.build();
}
- private Class<? extends Transport> getTransportForString(String transport) {
+ private Class<? extends Transport> getTransportForString(final String transport) {
switch(transport) {
case "ssh" :
return NetconfSsh.class;
}
}
- private String formatDateTime(Date loginTime) {
+ private String formatDateTime(final Date loginTime) {
SimpleDateFormat dateFormat = new SimpleDateFormat(ISO_DATE_FORMAT);
return dateFormat.format(loginTime);
}
}
@Override
- protected void addExiHandlers(NetconfEXICodec exiCodec) {
- replaceMessageDecoder(new NetconfEXIToMessageDecoder(exiCodec));
- replaceMessageEncoderAfterNextMessage(new NetconfMessageToEXIEncoder(exiCodec));
+ protected void addExiHandlers(final ByteToMessageDecoder decoder, final MessageToByteEncoder<NetconfMessage> encoder) {
+ replaceMessageDecoder(decoder);
+ replaceMessageEncoderAfterNextMessage(encoder);
}
@Override
"/META-INF/yang/config-test.yang",
"/META-INF/yang/config-test-impl.yang",
"/META-INF/yang/test-types.yang",
+ "/META-INF/yang/test-groups.yang",
"/META-INF/yang/ietf-inet-types.yang");
final Collection<InputStream> yangDependencies = new ArrayList<>();
}
notificationVerifier.assertNotificationCount(2);
- notificationVerifier.assertNotificationContent(0, 0, 0, 8);
- notificationVerifier.assertNotificationContent(1, 4, 3, 8);
+ notificationVerifier.assertNotificationContent(0, 0, 0, 9);
+ notificationVerifier.assertNotificationContent(1, 4, 3, 9);
mockedAggregator.assertSnapshotCount(2);
// Capabilities are stripped for persister
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
import org.opendaylight.controller.netconf.api.NetconfExiSession;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.api.NetconfSessionListener;
import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXIToMessageDecoder;
+import org.opendaylight.controller.netconf.nettyutil.handler.NetconfMessageToEXIEncoder;
import org.opendaylight.controller.netconf.nettyutil.handler.exi.EXIParameters;
import org.opendaylight.controller.netconf.util.xml.XmlElement;
import org.opendaylight.protocol.framework.AbstractProtocolSession;
LOG.warn("Unable to parse EXI parameters from {} on session {}", startExiMessage, this, e);
throw new IllegalArgumentException("Cannot parse options", e);
}
+
final NetconfEXICodec exiCodec = new NetconfEXICodec(exiParams.getOptions());
- addExiHandlers(exiCodec);
+ final NetconfMessageToEXIEncoder exiEncoder = new NetconfMessageToEXIEncoder(exiCodec);
+ final NetconfEXIToMessageDecoder exiDecoder = new NetconfEXIToMessageDecoder(exiCodec);
+ addExiHandlers(exiDecoder, exiEncoder);
+
LOG.debug("Session {} EXI handlers added to pipeline", this);
}
- protected abstract void addExiHandlers(NetconfEXICodec exiCodec);
+ /**
+ * Add a set encoder/decoder tuple into the channel pipeline as appropriate.
+ *
+ * @param decoder EXI decoder
+ * @param encoder EXI encoder
+ */
+ protected abstract void addExiHandlers(ByteToMessageDecoder decoder, MessageToByteEncoder<NetconfMessage> encoder);
public final boolean isUp() {
return up;
package org.opendaylight.controller.netconf.nettyutil.handler;
import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.openexi.proc.HeaderOptionsOutputType;
import org.openexi.proc.common.EXIOptions;
import org.openexi.proc.common.EXIOptionsException;
import org.openexi.proc.grammars.GrammarCache;
import org.openexi.sax.EXIReader;
import org.openexi.sax.Transmogrifier;
+import org.openexi.sax.TransmogrifierException;
+import org.xml.sax.EntityResolver;
+import org.xml.sax.InputSource;
public final class NetconfEXICodec {
/**
* of the stream. This is really useful, so let's output it now.
*/
private static final boolean OUTPUT_EXI_COOKIE = true;
+ /**
+ * OpenEXI does not allow us to directly prevent resolution of external entities. In order
+ * to prevent XXE attacks, we reuse a single no-op entity resolver.
+ */
+ private static final EntityResolver ENTITY_RESOLVER = new EntityResolver() {
+ @Override
+ public InputSource resolveEntity(final String publicId, final String systemId) {
+ return new InputSource();
+ }
+ };
+
+ /**
+ * Since we have a limited number of options we can have, instantiating a weak cache
+ * will allow us to reuse instances where possible.
+ */
+ private static final LoadingCache<Short, GrammarCache> GRAMMAR_CACHES = CacheBuilder.newBuilder().weakValues().build(new CacheLoader<Short, GrammarCache>() {
+ @Override
+ public GrammarCache load(final Short key) {
+ return new GrammarCache(key);
+ }
+ });
+
+ /**
+ * Grammar cache acts as a template and is duplicated by the Transmogrifier and the Reader
+ * before use. It is safe to reuse a single instance.
+ */
+ private final GrammarCache exiGrammarCache;
private final EXIOptions exiOptions;
public NetconfEXICodec(final EXIOptions exiOptions) {
this.exiOptions = Preconditions.checkNotNull(exiOptions);
+ this.exiGrammarCache = createGrammarCache(exiOptions);
}
- private GrammarCache getGrammarCache() {
+ private static GrammarCache createGrammarCache(final EXIOptions exiOptions) {
short go = GrammarOptions.DEFAULT_OPTIONS;
if (exiOptions.getPreserveComments()) {
go = GrammarOptions.addCM(go);
go = GrammarOptions.addPI(go);
}
- return new GrammarCache(null, go);
+ return GRAMMAR_CACHES.getUnchecked(go);
}
EXIReader getReader() throws EXIOptionsException {
final EXIReader r = new EXIReader();
r.setPreserveLexicalValues(exiOptions.getPreserveLexicalValues());
- r.setGrammarCache(getGrammarCache());
+ r.setGrammarCache(exiGrammarCache);
+ r.setEntityResolver(ENTITY_RESOLVER);
return r;
}
- Transmogrifier getTransmogrifier() throws EXIOptionsException {
+ Transmogrifier getTransmogrifier() throws EXIOptionsException, TransmogrifierException {
final Transmogrifier transmogrifier = new Transmogrifier();
transmogrifier.setAlignmentType(exiOptions.getAlignmentType());
transmogrifier.setBlockSize(exiOptions.getBlockSize());
- transmogrifier.setGrammarCache(getGrammarCache());
+ transmogrifier.setGrammarCache(exiGrammarCache);
transmogrifier.setOutputCookie(OUTPUT_EXI_COOKIE);
transmogrifier.setOutputOptions(HeaderOptionsOutputType.all);
+ transmogrifier.setResolveExternalGeneralEntities(false);
return transmogrifier;
}
}
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.openexi.proc.common.EXIOptionsException;
import org.openexi.sax.Transmogrifier;
+import org.openexi.sax.TransmogrifierException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- protected void encode(final ChannelHandlerContext ctx, final NetconfMessage msg, final ByteBuf out) throws EXIOptionsException, IOException, TransformerException {
+ protected void encode(final ChannelHandlerContext ctx, final NetconfMessage msg, final ByteBuf out) throws EXIOptionsException, IOException, TransformerException, TransmogrifierException {
LOG.trace("Sent to encode : {}", msg);
try (final OutputStream os = new ByteBufOutputStream(out)) {
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.verifyZeroInteractions;
-
import com.google.common.base.Optional;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.codec.MessageToByteEncoder;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.api.NetconfSessionListener;
import org.opendaylight.controller.netconf.api.NetconfTerminationReason;
-import org.opendaylight.controller.netconf.nettyutil.handler.NetconfEXICodec;
import org.opendaylight.controller.netconf.nettyutil.handler.exi.NetconfStartExiMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
testingNetconfSession = spy(testingNetconfSession);
testingNetconfSession.startExiCommunication(NetconfStartExiMessage.create(new EXIOptions(), "4"));
- verify(testingNetconfSession).addExiHandlers(any(NetconfEXICodec.class));
+ verify(testingNetconfSession).addExiHandlers(any(ByteToMessageDecoder.class), any(MessageToByteEncoder.class));
}
@Test
}
@Override
- protected void addExiHandlers(final NetconfEXICodec exiCodec) {}
+ protected void addExiHandlers(final ByteToMessageDecoder decoder, final MessageToByteEncoder<NetconfMessage> encoder) {}
@Override
public void stopExiCommunication() {}