Merge "Bug-2692:Avoid fake snapshot during initiate snapshot"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 10 Feb 2015 18:22:52 +0000 (18:22 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 10 Feb 2015 18:22:52 +0000 (18:22 +0000)
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
opendaylight/md-sal/sal-dummy-distributed-datastore/pom.xml

index 6d0c14e..73c81af 100644 (file)
@@ -73,4 +73,12 @@ public interface FollowerLogInformation {
      * This will stop the timeout clock
      */
     void markFollowerInActive();
+
+
+    /**
+     * This will return the active time of follower, since it was last reset
+     * @return time in milliseconds
+     */
+    long timeSinceLastActivity();
+
 }
index 7a690d3..0fed630 100644 (file)
@@ -95,4 +95,9 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
             stopwatch.stop();
         }
     }
+
+    @Override
+    public long timeSinceLastActivity() {
+        return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+    }
 }
index 3336cbd..8f33d94 100644 (file)
@@ -232,6 +232,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             purgeInMemoryLog();
         }
 
+        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+        sendUpdatesToFollower(followerId, followerLogInformation, false);
         return this;
     }
 
@@ -344,6 +346,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
         followerLogInformation.markFollowerActive();
 
         if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
+            boolean wasLastChunk = false;
             if (reply.isSuccess()) {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
@@ -371,6 +374,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                         // we can remove snapshot from the memory
                         setSnapshot(Optional.<ByteString>absent());
                     }
+                    wasLastChunk = true;
 
                 } else {
                     followerToSnapshot.markSendStatus(true);
@@ -381,6 +385,14 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 followerToSnapshot.markSendStatus(false);
             }
+
+            if (!wasLastChunk && followerToSnapshot.canSendNextChunk()) {
+                ActorSelection followerActor = context.getPeerActorSelection(followerId);
+                if(followerActor != null) {
+                    sendSnapshotChunk(followerActor, followerId);
+                }
+            }
+
         } else {
             LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
                     context.getId(), reply.getChunkIndex(), followerId,
@@ -419,67 +431,77 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
-
+        long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
+            final FollowerLogInformation followerLogInformation = e.getValue();
+            // This checks helps not to send a repeat message to the follower
+            if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+                sendUpdatesToFollower(followerId, followerLogInformation, true);
+            }
+        }
+    }
 
