X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=1c057d755369d50a2147cf7c3e2922783ae56fc8;hb=aafb8cb044e992dd784d1f4f66508599cc4cd588;hp=e3fa649ab15294ae16d87ba9b368375020f27e91;hpb=a81d98f692b80c45bce3fe6a87e731abfb012a9f;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index e3fa649ab1..1c057d7553 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -34,6 +34,7 @@ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersisten import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; @@ -50,6 +51,8 @@ import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftStat import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.concepts.Immutable; @@ -130,7 +133,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { this.getContext(), id, new ElectionTermImpl(persistentProvider, id, LOG), -1, -1, peerAddresses, configParams.isPresent() ? configParams.get() : new DefaultConfigParamsImpl(), - delegatingPersistenceProvider, LOG); + delegatingPersistenceProvider, this::handleApplyState, LOG); context.setPayloadVersion(payloadVersion); context.setReplicatedLog(ReplicatedLogImpl.newInstance(context)); @@ -222,29 +225,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (snapshotSupport.handleSnapshotMessage(message, getSender())) { return; } - if (message instanceof ApplyState) { ApplyState applyState = (ApplyState) message; - long startTime = System.nanoTime(); - - if (LOG.isDebugEnabled()) { - LOG.debug("{}: Applying state for log index {} data {}", - persistenceId(), applyState.getReplicatedLogEntry().getIndex(), - applyState.getReplicatedLogEntry().getData()); - } - - if (!(applyState.getReplicatedLogEntry().getData() instanceof NoopPayload)) { - applyState(applyState.getClientActor(), applyState.getIdentifier(), - applyState.getReplicatedLogEntry().getData()); - } - - long elapsedTime = System.nanoTime() - startTime; - if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { - LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", - TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); - } - if (!hasFollowers()) { // for single node, the capture should happen after the apply state // as we delete messages from the persistent journal which have made it to the snapshot @@ -255,14 +238,12 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { context.getSnapshotManager().trimLog(context.getLastApplied()); } - // Send it to the current behavior - some behaviors like PreLeader need to be notified of ApplyState. possiblyHandleBehaviorMessage(message); - } else if (message instanceof ApplyJournalEntries) { ApplyJournalEntries applyEntries = (ApplyJournalEntries) message; LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex()); - persistence().persist(applyEntries, NoopProcedure.instance()); + persistence().persistAsync(applyEntries, NoopProcedure.instance()); } else if (message instanceof FindLeader) { getSender().tell( @@ -276,13 +257,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } else if (message instanceof SwitchBehavior) { switchBehavior((SwitchBehavior) message); } else if (message instanceof LeaderTransitioning) { - onLeaderTransitioning(); + onLeaderTransitioning((LeaderTransitioning)message); } else if (message instanceof Shutdown) { onShutDown(); } else if (message instanceof Runnable) { ((Runnable)message).run(); } else if (message instanceof NoopPayload) { - persistData(null, null, (NoopPayload)message); + persistData(null, null, (NoopPayload)message, false); } else if (!possiblyHandleBehaviorMessage(message)) { handleNonRaftCommand(message); } @@ -374,10 +355,11 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } - private void onLeaderTransitioning() { - LOG.debug("{}: onLeaderTransitioning", persistenceId()); + private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) { + LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning); Optional roleChangeNotifier = getRoleChangeNotifier(); - if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent()) { + if (getRaftState() == RaftState.Follower && roleChangeNotifier.isPresent() + && leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) { roleChangeNotifier.get().tell(newLeaderStateChanged(getId(), null, getCurrentBehavior().getLeaderPayloadVersion()), getSelf()); } @@ -386,7 +368,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { private void switchBehavior(SwitchBehavior message) { if (!getRaftActorContext().getRaftPolicy().automaticElectionsEnabled()) { RaftState newState = message.getNewState(); - if ( newState == RaftState.Leader || newState == RaftState.Follower) { + if (newState == RaftState.Leader || newState == RaftState.Follower) { switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()), AbstractRaftActorBehavior.createBehavior(context, message.getNewState())); getRaftActorContext().getTermInformation().updateAndPersist(message.getNewTerm(), ""); @@ -417,7 +399,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } final RaftActorBehavior currentBehavior = context.getCurrentBehavior(); - OnDemandRaftState.Builder builder = OnDemandRaftState.builder() + OnDemandRaftState.AbstractBuilder builder = newOnDemandRaftStateBuilder() .commitIndex(context.getCommitIndex()) .currentTerm(context.getTermInformation().getCurrentTerm()) .inMemoryJournalDataSize(replicatedLog().dataSize()) @@ -461,6 +443,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } + protected OnDemandRaftState.AbstractBuilder newOnDemandRaftStateBuilder() { + return OnDemandRaftState.builder(); + } + private void handleBehaviorChange(BehaviorState oldBehaviorState, RaftActorBehavior currentBehavior) { RaftActorBehavior oldBehavior = oldBehaviorState.getBehavior(); @@ -497,6 +483,29 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } } + private void handleApplyState(ApplyState applyState) { + long startTime = System.nanoTime(); + + Payload payload = applyState.getReplicatedLogEntry().getData(); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: Applying state for log index {} data {}", + persistenceId(), applyState.getReplicatedLogEntry().getIndex(), payload); + } + + if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) { + applyState(applyState.getClientActor(), applyState.getIdentifier(), payload); + } + + long elapsedTime = System.nanoTime() - startTime; + if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) { + LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", + TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState); + } + + // Send the ApplyState message back to self to handle further processing asynchronously. + self().tell(applyState, self()); + } + protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) { return new LeaderStateChanged(memberId, leaderId, leaderPayloadVersion); } @@ -512,40 +521,57 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { } /** - * When a derived RaftActor needs to persist something it must call - * persistData. + * Persists the given Payload in the journal and replicates to any followers. After successful completion, + * {@link #applyState(ActorRef, Identifier, Object)} is notified. + * + * @param clientActor optional ActorRef that is provided via the applyState callback + * @param identifier the payload identifier + * @param data the payload data to persist + * @param batchHint if true, an attempt is made to delay immediate replication and batch the payload with + * subsequent payloads for efficiency. Otherwise the payload is immediately replicated. */ - protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data) { - - ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry( + protected final void persistData(final ActorRef clientActor, final Identifier identifier, final Payload data, + final boolean batchHint) { + ReplicatedLogEntry replicatedLogEntry = new SimpleReplicatedLogEntry( context.getReplicatedLog().lastIndex() + 1, context.getTermInformation().getCurrentTerm(), data); + replicatedLogEntry.setPersistencePending(true); LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry); final RaftActorContext raftContext = getRaftActorContext(); - replicatedLog().appendAndPersist(replicatedLogEntry, replicatedLogEntry1 -> { + boolean wasAppended = replicatedLog().appendAndPersist(replicatedLogEntry, persistedLogEntry -> { + // Clear the persistence pending flag in the log entry. + persistedLogEntry.setPersistencePending(false); + if (!hasFollowers()) { // Increment the Commit Index and the Last Applied values - raftContext.setCommitIndex(replicatedLogEntry1.getIndex()); - raftContext.setLastApplied(replicatedLogEntry1.getIndex()); + raftContext.setCommitIndex(persistedLogEntry.getIndex()); + raftContext.setLastApplied(persistedLogEntry.getIndex()); // Apply the state immediately. - self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry1), self()); + handleApplyState(new ApplyState(clientActor, identifier, persistedLogEntry)); // Send a ApplyJournalEntries message so that we write the fact that we applied // the state to durable storage - self().tell(new ApplyJournalEntries(replicatedLogEntry1.getIndex()), self()); + self().tell(new ApplyJournalEntries(persistedLogEntry.getIndex()), self()); } else { - context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry1); + context.getReplicatedLog().captureSnapshotIfReady(replicatedLogEntry); - // Send message for replication - getCurrentBehavior().handleMessage(getSelf(), - new Replicate(clientActor, identifier, replicatedLogEntry1)); + // Local persistence is complete so send the CheckConsensusReached message to the behavior (which + // normally should still be the leader) to check if consensus has now been reached in conjunction with + // follower replication. + getCurrentBehavior().handleMessage(getSelf(), CheckConsensusReached.INSTANCE); } - }); + }, true); + + if (wasAppended && hasFollowers()) { + // Send log entry for replication. + getCurrentBehavior().handleMessage(getSelf(), new Replicate(clientActor, identifier, replicatedLogEntry, + !batchHint)); + } } private ReplicatedLog replicatedLog() { @@ -740,7 +766,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onRecoveryComplete(); /** - * Returns the RaftActorSnapshotCohort to participate in persistence recovery. + * Returns the RaftActorSnapshotCohort to participate in snapshot captures. */ @Nonnull protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort();