LOG.trace("Starting a new augmentation node");
output.writeByte(NodeTypes.AUGMENTATION_NODE);
- writeQNameSet(identifier.getPossibleChildNames());
+ writeAugmentationIdentifier(identifier);
}
@Override
case PathArgumentTypes.AUGMENTATION_IDENTIFIER :
- AugmentationIdentifier augmentationIdentifier = (AugmentationIdentifier) pathArgument;
-
// No Qname in augmentation identifier
- writeQNameSet(augmentationIdentifier.getPossibleChildNames());
+ writeAugmentationIdentifier((AugmentationIdentifier) pathArgument);
break;
default :
throw new IllegalStateException("Unknown node identifier type is found : "
}
}
- private void writeQNameSet(final Set<QName> children) throws IOException {
+ void writeAugmentationIdentifier(final AugmentationIdentifier aid) throws IOException {
+ final Set<QName> qnames = aid.getPossibleChildNames();
// Write each child's qname separately, if list is empty send count as 0
- if (children != null && !children.isEmpty()) {
- output.writeInt(children.size());
- for (QName qname : children) {
+ if (!qnames.isEmpty()) {
+ output.writeInt(qnames.size());
+ for (QName qname : qnames) {
writeQName(qname);
}
} else {
public final SchemaPath readSchemaPath() throws IOException {
return delegate().readSchemaPath();
}
+
+ @Override
+ public final NormalizedNodeStreamVersion getVersion() throws IOException {
+ return delegate().getVersion();
+ }
}
* nodes. This process goes in recursive manner, where each NodeTypes object signifies the start of the object, except
* END_NODE. If a node can have children, then that node's end is calculated based on appearance of END_NODE.
*/
-public class NormalizedNodeInputStreamReader extends ForwardingDataInput implements NormalizedNodeDataInput {
+class LithiumNormalizedNodeInputStreamReader extends ForwardingDataInput implements NormalizedNodeDataInput {
- private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeInputStreamReader.class);
+ private static final Logger LOG = LoggerFactory.getLogger(LithiumNormalizedNodeInputStreamReader.class);
private final @NonNull DataInput input;
@SuppressWarnings("rawtypes")
private NormalizedNodeBuilder<NodeWithValue, Object, LeafSetEntryNode<Object>> leafSetEntryBuilder;
- NormalizedNodeInputStreamReader(final DataInput input) {
+ LithiumNormalizedNodeInputStreamReader(final DataInput input) {
this.input = requireNonNull(input);
}
return input;
}
+ @Override
+ public NormalizedNodeStreamVersion getVersion() throws IOException {
+ return NormalizedNodeStreamVersion.LITHIUM;
+ }
+
@Override
public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
return readNormalizedNodeInternal();
}
}
- private QName readQName() throws IOException {
+ QName readQName() throws IOException {
// Read in the same sequence of writing
String localName = readCodedString();
String namespace = readCodedString();
return children;
}
- private AugmentationIdentifier readAugmentationIdentifier() throws IOException {
- // FIXME: we should have a cache for these, too
+ AugmentationIdentifier readAugmentationIdentifier() throws IOException {
return new AugmentationIdentifier(readQNameSet());
}
--- /dev/null
+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.Revision;
+
+/**
+ * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
+ * a stream.
+ * A stream writer wrapper around this class will write node objects to stream in recursive manner.
+ * for example - If you have a ContainerNode which has a two LeafNode as children, then
+ * you will first call
+ * {@link #startContainerNode(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier, int)},
+ * then will call
+ * {@link #leafNode(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier, Object)} twice
+ * and then, {@link #endNode()} to end container node.
+ *
+ * <p>Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
+ * while reading.
+ */
+class LithiumNormalizedNodeOutputStreamWriter extends AbstractNormalizedNodeDataOutput {
+ private final Map<String, Integer> stringCodeMap = new HashMap<>();
+
+ LithiumNormalizedNodeOutputStreamWriter(final DataOutput output) {
+ super(output);
+ }
+
+ @Override
+ protected short streamVersion() {
+ return TokenTypes.LITHIUM_VERSION;
+ }
+
+ @Override
+ protected void writeQName(final QName qname) throws IOException {
+ writeString(qname.getLocalName());
+ writeString(qname.getNamespace().toString());
+ writeString(qname.getRevision().map(Revision::toString).orElse(null));
+ }
+
+ @Override
+ protected final void writeString(final String string) throws IOException {
+ if (string != null) {
+ final Integer value = stringCodeMap.get(string);
+ if (value == null) {
+ stringCodeMap.put(string, stringCodeMap.size());
+ writeByte(TokenTypes.IS_STRING_VALUE);
+ writeUTF(string);
+ } else {
+ writeByte(TokenTypes.IS_CODE_VALUE);
+ writeInt(value);
+ }
+ } else {
+ writeByte(TokenTypes.IS_NULL_VALUE);
+ }
+ }
+}
PathArgument readPathArgument() throws IOException;
SchemaPath readSchemaPath() throws IOException;
+
+ /**
+ * Return the version of the underlying input stream.
+ *
+ * @return Stream version
+ * @throws IOException if the version cannot be ascertained
+ */
+ NormalizedNodeStreamVersion getVersion() throws IOException;
}
}
/**
- * Creates a new {@link NormalizedNodeDataOutput} instance that writes to the given output.
+ * Creates a new {@link NormalizedNodeDataOutput} instance that writes to the given output and latest current
+ * stream version.
*
* @param output the DataOutput to write to
* @return a new {@link NormalizedNodeDataOutput} instance
public static NormalizedNodeDataOutput newDataOutput(final @NonNull DataOutput output) {
return new NormalizedNodeOutputStreamWriter(output);
}
+
+ /**
+ * Creates a new {@link NormalizedNodeDataOutput} instance that writes to the given output.
+ *
+ * @param output the DataOutput to write to
+ * @param version Streaming version to use
+ * @return a new {@link NormalizedNodeDataOutput} instance
+ */
+ public static NormalizedNodeDataOutput newDataOutput(final @NonNull DataOutput output,
+ final @NonNull NormalizedNodeStreamVersion version) {
+ switch (version) {
+ case LITHIUM:
+ return new LithiumNormalizedNodeOutputStreamWriter(output);
+ case SODIUM:
+ return new SodiumNormalizedNodeOutputStreamWriter(output);
+ default:
+ throw new IllegalStateException("Unhandled version " + version);
+ }
+ }
+
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.datastore.node.utils.stream;
import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.Revision;
/**
* NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
* <p>Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
* while reading.
*/
-final class NormalizedNodeOutputStreamWriter extends AbstractNormalizedNodeDataOutput {
- private final Map<String, Integer> stringCodeMap = new HashMap<>();
-
+// FIXME: CONTROLLER-1888: switch this to Sodium once we have a corresponding datastore version
+class NormalizedNodeOutputStreamWriter extends LithiumNormalizedNodeOutputStreamWriter {
NormalizedNodeOutputStreamWriter(final DataOutput output) {
super(output);
}
-
- @Override
- protected short streamVersion() {
- return TokenTypes.LITHIUM_VERSION;
- }
-
- @Override
- protected void writeQName(final QName qname) throws IOException {
- writeString(qname.getLocalName());
- writeString(qname.getNamespace().toString());
- writeString(qname.getRevision().map(Revision::toString).orElse(null));
- }
-
- @Override
- protected void writeString(final String string) throws IOException {
- if (string != null) {
- final Integer value = stringCodeMap.get(string);
- if (value == null) {
- stringCodeMap.put(string, stringCodeMap.size());
- writeByte(TokenTypes.IS_STRING_VALUE);
- writeUTF(string);
- } else {
- writeByte(TokenTypes.IS_CODE_VALUE);
- writeInt(value);
- }
- } else {
- writeByte(TokenTypes.IS_NULL_VALUE);
- }
- }
}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import com.google.common.annotations.Beta;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * Enumeration of all stream versions this implementation supports on both input and output.
+ */
+@Beta
+@NonNullByDefault
+public enum NormalizedNodeStreamVersion {
+ LITHIUM,
+ SODIUM;
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
+
+final class SodiumNormalizedNodeInputStreamReader extends LithiumNormalizedNodeInputStreamReader {
+ private final List<AugmentationIdentifier> codedAugments = new ArrayList<>();
+ private final List<QName> codedQNames = new ArrayList<>();
+
+ SodiumNormalizedNodeInputStreamReader(final DataInput input) {
+ super(input);
+ }
+
+ @Override
+ public NormalizedNodeStreamVersion getVersion() throws IOException {
+ return NormalizedNodeStreamVersion.SODIUM;
+ }
+
+ @Override
+ QName readQName() throws IOException {
+ final byte valueType = readByte();
+ switch (valueType) {
+ case TokenTypes.IS_QNAME_CODE:
+ return codedQName(readInt());
+ case TokenTypes.IS_QNAME_VALUE:
+ return rawQName();
+ default:
+ throw new IOException("Unhandled QName value type " + valueType);
+ }
+ }
+
+ @Override
+ AugmentationIdentifier readAugmentationIdentifier() throws IOException {
+ final byte valueType = readByte();
+ switch (valueType) {
+ case TokenTypes.IS_AUGMENT_CODE:
+ return codecAugmentId(readInt());
+ case TokenTypes.IS_AUGMENT_VALUE:
+ return rawAugmentId();
+ default:
+ throw new IOException("Unhandled QName value type " + valueType);
+ }
+ }
+
+ private QName codedQName(final int code) throws IOException {
+ try {
+ return codedQNames.get(code);
+ } catch (IndexOutOfBoundsException e) {
+ throw new IOException("QName code " + code + " was not found", e);
+ }
+ }
+
+ private QName rawQName() throws IOException {
+ final QName qname = super.readQName();
+ codedQNames.add(qname);
+ return qname;
+ }
+
+ private AugmentationIdentifier codecAugmentId(final int code) throws IOException {
+ try {
+ return codedAugments.get(code);
+ } catch (IndexOutOfBoundsException e) {
+ throw new IOException("QName set code " + code + " was not found", e);
+ }
+ }
+
+ private AugmentationIdentifier rawAugmentId() throws IOException {
+ final AugmentationIdentifier aid = super.readAugmentationIdentifier();
+ codedAugments.add(aid);
+ return aid;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.AugmentationIdentifier;
+
+/**
+ * NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
+ * a stream.
+ * A stream writer wrapper around this class will write node objects to stream in recursive manner.
+ * for example - If you have a ContainerNode which has a two LeafNode as children, then
+ * you will first call
+ * {@link #startContainerNode(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier, int)},
+ * then will call
+ * {@link #leafNode(org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier, Object)} twice
+ * and then, {@link #endNode()} to end container node.
+ *
+ * <p>Based on the each node, the node type is also written to the stream, that helps in reconstructing the object,
+ * while reading.
+ */
+class SodiumNormalizedNodeOutputStreamWriter extends LithiumNormalizedNodeOutputStreamWriter {
+ private final Map<AugmentationIdentifier, Integer> aidCodeMap = new HashMap<>();
+ private final Map<QName, Integer> qnameCodeMap = new HashMap<>();
+
+ SodiumNormalizedNodeOutputStreamWriter(final DataOutput output) {
+ super(output);
+ }
+
+ @Override
+ protected short streamVersion() {
+ return TokenTypes.SODIUM_VERSION;
+ }
+
+ @Override
+ protected final void writeQName(final QName qname) throws IOException {
+ final Integer value = qnameCodeMap.get(qname);
+ if (value == null) {
+ // Fresh QName, remember it and emit as three strings
+ qnameCodeMap.put(qname, qnameCodeMap.size());
+ writeByte(TokenTypes.IS_QNAME_VALUE);
+ super.writeQName(qname);
+ } else {
+ // We have already seen this QName: write its code
+ writeByte(TokenTypes.IS_QNAME_CODE);
+ writeInt(value);
+ }
+ }
+
+ @Override
+ void writeAugmentationIdentifier(final AugmentationIdentifier aid) throws IOException {
+ final Integer value = aidCodeMap.get(aid);
+ if (value == null) {
+ // Fresh QName, remember it and emit as three strings
+ aidCodeMap.put(aid, aidCodeMap.size());
+ writeByte(TokenTypes.IS_AUGMENT_VALUE);
+ super.writeAugmentationIdentifier(aid);
+ } else {
+ // We have already seen this QName set: write its code
+ writeByte(TokenTypes.IS_AUGMENT_CODE);
+ writeInt(value);
+ }
+ }
+}
* Original stream version. Uses a per-stream dictionary for strings. QNames are serialized as three strings.
*/
static final short LITHIUM_VERSION = 1;
+ /**
+ * Revised stream version. Unlike {@link #LITHIUM_VERSION}, QNames are using a per-stream dictionary, too.
+ */
+ static final short SODIUM_VERSION = 2;
// Tokens supported in LITHIUM_VERSION
static final byte IS_CODE_VALUE = 1;
static final byte IS_STRING_VALUE = 2;
static final byte IS_NULL_VALUE = 3;
+
+ // Tokens supported in SODIUM_VERSION
+ static final byte IS_QNAME_CODE = 4;
+ static final byte IS_QNAME_VALUE = 5;
+ static final byte IS_AUGMENT_CODE = 6;
+ static final byte IS_AUGMENT_VALUE = 7;
}
final NormalizedNodeDataInput ret;
switch (version) {
case TokenTypes.LITHIUM_VERSION:
- ret = new NormalizedNodeInputStreamReader(input);
+ ret = new LithiumNormalizedNodeInputStreamReader(input);
+ break;
+ case TokenTypes.SODIUM_VERSION:
+ ret = new SodiumNormalizedNodeInputStreamReader(input);
break;
default:
throw defunct("Unhandled stream version %s", version);