-            if (followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex();
-                boolean isFollowerActive = followerLogInformation.isFollowerActive();
+    /**
+     *
+     * This method checks if any update needs to be sent to the given follower. This includes append log entries,
+     * sending next snapshot chunk, and initiating a snapshot.
+     * @return true if any update is sent, false otherwise
+     */
 
-                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
-                if (followerToSnapshot != null) {
-                    // if install snapshot is in process , then sent next chunk if possible
-                    if (isFollowerActive && followerToSnapshot.canSendNextChunk()) {
-                        sendSnapshotChunk(followerActor, followerId);
-                    } else {
-                        // we send a heartbeat even if we have not received a reply for the last chunk
-                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
-                    }
+    private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
+                                          boolean sendHeartbeat) {
+
+        ActorSelection followerActor = context.getPeerActorSelection(followerId);
+        if (followerActor != null) {
+            long followerNextIndex = followerLogInformation.getNextIndex();
+            boolean isFollowerActive = followerLogInformation.isFollowerActive();
+
+            if (mapFollowerToSnapshot.get(followerId) != null) {
+                // if install snapshot is in process , then sent next chunk if possible
+                if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                    sendSnapshotChunk(followerActor, followerId);
+                } else if(sendHeartbeat) {
+                    // we send a heartbeat even if we have not received a reply for the last chunk
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                }
+            } else {
+                long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                if (isFollowerActive &&
+                    context.getReplicatedLog().isPresent(followerNextIndex)) {
+                    // FIXME : Sending one entry at a time
+                    final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
 
-                } else {
-                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
-                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-                    final List<ReplicatedLogEntry> entries;
-
-                    LOG.debug("{}: Checking sendAppendEntries for {}, leaderLastIndex: {}, leaderSnapShotIndex: {}",
-                            context.getId(), leaderLastIndex, leaderSnapShotIndex);
-
-                    if (isFollowerActive && context.getReplicatedLog().isPresent(followerNextIndex)) {
-                        LOG.debug("{}: sendAppendEntries: {} is present for {}", context.getId(),
-                                followerNextIndex, followerId);
-
-                        // FIXME : Sending one entry at a time
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
-                    } else if (isFollowerActive && followerNextIndex >= 0 &&
-                        leaderLastIndex >= followerNextIndex ) {
-                        // if the followers next index is not present in the leaders log, and
-                        // if the follower is just not starting and if leader's index is more than followers index
-                        // then snapshot should be sent
-
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
-                                    "follower-nextIndex: %s, leader-snapshot-index: %s,  " +
-                                    "leader-last-index: %s", context.getId(), followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
-                        }
-                        actor().tell(new InitiateInstallSnapshot(), actor());
-
-                        // we would want to sent AE as the capture snapshot might take time
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
-
-                    } else {
-                        //we send an AppendEntries, even if the follower is inactive
-                        // in-order to update the followers timestamp, in case it becomes active again
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
+
+                } else if (isFollowerActive && followerNextIndex >= 0 &&
+                    leaderLastIndex >= followerNextIndex) {
+                    // if the followers next index is not present in the leaders log, and
+                    // if the follower is just not starting and if leader's index is more than followers index
+                    // then snapshot should be sent
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("InitiateInstallSnapshot to follower:{}," +
+                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                "leader-last-index:{}", followerId,
+                            followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+                        );
                     }
+                    actor().tell(new InitiateInstallSnapshot(), actor());
 
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
+                    // Send heartbeat to follower whenever install snapshot is initiated.
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
+
+                } else if(sendHeartbeat) {
+                    //we send an AppendEntries, even if the follower is inactive
+                    // in-order to update the followers timestamp, in case it becomes active again
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
                 }
+
             }
         }
     }
index 8b47e8e..666cea6 100644 (file)
@@ -87,6 +87,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
                     final String out =
@@ -134,6 +137,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
@@ -271,6 +277,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             leader.getFollowerToSnapshot().getNextChunk();
             leader.getFollowerToSnapshot().incrementChunkIndex();
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
@@ -345,6 +354,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             //update follower timestamp
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 senderActor, new Replicate(null, "state-id", entry));
@@ -583,9 +595,102 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(snapshotIndex + 1, fli.getNextIndex());
         }};
     }
+    @Test
+    public void testSendSnapshotfromInstallSnapshotReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            TestActorRef<MessageCollectorActor> followerActor =
+                TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-reply");
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            final int followersLastIndex = 2;
+            final int snapshotIndex = 3;
+            final int snapshotTerm = 1;
+            final int currentTerm = 2;
+
+            MockRaftActorContext actorContext =
+                (MockRaftActorContext) createActorContext();
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(){
+                @Override
+                public int getSnapshotChunkSize() {
+                    return 50;
+                }
+            };
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            actorContext.setConfigParams(configParams);
+            actorContext.setPeerAddresses(peerAddresses);
+            actorContext.setCommitIndex(followersLastIndex);
+
+            MockLeader leader = new MockLeader(actorContext);
+
+            Map<String, String> leadersSnapshot = new HashMap<>();
+            leadersSnapshot.put("1", "A");
+            leadersSnapshot.put("2", "B");
+            leadersSnapshot.put("3", "C");
+
+            // set the snapshot variables in replicatedlog
+            actorContext.getReplicatedLog().setSnapshotIndex(snapshotIndex);
+            actorContext.getReplicatedLog().setSnapshotTerm(snapshotTerm);
+            actorContext.getTermInformation().update(currentTerm, leaderActor.path().toString());
+
+            ByteString bs = toByteString(leadersSnapshot);
+            leader.setSnapshot(Optional.of(bs));
+
+            leader.handleMessage(leaderActor, new SendInstallSnapshot(bs));
+
+            List<Object> objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(1, objectList.size());
+
+            Object o = objectList.get(0);
+            assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
+
+            InstallSnapshotMessages.InstallSnapshot installSnapshot = (InstallSnapshotMessages.InstallSnapshot) o;
+
+            assertEquals(1, installSnapshot.getChunkIndex());
+            assertEquals(3, installSnapshot.getTotalChunks());
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(2, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(1);
+
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            assertEquals(3, objectList.size());
+
+            installSnapshot = (InstallSnapshotMessages.InstallSnapshot) objectList.get(2);
+
+            // Send snapshot reply one more time and make sure that a new snapshot message should not be sent to follower
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                "follower-reply", installSnapshot.getChunkIndex(), true));
+
+            objectList = MessageCollectorActor.getAllMatching(followerActor,
+                InstallSnapshotMessages.InstallSnapshot.class);
+
+            // Count should still stay at 3
+            assertEquals(3, objectList.size());
+        }};
+    }
+
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
         new JavaTestKit(getSystem()) {{
 
             TestActorRef<MessageCollectorActor> followerActor =
@@ -639,11 +744,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(3, installSnapshot.getTotalChunks());
 
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                followerActor.path().toString(), -1, false));
+
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
 
             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
 
