X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=6cbeda6098d2026225767d1564aae76109a1fa00;hb=4ecb8ecaf04594b3312a44d801423f515ea445b3;hp=285be39c0b3286c2abbdaf3c08e09c1f85c0224f;hpb=e3998d55e33da9f6ecb69da75ecc71a047b6362b;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java old mode 100644 new mode 100755 index 285be39c0b..6cbeda6098 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -10,44 +11,61 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.japi.Procedure; -import akka.persistence.RecoveryCompleted; -import akka.persistence.SaveSnapshotFailure; -import akka.persistence.SaveSnapshotSuccess; -import akka.persistence.SnapshotOffer; -import akka.persistence.SnapshotSelectionCriteria; +import akka.actor.PoisonPill; +import akka.actor.Status; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import com.google.protobuf.ByteString; -import java.io.Serializable; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.commons.lang3.time.DurationFormatUtils; import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RoleChanged; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendInstallSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.RequestLeadership; +import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; +import org.opendaylight.controller.cluster.raft.persisted.NoopPayload; +import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload; +import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.concepts.Immutable; /** * RaftActor encapsulates a state machine that needs to be kept synchronized * in a cluster. It implements the RAFT algorithm as described in the paper * * In Search of an Understandable Consensus Algorithm - *
+ * + ** RaftActor has 3 states and each state has a certain behavior associated * with it. A Raft actor can behave as, *
* A RaftActor MUST be a Leader in order to accept requests from clients to * change the state of it's encapsulated state machine. Once a RaftActor becomes * a Leader it is also responsible for ensuring that all followers ultimately * have the same log and therefore the same state machine as itself. - *
- * + * + ** The current behavior of a RaftActor determines how election for leadership * is initiated and how peer RaftActors react to request for votes. - *
- * + * + ** Each RaftActor also needs to know the current election term. It uses this * information for a couple of things. One is to simply figure out who it * voted for in the last election. Another is to figure out if the message * it received to update it's state is stale. - *
- * + * + ** The RaftActor uses akka-persistence to store it's replicated log. * Furthermore through it's behaviors a Raft Actor determines - *
** This is to account for situations where a we know that a peer * exists but we do not know an address up-front. This may also be used in * situations where a known peer starts off in a different location and we * need to change it's address + * *
* Note that if the peerId does not match the list of peers passed to * this actor during construction an IllegalStateException will be thrown. - * - * @param peerId - * @param peerAddress */ - protected void setPeerAddress(String peerId, String peerAddress){ + protected void setPeerAddress(final String peerId, final String peerAddress) { context.setPeerAddress(peerId, peerAddress); } - protected void commitSnapshot(long sequenceNumber) { - context.getReplicatedLog().snapshotCommit(); - - // TODO: Not sure if we want to be this aggressive with trimming stuff - trimPersistentData(sequenceNumber); - } - /** * The applyState method will be called by the RaftActor when some data - * needs to be applied to the actor's state + * needs to be applied to the actor's state. * * @param clientActor A reference to the client who sent this message. This * is the same reference that was passed to persistData @@ -553,409 +809,234 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { * @param data A piece of data that was persisted by the persistData call. * This should NEVER be null. */ - protected abstract void applyState(ActorRef clientActor, String identifier, - Object data); + protected abstract void applyState(ActorRef clientActor, Identifier identifier, Object data); /** - * This method is called during recovery at the start of a batch of state entries. Derived - * classes should perform any initialization needed to start a batch. + * Returns the RaftActorRecoveryCohort to participate in persistence recovery. */ - protected abstract void startLogRecoveryBatch(int maxBatchSize); + @Nonnull + protected abstract RaftActorRecoveryCohort getRaftActorRecoveryCohort(); /** - * This method is called during recovery to append state data to the current batch. This method - * is called 1 or more times after {@link #startLogRecoveryBatch}. - * - * @param data the state data - */ - protected abstract void appendRecoveredLogEntry(Payload data); - - /** - * This method is called during recovery to reconstruct the state of the actor. - * - * @param snapshotBytes A snapshot of the state of the actor + * This method is called when recovery is complete. */ - protected abstract void applyRecoverySnapshot(byte[] snapshotBytes); + protected abstract void onRecoveryComplete(); /** - * This method is called during recovery at the end of a batch to apply the current batched - * log entries. This method is called after {@link #appendRecoveredLogEntry}. + * Returns the RaftActorSnapshotCohort to participate in snapshot captures. */ - protected abstract void applyCurrentLogRecoveryBatch(); + @Nonnull + protected abstract RaftActorSnapshotCohort getRaftActorSnapshotCohort(); /** - * This method is called when recovery is complete. + * This method will be called by the RaftActor when the state of the + * RaftActor changes. The derived actor can then use methods like + * isLeader or getLeader to do something useful */ - protected abstract void onRecoveryComplete(); + protected abstract void onStateChanged(); /** - * This method will be called by the RaftActor when a snapshot needs to be - * created. The derived actor should respond with its current state. - *
- * During recovery the state that is returned by the derived actor will - * be passed back to it by calling the applySnapshot method + * Notifier Actor for this RaftActor to notify when a role change happens. * - * @return The current state of the actor + * @return ActorRef - ActorRef of the notifier or Optional.absent if none. */ - protected abstract void createSnapshot(); + protected abstract Optional+ * The default implementation immediately runs the operation. + * + * @param operation the operation to run */ - protected abstract void onStateChanged(); - - protected abstract DataPersistenceProvider persistence(); + protected void pauseLeader(final Runnable operation) { + operation.run(); + } /** - * Notifier Actor for this RaftActor to notify when a role change happens - * @return ActorRef - ActorRef of the notifier or Optional.absent if none. + * This method is invoked when the actions hooked to the leader becoming paused failed to execute and the leader + * should resume normal operations. + * + *
+ * Note this method can be invoked even before the operation supplied to {@link #pauseLeader(Runnable)} is invoked.
*/
- protected abstract Optional
- *
- * @param o
- */
- @Override
- public void saveSnapshot(Object o) {
- // Make saving Snapshot successful
- commitSnapshot(-1L);
+ return new SimpleBehaviorState(lastValidLeaderId, lastLeaderId, behavior);
}
}
-
- @VisibleForTesting
- void setCurrentBehavior(AbstractRaftActorBehavior behavior) {
- currentBehavior = behavior;
- }
-
- protected RaftActorBehavior getCurrentBehavior() {
- return currentBehavior;
- }
-
}