-
- public ReplicatedLogImpl() {
- super();
- }
-
- @Override public void removeFromAndPersist(long logEntryIndex) {
- int adjustedIndex = adjustedIndex(logEntryIndex);
-
- if (adjustedIndex < 0) {
- return;
- }
-
- // FIXME: Maybe this should be done after the command is saved
- journal.subList(adjustedIndex , journal.size()).clear();
-
- persistence().persist(new DeleteEntries(adjustedIndex), new Procedure<DeleteEntries>() {
-
- @Override
- public void apply(DeleteEntries param)
- throws Exception {
- //FIXME : Doing nothing for now
- dataSize = 0;
- for (ReplicatedLogEntry entry : journal) {
- dataSize += entry.size();
- }
- }
- });
- }
-
- @Override public void appendAndPersist(
- final ReplicatedLogEntry replicatedLogEntry) {
- appendAndPersist(replicatedLogEntry, null);
- }
-
- public void appendAndPersist(
- final ReplicatedLogEntry replicatedLogEntry,
- final Procedure<ReplicatedLogEntry> callback) {
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Append log entry and persist {} ", persistenceId(), replicatedLogEntry);
- }
-
- // FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
- journal.add(replicatedLogEntry);
-
- // When persisting events with persist it is guaranteed that the
- // persistent actor will not receive further commands between the
- // persist call and the execution(s) of the associated event
- // handler. This also holds for multiple persist calls in context
- // of a single command.
- persistence().persist(replicatedLogEntry,
- new Procedure<ReplicatedLogEntry>() {
- @Override
- public void apply(ReplicatedLogEntry evt) throws Exception {
- int logEntrySize = replicatedLogEntry.size();
-
- dataSize += logEntrySize;
- long dataSizeForCheck = dataSize;
-
- dataSizeSinceLastSnapshot += logEntrySize;
- long journalSize = lastIndex() + 1;
-
- if(!hasFollowers()) {
- // When we do not have followers we do not maintain an in-memory log
- // due to this the journalSize will never become anything close to the
- // snapshot batch count. In fact will mostly be 1.
- // Similarly since the journal's dataSize depends on the entries in the
- // journal the journal's dataSize will never reach a value close to the
- // memory threshold.
- // By maintaining the dataSize outside the journal we are tracking essentially
- // what we have written to the disk however since we no longer are in
- // need of doing a snapshot just for the sake of freeing up memory we adjust
- // the real size of data by the DATA_SIZE_DIVIDER so that we do not snapshot as often
- // as if we were maintaining a real snapshot
- dataSizeForCheck = dataSizeSinceLastSnapshot / DATA_SIZE_DIVIDER;
- }
-
- long dataThreshold = Runtime.getRuntime().totalMemory() *
- getRaftActorContext().getConfigParams().getSnapshotDataThresholdPercentage() / 100;
-
- // when a snaphsot is being taken, captureSnapshot != null
- if (!context.isSnapshotCaptureInitiated() &&
- ( journalSize % context.getConfigParams().getSnapshotBatchCount() == 0 ||
- dataSizeForCheck > dataThreshold)) {
-
- dataSizeSinceLastSnapshot = 0;
-
- LOG.info("{}: Initiating Snapshot Capture..", persistenceId());
- long lastAppliedIndex = -1;
- long lastAppliedTerm = -1;
-
- ReplicatedLogEntry lastAppliedEntry = get(context.getLastApplied());
- if (!hasFollowers()) {
- lastAppliedIndex = replicatedLogEntry.getIndex();
- lastAppliedTerm = replicatedLogEntry.getTerm();
- } else if (lastAppliedEntry != null) {
- lastAppliedIndex = lastAppliedEntry.getIndex();
- lastAppliedTerm = lastAppliedEntry.getTerm();
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Snapshot Capture logSize: {}", persistenceId(), journal.size());
- LOG.debug("{}: Snapshot Capture lastApplied:{} ",
- persistenceId(), context.getLastApplied());
- LOG.debug("{}: Snapshot Capture lastAppliedIndex:{}", persistenceId(),
- lastAppliedIndex);
- LOG.debug("{}: Snapshot Capture lastAppliedTerm:{}", persistenceId(),
- lastAppliedTerm);
- }
-
- // send a CaptureSnapshot to self to make the expensive operation async.
- long replicatedToAllIndex = getCurrentBehavior().getReplicatedToAllIndex();
- ReplicatedLogEntry replicatedToAllEntry = context.getReplicatedLog().get(replicatedToAllIndex);
- getSelf().tell(new CaptureSnapshot(lastIndex(), lastTerm(), lastAppliedIndex, lastAppliedTerm,
- (replicatedToAllEntry != null ? replicatedToAllEntry.getIndex() : -1),
- (replicatedToAllEntry != null ? replicatedToAllEntry.getTerm() : -1)),
- null);
- context.setSnapshotCaptureInitiated(true);
- }
- if (callback != null){
- callback.apply(replicatedLogEntry);
- }
- }
- }
- );
- }
-