Merge "Add Distributed DataStore as a dependency of the Simulator"
authorTom Pantelis <tpanteli@brocade.com>
Tue, 10 Feb 2015 04:44:04 +0000 (04:44 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 10 Feb 2015 04:44:04 +0000 (04:44 +0000)
27 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java [deleted file]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java [new file with mode: 0644]
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java

index 6d0c14e733a8c81bb2e29a663915eb5776422a6d..73c81afd187d2791703c1428beacbc8348abda69 100644 (file)
@@ -73,4 +73,12 @@ public interface FollowerLogInformation {
      * This will stop the timeout clock
      */
     void markFollowerInActive();
+
+
+    /**
+     * This will return the active time of follower, since it was last reset
+     * @return time in milliseconds
+     */
+    long timeSinceLastActivity();
+
 }
index 7a690d3d18be84433f9e37c874a88b277b83f7cb..0fed63098d6da1edfb6229245ffd63d1e4e4b7df 100644 (file)
@@ -95,4 +95,9 @@ public class FollowerLogInformationImpl implements FollowerLogInformation {
             stopwatch.stop();
         }
     }
+
+    @Override
+    public long timeSinceLastActivity() {
+        return stopwatch.elapsed(TimeUnit.MILLISECONDS);
+    }
 }
index 77bf10370184894d6a510990e08fb1a8bc785a84..feccea7edba37b5faff9782cf986146f0a784af9 100644 (file)
@@ -12,7 +12,8 @@ import java.util.List;
 
 
 public class Snapshot implements Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -8298574936724056236L;
+
     private final byte[] state;
     private final List<ReplicatedLogEntry> unAppliedEntries;
     private final long lastIndex;
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java
deleted file mode 100644 (file)
index 6335e32..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * Message sent to commit an entry to the log
- */
-public class CommitEntry implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java
deleted file mode 100644 (file)
index 68ecc12..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * Message sent to Persist an entry into the transaction journal
- */
-public class PersistEntry implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java
deleted file mode 100644 (file)
index 7b7f085..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.raft.base.messages;
-
-import java.io.Serializable;
-
-/**
- * This message is sent by a RaftActor to itself so that a subclass can process
- * it and use it to save it's state
- */
-public class SaveSnapshot implements Serializable {
-    private static final long serialVersionUID = 1L;
-}
index e28e4b066d372ee54594ecaf9fe0e5259ad4cdfd..68cf5027dff1244d145e93dac28f2680ed9ee20a 100644 (file)
@@ -232,6 +232,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             purgeInMemoryLog();
         }
 
+        //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event
+        sendUpdatesToFollower(followerId, followerLogInformation, false);
         return this;
     }
 
@@ -294,6 +296,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
             // set currentTerm = T, convert to follower (ยง5.1)
             // This applies to all RPC messages and responses
             if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) {
+                LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(),
+                        rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
                 context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
 
                 return switchBehavior(new Follower(context));
@@ -330,12 +335,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
         String followerId = reply.getFollowerId();
         FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
+        if (followerToSnapshot == null) {
+            LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader",
+                    context.getId(), followerId);
+            return;
+        }
+
         FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
         followerLogInformation.markFollowerActive();
 
-        if (followerToSnapshot != null &&
-            followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
-
+        if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) {
             if (reply.isSuccess()) {
                 if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) {
                     //this was the last chunk reply
@@ -373,12 +383,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
                 followerToSnapshot.markSendStatus(false);
             }
-
         } else {
-            LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" +
-                    " or Chunk Index in InstallSnapshotReply not matching {} != {}",
-                    context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex()
-            );
+            LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}",
+                    context.getId(), reply.getChunkIndex(), followerId,
+                    followerToSnapshot.getChunkIndex());
 
             if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){
                 // Since the Follower did not find this index to be valid we should reset the follower snapshot
@@ -413,75 +421,94 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
 
     private void sendAppendEntries() {
         // Send an AppendEntries to all followers
+        long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis();
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final String followerId = e.getKey();
-            ActorSelection followerActor = context.getPeerActorSelection(followerId);
-
-            if (followerActor != null) {
-                FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
-                long followerNextIndex = followerLogInformation.getNextIndex();
-                boolean isFollowerActive = followerLogInformation.isFollowerActive();
-
-                if (mapFollowerToSnapshot.get(followerId) != null) {
-                    // if install snapshot is in process , then sent next chunk if possible
-                    if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
-                        sendSnapshotChunk(followerActor, followerId);
-                    } else {
-                        // we send a heartbeat even if we have not received a reply for the last chunk
-                        sendAppendEntriesToFollower(followerActor, followerNextIndex,
-                            Collections.<ReplicatedLogEntry>emptyList());
-                    }
+            final FollowerLogInformation followerLogInformation = e.getValue();
+            // This checks helps not to send a repeat message to the follower
+            if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) {
+                sendUpdatesToFollower(followerId, followerLogInformation, true);
+            }
+        }
+    }
 
