* This will stop the timeout clock
*/
void markFollowerInActive();
+
+
+ /**
+ * This will return the active time of follower, since it was last reset
+ * @return time in milliseconds
+ */
+ long timeSinceLastActivity();
+
}
stopwatch.stop();
}
}
+
+ @Override
+ public long timeSinceLastActivity() {
+ return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ }
}
purgeInMemoryLog();
}
+ //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+ sendUpdatesToFollower(followerId, followerLogInformation, false);
return this;
}
followerLogInformation.markFollowerActive();
if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+ boolean wasLastChunk = false;
if (reply.isSuccess()) {
if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
//this was the last chunk reply
// we can remove snapshot from the memory
setSnapshot(Optional.<ByteString>absent());
}
+ wasLastChunk = true;
} else {
followerToSnapshot.markSendStatus(true);
followerToSnapshot.markSendStatus(false);
}
+
+ if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if(followerActor != null) {
+ sendSnapshotChunk(followerActor, followerId);
+ }
+ }
+
} else {
LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
context.getId(), reply.getChunkIndex(), followerId,
private void sendAppendEntries() {
// Send an AppendEntries to all followers
-
+ long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ final FollowerLogInformation followerLogInformation = e.getValue();
+ // This checks helps not to send a repeat message to the follower
+ if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+ sendUpdatesToFollower(followerId, followerLogInformation, true);
+ }
+ }
+ }
- if (followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex();
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
+ /**
+ *
+ * This method checks if any update needs to be sent to the given follower. This includes append log entries,
+ * sending next snapshot chunk, and initiating a snapshot.
+ * @return true if any update is sent, false otherwise
+ */
- FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
- if (followerToSnapshot != null) {
- // if install snapshot is in process , then sent next chunk if possible
- if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
- sendSnapshotChunk(followerActor, followerId);
- } else {
- // we send a heartbeat even if we have not received a reply for the last chunk
- sendAppendEntriesToFollower(followerActor, followerNextIndex,
- Collections.<ReplicatedLogEntry>emptyList(), followerId);
- }
+ private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
+ boolean sendHeartbeat) {
+
+ ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ if (followerActor != null) {
+ long followerNextIndex = followerLogInformation.getNextIndex();
+ boolean isFollowerActive = followerLogInformation.isFollowerActive();
+
+ if (mapFollowerToSnapshot.get(followerId) != null) {
+ // if install snapshot is in process , then sent next chunk if possible
+ if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+ sendSnapshotChunk(followerActor, followerId);
+ } else if(sendHeartbeat) {
+ // we send a heartbeat even if we have not received a reply for the last chunk
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
+ }
+ } else {
+ long leaderLastIndex = context.getReplicatedLog().lastIndex();
+ long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+ if (isFollowerActive &&
+ context.getReplicatedLog().isPresent(followerNextIndex)) {
+ // FIXME : Sending one entry at a time
+ final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
- } else {
- long leaderLastIndex = context.getReplicatedLog().lastIndex();
- long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
- final List<ReplicatedLogEntry> entries;
-
- LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
- context.getId(), leaderLastIndex, leaderSnapShotIndex);
-
- if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
- LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
- followerNextIndex, followerId);
-
- // FIXME : Sending one entry at a time
- entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
- } else if (isFollowerActive && followerNextIndex >= 0 &&
- leaderLastIndex >= followerNextIndex ) {
- // if the followers next index is not present in the leaders log, and
- // if the follower is just not starting and if leader's index is more than followers index
- // then snapshot should be sent
-
- if(LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
- "follower-nextIndex: %s, leader-snapshot-index: %s, " +
- "leader-last-index: %s", context.getId(), followerId,
- followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
- }
- actor().tell(new InitiateInstallSnapshot(), actor());
-
- // we would want to sent AE as the capture snapshot might take time
- entries = Collections.<ReplicatedLogEntry>emptyList();
-
- } else {
- //we send an AppendEntries, even if the follower is inactive
- // in-order to update the followers timestamp, in case it becomes active again
- entries = Collections.<ReplicatedLogEntry>emptyList();
+ sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
+
+ } else if (isFollowerActive && followerNextIndex >= 0 &&
+ leaderLastIndex >= followerNextIndex) {
+ // if the followers next index is not present in the leaders log, and
+ // if the follower is just not starting and if leader's index is more than followers index
+ // then snapshot should be sent
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("InitiateInstallSnapshot to follower:{}," +
+ "follower-nextIndex:{}, leader-snapshot-index:{}, " +
+ "leader-last-index:{}", followerId,
+ followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+ );
}
+ actor().tell(new InitiateInstallSnapshot(), actor());
- sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
+ // Send heartbeat to follower whenever install snapshot is initiated.
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
+
+ } else if(sendHeartbeat) {
+ //we send an AppendEntries, even if the follower is inactive
+ // in-order to update the followers timestamp, in case it becomes active again
+ sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+ Collections.<ReplicatedLogEntry>emptyList(), followerId);
}
+
}
}
}
actorContext.setPeerAddresses(peerAddresses);
Leader leader = new Leader(actorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
leader.handleMessage(senderActor, new SendHeartBeat());
final String out =
actorContext.setPeerAddresses(peerAddresses);
Leader leader = new Leader(actorContext);
+ leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
RaftActorBehavior raftBehavior = leader
.handleMessage(senderActor, new Replicate(null, null,
new MockRaftActorContext.MockReplicatedLogEntry(1,
leader.getFollowerToSnapshot().getNextChunk();
leader.getFollowerToSnapshot().incrementChunkIndex();
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
//update follower timestamp
leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
// this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
RaftActorBehavior raftBehavior = leader.handleMessage(
senderActor, new Replicate(null, "state-id", entry));
assertEquals(snapshotIndex + 1, fli.getNextIndex());
}};
}
+ @Test
+ public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ TestActorRef<MessageCollectorActor> followerActor =
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ final int followersLastIndex = 2;
+ final int snapshotIndex = 3;
+ final int snapshotTerm = 1;
+ final int currentTerm = 2;
+
+ MockRaftActorContext actorContext =
+ (MockRaftActorContext) createActorContext();
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+ @Override
+ public int getSnapshotChunkSize() {
+ return 50;
+ }
+ };
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ actorContext.setConfigParams(configParams);
+ actorContext.setPeerAddresses(peerAddresses);
+ actorContext.setCommitIndex(followersLastIndex);
+
+ MockLeader leader = new MockLeader(actorContext);
+
+ Map<String, String> leadersSnapshot = new HashMap<>();
+ leadersSnapshot.put("1", "A");
+ leadersSnapshot.put("2", "B");
+ leadersSnapshot.put("3", "C");
+
+ // set the snapshot variables in replicatedlog
+ actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+ actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+ actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+ ByteString bs = toByteString(leadersSnapshot);
+ leader.setSnapshot(Optional.of(bs));
+
+ leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+ List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(1, objectList.size());
+
+ Object o = objectList.get(0);
+ assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+ InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+ assertEquals(1, installSnapshot.getChunkIndex());
+ assertEquals(3, installSnapshot.getTotalChunks());
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(2, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ assertEquals(3, objectList.size());
+
+ installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+ // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ "follower-reply", installSnapshot.getChunkIndex(), true));
+
+ objectList = MessageCollectorActor.getAllMatching(followerActor,
+ InstallSnapshotMessages.InstallSnapshot.class);
+
+ // Count should still stay at 3
+ assertEquals(3, objectList.size());
+ }};
+ }
+
@Test
- public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+ public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
new JavaTestKit(getSystem()) {{
TestActorRef<MessageCollectorActor> followerActor =
assertEquals(3, installSnapshot.getTotalChunks());
- leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+ leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+ followerActor.path().toString(), -1, false));
+
+ Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
leader.handleMessage(leaderActor, new SendHeartBeat());
- o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+ o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
{
TestActorRef<MessageCollectorActor> followerActor =
- TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+ TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(followerActor.path().toString(),
leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
- leader.handleMessage(leaderActor, new SendHeartBeat());
-
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
o = MessageCollectorActor.getAllMessages(followerActor).get(1);
assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
Leader leader = new Leader(leaderActorContext);
leader.markFollowerActive(followerActor.path().toString());
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
Leader leader = new Leader(leaderActorContext);
leader.markFollowerActive(followerActor.path().toString());
+ Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
leader.handleMessage(leaderActor, new SendHeartBeat());
AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
}};
}
+
+ @Test
+ public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+ new JavaTestKit(getSystem()) {{
+
+ ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+ MockRaftActorContext leaderActorContext =
+ new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+ DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+ configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+ configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+ leaderActorContext.setConfigParams(configParams);
+
+ ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+ MockRaftActorContext followerActorContext =
+ new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+ followerActorContext.setConfigParams(configParams);
+
+ Follower follower = new Follower(followerActorContext);
+
+ ForwardMessageToBehaviorActor.setBehavior(follower);
+
+ Map<String, String> peerAddresses = new HashMap<>();
+ peerAddresses.put("follower-reply",
+ followerActor.path().toString());
+
+ leaderActorContext.setPeerAddresses(peerAddresses);
+
+ leaderActorContext.getReplicatedLog().removeFrom(0);
+
+ //create 3 entries
+ leaderActorContext.setReplicatedLog(
+ new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+ leaderActorContext.setCommitIndex(1);
+
+ Leader leader = new Leader(leaderActorContext);
+ leader.markFollowerActive("follower-reply");
+
+ Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+ TimeUnit.MILLISECONDS);
+
+ leader.handleMessage(leaderActor, new SendHeartBeat());
+
+ AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+ .getFirstMatching(followerActor, AppendEntries.class);
+
+ assertNotNull(appendEntries);
+
+ assertEquals(1, appendEntries.getLeaderCommit());
+ assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+ assertEquals(0, appendEntries.getPrevLogIndex());
+
+ AppendEntriesReply appendEntriesReply =
+ (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+ assertNotNull(appendEntriesReply);
+
+ leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+ List<Object> entries = ForwardMessageToBehaviorActor
+ .getAllMatching(followerActor, AppendEntries.class);
+
+ assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+ AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+ assertEquals(1, appendEntriesSecond.getLeaderCommit());
+ assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+ assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+ }};
+ }
+
class MockLeader extends Leader {
FollowerToSnapshot fts;
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown from NormalizedNodeInputStreamReader when the input stream does not contain
+ * valid serialized data.
+ *
+ * @author Thomas Pantelis
+ */
+public class InvalidNormalizedNodeStreamException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidNormalizedNodeStreamException(String message) {
+ super(message);
+ }
+}
private final StringBuilder reusableStringBuilder = new StringBuilder(50);
+ private boolean readSignatureMarker = true;
+
public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
Preconditions.checkNotNull(stream);
input = new DataInputStream(stream);
@Override
public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+ readSignatureMarkerAndVersionIfNeeded();
+ return readNormalizedNodeInternal();
+ }
+
+ private void readSignatureMarkerAndVersionIfNeeded() throws IOException {
+ if(readSignatureMarker) {
+ readSignatureMarker = false;
+
+ byte marker = input.readByte();
+ if(marker != NormalizedNodeOutputStreamWriter.SIGNATURE_MARKER) {
+ throw new InvalidNormalizedNodeStreamException(String.format(
+ "Invalid signature marker: %d", marker));
+ }
+
+ input.readShort(); // read the version - not currently used/needed.
+ }
+ }
+
+ private NormalizedNode<?, ?> readNormalizedNodeInternal() throws IOException {
// each node should start with a byte
byte nodeType = input.readByte();
return bytes;
case ValueTypes.YANG_IDENTIFIER_TYPE :
- return readYangInstanceIdentifier();
+ return readYangInstanceIdentifierInternal();
default :
return null;
}
public YangInstanceIdentifier readYangInstanceIdentifier() throws IOException {
+ readSignatureMarkerAndVersionIfNeeded();
+ return readYangInstanceIdentifierInternal();
+ }
+
+ private YangInstanceIdentifier readYangInstanceIdentifierInternal() throws IOException {
int size = input.readInt();
List<PathArgument> pathArguments = new ArrayList<>(size);
lastLeafSetQName = nodeType;
- LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
+ LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
while(child != null) {
builder.withChild(child);
- child = (LeafSetEntryNode<Object>)readNormalizedNode();
+ child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
}
return builder;
}
NormalizedNodeContainerBuilder builder) throws IOException {
LOG.debug("Reading data container (leaf nodes) nodes");
- NormalizedNode<?, ?> child = readNormalizedNode();
+ NormalizedNode<?, ?> child = readNormalizedNodeInternal();
while(child != null) {
builder.addChild(child);
- child = readNormalizedNode();
+ child = readNormalizedNodeInternal();
}
return builder;
}
private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
+ static final byte SIGNATURE_MARKER = (byte) 0xab;
+ static final short CURRENT_VERSION = (short) 1;
+
static final byte IS_CODE_VALUE = 1;
static final byte IS_STRING_VALUE = 2;
static final byte IS_NULL_VALUE = 3;
private NormalizedNodeWriter normalizedNodeWriter;
+ private boolean wroteSignatureMarker;
+
public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
Preconditions.checkNotNull(stream);
output = new DataOutputStream(stream);
}
public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+ writeSignatureMarkerAndVersionIfNeeded();
normalizedNodeWriter().write(node);
}
+ private void writeSignatureMarkerAndVersionIfNeeded() throws IOException {
+ if(!wroteSignatureMarker) {
+ output.writeByte(SIGNATURE_MARKER);
+ output.writeShort(CURRENT_VERSION);
+ wroteSignatureMarker = true;
+ }
+ }
+
@Override
public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
Preconditions.checkNotNull(name, "Node identifier should not be null");
private void startNode(final QName qName, byte nodeType) throws IOException {
Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+
+ writeSignatureMarkerAndVersionIfNeeded();
+
// First write the type of node
output.writeByte(nodeType);
// Write Start Tag
}
public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
+ writeSignatureMarkerAndVersionIfNeeded();
+ writeYangInstanceIdentifierInternal(identifier);
+ }
+
+ private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException {
Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
int size = Iterables.size(pathArguments);
output.writeInt(size);
output.write(bytes);
break;
case ValueTypes.YANG_IDENTIFIER_TYPE:
- writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+ writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value);
break;
case ValueTypes.NULL_TYPE :
break;
import org.apache.commons.lang.SerializationUtils;
import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
import org.opendaylight.controller.cluster.datastore.util.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
public class NormalizedNodeStreamReaderWriterTest {
@Test
- public void testNormalizedNodeStreamReaderWriter() throws IOException {
+ public void testNormalizedNodeStreaming() throws IOException {
- testNormalizedNodeStreamReaderWriter(createTestContainer());
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+
+ NormalizedNode<?, ?> testContainer = createTestContainer();
+ writer.writeNormalizedNode(testContainer);
QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster");
QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor");
withNodeIdentifier(new NodeIdentifier(toaster)).
withChild(ImmutableNodes.leafNode(darknessFactor, "1000")).build();
- testNormalizedNodeStreamReaderWriter(Builders.containerBuilder().
+ ContainerNode toasterContainer = Builders.containerBuilder().
withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)).
- withChild(toasterNode).build());
+ withChild(toasterNode).build();
+ writer.writeNormalizedNode(toasterContainer);
+
+ NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+ NormalizedNode<?,?> node = reader.readNormalizedNode();
+ Assert.assertEquals(testContainer, node);
+
+ node = reader.readNormalizedNode();
+ Assert.assertEquals(toasterContainer, node);
+
+ writer.close();
}
private NormalizedNode<?, ?> createTestContainer() {
build();
}
- private void testNormalizedNodeStreamReaderWriter(NormalizedNode<?, ?> input) throws IOException {
+ @Test
+ public void testYangInstanceIdentifierStreaming() throws IOException {
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+ node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+ TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
- byte[] byteData = null;
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ NormalizedNodeOutputStreamWriter writer =
+ new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+ writer.writeYangInstanceIdentifier(path);
+
+ NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+ YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+ Assert.assertEquals(path, newPath);
+
+ writer.close();
+ }
+
+ @Test
+ public void testNormalizedNodeAndYangInstanceIdentifierStreaming() throws IOException {
- try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
- NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
- normalizedNodeWriter.write(input);
- byteData = byteArrayOutputStream.toByteArray();
+ NormalizedNode<?, ?> testContainer = TestModel.createBaseTestContainerBuilder().build();
+ writer.writeNormalizedNode(testContainer);
- }
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+ node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+ TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
+
+ writer.writeYangInstanceIdentifier(path);
NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
- new ByteArrayInputStream(byteData));
+ new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
NormalizedNode<?,?> node = reader.readNormalizedNode();
- Assert.assertEquals(input, node);
+ Assert.assertEquals(testContainer, node);
+
+ YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+ Assert.assertEquals(path, newPath);
+
+ writer.close();
+ }
+
+ @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+ public void testInvalidNormalizedNodeStream() throws IOException {
+ byte[] protobufBytes = new NormalizedNodeToNodeCodec(null).encode(
+ TestModel.createBaseTestContainerBuilder().build()).getNormalizedNode().toByteArray();
+
+ NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(protobufBytes));
+
+ reader.readNormalizedNode();
+ }
+
+ @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+ public void testInvalidYangInstanceIdentifierStream() throws IOException {
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).build();
+
+ byte[] protobufBytes = ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments(
+ InstanceIdentifierUtils.toSerializable(path)).build().toByteArray();
+
+ NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+ new ByteArrayInputStream(protobufBytes));
+
+ reader.readYangInstanceIdentifier();
}
@Test
import java.io.DataOutputStream;
import java.io.IOException;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.InvalidNormalizedNodeStreamException;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
}
public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
- try {
- boolean present = in.readBoolean();
- if(present) {
- NormalizedNodeInputStreamReader streamReader = streamReader(in);
- return streamReader.readNormalizedNode();
- }
- } catch (IOException e) {
- throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
- }
+ try {
+ return tryDeserializeNormalizedNode(in);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+ }
+ }
+
+ private static NormalizedNode<?, ?> tryDeserializeNormalizedNode(DataInput in) throws IOException {
+ boolean present = in.readBoolean();
+ if(present) {
+ NormalizedNodeInputStreamReader streamReader = streamReader(in);
+ return streamReader.readNormalizedNode();
+ }
return null;
}
public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
NormalizedNode<?, ?> node = null;
try {
- node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
- } catch(Exception e) {
- }
-
- if(node == null) {
- // Must be from legacy protobuf serialization - try that.
+ node = tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
+ } catch(InvalidNormalizedNodeStreamException e) {
+ // Probably from legacy protobuf serialization - try that.
try {
NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
node = new NormalizedNodeToNodeCodec(null).decode(serializedNode);
- } catch (InvalidProtocolBufferException e) {
+ } catch (InvalidProtocolBufferException e2) {
throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
}
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
}
return node;
<version>1.2.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-distributed-datastore</artifactId>
+ </dependency>
<!-- Test Dependencies -->
<dependency>