From: Tom Pantelis Date: Tue, 10 Feb 2015 04:44:04 +0000 (+0000) Subject: Merge "Add Distributed DataStore as a dependency of the Simulator" X-Git-Tag: release/lithium~605 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=20156b149d56b1b14a06d344366b8e1af1c22fe8;hp=450f54228f74345eaaa981bf45d5212fd2ffbfdd Merge "Add Distributed DataStore as a dependency of the Simulator" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java index 6d0c14e733..73c81afd18 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformation.java @@ -73,4 +73,12 @@ public interface FollowerLogInformation { * This will stop the timeout clock */ void markFollowerInActive(); + + + /** + * This will return the active time of follower, since it was last reset + * @return time in milliseconds + */ + long timeSinceLastActivity(); + } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java index 7a690d3d18..0fed63098d 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/FollowerLogInformationImpl.java @@ -95,4 +95,9 @@ public class FollowerLogInformationImpl implements FollowerLogInformation { stopwatch.stop(); } } + + @Override + public long timeSinceLastActivity() { + return stopwatch.elapsed(TimeUnit.MILLISECONDS); + } } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java index 77bf103701..feccea7edb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/Snapshot.java @@ -12,7 +12,8 @@ import java.util.List; public class Snapshot implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -8298574936724056236L; + private final byte[] state; private final List unAppliedEntries; private final long lastIndex; diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java deleted file mode 100644 index 6335e3272e..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/CommitEntry.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft.base.messages; - -import java.io.Serializable; - -/** - * Message sent to commit an entry to the log - */ -public class CommitEntry implements Serializable { - private static final long serialVersionUID = 1L; -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java deleted file mode 100644 index 68ecc1289b..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/PersistEntry.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft.base.messages; - -import java.io.Serializable; - -/** - * Message sent to Persist an entry into the transaction journal - */ -public class PersistEntry implements Serializable { - private static final long serialVersionUID = 1L; -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java deleted file mode 100644 index 7b7f085856..0000000000 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/base/messages/SaveSnapshot.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.raft.base.messages; - -import java.io.Serializable; - -/** - * This message is sent by a RaftActor to itself so that a subclass can process - * it and use it to save it's state - */ -public class SaveSnapshot implements Serializable { - private static final long serialVersionUID = 1L; -} diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index e28e4b066d..68cf5027df 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -232,6 +232,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { purgeInMemoryLog(); } + //Send the next log entry immediately, if possible, no need to wait for heartbeat to trigger that event + sendUpdatesToFollower(followerId, followerLogInformation, false); return this; } @@ -294,6 +296,9 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // set currentTerm = T, convert to follower (§5.1) // This applies to all RPC messages and responses if (rpc.getTerm() > context.getTermInformation().getCurrentTerm()) { + LOG.debug("{}: Term {} in \"{}\" message is greater than leader's term {}", context.getId(), + rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm()); + context.getTermInformation().updateAndPersist(rpc.getTerm(), null); return switchBehavior(new Follower(context)); @@ -330,12 +335,17 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void handleInstallSnapshotReply(InstallSnapshotReply reply) { String followerId = reply.getFollowerId(); FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + + if (followerToSnapshot == null) { + LOG.error("{}: FollowerId {} in InstallSnapshotReply not known to Leader", + context.getId(), followerId); + return; + } + FollowerLogInformation followerLogInformation = followerToLog.get(followerId); followerLogInformation.markFollowerActive(); - if (followerToSnapshot != null && - followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { - + if (followerToSnapshot.getChunkIndex() == reply.getChunkIndex()) { if (reply.isSuccess()) { if(followerToSnapshot.isLastChunk(reply.getChunkIndex())) { //this was the last chunk reply @@ -373,12 +383,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.markSendStatus(false); } - } else { - LOG.error("{}: FollowerId in InstallSnapshotReply not known to Leader" + - " or Chunk Index in InstallSnapshotReply not matching {} != {}", - context.getId(), followerToSnapshot.getChunkIndex(), reply.getChunkIndex() - ); + LOG.error("{}: Chunk index {} in InstallSnapshotReply from follower {} does not match expected index {}", + context.getId(), reply.getChunkIndex(), followerId, + followerToSnapshot.getChunkIndex()); if(reply.getChunkIndex() == INVALID_CHUNK_INDEX){ // Since the Follower did not find this index to be valid we should reset the follower snapshot @@ -413,75 +421,94 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendAppendEntries() { // Send an AppendEntries to all followers + long heartbeatInterval = context.getConfigParams().getHeartBeatInterval().toMillis(); for (Entry e : followerToLog.entrySet()) { final String followerId = e.getKey(); - ActorSelection followerActor = context.getPeerActorSelection(followerId); - - if (followerActor != null) { - FollowerLogInformation followerLogInformation = followerToLog.get(followerId); - long followerNextIndex = followerLogInformation.getNextIndex(); - boolean isFollowerActive = followerLogInformation.isFollowerActive(); - - if (mapFollowerToSnapshot.get(followerId) != null) { - // if install snapshot is in process , then sent next chunk if possible - if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { - sendSnapshotChunk(followerActor, followerId); - } else { - // we send a heartbeat even if we have not received a reply for the last chunk - sendAppendEntriesToFollower(followerActor, followerNextIndex, - Collections.emptyList()); - } + final FollowerLogInformation followerLogInformation = e.getValue(); + // This checks helps not to send a repeat message to the follower + if(followerLogInformation.timeSinceLastActivity() >= heartbeatInterval) { + sendUpdatesToFollower(followerId, followerLogInformation, true); + } + } + } - } else { - long leaderLastIndex = context.getReplicatedLog().lastIndex(); - long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); - final List entries; - - if (isFollowerActive && - context.getReplicatedLog().isPresent(followerNextIndex)) { - // FIXME : Sending one entry at a time - entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); - - } else if (isFollowerActive && followerNextIndex >= 0 && - leaderLastIndex >= followerNextIndex ) { - // if the followers next index is not present in the leaders log, and - // if the follower is just not starting and if leader's index is more than followers index - // then snapshot should be sent - - if(LOG.isDebugEnabled()) { - LOG.debug(String.format("%s: InitiateInstallSnapshot to follower: %s," + - "follower-nextIndex: %s, leader-snapshot-index: %s, " + - "leader-last-index: %s", context.getId(), followerId, - followerNextIndex, leaderSnapShotIndex, leaderLastIndex)); - } - actor().tell(new InitiateInstallSnapshot(), actor()); - - // we would want to sent AE as the capture snapshot might take time - entries = Collections.emptyList(); + /** + * + * This method checks if any update needs to be sent to the given follower. This includes append log entries, + * sending next snapshot chunk, and initiating a snapshot. + * @return true if any update is sent, false otherwise + */ - } else { - //we send an AppendEntries, even if the follower is inactive - // in-order to update the followers timestamp, in case it becomes active again - entries = Collections.emptyList(); + private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation, + boolean sendHeartbeat) { + + ActorSelection followerActor = context.getPeerActorSelection(followerId); + if (followerActor != null) { + long followerNextIndex = followerLogInformation.getNextIndex(); + boolean isFollowerActive = followerLogInformation.isFollowerActive(); + + if (mapFollowerToSnapshot.get(followerId) != null) { + // if install snapshot is in process , then sent next chunk if possible + if (isFollowerActive && mapFollowerToSnapshot.get(followerId).canSendNextChunk()) { + sendSnapshotChunk(followerActor, followerId); + } else if(sendHeartbeat) { + // we send a heartbeat even if we have not received a reply for the last chunk + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); + } + } else { + long leaderLastIndex = context.getReplicatedLog().lastIndex(); + long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex(); + if (isFollowerActive && + context.getReplicatedLog().isPresent(followerNextIndex)) { + // FIXME : Sending one entry at a time + final List entries = context.getReplicatedLog().getFrom(followerNextIndex, 1); + + sendAppendEntriesToFollower(followerActor, followerNextIndex, entries, followerId); + + } else if (isFollowerActive && followerNextIndex >= 0 && + leaderLastIndex >= followerNextIndex) { + // if the followers next index is not present in the leaders log, and + // if the follower is just not starting and if leader's index is more than followers index + // then snapshot should be sent + + if (LOG.isDebugEnabled()) { + LOG.debug("InitiateInstallSnapshot to follower:{}," + + "follower-nextIndex:{}, leader-snapshot-index:{}, " + + "leader-last-index:{}", followerId, + followerNextIndex, leaderSnapShotIndex, leaderLastIndex + ); } + actor().tell(new InitiateInstallSnapshot(), actor()); - sendAppendEntriesToFollower(followerActor, followerNextIndex, entries); + // Send heartbeat to follower whenever install snapshot is initiated. + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); + } else if(sendHeartbeat) { + //we send an AppendEntries, even if the follower is inactive + // in-order to update the followers timestamp, in case it becomes active again + sendAppendEntriesToFollower(followerActor, followerLogInformation.getNextIndex(), + Collections.emptyList(), followerId); } + } } } private void sendAppendEntriesToFollower(ActorSelection followerActor, long followerNextIndex, - List entries) { - followerActor.tell( - new AppendEntries(currentTerm(), context.getId(), - prevLogIndex(followerNextIndex), - prevLogTerm(followerNextIndex), entries, - context.getCommitIndex(), - replicatedToAllIndex).toSerializable(), - actor() - ); + List entries, String followerId) { + AppendEntries appendEntries = new AppendEntries(currentTerm(), context.getId(), + prevLogIndex(followerNextIndex), + prevLogTerm(followerNextIndex), entries, + context.getCommitIndex(), replicatedToAllIndex); + + if(!entries.isEmpty()) { + LOG.debug("{}: Sending AppendEntries to follower {}: {}", context.getId(), followerId, + appendEntries); + } + + followerActor.tell(appendEntries.toSerializable(), actor()); } /** @@ -501,6 +528,10 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * */ private void installSnapshotIfNeeded() { + if(LOG.isDebugEnabled()) { + LOG.debug("{}: installSnapshotIfNeeded, followers {}", context.getId(), followerToLog.keySet()); + } + for (Entry e : followerToLog.entrySet()) { final ActorSelection followerActor = context.getPeerActorSelection(e.getKey()); @@ -508,7 +539,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { long nextIndex = e.getValue().getNextIndex(); if (!context.getReplicatedLog().isPresent(nextIndex) && - context.getReplicatedLog().isInSnapshot(nextIndex)) { + context.getReplicatedLog().isInSnapshot(nextIndex)) { LOG.info("{}: {} follower needs a snapshot install", context.getId(), e.getKey()); if (snapshot.isPresent()) { // if a snapshot is present in the memory, most likely another install is in progress @@ -573,21 +604,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { + ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId,snapshot.get()); + + // Note: the previous call to getNextSnapshotChunk has the side-effect of adding + // followerId to the followerToSnapshot map. + FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); + followerActor.tell( new InstallSnapshot(currentTerm(), context.getId(), context.getReplicatedLog().getSnapshotIndex(), context.getReplicatedLog().getSnapshotTerm(), - getNextSnapshotChunk(followerId,snapshot.get()), - mapFollowerToSnapshot.get(followerId).incrementChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks(), - Optional.of(mapFollowerToSnapshot.get(followerId).getLastChunkHashCode()) + nextSnapshotChunk, + followerToSnapshot.incrementChunkIndex(), + followerToSnapshot.getTotalChunks(), + Optional.of(followerToSnapshot.getLastChunkHashCode()) ).toSerializable(), actor() ); LOG.info("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", context.getId(), followerActor.path(), - mapFollowerToSnapshot.get(followerId).getChunkIndex(), - mapFollowerToSnapshot.get(followerId).getTotalChunks()); + followerToSnapshot.getChunkIndex(), + followerToSnapshot.getTotalChunks()); } } catch (IOException e) { LOG.error(e, "{}: InstallSnapshot failed for Leader.", context.getId()); diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java index a782eda565..32ed85b281 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/AppendEntriesReply.java @@ -12,7 +12,7 @@ package org.opendaylight.controller.cluster.raft.messages; * Reply for the AppendEntriesRpc message */ public class AppendEntriesReply extends AbstractRaftRPC { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -7487547356392536683L; // true if follower contained entry matching // prevLogIndex and prevLogTerm @@ -38,6 +38,7 @@ public class AppendEntriesReply extends AbstractRaftRPC { this.logLastTerm = logLastTerm; } + @Override public long getTerm() { return term; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java index 71e7ecc189..15621bf894 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/InstallSnapshotReply.java @@ -9,13 +9,13 @@ package org.opendaylight.controller.cluster.raft.messages; public class InstallSnapshotReply extends AbstractRaftRPC { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 642227896390779503L; // The followerId - this will be used to figure out which follower is // responding private final String followerId; private final int chunkIndex; - private boolean success; + private final boolean success; public InstallSnapshotReply(long term, String followerId, int chunkIndex, boolean success) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java index 8321d0c25b..9ba5acb664 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVote.java @@ -12,7 +12,7 @@ package org.opendaylight.controller.cluster.raft.messages; * Invoked by candidates to gather votes (§5.2). */ public class RequestVote extends AbstractRaftRPC { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -6967509186297108657L; // candidate requesting vote private String candidateId; @@ -35,6 +35,7 @@ public class RequestVote extends AbstractRaftRPC { public RequestVote() { } + @Override public long getTerm() { return term; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java index da3ba5c39f..b3c95d6eca 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/messages/RequestVoteReply.java @@ -9,7 +9,7 @@ package org.opendaylight.controller.cluster.raft.messages; public class RequestVoteReply extends AbstractRaftRPC { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 8427899326488775660L; // true means candidate received vot private final boolean voteGranted; @@ -19,6 +19,7 @@ public class RequestVoteReply extends AbstractRaftRPC { this.voteGranted = voteGranted; } + @Override public long getTerm() { return term; } diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java index b31cb621b3..63f94828eb 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java @@ -86,6 +86,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { actorContext.setPeerAddresses(peerAddresses); Leader leader = new Leader(actorContext); + leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); leader.handleMessage(senderActor, new SendHeartBeat()); final String out = @@ -133,6 +136,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { actorContext.setPeerAddresses(peerAddresses); Leader leader = new Leader(actorContext); + leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); RaftActorBehavior raftBehavior = leader .handleMessage(senderActor, new Replicate(null, null, new MockRaftActorContext.MockReplicatedLogEntry(1, @@ -270,6 +276,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.getFollowerToSnapshot().getNextChunk(); leader.getFollowerToSnapshot().incrementChunkIndex(); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); AppendEntries aeproto = (AppendEntries)MessageCollectorActor.getFirstMatching( @@ -344,6 +353,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { //update follower timestamp leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + // this should invoke a sendinstallsnapshot as followersLastIndex < snapshotIndex RaftActorBehavior raftBehavior = leader.handleMessage( senderActor, new Replicate(null, "state-id", entry)); @@ -578,7 +590,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { } @Test - public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception { + public void testHandleInstallSnapshotReplyWithInvalidChunkIndex() throws Exception{ new JavaTestKit(getSystem()) {{ TestActorRef followerActor = @@ -632,11 +644,15 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { assertEquals(3, installSnapshot.getTotalChunks()); - leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), followerActor.path().toString(), -1, false)); + leader.handleMessage(followerActor, new InstallSnapshotReply(actorContext.getTermInformation().getCurrentTerm(), + followerActor.path().toString(), -1, false)); + + Uninterruptibles.sleepUninterruptibly(actorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); leader.handleMessage(leaderActor, new SendHeartBeat()); - o = MessageCollectorActor.getAllMessages(followerActor).get(1); + o = MessageCollectorActor.getAllMatching(followerActor,InstallSnapshotMessages.InstallSnapshot.class).get(1); assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); @@ -655,7 +671,7 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { { TestActorRef followerActor = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower"); + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class), "follower-chunk"); Map peerAddresses = new HashMap<>(); peerAddresses.put(followerActor.path().toString(), @@ -709,10 +725,10 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { leader.handleMessage(followerActor, new InstallSnapshotReply(installSnapshot.getTerm(),followerActor.path().toString(),1,true )); - leader.handleMessage(leaderActor, new SendHeartBeat()); - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); + o = MessageCollectorActor.getAllMessages(followerActor).get(1); assertTrue(o instanceof InstallSnapshotMessages.InstallSnapshot); @@ -874,6 +890,9 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { Leader leader = new Leader(leaderActorContext); leader.markFollowerActive(followerActor.path().toString()); + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + leader.handleMessage(leaderActor, new SendHeartBeat()); AppendEntries appendEntries = (AppendEntries) MessageCollectorActor @@ -942,6 +961,8 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { Leader leader = new Leader(leaderActorContext); leader.markFollowerActive(followerActor.path().toString()); + Thread.sleep(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis()); + leader.handleMessage(leaderActor, new SendHeartBeat()); AppendEntries appendEntries = (AppendEntries) MessageCollectorActor @@ -1170,6 +1191,85 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest { }}; } + + @Test + public void testAppendEntryCallAtEndofAppendEntryReply() throws Exception { + new JavaTestKit(getSystem()) {{ + + ActorRef leaderActor = getSystem().actorOf(Props.create(MessageCollectorActor.class)); + + MockRaftActorContext leaderActorContext = + new MockRaftActorContext("leader", getSystem(), leaderActor); + + DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl(); + configParams.setHeartBeatInterval(new FiniteDuration(9, TimeUnit.SECONDS)); + configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(10, TimeUnit.SECONDS)); + + leaderActorContext.setConfigParams(configParams); + + ActorRef followerActor = getSystem().actorOf(Props.create(ForwardMessageToBehaviorActor.class)); + + MockRaftActorContext followerActorContext = + new MockRaftActorContext("follower-reply", getSystem(), followerActor); + + followerActorContext.setConfigParams(configParams); + + Follower follower = new Follower(followerActorContext); + + ForwardMessageToBehaviorActor.setBehavior(follower); + + Map peerAddresses = new HashMap<>(); + peerAddresses.put("follower-reply", + followerActor.path().toString()); + + leaderActorContext.setPeerAddresses(peerAddresses); + + leaderActorContext.getReplicatedLog().removeFrom(0); + + //create 3 entries + leaderActorContext.setReplicatedLog( + new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build()); + + leaderActorContext.setCommitIndex(1); + + Leader leader = new Leader(leaderActorContext); + leader.markFollowerActive("follower-reply"); + + Uninterruptibles.sleepUninterruptibly(leaderActorContext.getConfigParams().getHeartBeatInterval().toMillis(), + TimeUnit.MILLISECONDS); + + leader.handleMessage(leaderActor, new SendHeartBeat()); + + AppendEntries appendEntries = (AppendEntries) ForwardMessageToBehaviorActor + .getFirstMatching(followerActor, AppendEntries.class); + + assertNotNull(appendEntries); + + assertEquals(1, appendEntries.getLeaderCommit()); + assertEquals(1, appendEntries.getEntries().get(0).getIndex()); + assertEquals(0, appendEntries.getPrevLogIndex()); + + AppendEntriesReply appendEntriesReply = + (AppendEntriesReply)ForwardMessageToBehaviorActor.getFirstMatching(leaderActor, AppendEntriesReply.class); + + assertNotNull(appendEntriesReply); + + leader.handleAppendEntriesReply(followerActor, appendEntriesReply); + + List entries = ForwardMessageToBehaviorActor + .getAllMatching(followerActor, AppendEntries.class); + + assertEquals("AppendEntries count should be 2 ", 2, entries.size()); + + AppendEntries appendEntriesSecond = (AppendEntries) entries.get(1); + + assertEquals(1, appendEntriesSecond.getLeaderCommit()); + assertEquals(2, appendEntriesSecond.getEntries().get(0).getIndex()); + assertEquals(1, appendEntriesSecond.getPrevLogIndex()); + + }}; + } + class MockLeader extends Leader { FollowerToSnapshot fts; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java new file mode 100644 index 0000000000..da60496a22 --- /dev/null +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/InvalidNormalizedNodeStreamException.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.node.utils.stream; + +import java.io.IOException; + +/** + * Exception thrown from NormalizedNodeInputStreamReader when the input stream does not contain + * valid serialized data. + * + * @author Thomas Pantelis + */ +public class InvalidNormalizedNodeStreamException extends IOException { + private static final long serialVersionUID = 1L; + + public InvalidNormalizedNodeStreamException(String message) { + super(message); + } +} diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java index cde338179b..bb2f5d41d9 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java @@ -69,6 +69,8 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead private final StringBuilder reusableStringBuilder = new StringBuilder(50); + private boolean readSignatureMarker = true; + public NormalizedNodeInputStreamReader(InputStream stream) throws IOException { Preconditions.checkNotNull(stream); input = new DataInputStream(stream); @@ -80,6 +82,25 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead @Override public NormalizedNode readNormalizedNode() throws IOException { + readSignatureMarkerAndVersionIfNeeded(); + return readNormalizedNodeInternal(); + } + + private void readSignatureMarkerAndVersionIfNeeded() throws IOException { + if(readSignatureMarker) { + readSignatureMarker = false; + + byte marker = input.readByte(); + if(marker != NormalizedNodeOutputStreamWriter.SIGNATURE_MARKER) { + throw new InvalidNormalizedNodeStreamException(String.format( + "Invalid signature marker: %d", marker)); + } + + input.readShort(); // read the version - not currently used/needed. + } + } + + private NormalizedNode readNormalizedNodeInternal() throws IOException { // each node should start with a byte byte nodeType = input.readByte(); @@ -284,7 +305,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead return bytes; case ValueTypes.YANG_IDENTIFIER_TYPE : - return readYangInstanceIdentifier(); + return readYangInstanceIdentifierInternal(); default : return null; @@ -292,6 +313,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead } public YangInstanceIdentifier readYangInstanceIdentifier() throws IOException { + readSignatureMarkerAndVersionIfNeeded(); + return readYangInstanceIdentifierInternal(); + } + + private YangInstanceIdentifier readYangInstanceIdentifierInternal() throws IOException { int size = input.readInt(); List pathArguments = new ArrayList<>(size); @@ -342,11 +368,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead lastLeafSetQName = nodeType; - LeafSetEntryNode child = (LeafSetEntryNode)readNormalizedNode(); + LeafSetEntryNode child = (LeafSetEntryNode)readNormalizedNodeInternal(); while(child != null) { builder.withChild(child); - child = (LeafSetEntryNode)readNormalizedNode(); + child = (LeafSetEntryNode)readNormalizedNodeInternal(); } return builder; } @@ -356,11 +382,11 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead NormalizedNodeContainerBuilder builder) throws IOException { LOG.debug("Reading data container (leaf nodes) nodes"); - NormalizedNode child = readNormalizedNode(); + NormalizedNode child = readNormalizedNodeInternal(); while(child != null) { builder.addChild(child); - child = readNormalizedNode(); + child = readNormalizedNodeInternal(); } return builder; } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java index 088f4dfbe9..d4aab036be 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java @@ -46,6 +46,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri private static final Logger LOG = LoggerFactory.getLogger(NormalizedNodeOutputStreamWriter.class); + static final byte SIGNATURE_MARKER = (byte) 0xab; + static final short CURRENT_VERSION = (short) 1; + static final byte IS_CODE_VALUE = 1; static final byte IS_STRING_VALUE = 2; static final byte IS_NULL_VALUE = 3; @@ -56,6 +59,8 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri private NormalizedNodeWriter normalizedNodeWriter; + private boolean wroteSignatureMarker; + public NormalizedNodeOutputStreamWriter(OutputStream stream) throws IOException { Preconditions.checkNotNull(stream); output = new DataOutputStream(stream); @@ -74,9 +79,18 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri } public void writeNormalizedNode(NormalizedNode node) throws IOException { + writeSignatureMarkerAndVersionIfNeeded(); normalizedNodeWriter().write(node); } + private void writeSignatureMarkerAndVersionIfNeeded() throws IOException { + if(!wroteSignatureMarker) { + output.writeByte(SIGNATURE_MARKER); + output.writeShort(CURRENT_VERSION); + wroteSignatureMarker = true; + } + } + @Override public void leafNode(YangInstanceIdentifier.NodeIdentifier name, Object value) throws IOException, IllegalArgumentException { Preconditions.checkNotNull(name, "Node identifier should not be null"); @@ -201,6 +215,9 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri private void startNode(final QName qName, byte nodeType) throws IOException { Preconditions.checkNotNull(qName, "QName of node identifier should not be null."); + + writeSignatureMarkerAndVersionIfNeeded(); + // First write the type of node output.writeByte(nodeType); // Write Start Tag @@ -247,6 +264,11 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri } public void writeYangInstanceIdentifier(YangInstanceIdentifier identifier) throws IOException { + writeSignatureMarkerAndVersionIfNeeded(); + writeYangInstanceIdentifierInternal(identifier); + } + + private void writeYangInstanceIdentifierInternal(YangInstanceIdentifier identifier) throws IOException { Iterable pathArguments = identifier.getPathArguments(); int size = Iterables.size(pathArguments); output.writeInt(size); @@ -363,7 +385,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri output.write(bytes); break; case ValueTypes.YANG_IDENTIFIER_TYPE: - writeYangInstanceIdentifier((YangInstanceIdentifier) value); + writeYangInstanceIdentifierInternal((YangInstanceIdentifier) value); break; case ValueTypes.NULL_TYPE : break; diff --git a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java index 6528f2e4d2..67a342b440 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/test/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeStreamReaderWriterTest.java @@ -15,15 +15,16 @@ import java.io.IOException; import org.apache.commons.lang.SerializationUtils; import org.junit.Assert; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; import org.opendaylight.controller.cluster.datastore.util.TestModel; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter; -import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeWriter; import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder; @@ -33,9 +34,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; public class NormalizedNodeStreamReaderWriterTest { @Test - public void testNormalizedNodeStreamReaderWriter() throws IOException { + public void testNormalizedNodeStreaming() throws IOException { - testNormalizedNodeStreamReaderWriter(createTestContainer()); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream); + + NormalizedNode testContainer = createTestContainer(); + writer.writeNormalizedNode(testContainer); QName toaster = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","toaster"); QName darknessFactor = QName.create("http://netconfcentral.org/ns/toaster","2009-11-20","darknessFactor"); @@ -43,9 +48,21 @@ public class NormalizedNodeStreamReaderWriterTest { withNodeIdentifier(new NodeIdentifier(toaster)). withChild(ImmutableNodes.leafNode(darknessFactor, "1000")).build(); - testNormalizedNodeStreamReaderWriter(Builders.containerBuilder(). + ContainerNode toasterContainer = Builders.containerBuilder(). withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)). - withChild(toasterNode).build()); + withChild(toasterNode).build(); + writer.writeNormalizedNode(toasterContainer); + + NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + + NormalizedNode node = reader.readNormalizedNode(); + Assert.assertEquals(testContainer, node); + + node = reader.readNormalizedNode(); + Assert.assertEquals(toasterContainer, node); + + writer.close(); } private NormalizedNode createTestContainer() { @@ -76,24 +93,75 @@ public class NormalizedNodeStreamReaderWriterTest { build(); } - private void testNormalizedNodeStreamReaderWriter(NormalizedNode input) throws IOException { + @Test + public void testYangInstanceIdentifierStreaming() throws IOException { + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH). + node(TestModel.OUTER_LIST_QNAME).nodeWithKey( + TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build(); - byte[] byteData = null; + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + NormalizedNodeOutputStreamWriter writer = + new NormalizedNodeOutputStreamWriter(byteArrayOutputStream); + writer.writeYangInstanceIdentifier(path); + + NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( + new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + + YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier(); + Assert.assertEquals(path, newPath); + + writer.close(); + } + + @Test + public void testNormalizedNodeAndYangInstanceIdentifierStreaming() throws IOException { - try(ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - NormalizedNodeStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream)) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(byteArrayOutputStream); - NormalizedNodeWriter normalizedNodeWriter = NormalizedNodeWriter.forStreamWriter(writer); - normalizedNodeWriter.write(input); - byteData = byteArrayOutputStream.toByteArray(); + NormalizedNode testContainer = TestModel.createBaseTestContainerBuilder().build(); + writer.writeNormalizedNode(testContainer); - } + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH). + node(TestModel.OUTER_LIST_QNAME).nodeWithKey( + TestModel.INNER_LIST_QNAME, TestModel.ID_QNAME, 10).build(); + + writer.writeYangInstanceIdentifier(path); NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( - new ByteArrayInputStream(byteData)); + new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); NormalizedNode node = reader.readNormalizedNode(); - Assert.assertEquals(input, node); + Assert.assertEquals(testContainer, node); + + YangInstanceIdentifier newPath = reader.readYangInstanceIdentifier(); + Assert.assertEquals(path, newPath); + + writer.close(); + } + + @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000) + public void testInvalidNormalizedNodeStream() throws IOException { + byte[] protobufBytes = new NormalizedNodeToNodeCodec(null).encode( + TestModel.createBaseTestContainerBuilder().build()).getNormalizedNode().toByteArray(); + + NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( + new ByteArrayInputStream(protobufBytes)); + + reader.readNormalizedNode(); + } + + @Test(expected=InvalidNormalizedNodeStreamException.class, timeout=10000) + public void testInvalidYangInstanceIdentifierStream() throws IOException { + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.TEST_PATH).build(); + + byte[] protobufBytes = ShardTransactionMessages.DeleteData.newBuilder().setInstanceIdentifierPathArguments( + InstanceIdentifierUtils.toSerializable(path)).build().toByteArray(); + + NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader( + new ByteArrayInputStream(protobufBytes)); + + reader.readYangInstanceIdentifier(); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 10876045ae..22e2dbd47d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -27,6 +27,15 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; @@ -46,15 +55,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; /** * The ShardManager has the following jobs, @@ -555,7 +555,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } static class SchemaContextModules implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -8884620101025936590L; + private final Set modules; SchemaContextModules(Set modules){ diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index 6f8d0567d9..2e66ef918e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -96,4 +96,9 @@ public class ShardReadTransaction extends ShardTransaction { protected DOMStoreTransaction getDOMStoreTransaction() { return transaction; } + + @Override + protected boolean returnCloseTransactionReply() { + return false; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 678b781569..8a37dfee4d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -39,12 +39,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction *

*

- * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions - * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this - * time there are no known advantages for creating a read-only or write-only transaction which may change over time - * at which point we can optimize things in the distributed store as well. - *

- *

* Handles Messages
* ----------------
*

  • {@link org.opendaylight.controller.cluster.datastore.messages.ReadData} @@ -114,10 +108,14 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering } } + protected boolean returnCloseTransactionReply() { + return true; + } + private void closeTransaction(boolean sendReply) { getDOMStoreTransaction().close(); - if(sendReply) { + if(sendReply && returnCloseTransactionReply()) { getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java index 5854932a6f..bf9f8d803a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/SerializationUtils.java @@ -17,6 +17,7 @@ import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.InvalidNormalizedNodeStreamException; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; @@ -93,15 +94,19 @@ public final class SerializationUtils { } public static NormalizedNode deserializeNormalizedNode(DataInput in) { - try { - boolean present = in.readBoolean(); - if(present) { - NormalizedNodeInputStreamReader streamReader = streamReader(in); - return streamReader.readNormalizedNode(); - } - } catch (IOException e) { - throw new IllegalArgumentException("Error deserializing NormalizedNode", e); - } + try { + return tryDeserializeNormalizedNode(in); + } catch (IOException e) { + throw new IllegalArgumentException("Error deserializing NormalizedNode", e); + } + } + + private static NormalizedNode tryDeserializeNormalizedNode(DataInput in) throws IOException { + boolean present = in.readBoolean(); + if(present) { + NormalizedNodeInputStreamReader streamReader = streamReader(in); + return streamReader.readNormalizedNode(); + } return null; } @@ -109,18 +114,17 @@ public final class SerializationUtils { public static NormalizedNode deserializeNormalizedNode(byte [] bytes) { NormalizedNode node = null; try { - node = deserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes))); - } catch(Exception e) { - } - - if(node == null) { - // Must be from legacy protobuf serialization - try that. + node = tryDeserializeNormalizedNode(new DataInputStream(new ByteArrayInputStream(bytes))); + } catch(InvalidNormalizedNodeStreamException e) { + // Probably from legacy protobuf serialization - try that. try { NormalizedNodeMessages.Node serializedNode = NormalizedNodeMessages.Node.parseFrom(bytes); node = new NormalizedNodeToNodeCodec(null).decode(serializedNode); - } catch (InvalidProtocolBufferException e) { + } catch (InvalidProtocolBufferException e2) { throw new IllegalArgumentException("Error deserializing NormalizedNode", e); } + } catch (IOException e) { + throw new IllegalArgumentException("Error deserializing NormalizedNode", e); } return node; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 69dd706f37..851fb0114b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -412,10 +412,10 @@ public class ShardTransactionTest extends AbstractActorTest { } @Test - public void testOnReceiveCloseTransaction() throws Exception { + public void testReadWriteTxOnReceiveCloseTransaction() throws Exception { new JavaTestKit(getSystem()) {{ final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(), - "testCloseTransaction"); + "testReadWriteTxOnReceiveCloseTransaction"); watch(transaction); @@ -426,6 +426,35 @@ public class ShardTransactionTest extends AbstractActorTest { }}; } + @Test + public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(), + "testWriteTxOnReceiveCloseTransaction"); + + watch(transaction); + + transaction.tell(new CloseTransaction().toSerializable(), getRef()); + + expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS); + expectTerminated(duration("3 seconds"), transaction); + }}; + } + + @Test + public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception { + new JavaTestKit(getSystem()) {{ + final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(), + "testReadOnlyTxOnReceiveCloseTransaction"); + + watch(transaction); + + transaction.tell(new CloseTransaction().toSerializable(), getRef()); + + expectMsgClass(duration("3 seconds"), Terminated.class); + }}; + } + @Test(expected=UnknownMessageException.class) public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { final ActorRef shard = createShard(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java index 4bb5258b40..5d780be641 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/ExecuteRpc.java @@ -9,29 +9,28 @@ package org.opendaylight.controller.remote.rpc.messages; import com.google.common.base.Preconditions; -import org.opendaylight.yangtools.yang.common.QName; - import java.io.Serializable; +import org.opendaylight.yangtools.yang.common.QName; public class ExecuteRpc implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1128904894827335676L; - private final String inputCompositeNode; - private final QName rpc; + private final String inputCompositeNode; + private final QName rpc; - public ExecuteRpc(final String inputCompositeNode, final QName rpc) { - Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present"); - Preconditions.checkNotNull(rpc, "rpc Qname should not be null"); + public ExecuteRpc(final String inputCompositeNode, final QName rpc) { + Preconditions.checkNotNull(inputCompositeNode, "Composite Node input string should be present"); + Preconditions.checkNotNull(rpc, "rpc Qname should not be null"); - this.inputCompositeNode = inputCompositeNode; - this.rpc = rpc; - } + this.inputCompositeNode = inputCompositeNode; + this.rpc = rpc; + } - public String getInputCompositeNode() { - return inputCompositeNode; - } + public String getInputCompositeNode() { + return inputCompositeNode; + } - public QName getRpc() { - return rpc; - } + public QName getRpc() { + return rpc; + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java index 652569b7ba..9c40dbfc58 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/InvokeRpc.java @@ -8,37 +8,36 @@ package org.opendaylight.controller.remote.rpc.messages; import com.google.common.base.Preconditions; +import java.io.Serializable; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import java.io.Serializable; - public class InvokeRpc implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -2813459607858108953L; - private final QName rpc; - private final YangInstanceIdentifier identifier; - private final CompositeNode input; + private final QName rpc; + private final YangInstanceIdentifier identifier; + private final CompositeNode input; - public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) { - Preconditions.checkNotNull(rpc, "rpc qname should not be null"); - Preconditions.checkNotNull(input, "rpc input should not be null"); + public InvokeRpc(final QName rpc, final YangInstanceIdentifier identifier, final CompositeNode input) { + Preconditions.checkNotNull(rpc, "rpc qname should not be null"); + Preconditions.checkNotNull(input, "rpc input should not be null"); - this.rpc = rpc; - this.identifier = identifier; - this.input = input; - } + this.rpc = rpc; + this.identifier = identifier; + this.input = input; + } - public QName getRpc() { - return rpc; - } + public QName getRpc() { + return rpc; + } - public YangInstanceIdentifier getIdentifier() { - return identifier; - } + public YangInstanceIdentifier getIdentifier() { + return identifier; + } - public CompositeNode getInput() { - return input; - } + public CompositeNode getInput() { + return input; + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java index 387cb90112..e6b208cb6f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/messages/RpcResponse.java @@ -10,14 +10,15 @@ package org.opendaylight.controller.remote.rpc.messages; import java.io.Serializable; public class RpcResponse implements Serializable { - private static final long serialVersionUID = 1L; - private final String resultCompositeNode; + private static final long serialVersionUID = -4211279498688989245L; - public RpcResponse(final String resultCompositeNode) { - this.resultCompositeNode = resultCompositeNode; - } + private final String resultCompositeNode; - public String getResultCompositeNode() { - return resultCompositeNode; - } + public RpcResponse(final String resultCompositeNode) { + this.resultCompositeNode = resultCompositeNode; + } + + public String getResultCompositeNode() { + return resultCompositeNode; + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java index 52b1106c87..f67657f692 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/RoutingTable.java @@ -17,7 +17,7 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Copier; import org.opendaylight.controller.sal.connector.api.RpcRouter; public class RoutingTable implements Copier, Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 5592610415175278760L; private final Map, Long> table = new HashMap<>(); private ActorRef router; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java index b81175e9a2..4c4573d909 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketImpl.java @@ -10,7 +10,7 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import java.io.Serializable; public class BucketImpl> implements Bucket, Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 294779770032719196L; private Long version = System.currentTimeMillis(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java index b05bd7d0f6..00437e7e56 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Messages.java @@ -46,7 +46,8 @@ public class Messages { } public static class ContainsBuckets implements Serializable{ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -4940160367495308286L; + private final Map buckets; public ContainsBuckets(Map buckets){ @@ -87,7 +88,8 @@ public class Messages { } public static class ContainsBucketVersions implements Serializable{ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -8172148925383801613L; + Map versions; public ContainsBucketVersions(Map versions) { @@ -119,15 +121,16 @@ public class Messages { public static class GossiperMessages{ public static class Tick implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -4770935099506366773L; } public static final class GossipTick extends Tick { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 5803354404380026143L; } public static final class GossipStatus extends ContainsBucketVersions implements Serializable{ - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = -593037395143883265L; + private final Address from; public GossipStatus(Address from, Map versions) { @@ -141,7 +144,8 @@ public class Messages { } public static final class GossipEnvelope extends ContainsBuckets implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 8346634072582438818L; + private final Address from; private final Address to;