-                } else {
-                    long leaderLastIndex = context.getReplicatedLog().lastIndex();
-                    long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
-                    final List<ReplicatedLogEntry> entries;
-
-                    if (isFollowerActive &&
-                        context.getReplicatedLog().isPresent(followerNextIndex)) {
-                        // FIXME : Sending one entry at a time
-                        entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
-
-                    } else if (isFollowerActive && followerNextIndex >= 0 &&
-                        leaderLastIndex >= followerNextIndex ) {
-                        // if the followers next index is not present in the leaders log, and
-                        // if the follower is just not starting and if leader's index is more than followers index
-                        // then snapshot should be sent
-
-                        if(LOG.isDebugEnabled()) {
-                            LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," +
-                                    "follower-nextIndex: %s, leader-snapshot-index: %s,  " +
-                                    "leader-last-index: %s", context.getId(), followerId,
-                                followerNextIndex, leaderSnapShotIndex, leaderLastIndex));
-                        }
-                        actor().tell(new InitiateInstallSnapshot(), actor());
-
-                        // we would want to sent AE as the capture snapshot might take time
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+    /**
+     *
+     * This method checks if any update needs to be sent to the given follower. This includes append log entries,
+     * sending next snapshot chunk, and initiating a snapshot.
+     * @return true if any update is sent, false otherwise
+     */
 
-                    } else {
-                        //we send an AppendEntries, even if the follower is inactive
-                        // in-order to update the followers timestamp, in case it becomes active again
-                        entries =  Collections.<ReplicatedLogEntry>emptyList();
+    private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
+                                          boolean sendHeartbeat) {
+
+        ActorSelection followerActor = context.getPeerActorSelection(followerId);
+        if (followerActor != null) {
+            long followerNextIndex = followerLogInformation.getNextIndex();
+            boolean isFollowerActive = followerLogInformation.isFollowerActive();
+
+            if (mapFollowerToSnapshot.get(followerId) != null) {
+                // if install snapshot is in process , then sent next chunk if possible
+                if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) {
+                    sendSnapshotChunk(followerActor, followerId);
+                } else if(sendHeartbeat) {
+                    // we send a heartbeat even if we have not received a reply for the last chunk
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
+                }
+            } else {
+                long leaderLastIndex = context.getReplicatedLog().lastIndex();
+                long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+                if (isFollowerActive &&
+                    context.getReplicatedLog().isPresent(followerNextIndex)) {
+                    // FIXME : Sending one entry at a time
+                    final List<ReplicatedLogEntry> entries = context.getReplicatedLog().getFrom(followerNextIndex, 1);
+
+                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId);
+
+                } else if (isFollowerActive && followerNextIndex >= 0 &&
+                    leaderLastIndex >= followerNextIndex) {
+                    // if the followers next index is not present in the leaders log, and
+                    // if the follower is just not starting and if leader's index is more than followers index
+                    // then snapshot should be sent
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("InitiateInstallSnapshot to follower:{}," +
+                                "follower-nextIndex:{}, leader-snapshot-index:{},  " +
+                                "leader-last-index:{}", followerId,
+                            followerNextIndex, leaderSnapShotIndex, leaderLastIndex
+                        );
                     }
+                    actor().tell(new InitiateInstallSnapshot(), actor());
 
-                    sendAppendEntriesToFollower(followerActor, followerNextIndex, entries);
+                    // Send heartbeat to follower whenever install snapshot is initiated.
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                            Collections.<ReplicatedLogEntry>emptyList(), followerId);
 
+                } else if(sendHeartbeat) {
+                    //we send an AppendEntries, even if the follower is inactive
+                    // in-order to update the followers timestamp, in case it becomes active again
+                    sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(),
+                        Collections.<ReplicatedLogEntry>emptyList(), followerId);
                 }
+
             }
         }
     }
 
     private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex,
-        List<ReplicatedLogEntry> entries) {
-        followerActor.tell(
-            new AppendEntries(currentTerm(), context.getId(),
-                prevLogIndex(followerNextIndex),
-                prevLogTerm(followerNextIndex), entries,
-                context.getCommitIndex(),
-                replicatedToAllIndex).toSerializable(),
-            actor()
-        );
+        List<ReplicatedLogEntry> entries, String followerId) {
+        AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(),
+            prevLogIndex(followerNextIndex),
+            prevLogTerm(followerNextIndex), entries,
+            context.getCommitIndex(), replicatedToAllIndex);
+
+        if(!entries.isEmpty()) {
+            LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId,
+                    appendEntries);
+        }
+
+        followerActor.tell(appendEntries.toSerializable(), actor());
     }
 
     /**
@@ -501,6 +528,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
      *
      */
     private void installSnapshotIfNeeded() {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet());
+        }
+
         for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
             final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
 