@@ -662,7 +771,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             {
 
                 TestActorRef<MessageCollectorActor> followerActor =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
 
                 Map<String, String> peerAddresses = new HashMap<>();
                 peerAddresses.put(followerActor.path().toString(),
@@ -716,10 +825,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
 
-                leader.handleMessage(leaderActor, new SendHeartBeat());
-
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
 
                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
@@ -881,6 +990,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -949,6 +1061,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -1177,6 +1291,85 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+
+    @Test
+    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+            followerActorContext.setConfigParams(configParams);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive("follower-reply");
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+                .getFirstMatching(followerActor, AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+            List<Object> entries = ForwardMessageToBehaviorActor
+                .getAllMatching(followerActor, AppendEntries.class);
+
+            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+            assertEquals(1, appendEntriesSecond.getLeaderCommit());
+            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+        }};
+    }
+
     class MockLeader extends Leader {
 
         FollowerToSnapshot fts;
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java
new file mode 100644 (file)
index 0000000..da60496
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown from NormalizedNodeInputStreamReader when the input stream does not contain
+ * valid serialized data.
+ *
+ * @author Thomas Pantelis
+ */
+public class InvalidNormalizedNodeStreamException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidNormalizedNodeStreamException(String message) {
+        super(message);
+    }
+}
index cde3381..bb2f5d4 100644 (file)
@@ -69,6 +69,8 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
     private final StringBuilder reusableStringBuilder = new StringBuilder(50);
 
+    private boolean readSignatureMarker = true;
+
     public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         input = new DataInputStream(stream);
@@ -80,6 +82,25 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
     @Override
     public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+        readSignatureMarkerAndVersionIfNeeded();
+        return readNormalizedNodeInternal();
+    }
+
+    private void readSignatureMarkerAndVersionIfNeeded() throws IOException {
+        if(readSignatureMarker) {
+            readSignatureMarker = false;
+
+            byte marker = input.readByte();
+            if(marker != NormalizedNodeOutputStreamWriter.SIGNATURE_MARKER) {
+                throw new InvalidNormalizedNodeStreamException(String.format(
+                        "Invalid signature marker: %d", marker));
+            }
+
+            input.readShort(); // read the version - not currently used/needed.
+        }
+    }
+
+    private NormalizedNode<?, ?> readNormalizedNodeInternal() throws IOException {
         // each node should start with a byte
         byte nodeType = input.readByte();
 
@@ -284,7 +305,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
                 return bytes;
 
             case ValueTypes.YANG_IDENTIFIER_TYPE :
-                return readYangInstanceIdentifier();
+                return readYangInstanceIdentifierInternal();
 
             default :
                 return null;
@@ -292,6 +313,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
     }
 
     public YangInstanceIdentifier readYangInstanceIdentifier() throws IOException {
+        readSignatureMarkerAndVersionIfNeeded();
+        return readYangInstanceIdentifierInternal();
+    }
+
+    private YangInstanceIdentifier readYangInstanceIdentifierInternal() throws IOException {
         int size = input.readInt();
 
         List<PathArgument> pathArguments = new ArrayList<>(size);
@@ -342,11 +368,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
         lastLeafSetQName = nodeType;
 
-        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
+        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
 
         while(child != null) {
             builder.withChild(child);
-            child = (LeafSetEntryNode<Object>)readNormalizedNode();
+            child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
         }
         return builder;
     }
