X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=1c057d755369d50a2147cf7c3e2922783ae56fc8;hb=refs%2Fchanges%2F83%2F51583%2F6;hp=1c08b20f2df27842c5050a6d7bda9c83e9370a91;hpb=b8da9f6fa8bf4284805349f4638ebdadf169ff5f;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 1c08b20f2d..1c057d7553 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -51,6 +51,7 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; @@ -132,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), - delegatingPersistenceProvider, LOG); + delegatingPersistenceProvider, this::handleApplyState, LOG); context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); @@ -224,29 +225,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (snapshotSupport.handleSnapshotMessage(message, getSender())) { return; } - if (message instanceof ApplyState) { ApplyState applyState = (ApplyState) message; - long startTime = System.nanoTime(); - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Applying state for log index {} data {}", - persistenceId(), applyState.getReplicatedLogEntry().getIndex(), - applyState.getReplicatedLogEntry().getData()); - } - - if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) { - applyState(applyState.getClientActor(), applyState.getIdentifier(), - applyState.getReplicatedLogEntry().getData()); - } - - long elapsedTime = System.nanoTime() - startTime; - if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { - LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", - TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); - } - if (!hasFollowers()) { // for single node, the capture should happen after the apply state // as we delete messages from the persistent journal which have made it to the snapshot @@ -257,9 +238,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getSnapshotManager().trimLog(context.getLastApplied()); } - // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState. possiblyHandleBehaviorMessage(message); - } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); @@ -278,7 +257,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SwitchBehavior) { switchBehavior((SwitchBehavior) message); } else if (message instanceof LeaderTransitioning) { - onLeaderTransitioning(); + onLeaderTransitioning((LeaderTransitioning)message); } else if (message instanceof Shutdown) { onShutDown(); } else if (message instanceof Runnable) { @@ -376,10 +355,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void onLeaderTransitioning() { - LOG.debug("{}: onLeaderTransitioning", persistenceId()); + private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) { + LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning); Optional roleChangeNotifier = getRoleChangeNotifier(); - if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { + if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent() + && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } @@ -388,7 +368,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void switchBehavior(SwitchBehavior message) { if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); - if ( newState == RaftState.Leader || newState == RaftState.Follower) { + if (newState == RaftState.Leader || newState == RaftState.Follower) { switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()), AbstractRaftActorBehavior.createBehavior(context, message.getNewState())); getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); @@ -419,7 +399,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.Builder builder = OnDemandRaftState.builder() + OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -463,6 +443,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandRaftState.builder(); + } + private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) { RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); @@ -499,6 +483,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + private void handleApplyState(ApplyState applyState) { + long startTime = System.nanoTime(); + + Payload payload = applyState.getReplicatedLogEntry().getData(); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Applying state for log index {} data {}", + persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload); + } + + if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) { + applyState(applyState.getClientActor(), applyState.getIdentifier(), payload); + } + + long elapsedTime = System.nanoTime() - startTime; + if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { + LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", + TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); + } + + // Send the ApplyState message back to self to handle further processing asynchronously. + self().tell(applyState, self()); + } + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } @@ -544,7 +551,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { raftContext.setLastApplied(persistedLogEntry.getIndex()); // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, persistedLogEntry), self()); + handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry)); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage @@ -759,7 +766,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onRecoveryComplete(); /** - * Returns the RaftActorSnapshotCohort to participate in persistence recovery. + * Returns the RaftActorSnapshotCohort to participate in snapshot captures. */ @Nonnull protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();