@@ -508,7 +539,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
                 long nextIndex = e.getValue().getNextIndex();
 
                 if (!context.getReplicatedLog().isPresent(nextIndex) &&
-                    context.getReplicatedLog().isInSnapshot(nextIndex)) {
+                        context.getReplicatedLog().isInSnapshot(nextIndex)) {
                     LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey());
                     if (snapshot.isPresent()) {
                         // if a snapshot is present in the memory, most likely another install is in progress
@@ -573,21 +604,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior {
     private void sendSnapshotChunk(ActorSelection followerActor, String followerId) {
         try {
             if (snapshot.isPresent()) {
+                ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get());
+
+                // Note: the previous call to getNextSnapshotChunk has the side-effect of adding
+                // followerId to the followerToSnapshot map.
+                FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId);
+
                 followerActor.tell(
                     new InstallSnapshot(currentTerm(), context.getId(),
                         context.getReplicatedLog().getSnapshotIndex(),
                         context.getReplicatedLog().getSnapshotTerm(),
-                        getNextSnapshotChunk(followerId,snapshot.get()),
-                        mapFollowerToSnapshot.get(followerId).incrementChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks(),
-                        Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode())
+                        nextSnapshotChunk,
+                        followerToSnapshot.incrementChunkIndex(),
+                        followerToSnapshot.getTotalChunks(),
+                        Optional.of(followerToSnapshot.getLastChunkHashCode())
                     ).toSerializable(),
                     actor()
                 );
                 LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}",
                         context.getId(), followerActor.path(),
-                        mapFollowerToSnapshot.get(followerId).getChunkIndex(),
-                        mapFollowerToSnapshot.get(followerId).getTotalChunks());
+                        followerToSnapshot.getChunkIndex(),
+                        followerToSnapshot.getTotalChunks());
             }
         } catch (IOException e) {
             LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId());
index a782eda5659d9389606ea4e7606a5f61dd949c07..32ed85b281fee0feb4b0533c0b72cb5ade40093a 100644 (file)
@@ -12,7 +12,7 @@ package org.opendaylight.controller.cluster.raft.messages;
  * Reply for the AppendEntriesRpc message
  */
 public class AppendEntriesReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -7487547356392536683L;
 
     // true if follower contained entry matching
     // prevLogIndex and prevLogTerm
