X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2Fbehaviors%2FAbstractLeader.java;h=12cbcc0df66aa635315436ab04a4cd5e00bfa5cd;hp=548b920fe771f183b904e2d5e2e65d7001cb746e;hb=5fd4213b5bfaf2db21f1b37139f6b98535a872c0;hpb=2faf656bf68dd3843fd59520b27a7ec2abbdcc68 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java index 548b920fe7..12cbcc0df6 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/AbstractLeader.java @@ -14,8 +14,8 @@ import akka.actor.Cancellable; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.io.ByteSource; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -789,35 +789,39 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { followerLogInfo.setLeaderInstallSnapshotState(installSnapshotState); } - // Ensure the snapshot bytes are set - this is a no-op. - installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); - - byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); + try { + // Ensure the snapshot bytes are set - this is a no-op. + installSnapshotState.setSnapshotBytes(snapshotHolder.get().getSnapshotBytes()); - log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), - nextSnapshotChunk.length); + byte[] nextSnapshotChunk = installSnapshotState.getNextChunk(); - int nextChunkIndex = installSnapshotState.incrementChunkIndex(); - Optional serverConfig = Optional.absent(); - if (installSnapshotState.isLastChunk(nextChunkIndex)) { - serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); - } + log.debug("{}: next snapshot chunk size for follower {}: {}", logName(), followerLogInfo.getId(), + nextSnapshotChunk.length); - followerActor.tell( - new InstallSnapshot(currentTerm(), context.getId(), - snapshotHolder.get().getLastIncludedIndex(), - snapshotHolder.get().getLastIncludedTerm(), - nextSnapshotChunk, - nextChunkIndex, - installSnapshotState.getTotalChunks(), - Optional.of(installSnapshotState.getLastChunkHashCode()), - serverConfig - ).toSerializable(followerLogInfo.getRaftVersion()), - actor() - ); + int nextChunkIndex = installSnapshotState.incrementChunkIndex(); + Optional serverConfig = Optional.absent(); + if (installSnapshotState.isLastChunk(nextChunkIndex)) { + serverConfig = Optional.fromNullable(context.getPeerServerInfo(true)); + } - log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), - installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); + followerActor.tell( + new InstallSnapshot(currentTerm(), context.getId(), + snapshotHolder.get().getLastIncludedIndex(), + snapshotHolder.get().getLastIncludedTerm(), + nextSnapshotChunk, + nextChunkIndex, + installSnapshotState.getTotalChunks(), + Optional.of(installSnapshotState.getLastChunkHashCode()), + serverConfig + ).toSerializable(followerLogInfo.getRaftVersion()), + actor() + ); + + log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(), + installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks()); + } catch (IOException e) { + throw Throwables.propagate(e); + } } } @@ -912,16 +916,12 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { static class SnapshotHolder { private final long lastIncludedTerm; private final long lastIncludedIndex; - private final ByteString snapshotBytes; + private final ByteSource snapshotBytes; SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) { this.lastIncludedTerm = snapshot.getLastAppliedTerm(); this.lastIncludedIndex = snapshot.getLastAppliedIndex(); - try { - this.snapshotBytes = ByteString.copyFrom(snapshotBytes.read()); - } catch (IOException e) { - throw new RuntimeException("Error reading state", e); - } + this.snapshotBytes = snapshotBytes; } long getLastIncludedTerm() { @@ -932,7 +932,7 @@ public abstract class AbstractLeader extends AbstractRaftActorBehavior { return lastIncludedIndex; } - ByteString getSnapshotBytes() { + ByteSource getSnapshotBytes() { return snapshotBytes; } }