X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fraft%2FRaftActor.java;h=353d0b4a241f844f1338435e712202211263e3d7;hb=04cc3d44be860365d124a56c731d5cf2ffc5b509;hp=64fa7496042466e58bd51cf0a488c265898866da;hpb=05a8052a457b2e53f06233f1a0b056d162118566;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index 64fa749604..43a954756c 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -10,38 +11,47 @@ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; -import akka.event.Logging; -import akka.event.LoggingAdapter; +import akka.actor.PoisonPill; import akka.japi.Procedure; -import akka.persistence.RecoveryCompleted; -import akka.persistence.SaveSnapshotFailure; -import akka.persistence.SaveSnapshotSuccess; -import akka.persistence.SnapshotOffer; -import akka.persistence.SnapshotSelectionCriteria; -import akka.persistence.UntypedPersistentActor; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import com.google.protobuf.ByteString; -import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; -import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import java.io.Serializable; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.opendaylight.controller.cluster.DataPersistenceProvider; +import org.opendaylight.controller.cluster.DelegatingPersistentDataProvider; +import org.opendaylight.controller.cluster.NonPersistentDataProvider; +import org.opendaylight.controller.cluster.PersistentDataProvider; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor; +import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; +import org.opendaylight.controller.cluster.notifications.RoleChanged; +import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.base.messages.InitiateCaptureSnapshot; +import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning; import org.opendaylight.controller.cluster.raft.base.messages.Replicate; -import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat; -import org.opendaylight.controller.cluster.raft.behaviors.Candidate; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractLeader; +import org.opendaylight.controller.cluster.raft.behaviors.AbstractRaftActorBehavior; import org.opendaylight.controller.cluster.raft.behaviors.Follower; -import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior; -import org.opendaylight.controller.cluster.raft.client.messages.AddRaftPeer; import org.opendaylight.controller.cluster.raft.client.messages.FindLeader; import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply; -import org.opendaylight.controller.cluster.raft.client.messages.RemoveRaftPeer; -import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.client.messages.FollowerInfo; +import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; +import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages; -import java.io.Serializable; -import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RaftActor encapsulates a state machine that needs to be kept synchronized @@ -83,305 +93,386 @@ import java.util.Map; *
+ * The default implementation immediately runs the operation.
+ *
+ * @param operation the operation to run
+ */
+ protected void pauseLeader(Runnable operation) {
+ operation.run();
}
- private void trimPersistentData(long sequenceNumber) {
- // Trim akka snapshots
- // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
- // For now guessing that it is ANDed.
- deleteSnapshots(new SnapshotSelectionCriteria(
- sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+ protected void onLeaderChanged(String oldLeader, String newLeader) {
- // Trim akka journal
- deleteMessages(sequenceNumber);
- }
+ };
private String getLeaderAddress(){
if(isLeader()){
return getSelf().path().toString();
}
- String leaderId = currentBehavior.getLeaderId();
+ String leaderId = getLeaderId();
if (leaderId == null) {
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
if(LOG.isDebugEnabled()) {
- LOG.debug("getLeaderAddress leaderId = " + leaderId + " peerAddress = "
- + peerAddress);
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}",
+ persistenceId(), leaderId, peerAddress);
}
return peerAddress;
}
- private void handleCaptureSnapshotReply(ByteString stateInBytes) {
- // create a snapshot object from the state provided and save it
- // when snapshot is saved async, SaveSnapshotSuccess is raised.
-
- Snapshot sn = Snapshot.create(stateInBytes.toByteArray(),
- context.getReplicatedLog().getFrom(captureSnapshot.getLastAppliedIndex() + 1),
- captureSnapshot.getLastIndex(), captureSnapshot.getLastTerm(),
- captureSnapshot.getLastAppliedIndex(), captureSnapshot.getLastAppliedTerm());
-
- saveSnapshot(sn);
-
- LOG.info("Persisting of snapshot done:{}", sn.getLogMessage());
-
- //be greedy and remove entries from in-mem journal which are in the snapshot
- // and update snapshotIndex and snapshotTerm without waiting for the success,
-
- context.getReplicatedLog().snapshotPreCommit(stateInBytes,
- captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
-
- LOG.info("Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
- "and term:{}", captureSnapshot.getLastAppliedIndex(),
- captureSnapshot.getLastAppliedTerm());
-
- captureSnapshot = null;
- hasSnapshotCaptureInitiated = false;
+ protected boolean hasFollowers(){
+ return getRaftActorContext().hasFollowers();
}
+ private void captureSnapshot() {
+ SnapshotManager snapshotManager = context.getSnapshotManager();
- private class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
-
- public ReplicatedLogImpl(Snapshot snapshot) {
- super(ByteString.copyFrom(snapshot.getState()),
- snapshot.getLastAppliedIndex(), snapshot.getLastAppliedTerm(),
- snapshot.getUnAppliedEntries());
- }
+ if (!snapshotManager.isCapturing()) {
+ final long idx = getCurrentBehavior().getReplicatedToAllIndex();
+ LOG.debug("Take a snapshot of current state. lastReplicatedLog is {} and replicatedToAllIndex is {}",
+ replicatedLog().last(), idx);
- public ReplicatedLogImpl() {
- super();
+ snapshotManager.capture(replicatedLog().last(), idx);
}
+ }
- @Override public void removeFromAndPersist(long logEntryIndex) {
- int adjustedIndex = adjustedIndex(logEntryIndex);
-
- if (adjustedIndex < 0) {
- return;
- }
-
- // FIXME: Maybe this should be done after the command is saved
- journal.subList(adjustedIndex , journal.size()).clear();
-
- persist(new DeleteEntries(adjustedIndex), new Procedure