@@ -38,6 +38,7 @@ public class AppendEntriesReply extends AbstractRaftRPC {
         this.logLastTerm = logLastTerm;
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index 71e7ecc189fc882eaa0917b19af99376ed05f1b7..15621bf894b14f75ec845821654202877f4b4382 100644 (file)
@@ -9,13 +9,13 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 public class InstallSnapshotReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 642227896390779503L;
 
     // The followerId - this will be used to figure out which follower is
     // responding
     private final String followerId;
     private final int chunkIndex;
-    private boolean success;
+    private final boolean success;
 
     public InstallSnapshotReply(long term, String followerId, int chunkIndex,
         boolean success) {
index 8321d0c25bcecce32617c55fe652c2b7be189541..9ba5acb664700f4873d51fdb9f2637ef882c360e 100644 (file)
@@ -12,7 +12,7 @@ package org.opendaylight.controller.cluster.raft.messages;
  * Invoked by candidates to gather votes (ยง5.2).
  */
 public class RequestVote extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -6967509186297108657L;
 
     // candidate requesting vote
     private String candidateId;
@@ -35,6 +35,7 @@ public class RequestVote extends AbstractRaftRPC {
     public RequestVote() {
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index da3ba5c39f6989b86c1f76b7830d2c0426302ddb..b3c95d6ecaab2b19524fb4ae06c3c8f6491658c0 100644 (file)
@@ -9,7 +9,7 @@
 package org.opendaylight.controller.cluster.raft.messages;
 
 public class RequestVoteReply extends AbstractRaftRPC {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 8427899326488775660L;
 
     // true means candidate received vot
     private final boolean voteGranted;
@@ -19,6 +19,7 @@ public class RequestVoteReply extends AbstractRaftRPC {
         this.voteGranted = voteGranted;
     }
 
+    @Override
     public long getTerm() {
         return term;
     }
index b31cb621b3576b1a9bcbaff321465d4bd186674e..63f94828eb4752ed6fc9367e9575f98d4bdc6d76 100644 (file)
@@ -86,6 +86,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     leader.handleMessage(senderActor, new SendHeartBeat());
 
                     final String out =
@@ -133,6 +136,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
                     actorContext.setPeerAddresses(peerAddresses);
 
                     Leader leader = new Leader(actorContext);
+                    leader.markFollowerActive(followerActor.path().toString());
+                    Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                        TimeUnit.MILLISECONDS);
                     RaftActorBehavior raftBehavior = leader
                         .handleMessage(senderActor, new Replicate(null, null,
                             new MockRaftActorContext.MockReplicatedLogEntry(1,
@@ -270,6 +276,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             leader.getFollowerToSnapshot().getNextChunk();
             leader.getFollowerToSnapshot().incrementChunkIndex();
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching(
@@ -344,6 +353,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             //update follower timestamp
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex
             RaftActorBehavior raftBehavior = leader.handleMessage(
                 senderActor, new Replicate(null, "state-id", entry));
@@ -578,7 +590,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
     }
 
     @Test
-    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception {
+    public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{
         new JavaTestKit(getSystem()) {{
 
             TestActorRef<MessageCollectorActor> followerActor =
@@ -632,11 +644,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             assertEquals(3, installSnapshot.getTotalChunks());
 
 
-            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false));
+            leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(),
+                followerActor.path().toString(), -1, false));
+
+            Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
 
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
-            o = MessageCollectorActor.getAllMessages(followerActor).get(1);
+            o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1);
 
             assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
 
@@ -655,7 +671,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             {
 
                 TestActorRef<MessageCollectorActor> followerActor =
-                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower");
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk");
 
                 Map<String, String> peerAddresses = new HashMap<>();
                 peerAddresses.put(followerActor.path().toString(),
@@ -709,10 +725,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
 
                 leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true ));
 
-                leader.handleMessage(leaderActor, new SendHeartBeat());
-
                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 
+                leader.handleMessage(leaderActor, new SendHeartBeat());
+
                 o = MessageCollectorActor.getAllMessages(followerActor).get(1);
 
                 assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot);
@@ -874,6 +890,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -942,6 +961,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
             Leader leader = new Leader(leaderActorContext);
             leader.markFollowerActive(followerActor.path().toString());
 
+            Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis());
+
             leader.handleMessage(leaderActor, new SendHeartBeat());
 
             AppendEntries appendEntries = (AppendEntries) MessageCollectorActor
@@ -1170,6 +1191,85 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+
+    @Test
+    public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception {
+        new JavaTestKit(getSystem()) {{
+
+            ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+            MockRaftActorContext leaderActorContext =
+                new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+            DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
+            configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS));
+            configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS));
+
+            leaderActorContext.setConfigParams(configParams);
+
+            ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class));
+
+            MockRaftActorContext followerActorContext =
+                new MockRaftActorContext("follower-reply", getSystem(), followerActor);
+
+            followerActorContext.setConfigParams(configParams);
+
+            Follower follower = new Follower(followerActorContext);
+
+            ForwardMessageToBehaviorActor.setBehavior(follower);
+
+            Map<String, String> peerAddresses = new HashMap<>();
+            peerAddresses.put("follower-reply",
+                followerActor.path().toString());
+
+            leaderActorContext.setPeerAddresses(peerAddresses);
+
+            leaderActorContext.getReplicatedLog().removeFrom(0);
+
+            //create 3 entries
+            leaderActorContext.setReplicatedLog(
+                new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+            leaderActorContext.setCommitIndex(1);
+
+            Leader leader = new Leader(leaderActorContext);
+            leader.markFollowerActive("follower-reply");
+
+            Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(),
+                TimeUnit.MILLISECONDS);
+
+            leader.handleMessage(leaderActor, new SendHeartBeat());
+
+            AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor
+                .getFirstMatching(followerActor, AppendEntries.class);
+
+            assertNotNull(appendEntries);
+
+            assertEquals(1, appendEntries.getLeaderCommit());
+            assertEquals(1, appendEntries.getEntries().get(0).getIndex());
+            assertEquals(0, appendEntries.getPrevLogIndex());
+
+            AppendEntriesReply appendEntriesReply =
+                (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class);
+
+            assertNotNull(appendEntriesReply);
+
+            leader.handleAppendEntriesReply(followerActor, appendEntriesReply);
+
+            List<Object> entries = ForwardMessageToBehaviorActor
+                .getAllMatching(followerActor, AppendEntries.class);
+
+            assertEquals("AppendEntries count should be 2 ", 2, entries.size());
+
+            AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1);
+
+            assertEquals(1, appendEntriesSecond.getLeaderCommit());
+            assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex());
+            assertEquals(1, appendEntriesSecond.getPrevLogIndex());
+
+        }};
+    }
+
     class MockLeader extends Leader {
 
         FollowerToSnapshot fts;
diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java
new file mode 100644 (file)
index 0000000..da60496
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.node.utils.stream;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown from NormalizedNodeInputStreamReader when the input stream does not contain
+ * valid serialized data.
+ *
+ * @author Thomas Pantelis
+ */
+public class InvalidNormalizedNodeStreamException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidNormalizedNodeStreamException(String message) {
+        super(message);
+    }
+}
index cde338179ba727903e35d4513195cbbddd9b1b1c..bb2f5d41d920d5ce97c723fe6bc91cafc5cd6031 100644 (file)
@@ -69,6 +69,8 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
     private final StringBuilder reusableStringBuilder = new StringBuilder(50);
 
