private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
- private static final Procedure<ApplyJournalEntries> APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK =
- new Procedure<ApplyJournalEntries>() {
- @Override
- public void apply(ApplyJournalEntries param) throws Exception {
- }
- };
private static final String COMMIT_SNAPSHOT = "commit_snapshot";
protected final Logger LOG = LoggerFactory.getLogger(getClass());
Optional<ConfigParams> configParams) {
context = new RaftActorContextImpl(this.getSelf(),
- this.getContext(), id, new ElectionTermImpl(),
+ this.getContext(), id, new ElectionTermImpl(delegatingPersistenceProvider, id, LOG),
-1, -1, replicatedLog, peerAddresses,
(configParams.isPresent() ? configParams.get(): new DefaultConfigParamsImpl()),
LOG);
LOG.debug("{}: Persisting ApplyLogEntries with index={}", persistenceId(), applyEntries.getToIndex());
}
- persistence().persist(applyEntries, APPLY_JOURNAL_ENTRIES_PERSIST_CALLBACK);
+ persistence().persist(applyEntries, NoopProcedure.instance());
} else if(message instanceof ApplySnapshot ) {
Snapshot snapshot = ((ApplySnapshot) message).getSnapshot();
}
}
-
- private class ElectionTermImpl implements ElectionTerm {
- /**
- * Identifier of the actor whose election term information this is
- */
- private long currentTerm = 0;
- private String votedFor = null;
-
- @Override
- public long getCurrentTerm() {
- return currentTerm;
- }
-
- @Override
- public String getVotedFor() {
- return votedFor;
- }
-
- @Override public void update(long currentTerm, String votedFor) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Set currentTerm={}, votedFor={}", persistenceId(), currentTerm, votedFor);
- }
- this.currentTerm = currentTerm;
- this.votedFor = votedFor;
- }
-
- @Override
- public void updateAndPersist(long currentTerm, String votedFor){
- update(currentTerm, votedFor);
- // FIXME : Maybe first persist then update the state
- persistence().persist(new UpdateElectionTerm(this.currentTerm, this.votedFor), new Procedure<UpdateElectionTerm>(){
-
- @Override public void apply(UpdateElectionTerm param)
- throws Exception {
-
- }
- });
- }
- }
-
static class UpdateElectionTerm implements Serializable {
private static final long serialVersionUID = 1L;
private final long currentTerm;