import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalSerdes;
import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalWriter;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
import java.util.ArrayList;
* <p>
* Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
* mapping information. The only additional information we need to maintain is the last deleted sequence number.
- *
- * @author Robert Varga
*/
final class SegmentedJournalActor extends AbstractActor {
abstract static class AsyncMessage<T> {
private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
Future<Optional<Exception>> add(final AtomicWrite write) {
- final Promise<Optional<Exception>> promise = Promise.apply();
+ final var promise = Promise.<Optional<Exception>>apply();
requests.add(write);
results.add(promise);
return promise.future();
LOG.debug("{}: actor starting", persistenceId);
super.preStart();
- final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
- final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
+ final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+ final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
LOG.debug("{}: deleting entries up to {}", persistenceId, to);
lastDelete = to;
- final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
- final Indexed<Long> entry = deleteWriter.append(lastDelete);
+ final var deleteWriter = deleteJournal.writer();
+ final var entry = deleteWriter.append(lastDelete);
deleteWriter.commit(entry.index());
dataJournal.deleteTo(lastDelete);
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final Stopwatch sw = Stopwatch.createStarted();
+ final var sw = Stopwatch.createStarted();
final long start = dataJournal.lastWrittenSequenceNr();
final long bytes = dataJournal.handleWriteMessages(message);
sw.stop();
return;
}
- final Stopwatch sw = Stopwatch.createStarted();
+ final var sw = Stopwatch.createStarted();
deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
.withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
- final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
+ final var lastEntry = deleteJournal.writer().getLastEntry();
lastDelete = lastEntry == null ? 0 : lastEntry.entry();
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,