X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=4a2ab9936f5e4327c7d6eeb35df10416b2ee807f;hb=c50fd31278f5f24ed487a6f6021f6065bc2fe93f;hp=b74259d4851153659df0c2866f6323b9234eff06;hpb=3c82a8f501a71ec8a40b170fc7ef12f8683c1842;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index b74259d485..7ec9f2743c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -10,35 +11,35 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.japi.Procedure; -import akka.persistence.RecoveryCompleted; -import akka.persistence.SaveSnapshotFailure; -import akka.persistence.SaveSnapshotSuccess; -import akka.persistence.SnapshotOffer; -import akka.persistence.SnapshotSelectionCriteria; +import akka.actor.PoisonPill; +import akka.actor.Status; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import com.google.common.collect.Lists; -import java.io.Serializable; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; @@ -48,16 +49,23 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Immutable; /** * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper * * In Search of an Understandable Consensus Algorithm - *
+ * + ** RaftActor has 3 states and each state has a certain behavior associated * with it. A Raft actor can behave as, *
* A RaftActor MUST be a Leader in order to accept requests from clients to * change the state of it's encapsulated state machine. Once a RaftActor becomes * a Leader it is also responsible for ensuring that all followers ultimately * have the same log and therefore the same state machine as itself. - *
- * + * + ** The current behavior of a RaftActor determines how election for leadership * is initiated and how peer RaftActors react to request for votes. - *
- * + * + ** Each RaftActor also needs to know the current election term. It uses this * information for a couple of things. One is to simply figure out who it * voted for in the last election. Another is to figure out if the message * it received to update it's state is stale. - *
- * + * + ** The RaftActor uses akka-persistence to store it's replicated log. * Furthermore through it's behaviors a Raft Actor determines - *
** This is to account for situations where a we know that a peer * exists but we do not know an address up-front. This may also be used in * situations where a known peer starts off in a different location and we * need to change it's address + * *
* Note that if the peerId does not match the list of peers passed to * this actor during construction an IllegalStateException will be thrown. - * - * @param peerId - * @param peerAddress */ - protected void setPeerAddress(String peerId, String peerAddress){ + protected void setPeerAddress(String peerId, String peerAddress) { context.setPeerAddress(peerId, peerAddress); } - protected void commitSnapshot(long sequenceNumber) { - context.getSnapshotManager().commit(persistence(), sequenceNumber); - } - /** * The applyState method will be called by the RaftActor when some data - * needs to be applied to the actor's state + * needs to be applied to the actor's state. * * @param clientActor A reference to the client who sent this message. This * is the same reference that was passed to persistData @@ -626,35 +807,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param data A piece of data that was persisted by the persistData call. * This should NEVER be null. */ - protected abstract void applyState(ActorRef clientActor, String identifier, - Object data); - - /** - * This method is called during recovery at the start of a batch of state entries. Derived - * classes should perform any initialization needed to start a batch. - */ - protected abstract void startLogRecoveryBatch(int maxBatchSize); + protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data); /** - * This method is called during recovery to append state data to the current batch. This method - * is called 1 or more times after {@link #startLogRecoveryBatch}. - * - * @param data the state data + * Returns the RaftActorRecoveryCohort to participate in persistence recovery. */ - protected abstract void appendRecoveredLogEntry(Payload data); - - /** - * This method is called during recovery to reconstruct the state of the actor. - * - * @param snapshotBytes A snapshot of the state of the actor - */ - protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); - - /** - * This method is called during recovery at the end of a batch to apply the current batched - * log entries. This method is called after {@link #appendRecoveredLogEntry}. - */ - protected abstract void applyCurrentLogRecoveryBatch(); + @Nonnull + protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort(); /** * This method is called when recovery is complete. @@ -662,24 +821,10 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { protected abstract void onRecoveryComplete(); /** - * This method will be called by the RaftActor when a snapshot needs to be - * created. The derived actor should respond with its current state. - *
- * During recovery the state that is returned by the derived actor will - * be passed back to it by calling the applySnapshot method - * - * @return The current state of the actor - */ - protected abstract void createSnapshot(); - - /** - * This method can be called at any other point during normal - * operations when the derived actor is out of sync with it's peers - * and the only way to bring it in sync is by applying a snapshot - * - * @param snapshotBytes A snapshot of the state of the actor + * Returns the RaftActorSnapshotCohort to participate in snapshot captures. */ - protected abstract void applySnapshot(byte[] snapshotBytes); + @Nonnull + protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); /** * This method will be called by the RaftActor when the state of the @@ -688,289 +833,192 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { */ protected abstract void onStateChanged(); - protected abstract DataPersistenceProvider persistence(); - /** - * Notifier Actor for this RaftActor to notify when a role change happens + * Notifier Actor for this RaftActor to notify when a role change happens. + * * @return ActorRef - ActorRef of the notifier or Optional.absent if none. */ protected abstract Optional
+ * The default implementation immediately runs the operation.
+ *
+ * @param operation the operation to run
+ */
+ protected void pauseLeader(Runnable operation) {
+ operation.run();
+ }
- private String getLeaderAddress(){
- if(isLeader()){
+ protected void onLeaderChanged(String oldLeader, String newLeader) {
+ }
+
+ private String getLeaderAddress() {
+ if (isLeader()) {
return getSelf().path().toString();
}
- String leaderId = currentBehavior.getLeaderId();
+ String leaderId = getLeaderId();
if (leaderId == null) {
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
- persistenceId(), leaderId, peerAddress);
- }
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
return peerAddress;
}
- private void handleCaptureSnapshotReply(byte[] snapshotBytes) {
- LOG.debug("{}: CaptureSnapshotReply received by actor: snapshot size {}", persistenceId(), snapshotBytes.length);
-
- context.getSnapshotManager().persist(persistence(), snapshotBytes, currentBehavior, getTotalMemory());
+ protected boolean hasFollowers() {
+ return getRaftActorContext().hasFollowers();
}
- protected long getTotalMemory() {
- return Runtime.getRuntime().totalMemory();
- }
+ private void captureSnapshot() {
+ SnapshotManager snapshotManager = context.getSnapshotManager();
- protected boolean hasFollowers(){
- return getRaftActorContext().getPeerAddresses().keySet().size() > 0;
- }
-
- private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
- private static final int DATA_SIZE_DIVIDER = 5;
- private long dataSizeSinceLastSnapshot = 0L;
-
-
- public ReplicatedLogImpl(Snapshot snapshot) {
- super(snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
- snapshot.getUnAppliedEntries());
- }
+ if (!snapshotManager.isCapturing()) {
+ final long idx = getCurrentBehavior().getReplicatedToAllIndex();
+ LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
+ replicatedLog().last(), idx);
- public ReplicatedLogImpl() {
- super();
+ snapshotManager.capture(replicatedLog().last(), idx);
}
+ }
- @Override public void removeFromAndPersist(long logEntryIndex) {
- int adjustedIndex = adjustedIndex(logEntryIndex);
-
- if (adjustedIndex < 0) {
- return;
- }
-
- // FIXME: Maybe this should be done after the command is saved
- journal.subList(adjustedIndex , journal.size()).clear();
-
- persistence().persist(new DeleteEntries(adjustedIndex), new Procedure
- *
- * @param o
+ * A {@link BehaviorState} corresponding to null {@link RaftActorBehavior} state. Since null behavior is only
+ * allowed before we receive the first message, we know the leader ID to be null.
*/
- @Override
- public void saveSnapshot(Object o) {
- // Make saving Snapshot successful
- // Committing the snapshot here would end up calling commit in the creating state which would
- // be a state violation. That's why now we send a message to commit the snapshot.
- self().tell(COMMIT_SNAPSHOT, self());
- }
- }
-
-
- private class CreateSnapshotProcedure implements Procedure