import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Map;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
-import java.io.Serializable;
-import java.util.Map;
-
/**
* RaftActor encapsulates a state machine that needs to be kept synchronized
* in a cluster. It implements the RAFT algorithm as described in the paper
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), scala.Long.MaxValue()));
onRecoveryComplete();
+
+ RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = new Follower(context);
- onStateChanged();
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
}
}
replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
replicatedLog.snapshotTerm, replicatedLog.size());
+ RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = new Follower(context);
- onStateChanged();
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
@Override public void handleCommand(Object message) {
RaftActorBehavior oldBehavior = currentBehavior;
currentBehavior = currentBehavior.handleMessage(getSender(), message);
- if(oldBehavior != currentBehavior){
- onStateChanged();
- }
-
- onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
+ handleBehaviorChange(oldBehavior, currentBehavior);
}
}
- public java.util.Set<String> getPeers() {
-
- return context.getPeerAddresses().keySet();
- }
+ private void handleBehaviorChange(RaftActorBehavior oldBehavior, RaftActorBehavior currentBehavior) {
+ if (oldBehavior != currentBehavior){
+ onStateChanged();
+ }
+ if (oldBehavior != null) {
+ // it can happen that the state has not changed but the leader has changed.
+ onLeaderChanged(oldBehavior.getLeaderId(), currentBehavior.getLeaderId());
- protected String getReplicatedLogState() {
- return "snapshotIndex=" + context.getReplicatedLog().getSnapshotIndex()
- + ", snapshotTerm=" + context.getReplicatedLog().getSnapshotTerm()
- + ", im-mem journal size=" + context.getReplicatedLog().size();
+ if (getRoleChangeNotifier().isPresent() && oldBehavior.state() != currentBehavior.state()) {
+ // we do not want to notify when the behavior/role is set for the first time (i.e follower)
+ getRoleChangeNotifier().get().tell(new RoleChanged(getId(), oldBehavior.state().name(),
+ currentBehavior.state().name()), getSelf());
+ }
+ }
}
-
/**
* When a derived RaftActor needs to persist something it must call
* persistData.
protected abstract DataPersistenceProvider persistence();
+ /**
+ * Notifier Actor for this RaftActor to notify when a role change happens
+ * @return ActorRef - ActorRef of the notifier or Optional.absent if none.
+ */
+ protected abstract Optional<ActorRef> getRoleChangeNotifier();
+
protected void onLeaderChanged(String oldLeader, String newLeader){};
private void trimPersistentData(long sequenceNumber) {
@Override public void apply(DeleteEntries param)
throws Exception {
//FIXME : Doing nothing for now
+ dataSize = 0;
+ for(ReplicatedLogEntry entry : journal){
+ dataSize += entry.size();
+ }
}
});
}
appendAndPersist(null, null, replicatedLogEntry);
}
+ @Override
+ public int dataSize() {
+ return dataSize;
+ }
+
public void appendAndPersist(final ActorRef clientActor,
final String identifier,
final ReplicatedLogEntry replicatedLogEntry) {
new Procedure<ReplicatedLogEntry>() {
@Override
public void apply(ReplicatedLogEntry evt) throws Exception {
+ dataSize += replicatedLogEntry.size();
+
+ long dataThreshold = Runtime.getRuntime().totalMemory() *
+ getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
+
// when a snaphsot is being taken, captureSnapshot != null
if (hasSnapshotCaptureInitiated == false &&
- journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0) {
+ ( journal.size() % context.getConfigParams().getSnapshotBatchCount() == 0 ||
+ dataSize > dataThreshold)) {
LOG.info("Initiating Snapshot Capture..");
long lastAppliedIndex = -1;
protected RaftActorBehavior getCurrentBehavior() {
return currentBehavior;
}
+
}