@@ -356,11 +382,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
             NormalizedNodeContainerBuilder builder) throws IOException {
         LOG.debug("Reading data container (leaf nodes) nodes");
 
-        NormalizedNode<?, ?> child = readNormalizedNode();
+        NormalizedNode<?, ?> child = readNormalizedNodeInternal();
 
         while(child != null) {
             builder.addChild(child);
-            child = readNormalizedNode();
+            child = readNormalizedNodeInternal();
         }
         return builder;
     }
index 088f4df..d4aab03 100644 (file)
@@ -46,6 +46,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
 
+    static final byte SIGNATURE_MARKER = (byte) 0xab;
+    static final short CURRENT_VERSION = (short) 1;
+
     static final byte IS_CODE_VALUE = 1;
     static final byte IS_STRING_VALUE = 2;
     static final byte IS_NULL_VALUE = 3;
@@ -56,6 +59,8 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private NormalizedNodeWriter normalizedNodeWriter;
 
+    private boolean wroteSignatureMarker;
+
     public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         output = new DataOutputStream(stream);
@@ -74,9 +79,18 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     }
 
     public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+        writeSignatureMarkerAndVersionIfNeeded();
         normalizedNodeWriter().write(node);
     }
 
+    private void writeSignatureMarkerAndVersionIfNeeded() throws IOException {
+        if(!wroteSignatureMarker) {
+            output.writeByte(SIGNATURE_MARKER);
+            output.writeShort(CURRENT_VERSION);
+            wroteSignatureMarker = true;
+        }
+    }
+
     @Override
     public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
         Preconditions.checkNotNull(name, "Node identifier should not be null");
@@ -201,6 +215,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     private void startNode(final QName qName, byte nodeType) throws IOException {
 
         Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+
+        writeSignatureMarkerAndVersionIfNeeded();
+
         // First write the type of node
         output.writeByte(nodeType);
         // Write Start Tag
@@ -247,6 +264,11 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     }
 
     public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
+        writeSignatureMarkerAndVersionIfNeeded();
+        writeYangInstanceIdentifierInternal(identifier);
+    }
+
+    private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException {
         Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
         int size = Iterables.size(pathArguments);
         output.writeInt(size);
@@ -363,7 +385,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
                 output.write(bytes);
                 break;
             case ValueTypes.YANG_IDENTIFIER_TYPE:
-                writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+                writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value);
                 break;
             case ValueTypes.NULL_TYPE :
                 break;
