/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.cluster.datastore.node.utils.stream;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* NormalizedNodeOutputStreamWriter will be used by distributed datastore to send normalized node in
private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
+ static final byte SIGNATURE_MARKER = (byte) 0xab;
+ static final short CURRENT_VERSION = (short) 1;
+
static final byte IS_CODE_VALUE = 1;
static final byte IS_STRING_VALUE = 2;
static final byte IS_NULL_VALUE = 3;
private final Map<String, Integer> stringCodeMap = new HashMap<>();
+ private NormalizedNodeWriter normalizedNodeWriter;
+
+ private boolean wroteSignatureMarker;
+
public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
Preconditions.checkNotNull(stream);
output = new DataOutputStream(stream);
}
- public NormalizedNodeOutputStreamWriter(DataOutput output) throws IOException {
+ public NormalizedNodeOutputStreamWriter(DataOutput output) {
this.output = Preconditions.checkNotNull(output);
}
+ private NormalizedNodeWriter normalizedNodeWriter() {
+ if(normalizedNodeWriter == null) {
+ normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(this);
+ }
+
+ return normalizedNodeWriter;
+ }
+
+ public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+ writeSignatureMarkerAndVersionIfNeeded();
+ normalizedNodeWriter().write(node);
+ }
+
+ private void writeSignatureMarkerAndVersionIfNeeded() throws IOException {
+ if(!wroteSignatureMarker) {
+ output.writeByte(SIGNATURE_MARKER);
+ output.writeShort(CURRENT_VERSION);
+ wroteSignatureMarker = true;
+ }
+ }
+
@Override
public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
Preconditions.checkNotNull(name, "Node identifier should not be null");
@Override
public void close() throws IOException {
+ flush();
}
@Override
public void flush() throws IOException {
+ if (output instanceof OutputStream) {
+ ((OutputStream)output).flush();
+ }
}
private void startNode(final QName qName, byte nodeType) throws IOException {
Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+
+ writeSignatureMarkerAndVersionIfNeeded();
+
// First write the type of node
output.writeByte(nodeType);
// Write Start Tag
}
public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
- Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
- int size = Iterables.size(pathArguments);
- output.writeInt(size);
+ writeSignatureMarkerAndVersionIfNeeded();
+ writeYangInstanceIdentifierInternal(identifier);
+ }
+
+ private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException {
+ Collection<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
+ output.writeInt(pathArguments.size());
for(YangInstanceIdentifier.PathArgument pathArgument : pathArguments) {
writePathArgument(pathArgument);
}
}
- private void writePathArgument(YangInstanceIdentifier.PathArgument pathArgument) throws IOException {
+ public void writePathArgument(YangInstanceIdentifier.PathArgument pathArgument) throws IOException {
byte type = PathArgumentTypes.getSerializablePathArgumentType(pathArgument);
}
}
- @SuppressWarnings("rawtypes")
private void writeObject(Object value) throws IOException {
byte type = ValueTypes.getSerializableType(value);
case ValueTypes.BITS_TYPE:
writeObjSet((Set<?>) value);
break;
+ case ValueTypes.BINARY_TYPE:
+ byte[] bytes = (byte[]) value;
+ output.writeInt(bytes.length);
+ output.write(bytes);
+ break;
case ValueTypes.YANG_IDENTIFIER_TYPE:
- writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+ writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value);
+ break;
+ case ValueTypes.NULL_TYPE :
+ break;
+ case ValueTypes.STRING_BYTES_TYPE:
+ final byte[] valueBytes = value.toString().getBytes(StandardCharsets.UTF_8);
+ output.writeInt(valueBytes.length);
+ output.write(valueBytes);
break;
default:
output.writeUTF(value.toString());