+    private boolean readSignatureMarker = true;
+
     public NormalizedNodeInputStreamReader(InputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         input = new DataInputStream(stream);
@@ -80,6 +82,25 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
     @Override
     public NormalizedNode<?, ?> readNormalizedNode() throws IOException {
+        readSignatureMarkerAndVersionIfNeeded();
+        return readNormalizedNodeInternal();
+    }
+
+    private void readSignatureMarkerAndVersionIfNeeded() throws IOException {
+        if(readSignatureMarker) {
+            readSignatureMarker = false;
+
+            byte marker = input.readByte();
+            if(marker != NormalizedNodeOutputStreamWriter.SIGNATURE_MARKER) {
+                throw new InvalidNormalizedNodeStreamException(String.format(
+                        "Invalid signature marker: %d", marker));
+            }
+
+            input.readShort(); // read the version - not currently used/needed.
+        }
+    }
+
+    private NormalizedNode<?, ?> readNormalizedNodeInternal() throws IOException {
         // each node should start with a byte
         byte nodeType = input.readByte();
 
@@ -284,7 +305,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
                 return bytes;
 
             case ValueTypes.YANG_IDENTIFIER_TYPE :
-                return readYangInstanceIdentifier();
+                return readYangInstanceIdentifierInternal();
 
             default :
                 return null;
@@ -292,6 +313,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
     }
 
     public YangInstanceIdentifier readYangInstanceIdentifier() throws IOException {
+        readSignatureMarkerAndVersionIfNeeded();
+        return readYangInstanceIdentifierInternal();
+    }
+
+    private YangInstanceIdentifier readYangInstanceIdentifierInternal() throws IOException {
         int size = input.readInt();
 
         List<PathArgument> pathArguments = new ArrayList<>(size);
@@ -342,11 +368,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
 
         lastLeafSetQName = nodeType;
 
-        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNode();
+        LeafSetEntryNode<Object> child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
 
         while(child != null) {
             builder.withChild(child);
-            child = (LeafSetEntryNode<Object>)readNormalizedNode();
+            child = (LeafSetEntryNode<Object>)readNormalizedNodeInternal();
         }
         return builder;
     }
@@ -356,11 +382,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
             NormalizedNodeContainerBuilder builder) throws IOException {
         LOG.debug("Reading data container (leaf nodes) nodes");
 
-        NormalizedNode<?, ?> child = readNormalizedNode();
+        NormalizedNode<?, ?> child = readNormalizedNodeInternal();
 
         while(child != null) {
             builder.addChild(child);
-            child = readNormalizedNode();
+            child = readNormalizedNodeInternal();
         }
         return builder;
     }
index 088f4dfbe98a1358a980a84e8ad3adaa99736b57..d4aab036be21df1734f4f88bc590b35030a71d21 100644 (file)
@@ -46,6 +46,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class);
 
+    static final byte SIGNATURE_MARKER = (byte) 0xab;
+    static final short CURRENT_VERSION = (short) 1;
+
     static final byte IS_CODE_VALUE = 1;
     static final byte IS_STRING_VALUE = 2;
     static final byte IS_NULL_VALUE = 3;
@@ -56,6 +59,8 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     private NormalizedNodeWriter normalizedNodeWriter;
 
+    private boolean wroteSignatureMarker;
+
     public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException {
         Preconditions.checkNotNull(stream);
         output = new DataOutputStream(stream);
@@ -74,9 +79,18 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     }
 
     public void writeNormalizedNode(NormalizedNode<?, ?> node) throws IOException {
+        writeSignatureMarkerAndVersionIfNeeded();
         normalizedNodeWriter().write(node);
     }
 