index 6528f2e..67a342b 100644 (file)
@@ -15,15 +15,16 @@ import java.io.IOException;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.cluster.datastore.util.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
@@ -33,9 +34,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class NormalizedNodeStreamReaderWriterTest {
 
     @Test
-    public void testNormalizedNodeStreamReaderWriter() throws IOException {
+    public void testNormalizedNodeStreaming() throws IOException {
 
-        testNormalizedNodeStreamReaderWriter(createTestContainer());
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+
+        NormalizedNode<?, ?> testContainer = createTestContainer();
+        writer.writeNormalizedNode(testContainer);
 
         QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster");
         QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor");
@@ -43,9 +48,21 @@ public class NormalizedNodeStreamReaderWriterTest {
                 withNodeIdentifier(new NodeIdentifier(toaster)).
                 withChild(ImmutableNodes.leafNode(darknessFactor, "1000")).build();
 
-        testNormalizedNodeStreamReaderWriter(Builders.containerBuilder().
+        ContainerNode toasterContainer = Builders.containerBuilder().
                 withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)).
-                withChild(toasterNode).build());
+                withChild(toasterNode).build();
+        writer.writeNormalizedNode(toasterContainer);
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+        NormalizedNode<?,?> node = reader.readNormalizedNode();
+        Assert.assertEquals(testContainer, node);
+
+        node = reader.readNormalizedNode();
+        Assert.assertEquals(toasterContainer, node);
+
+        writer.close();
     }
 
     private NormalizedNode<?, ?> createTestContainer() {
@@ -76,24 +93,75 @@ public class NormalizedNodeStreamReaderWriterTest {
                 build();
     }
 
-    private void testNormalizedNodeStreamReaderWriter(NormalizedNode<?, ?> input) throws IOException {
+    @Test
+    public void testYangInstanceIdentifierStreaming() throws IOException  {
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+                node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+                        TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
 
-        byte[] byteData = null;
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer =
+                new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+        writer.writeYangInstanceIdentifier(path);
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+        YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+        Assert.assertEquals(path, newPath);
+
+        writer.close();
+    }
+
+    @Test
+    public void testNormalizedNodeAndYangInstanceIdentifierStreaming() throws IOException {
 
-        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-            NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
 
-            NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
-            normalizedNodeWriter.write(input);
-            byteData = byteArrayOutputStream.toByteArray();
+        NormalizedNode<?, ?> testContainer = TestModel.createBaseTestContainerBuilder().build();
+        writer.writeNormalizedNode(testContainer);
 
-        }
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+                node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+                        TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
+
+        writer.writeYangInstanceIdentifier(path);
 
         NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
-                new ByteArrayInputStream(byteData));
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
 
         NormalizedNode<?,?> node = reader.readNormalizedNode();
-        Assert.assertEquals(input, node);
+        Assert.assertEquals(testContainer, node);
+
+        YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+        Assert.assertEquals(path, newPath);
+
+        writer.close();
+    }
+
+    @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+    public void testInvalidNormalizedNodeStream() throws IOException {
+        byte[] protobufBytes = new NormalizedNodeToNodeCodec(null).encode(
+                TestModel.createBaseTestContainerBuilder().build()).getNormalizedNode().toByteArray();
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(protobufBytes));
+
+        reader.readNormalizedNode();
+    }
+
+    @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+    public void testInvalidYangInstanceIdentifierStream() throws IOException {
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).build();
+
+        byte[] protobufBytes = ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments(
+                InstanceIdentifierUtils.toSerializable(path)).build().toByteArray();
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(protobufBytes));
+
+        reader.readYangInstanceIdentifier();
     }
 
     @Test
index 5854932..bf9f8d8 100644 (file)
@@ -17,6 +17,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.InvalidNormalizedNodeStreamException;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -93,15 +94,19 @@ public final class SerializationUtils {
     }
 
     public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
-            try {
-                boolean present = in.readBoolean();
-                if(present) {
-                    NormalizedNodeInputStreamReader streamReader = streamReader(in);
-                    return streamReader.readNormalizedNode();
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
-            }
+        try {
+            return tryDeserializeNormalizedNode(in);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+        }
+    }
+
+    private static NormalizedNode<?, ?> tryDeserializeNormalizedNode(DataInput in) throws IOException {
+        boolean present = in.readBoolean();
+        if(present) {
+            NormalizedNodeInputStreamReader streamReader = streamReader(in);
+            return streamReader.readNormalizedNode();
+        }
 
         return null;
     }
@@ -109,18 +114,17 @@ public final class SerializationUtils {
     public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
         NormalizedNode<?, ?> node = null;
         try {
-            node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
-        } catch(Exception e) {
-        }
-
-        if(node == null) {
-            // Must be from legacy protobuf serialization - try that.
+            node = tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
+        } catch(InvalidNormalizedNodeStreamException e) {
+            // Probably from legacy protobuf serialization - try that.
             try {
                 NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
                 node =  new NormalizedNodeToNodeCodec(null).decode(serializedNode);
-            } catch (InvalidProtocolBufferException e) {
+            } catch (InvalidProtocolBufferException e2) {
                 throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
             }
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
         }
 
         return node;
index d8d1a76..c7ee3a5 100644 (file)
           <version>1.2.0-SNAPSHOT</version>
       </dependency>
 
+      <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>sal-distributed-datastore</artifactId>
+      </dependency>
 
     <!-- Test Dependencies -->
     <dependency>

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.