X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=2fe3bfdc8764aca78f7cdf942f3ba5dbba96e589;hp=31ee9d2a7a61d3e2fa10fc6eb65544dbf7907b86;hb=a8000ee3b6071fa3b83500a39fc60ab3a9c5f085;hpb=5a8f765f5a6a24019d3ff6121d40a6594ba08f19 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 31ee9d2a7a..2fe3bfdc87 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -16,6 +16,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -24,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.raft.ClientRequestTracker; import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl; @@ -83,26 +85,27 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private final Map followerToLog = new HashMap<>(); private final Map mapFollowerToSnapshot = new HashMap<>(); - private Cancellable heartbeatSchedule = null; - - private final Collection trackerList = new LinkedList<>(); - - private int minReplicationCount; + /** + * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really + * expect the entries to be modified in sequence, hence we open-code the lookup. + * + * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly, + * but we already expect those to be far from frequent. + */ + private final Queue trackers = new LinkedList<>(); + private Cancellable heartbeatSchedule = null; private Optional snapshot; + private int minReplicationCount; - public AbstractLeader(RaftActorContext context) { - super(context, RaftState.Leader); - - setLeaderPayloadVersion(context.getPayloadVersion()); + protected AbstractLeader(RaftActorContext context, RaftState state) { + super(context, state); for(PeerInfo peerInfo: context.getPeers()) { FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(peerInfo, -1, context); followerToLog.put(peerInfo.getId(), followerLogInformation); } - leaderId = context.getId(); - LOG.debug("{}: Election: Leader has following peers: {}", logName(), getFollowerIds()); updateMinReplicaCount(); @@ -140,6 +143,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { public void removeFollower(String followerId) { followerToLog.remove(followerId); + mapFollowerToSnapshot.remove(followerId); } public void updateMinReplicaCount() { @@ -210,6 +214,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInformation.markFollowerActive(); followerLogInformation.setPayloadVersion(appendEntriesReply.getPayloadVersion()); + followerLogInformation.setRaftVersion(appendEntriesReply.getRaftVersion()); boolean updated = false; if (appendEntriesReply.isSuccess()) { @@ -254,7 +259,8 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { int replicatedCount = 1; for (FollowerLogInformation info : followerToLog.values()) { - if ((info.getMatchIndex() >= N) && (context.getPeerInfo(followerId).isVoting())) { + final PeerInfo peerInfo = context.getPeerInfo(info.getId()); + if(info.getMatchIndex() >= N && (peerInfo != null && peerInfo.isVoting())) { replicatedCount++; } } @@ -325,7 +331,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { @Override protected ClientRequestTracker removeClientRequestTracker(long logIndex) { - final Iterator it = trackerList.iterator(); + final Iterator it = trackers.iterator(); while (it.hasNext()) { final ClientRequestTracker t = it.next(); if (t.getIndex() == logIndex) { @@ -337,16 +343,6 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return null; } - @Override - protected ClientRequestTracker findClientRequestTracker(long logIndex) { - for (ClientRequestTracker tracker : trackerList) { - if (tracker.getIndex() == logIndex) { - return tracker; - } - } - return null; - } - @Override protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) { @@ -380,23 +376,19 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { beforeSendHeartbeat(); sendHeartBeat(); scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval()); - return this; - } else if(message instanceof SendInstallSnapshot) { // received from RaftActor setSnapshot(((SendInstallSnapshot) message).getSnapshot()); sendInstallSnapshot(); - } else if (message instanceof Replicate) { replicate((Replicate) message); - - } else if (message instanceof InstallSnapshotReply){ + } else if (message instanceof InstallSnapshotReply) { handleInstallSnapshotReply((InstallSnapshotReply) message); - + } else { + return super.handleMessage(sender, message); } - - return super.handleMessage(sender, message); + return this; } private void handleInstallSnapshotReply(InstallSnapshotReply reply) { @@ -497,7 +489,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // Create a tracker entry we will use this later to notify the // client actor - trackerList.add( + trackers.add( new ClientRequestTrackerImpl(replicate.getClientActor(), replicate.getIdentifier(), logIndex) @@ -622,7 +614,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { appendEntries); } - followerActor.tell(appendEntries.toSerializable(), actor()); + followerActor.tell(appendEntries, actor()); } /** @@ -687,7 +679,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { private void sendSnapshotChunk(ActorSelection followerActor, String followerId) { try { if (snapshot.isPresent()) { - ByteString nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); + byte[] nextSnapshotChunk = getNextSnapshotChunk(followerId, snapshot.get().getSnapshotBytes()); // Note: the previous call to getNextSnapshotChunk has the side-effect of adding // followerId to the followerToSnapshot map. @@ -701,7 +693,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerToSnapshot.incrementChunkIndex(), followerToSnapshot.getTotalChunks(), Optional.of(followerToSnapshot.getLastChunkHashCode()) - ).toSerializable(), + ).toSerializable(followerToLog.get(followerId).getRaftVersion()), actor() ); @@ -720,15 +712,15 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { * Acccepts snaphot as ByteString, enters into map for future chunks * creates and return a ByteString chunk */ - private ByteString getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { + private byte[] getNextSnapshotChunk(String followerId, ByteString snapshotBytes) throws IOException { FollowerToSnapshot followerToSnapshot = mapFollowerToSnapshot.get(followerId); if (followerToSnapshot == null) { followerToSnapshot = new FollowerToSnapshot(snapshotBytes); mapFollowerToSnapshot.put(followerId, followerToSnapshot); } - ByteString nextChunk = followerToSnapshot.getNextChunk(); + byte[] nextChunk = followerToSnapshot.getNextChunk(); - LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.size()); + LOG.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerId, nextChunk.length); return nextChunk; } @@ -761,27 +753,33 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { // need to be sent if there are other messages being sent to the remote // actor. heartbeatSchedule = context.getActorSystem().scheduler().scheduleOnce( - interval, context.getActor(), new SendHeartBeat(), + interval, context.getActor(), SendHeartBeat.INSTANCE, context.getActorSystem().dispatcher(), context.getActor()); } @Override - public void close() throws Exception { + public void close() { stopHeartBeat(); } @Override - public String getLeaderId() { + public final String getLeaderId() { return context.getId(); } + @Override + public final short getLeaderPayloadVersion() { + return context.getPayloadVersion(); + } + protected boolean isLeaderIsolated() { int minPresent = getMinIsolatedLeaderPeerCount(); for (FollowerLogInformation followerLogInformation : followerToLog.values()) { - if (followerLogInformation.isFollowerActive()) { + final PeerInfo peerInfo = context.getPeerInfo(followerLogInformation.getId()); + if(peerInfo != null && peerInfo.isVoting() && followerLogInformation.isFollowerActive()) { --minPresent; if (minPresent == 0) { - break; + return false; } } } @@ -867,25 +865,23 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { } } - public ByteString getNextChunk() { + public byte[] getNextChunk() { int snapshotLength = getSnapshotBytes().size(); int start = incrementOffset(); int size = context.getConfigParams().getSnapshotChunkSize(); if (context.getConfigParams().getSnapshotChunkSize() > snapshotLength) { size = snapshotLength; - } else { - if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { - size = snapshotLength - start; - } + } else if ((start + context.getConfigParams().getSnapshotChunkSize()) > snapshotLength) { + size = snapshotLength - start; } + byte[] nextChunk = new byte[size]; + getSnapshotBytes().copyTo(nextChunk, start, 0, size); + nextChunkHashCode = Arrays.hashCode(nextChunk); - LOG.debug("{}: Next chunk: length={}, offset={},size={}", logName(), - snapshotLength, start, size); - - ByteString substring = getSnapshotBytes().substring(start, start + size); - nextChunkHashCode = substring.hashCode(); - return substring; + LOG.debug("{}: Next chunk: total length={}, offset={}, size={}, hashCode={}", logName(), + snapshotLength, start, size, nextChunkHashCode); + return nextChunk; } /**