Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Simplify isolated leader check
[controller.git]
/
opendaylight
/
md-sal
/
sal-akka-raft
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
raft
/
RaftActor.java
diff --git
a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
index 304d58beb526f9ebedaf4ca37119013793a4100b..fdd4b2395bff7b3e9b8ef746450761a7ec41f323 100644
(file)
--- a/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
+++ b/
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
@@
-41,7
+41,6
@@
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader;
import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior;
-import org.opendaylight.controller.cluster.raft.behaviors.DelegatingRaftActorBehavior;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@
-100,12
+99,6
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
protected final Logger LOG = LoggerFactory.getLogger(getClass());
- /**
- * The current state determines the current behavior of a RaftActor
- * A Raft Actor always starts off in the Follower State
- */
- private final DelegatingRaftActorBehavior currentBehavior = new DelegatingRaftActorBehavior();
-
/**
* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
/**
* This context should NOT be passed directly to any other actor it is
* only to be consumed by the RaftActorBehaviors
@@
-143,7
+136,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
delegatingPersistenceProvider, LOG);
context.setPayloadVersion(payloadVersion);
- context.setReplicatedLog(ReplicatedLogImpl.newInstance(context
, currentBehavior
));
+ context.setReplicatedLog(ReplicatedLogImpl.newInstance(context));
}
@Override
}
@Override
@@
-159,14
+152,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
@Override
public void postStop() {
@Override
public void postStop() {
- if(currentBehavior.getDelegate() != null) {
- try {
- currentBehavior.close();
- } catch (Exception e) {
- LOG.debug("{}: Error closing behavior {}", persistenceId(), currentBehavior.state());
- }
- }
-
+ context.close();
super.postStop();
}
super.postStop();
}
@@
-194,14
+180,24
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
}
protected RaftActorRecoverySupport newRaftActorRecoverySupport() {
- return new RaftActorRecoverySupport(context,
currentBehavior,
getRaftActorRecoveryCohort());
+ return new RaftActorRecoverySupport(context, getRaftActorRecoveryCohort());
}
}
- protected void initializeBehavior(){
+ @VisibleForTesting
+ void initializeBehavior(){
changeCurrentBehavior(new Follower(context));
}
changeCurrentBehavior(new Follower(context));
}
+ @VisibleForTesting
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
protected void changeCurrentBehavior(RaftActorBehavior newBehavior){
+ if(getCurrentBehavior() != null) {
+ try {
+ getCurrentBehavior().close();
+ } catch(Exception e) {
+ LOG.warn("{}: Error closing behavior {}", persistence(), getCurrentBehavior(), e);
+ }
+ }
+
reusableBehaviorStateHolder.init(getCurrentBehavior());
setCurrentBehavior(newBehavior);
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
reusableBehaviorStateHolder.init(getCurrentBehavior());
setCurrentBehavior(newBehavior);
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
@@
-241,13
+237,13
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
// and recovery shows data missing
context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
// and recovery shows data missing
context.getReplicatedLog().captureSnapshotIfReady(applyState.getReplicatedLogEntry());
- context.getSnapshotManager().trimLog(context.getLastApplied()
, currentBehavior
);
+ context.getSnapshotManager().trimLog(context.getLastApplied());
}
} else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
if(LOG.isDebugEnabled()) {
}
} else if (message instanceof ApplyJournalEntries) {
ApplyJournalEntries applyEntries = (ApplyJournalEntries) message;
if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Persisting Apply
Log
Entries with index={}", persistenceId(), applyEntries.getToIndex());
+ LOG.debug("{}: Persisting Apply
Journal
Entries with index={}", persistenceId(), applyEntries.getToIndex());
}
persistence().persist(applyEntries, NoopProcedure.instance());
}
persistence().persist(applyEntries, NoopProcedure.instance());
@@
-307,6
+303,8
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
}
shuttingDown = true;
}
shuttingDown = true;
+
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
if(currentBehavior.state() == RaftState.Leader && context.hasFollowers()) {
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
@@
-341,9
+339,9
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
private void onLeaderTransitioning() {
LOG.debug("{}: onLeaderTransitioning", persistenceId());
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
private void onLeaderTransitioning() {
LOG.debug("{}: onLeaderTransitioning", persistenceId());
Optional<ActorRef> roleChangeNotifier = getRoleChangeNotifier();
- if(
currentBehavior.s
tate() == RaftState.Follower && roleChangeNotifier.isPresent()) {
+ if(
getRaftS
tate() == RaftState.Follower && roleChangeNotifier.isPresent()) {
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null,
-
currentBehavior
.getLeaderPayloadVersion()), getSelf());
+
getCurrentBehavior()
.getLeaderPayloadVersion()), getSelf());
}
}
}
}
@@
-367,9
+365,9
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
handleBehaviorChange(reusableBehaviorStateHolder, getCurrentBehavior());
}
- protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
- return new RaftActorSnapshotMessageSupport(context, currentBehavior,
-
getRaftActorSnapshotCohort());
+ @VisibleForTesting
+ RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
+
return new RaftActorSnapshotMessageSupport(context,
getRaftActorSnapshotCohort());
}
private void onGetOnDemandRaftStats() {
}
private void onGetOnDemandRaftStats() {
@@
-380,6
+378,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
peerAddresses.put(peerId, context.getPeerAddress(peerId));
}
peerAddresses.put(peerId, context.getPeerAddress(peerId));
}
+ final RaftActorBehavior currentBehavior = context.getCurrentBehavior();
OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
OnDemandRaftState.Builder builder = OnDemandRaftState.builder()
.commitIndex(context.getCommitIndex())
.currentTerm(context.getTermInformation().getCurrentTerm())
@@
-398,7
+397,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
.peerAddresses(peerAddresses)
.customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
.peerAddresses(peerAddresses)
.customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
- ReplicatedLogEntry lastLogEntry =
getLastLogEntry
();
+ ReplicatedLogEntry lastLogEntry =
replicatedLog().last
();
if (lastLogEntry != null) {
builder.lastLogIndex(lastLogEntry.getIndex());
builder.lastLogTerm(lastLogEntry.getTerm());
if (lastLogEntry != null) {
builder.lastLogIndex(lastLogEntry.getIndex());
builder.lastLogTerm(lastLogEntry.getTerm());
@@
-445,6
+444,8
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
if(leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
if(leadershipTransferInProgress != null) {
leadershipTransferInProgress.onNewLeader(currentBehavior.getLeaderId());
}
+
+ serverConfigurationSupport.onNewLeader(currentBehavior.getLeaderId());
}
if (roleChangeNotifier.isPresent() &&
}
if (roleChangeNotifier.isPresent() &&
@@
-508,7
+509,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
// Send message for replication
context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry);
// Send message for replication
-
currentBehavior
.handleMessage(getSelf(),
+
getCurrentBehavior()
.handleMessage(getSelf(),
new Replicate(clientActor, identifier, replicatedLogEntry));
}
}
new Replicate(clientActor, identifier, replicatedLogEntry));
}
}
@@
-525,11
+526,11
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
@VisibleForTesting
void setCurrentBehavior(RaftActorBehavior behavior) {
@VisibleForTesting
void setCurrentBehavior(RaftActorBehavior behavior) {
- c
urrentBehavior.setDelegate
(behavior);
+ c
ontext.setCurrentBehavior
(behavior);
}
protected RaftActorBehavior getCurrentBehavior() {
}
protected RaftActorBehavior getCurrentBehavior() {
- return c
urrentBehavior.getDelegate
();
+ return c
ontext.getCurrentBehavior
();
}
/**
}
/**
@@
-539,11
+540,11
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
* @return true it this RaftActor is a Leader false otherwise
*/
protected boolean isLeader() {
* @return true it this RaftActor is a Leader false otherwise
*/
protected boolean isLeader() {
- return context.getId().equals(
currentBehavior
.getLeaderId());
+ return context.getId().equals(
getCurrentBehavior()
.getLeaderId());
}
}
- protected boolean isLeaderActive() {
- return
currentBehavior.s
tate() != RaftState.IsolatedLeader && !shuttingDown &&
+ protected
final
boolean isLeaderActive() {
+ return
getRaftS
tate() != RaftState.IsolatedLeader && !shuttingDown &&
!isLeadershipTransferInProgress();
}
!isLeadershipTransferInProgress();
}
@@
-572,30
+573,19
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
*
* @return the current leader's id
*/
*
* @return the current leader's id
*/
- protected String getLeaderId(){
- return currentBehavior.getLeaderId();
- }
-
- protected RaftState getRaftState() {
- return currentBehavior.state();
+ protected final String getLeaderId(){
+ return getCurrentBehavior().getLeaderId();
}
}
- protected ReplicatedLogEntry getLastLogEntry() {
- return replicatedLog().last();
+ @VisibleForTesting
+ protected final RaftState getRaftState() {
+ return getCurrentBehavior().state();
}
protected Long getCurrentTerm(){
return context.getTermInformation().getCurrentTerm();
}
}
protected Long getCurrentTerm(){
return context.getTermInformation().getCurrentTerm();
}
- protected Long getCommitIndex(){
- return context.getCommitIndex();
- }
-
- protected Long getLastApplied(){
- return context.getLastApplied();
- }
-
protected RaftActorContext getRaftActorContext() {
return context;
}
protected RaftActorContext getRaftActorContext() {
return context;
}
@@
-615,7
+605,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
// The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
// but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
// avoids potential disruption. Otherwise, switch to Follower normally.
// The RaftPolicy was modified. If the current behavior is Follower then re-initialize to Follower
// but transfer the previous leaderId so it doesn't immediately try to schedule an election. This
// avoids potential disruption. Otherwise, switch to Follower normally.
- RaftActorBehavior behavior =
currentBehavior.getDelegate
();
+ RaftActorBehavior behavior =
getCurrentBehavior
();
if(behavior instanceof Follower) {
String previousLeaderId = ((Follower)behavior).getLeaderId();
short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
if(behavior instanceof Follower) {
String previousLeaderId = ((Follower)behavior).getLeaderId();
short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
@@
-747,13
+737,15
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
operation.run();
}
operation.run();
}
- protected void onLeaderChanged(String oldLeader, String newLeader){};
+ protected void onLeaderChanged(String oldLeader, String newLeader) {
+
+ };
private String getLeaderAddress(){
if(isLeader()){
return getSelf().path().toString();
}
private String getLeaderAddress(){
if(isLeader()){
return getSelf().path().toString();
}
- String leaderId =
currentBehavior.
getLeaderId();
+ String leaderId = getLeaderId();
if (leaderId == null) {
return null;
}
if (leaderId == null) {
return null;
}
@@
-773,11
+765,12
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
private void captureSnapshot() {
SnapshotManager snapshotManager = context.getSnapshotManager();
private void captureSnapshot() {
SnapshotManager snapshotManager = context.getSnapshotManager();
- if(!snapshotManager.isCapturing()) {
+ if (!snapshotManager.isCapturing()) {
+ final long idx = getCurrentBehavior().getReplicatedToAllIndex();
LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
- replicatedLog().last(),
currentBehavior.getReplicatedToAllIndex()
);
+ replicatedLog().last(),
idx
);
- snapshotManager.capture(replicatedLog().last(),
currentBehavior.getReplicatedToAllIndex()
);
+ snapshotManager.capture(replicatedLog().last(),
idx
);
}
}
}
}
@@
-869,7
+862,7
@@
public abstract class RaftActor extends AbstractUntypedPersistentActor {
if(this.message instanceof SwitchBehavior){
return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
}
if(this.message instanceof SwitchBehavior){
return AbstractRaftActorBehavior.createBehavior(context, ((SwitchBehavior) message).getNewState());
}
- return
currentBehavior
.handleMessage(sender, message);
+ return
getCurrentBehavior()
.handleMessage(sender, message);
}
}
}
}
}
}