+    private void writeSignatureMarkerAndVersionIfNeeded() throws IOException {
+        if(!wroteSignatureMarker) {
+            output.writeByte(SIGNATURE_MARKER);
+            output.writeShort(CURRENT_VERSION);
+            wroteSignatureMarker = true;
+        }
+    }
+
     @Override
     public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException {
         Preconditions.checkNotNull(name, "Node identifier should not be null");
@@ -201,6 +215,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     private void startNode(final QName qName, byte nodeType) throws IOException {
 
         Preconditions.checkNotNull(qName, "QName of node identifier should not be null.");
+
+        writeSignatureMarkerAndVersionIfNeeded();
+
         // First write the type of node
         output.writeByte(nodeType);
         // Write Start Tag
@@ -247,6 +264,11 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
     }
 
     public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException {
+        writeSignatureMarkerAndVersionIfNeeded();
+        writeYangInstanceIdentifierInternal(identifier);
+    }
+
+    private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException {
         Iterable<YangInstanceIdentifier.PathArgument> pathArguments = identifier.getPathArguments();
         int size = Iterables.size(pathArguments);
         output.writeInt(size);
@@ -363,7 +385,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
                 output.write(bytes);
                 break;
             case ValueTypes.YANG_IDENTIFIER_TYPE:
-                writeYangInstanceIdentifier((YangInstanceIdentifier) value);
+                writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value);
                 break;
             case ValueTypes.NULL_TYPE :
                 break;
index 6528f2e4d2e6524f2f08acb750aae764c86dbc0b..67a342b440666e3febfd6e5b390953fa3d54bebf 100644 (file)
@@ -15,15 +15,16 @@ import java.io.IOException;
 import org.apache.commons.lang.SerializationUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
 import org.opendaylight.controller.cluster.datastore.util.TestModel;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
-import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter;
 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
@@ -33,9 +34,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class NormalizedNodeStreamReaderWriterTest {
 
     @Test
-    public void testNormalizedNodeStreamReaderWriter() throws IOException {
+    public void testNormalizedNodeStreaming() throws IOException {
 
-        testNormalizedNodeStreamReaderWriter(createTestContainer());
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+
+        NormalizedNode<?, ?> testContainer = createTestContainer();
+        writer.writeNormalizedNode(testContainer);
 
         QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster");
         QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor");
@@ -43,9 +48,21 @@ public class NormalizedNodeStreamReaderWriterTest {
                 withNodeIdentifier(new NodeIdentifier(toaster)).
                 withChild(ImmutableNodes.leafNode(darknessFactor, "1000")).build();
 
-        testNormalizedNodeStreamReaderWriter(Builders.containerBuilder().
+        ContainerNode toasterContainer = Builders.containerBuilder().
                 withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)).
-                withChild(toasterNode).build());
+                withChild(toasterNode).build();
+        writer.writeNormalizedNode(toasterContainer);
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+        NormalizedNode<?,?> node = reader.readNormalizedNode();
+        Assert.assertEquals(testContainer, node);
+
+        node = reader.readNormalizedNode();
+        Assert.assertEquals(toasterContainer, node);
+
+        writer.close();
     }
 
     private NormalizedNode<?, ?> createTestContainer() {
@@ -76,24 +93,75 @@ public class NormalizedNodeStreamReaderWriterTest {
                 build();
     }
 
-    private void testNormalizedNodeStreamReaderWriter(NormalizedNode<?, ?> input) throws IOException {
+    @Test
+    public void testYangInstanceIdentifierStreaming() throws IOException  {
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+                node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+                        TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
 
-        byte[] byteData = null;
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer =
+                new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
+        writer.writeYangInstanceIdentifier(path);
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
+
+        YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+        Assert.assertEquals(path, newPath);
+
+        writer.close();
+    }
+
+    @Test
+    public void testNormalizedNodeAndYangInstanceIdentifierStreaming() throws IOException {
 
-        try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-            NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream);
 
-            NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer);
-            normalizedNodeWriter.write(input);
-            byteData = byteArrayOutputStream.toByteArray();
+        NormalizedNode<?, ?> testContainer = TestModel.createBaseTestContainerBuilder().build();
+        writer.writeNormalizedNode(testContainer);
 
-        }
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).
+                node(TestModel.OUTER_LIST_QNAME).nodeWithKey(
+                        TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build();
+
+        writer.writeYangInstanceIdentifier(path);
 
         NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
-                new ByteArrayInputStream(byteData));
+                new ByteArrayInputStream(byteArrayOutputStream.toByteArray()));
 
         NormalizedNode<?,?> node = reader.readNormalizedNode();
-        Assert.assertEquals(input, node);
+        Assert.assertEquals(testContainer, node);
+
+        YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier();
+        Assert.assertEquals(path, newPath);
+
+        writer.close();
+    }
+
+    @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+    public void testInvalidNormalizedNodeStream() throws IOException {
+        byte[] protobufBytes = new NormalizedNodeToNodeCodec(null).encode(
+                TestModel.createBaseTestContainerBuilder().build()).getNormalizedNode().toByteArray();
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(protobufBytes));
+
+        reader.readNormalizedNode();
+    }
+
+    @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000)
+    public void testInvalidYangInstanceIdentifierStream() throws IOException {
+        YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).build();
+
+        byte[] protobufBytes = ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments(
+                InstanceIdentifierUtils.toSerializable(path)).build().toByteArray();
+
+        NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(
+                new ByteArrayInputStream(protobufBytes));
+
+        reader.readYangInstanceIdentifier();
     }
 
     @Test
