Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Added LeaderElectionScenariosTest class
[controller.git]
/
opendaylight
/
md-sal
/
sal-akka-raft
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
raft
/
RaftActor.java
diff --git
a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
index aa7b4533b7758f7a3f457ea48b1ae72cc6d94162..3dc6ae469a932474effca186c956a7a6f4ec8631 100644
(file)
--- a/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
+++ b/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
@@
-10,8
+10,6
@@
package org.opendaylight.controller.cluster.raft;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
@@
-43,6
+41,8
@@
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
@@
-85,8
+85,7
@@
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntries
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
* The current state determines the current behavior of a RaftActor
/**
* The current state determines the current behavior of a RaftActor
@@
-107,14
+106,10
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
private CaptureSnapshot captureSnapshot = null;
private CaptureSnapshot captureSnapshot = null;
- private volatile boolean hasSnapshotCaptureInitiated = false;
-
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
private Stopwatch recoveryTimer;
private int currentRecoveryBatchCount;
-
-
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
public RaftActor(String id, Map<String, String> peerAddresses) {
this(id, peerAddresses, Optional.<ConfigParams>absent());
}
@@
-342,8
+337,8
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
} else if (message instanceof SaveSnapshotFailure) {
SaveSnapshotFailure saveSnapshotFailure = (SaveSnapshotFailure) message;
- LOG.error(
saveSnapshotFailure.cause(),
"{}: SaveSnapshotFailure received for snapshot Cause:",
- persistenceId());
+ LOG.error("{}: SaveSnapshotFailure received for snapshot Cause:",
+ persistenceId()
, saveSnapshotFailure.cause()
);
context.getReplicatedLog().snapshotRollback();
context.getReplicatedLog().snapshotRollback();
@@
-436,7
+431,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
self().tell(new ApplyLogEntries((int) replicatedLogEntry.getIndex()), self());
// Check if the "real" snapshot capture has been initiated. If no then do the fake snapshot
- if(!
hasSnapshotCaptureInitiated
){
+ if(!
context.isSnapshotCaptureInitiated()
){
raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
raftContext.getReplicatedLog().snapshotPreCommit(raftContext.getLastApplied(),
raftContext.getTermInformation().getCurrentTerm());
raftContext.getReplicatedLog().snapshotCommit();
@@
-693,7
+688,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
captureSnapshot = null;
}
captureSnapshot = null;
-
hasSnapshotCaptureInitiated = false
;
+
context.setSnapshotCaptureInitiated(false)
;
}
protected boolean hasFollowers(){
}
protected boolean hasFollowers(){
@@
-794,7
+789,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
// when a snaphsot is being taken, captureSnapshot != null
getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
// when a snaphsot is being taken, captureSnapshot != null
- if (
hasSnapshotCaptureInitiated == false
&&
+ if (
!context.isSnapshotCaptureInitiated()
&&
( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
dataSizeForCheck > dataThreshold)) {
( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
dataSizeForCheck > dataThreshold)) {
@@
-827,7
+822,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
getSelf().tell(new CaptureSnapshot(
lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
null);
getSelf().tell(new CaptureSnapshot(
lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm),
null);
-
hasSnapshotCaptureInitiated = true
;
+
context.setSnapshotCaptureInitiated(true)
;
}
if(callback != null){
callback.apply(replicatedLogEntry);
}
if(callback != null){
callback.apply(replicatedLogEntry);