index 10876045ae272c436e54143c8a0da8bf1c2e41e7..22e2dbd47d4d7148c8e41cfe05ce8532df527466 100644 (file)
@@ -27,6 +27,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -46,15 +55,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
 /**
  * The ShardManager has the following jobs,
@@ -555,7 +555,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     static class SchemaContextModules implements Serializable {
-        private static final long serialVersionUID = 1L;
+        private static final long serialVersionUID = -8884620101025936590L;
+
         private final Set<String> modules;
 
         SchemaContextModules(Set<String> modules){
index 6f8d0567d9aa162d4080361162fe709decd544ce..2e66ef918e6e7541592449e9b51405610f4826a6 100644 (file)
@@ -96,4 +96,9 @@ public class ShardReadTransaction extends ShardTransaction {
     protected DOMStoreTransaction getDOMStoreTransaction() {
         return transaction;
     }
+
+    @Override
+    protected boolean returnCloseTransactionReply() {
+        return false;
+    }
 }
index 678b7815693382179328a8c13adb567d80d61b13..8a37dfee4d8ccc698a5bb9096d4ecdf9ed228771 100644 (file)
@@ -39,12 +39,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
  * </p>
  * <p>
- * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
- * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
- * time there are no known advantages for creating a read-only or write-only transaction which may change over time
- * at which point we can optimize things in the distributed store as well.
- * </p>
- * <p>
  * Handles Messages <br/>
  * ---------------- <br/>
  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
@@ -114,10 +108,14 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         }
     }
 
+    protected boolean returnCloseTransactionReply() {
+        return true;
+    }
+
     private void closeTransaction(boolean sendReply) {
         getDOMStoreTransaction().close();
 
-        if(sendReply) {
+        if(sendReply && returnCloseTransactionReply()) {
             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
         }
 
index 5854932a6fa0d999fe368aa61bfdd252821739c6..bf9f8d803ac00f0e8ce2cbe983f81bfb9e9a44d0 100644 (file)
@@ -17,6 +17,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.InvalidNormalizedNodeStreamException;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
@@ -93,15 +94,19 @@ public final class SerializationUtils {
     }
 
     public static NormalizedNode<?, ?> deserializeNormalizedNode(DataInput in) {
-            try {
-                boolean present = in.readBoolean();
-                if(present) {
-                    NormalizedNodeInputStreamReader streamReader = streamReader(in);
-                    return streamReader.readNormalizedNode();
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
-            }
+        try {
+            return tryDeserializeNormalizedNode(in);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
+        }
+    }
+
+    private static NormalizedNode<?, ?> tryDeserializeNormalizedNode(DataInput in) throws IOException {
+        boolean present = in.readBoolean();
+        if(present) {
+            NormalizedNodeInputStreamReader streamReader = streamReader(in);
+            return streamReader.readNormalizedNode();
+        }
 
         return null;
     }
@@ -109,18 +114,17 @@ public final class SerializationUtils {
     public static NormalizedNode<?, ?> deserializeNormalizedNode(byte [] bytes) {
         NormalizedNode<?, ?> node = null;
         try {
-            node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
-        } catch(Exception e) {
-        }
-
-        if(node == null) {
-            // Must be from legacy protobuf serialization - try that.
+            node = tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes)));
+        } catch(InvalidNormalizedNodeStreamException e) {
+            // Probably from legacy protobuf serialization - try that.
             try {
                 NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes);
                 node =  new NormalizedNodeToNodeCodec(null).decode(serializedNode);
-            } catch (InvalidProtocolBufferException e) {
+            } catch (InvalidProtocolBufferException e2) {
                 throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
             }
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Error deserializing NormalizedNode", e);
         }
 
         return node;
index 69dd706f37cb3bd8c1e7c75d91ec05e2b0fe1f13..851fb0114b3a64c7d211a5c6375c14c41e715e69 100644 (file)
@@ -412,10 +412,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveCloseTransaction() throws Exception {
+    public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
-                    "testCloseTransaction");
+                    "testReadWriteTxOnReceiveCloseTransaction");
 
             watch(transaction);
 
@@ -426,6 +426,35 @@ public class ShardTransactionTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+                    "testWriteTxOnReceiveCloseTransaction");
+
+            watch(transaction);
+
+            transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+            expectTerminated(duration("3 seconds"), transaction);
+        }};
+    }
+
+    @Test
+    public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+                    "testReadOnlyTxOnReceiveCloseTransaction");
+
+            watch(transaction);
+
+            transaction.tell(new CloseTransaction().toSerializable(), getRef());
+
+            expectMsgClass(duration("3 seconds"), Terminated.class);
+        }};
+    }
+
     @Test(expected=UnknownMessageException.class)
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
index 4bb5258b4005f21eabc62c2707f2d3edca59b229..5d780be641e2cce6d2b48a2628cd675f9a24c8ee 100644 (file)
@@ -9,29 +9,28 @@ package org.opendaylight.controller.remote.rpc.messages;
 
 
 import com.google.common.base.Preconditions;
-import org.opendaylight.yangtools.yang.common.QName;
-
 import java.io.Serializable;
+import org.opendaylight.yangtools.yang.common.QName;
 
 public class ExecuteRpc implements Serializable {
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1128904894827335676L;
 
-  private final String inputCompositeNode;
-  private final QName rpc;
+    private final String inputCompositeNode;
+    private final QName rpc;
 
-  public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
-    Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
-    Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
+    public ExecuteRpc(final String inputCompositeNode, final QName rpc) {
+        Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present");
+        Preconditions.checkNotNull(rpc, "rpc Qname should not be null");
 
-    this.inputCompositeNode = inputCompositeNode;
-    this.rpc = rpc;
-  }
+        this.inputCompositeNode = inputCompositeNode;
+        this.rpc = rpc;
+    }
 
-  public String getInputCompositeNode() {
-    return inputCompositeNode;
-  }
+    public String getInputCompositeNode() {
+        return inputCompositeNode;
+    }
 
-  public QName getRpc() {
-    return rpc;
-  }
+    public QName getRpc() {
+        return rpc;
+    }
 }
index 652569b7baf705ea0efa5c4d35f530edf85942f0..9c40dbfc58a556bc28f337c9ee683b947846c045 100644 (file)
@@ -8,37 +8,36 @@
 package org.opendaylight.controller.remote.rpc.messages;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-import java.io.Serializable;
-
 public class InvokeRpc implements Serializable {
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = -2813459607858108953L;
 
-  private final QName rpc;
-  private final YangInstanceIdentifier identifier;
-  private final CompositeNode input;
+    private final QName rpc;
+    private final YangInstanceIdentifier identifier;
+    private final CompositeNode input;
 
-  public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
-    Preconditions.checkNotNull(rpc, "rpc qname should not be null");
-    Preconditions.checkNotNull(input, "rpc input should not be null");
+    public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) {
+        Preconditions.checkNotNull(rpc, "rpc qname should not be null");
+        Preconditions.checkNotNull(input, "rpc input should not be null");
 
-    this.rpc = rpc;
-    this.identifier = identifier;
-    this.input = input;
-  }
+        this.rpc = rpc;
+        this.identifier = identifier;
+        this.input = input;
+    }
 
-  public QName getRpc() {
-    return rpc;
-  }
+    public QName getRpc() {
+        return rpc;
+    }
 
-  public YangInstanceIdentifier getIdentifier() {
-    return identifier;
-  }
+    public YangInstanceIdentifier getIdentifier() {
+        return identifier;
+    }
 
-  public CompositeNode getInput() {
-    return input;
-  }
+    public CompositeNode getInput() {
+        return input;
+    }
 }
index 387cb90112dad6ffbab6fb7cce7bf7c1d92442f1..e6b208cb6fe63d64eca185b9d90d2d19144ed440 100644 (file)
@@ -10,14 +10,15 @@ package org.opendaylight.controller.remote.rpc.messages;
 import java.io.Serializable;
 
 public class RpcResponse implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private final String resultCompositeNode;
+    private static final long serialVersionUID = -4211279498688989245L;
 
-  public RpcResponse(final String resultCompositeNode) {
-    this.resultCompositeNode = resultCompositeNode;
-  }
+    private final String resultCompositeNode;
 
-  public String getResultCompositeNode() {
-    return resultCompositeNode;
-  }
+    public RpcResponse(final String resultCompositeNode) {
+        this.resultCompositeNode = resultCompositeNode;
+    }
+
+    public String getResultCompositeNode() {
+        return resultCompositeNode;
+    }
 }
index 52b1106c873872609e4300abe52d558ee89a6011..f67657f6927801931fae2fd0434481169f2f3de8 100644 (file)
@@ -17,7 +17,7 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 
 public class RoutingTable implements Copier<RoutingTable>, Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 5592610415175278760L;
 
     private final Map<RpcRouter.RouteIdentifier<?, ?, ?>, Long> table = new HashMap<>();
     private ActorRef router;
index b81175e9a253176870ddf92901d8d2debaa4d698..4c4573d909491f0414d6bd00ebac5eb35a2c7b94 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.remote.rpc.registry.gossip;
 import java.io.Serializable;
 
 public class BucketImpl<T extends Copier<T>> implements Bucket<T>, Serializable {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 294779770032719196L;
 
     private Long version = System.currentTimeMillis();
 
index b05bd7d0f6f314c990e4a1d7909115b6c1ad1348..00437e7e5639fc382e8b1434d4b02495ef1ac56e 100644 (file)
@@ -46,7 +46,8 @@ public class Messages {
         }
 
         public static class ContainsBuckets implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -4940160367495308286L;
+
             private final Map<Address, Bucket> buckets;
 
             public ContainsBuckets(Map<Address, Bucket> buckets){
@@ -87,7 +88,8 @@ public class Messages {
         }
 
         public static class ContainsBucketVersions implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -8172148925383801613L;
+
             Map<Address, Long> versions;
 
             public ContainsBucketVersions(Map<Address, Long> versions) {
@@ -119,15 +121,16 @@ public class Messages {
 
     public static class GossiperMessages{
         public static class Tick implements Serializable {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -4770935099506366773L;
         }
 
         public static final class GossipTick extends Tick {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = 5803354404380026143L;
         }
 
         public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = -593037395143883265L;
+
             private final Address from;
 
             public GossipStatus(Address from, Map<Address, Long> versions) {
@@ -141,7 +144,8 @@ public class Messages {
         }
 
         public static final class GossipEnvelope extends ContainsBuckets implements Serializable {
-            private static final long serialVersionUID = 1L;
+            private static final long serialVersionUID = 8346634072582438818L;
+
             private final Address from;
             private final Address to;