* A single entry in the data journal. We do not store {@code persistenceId} for each entry, as that is a
* journal-invariant, nor do we store {@code sequenceNr}, as that information is maintained by {@link JournalSegment}'s
* index.
- *
- * @author Robert Varga
*/
-abstract class DataJournalEntry {
+abstract sealed class DataJournalEntry {
/**
* A single data journal entry on its way to the backing file.
*/
*/
package org.opendaylight.controller.akka.segjournal;
-import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.persistence.PersistentRepr;
+import akka.serialization.JavaSerializer;
+import com.google.common.base.VerifyException;
import io.atomix.storage.journal.JournalSerdes.EntryInput;
import io.atomix.storage.journal.JournalSerdes.EntryOutput;
import io.atomix.storage.journal.JournalSerdes.EntrySerdes;
* {@link #write(EntryOutput, DataJournalEntry)} only accepts {@link ToPersistence} subclass, which is a wrapper
* around a {@link PersistentRepr}, while {@link #read(EntryInput)} produces an {@link FromPersistence}, which
* needs further processing to reconstruct a {@link PersistentRepr}.
- *
- * @author Robert Varga
*/
final class DataJournalEntrySerializer implements EntrySerdes<DataJournalEntry> {
private final ExtendedActorSystem actorSystem;
@Override
public void write(final EntryOutput output, final DataJournalEntry entry) throws IOException {
- verify(entry instanceof ToPersistence);
- final PersistentRepr repr = ((ToPersistence) entry).repr();
- output.writeString(repr.manifest());
- output.writeString(repr.writerUuid());
- output.writeObject(repr.payload());
+ if (entry instanceof ToPersistence toPersistence) {
+ final var repr = toPersistence.repr();
+ output.writeString(repr.manifest());
+ output.writeString(repr.writerUuid());
+ output.writeObject(repr.payload());
+ } else {
+ throw new VerifyException("Unexpected entry " + entry);
+ }
}
@Override
public DataJournalEntry read(final EntryInput input) throws IOException {
- final String manifest = input.readString();
- final String uuid = input.readString();
- final Object payload = akka.serialization.JavaSerializer.currentSystem().withValue(actorSystem,
- (Callable<Object>) input::readObject);
- return new FromPersistence(manifest, uuid, payload);
+ return new FromPersistence(input.readString(), input.readString(),
+ JavaSerializer.currentSystem().withValue(actorSystem, (Callable<Object>) input::readObject));
}
}
*/
package org.opendaylight.controller.akka.segjournal;
-import static com.google.common.base.Verify.verify;
-
import akka.actor.ActorSystem;
-import akka.persistence.AtomicWrite;
import akka.persistence.PersistentRepr;
import com.codahale.metrics.Histogram;
-import io.atomix.storage.journal.Indexed;
+import com.google.common.base.VerifyException;
import io.atomix.storage.journal.JournalSerdes;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.SegmentedJournalReader;
/**
* Version 0 data journal, where every journal entry maps to exactly one segmented file entry.
- *
- * @author Robert Varga
*/
final class DataJournalV0 extends DataJournal {
private static final Logger LOG = LoggerFactory.getLogger(DataJournalV0.class);
@Override
@SuppressWarnings("checkstyle:illegalCatch")
void handleReplayMessages(final ReplayMessages message, final long fromSequenceNr) {
- try (SegmentedJournalReader<DataJournalEntry> reader = entries.openReader(fromSequenceNr)) {
- int count = 0;
- while (reader.hasNext() && count < message.max) {
- final Indexed<DataJournalEntry> next = reader.next();
- if (next.index() > message.toSequenceNr) {
- break;
- }
-
- LOG.trace("{}: replay {}", persistenceId, next);
- updateLargestSize(next.size());
- final DataJournalEntry entry = next.entry();
- verify(entry instanceof FromPersistence, "Unexpected entry %s", entry);
-
- final PersistentRepr repr = ((FromPersistence) entry).toRepr(persistenceId, next.index());
- LOG.debug("{}: replaying {}", persistenceId, repr);
- message.replayCallback.accept(repr);
- count++;
- }
- LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+ try (var reader = entries.openReader(fromSequenceNr)) {
+ handleReplayMessages(reader, message);
} catch (Exception e) {
LOG.warn("{}: failed to replay messages for {}", persistenceId, message, e);
message.promise.failure(e);
}
}
+ private void handleReplayMessages(final SegmentedJournalReader<DataJournalEntry> reader,
+ final ReplayMessages message) {
+ int count = 0;
+ while (reader.hasNext() && count < message.max) {
+ final var next = reader.next();
+ if (next.index() > message.toSequenceNr) {
+ break;
+ }
+
+ LOG.trace("{}: replay {}", persistenceId, next);
+ updateLargestSize(next.size());
+ final var entry = next.entry();
+ if (entry instanceof FromPersistence fromPersistence) {
+ final var repr = fromPersistence.toRepr(persistenceId, next.index());
+ LOG.debug("{}: replaying {}", persistenceId, repr);
+ message.replayCallback.accept(repr);
+ count++;
+ } else {
+ throw new VerifyException("Unexpected entry " + entry);
+ }
+ }
+ LOG.debug("{}: successfully replayed {} entries", persistenceId, count);
+ }
+
@Override
@SuppressWarnings("checkstyle:illegalCatch")
long handleWriteMessages(final WriteMessages message) {
final int count = message.size();
- final SegmentedJournalWriter<DataJournalEntry> writer = entries.writer();
+ final var writer = entries.writer();
long bytes = 0;
for (int i = 0; i < count; ++i) {
final long mark = writer.getLastIndex();
- final AtomicWrite request = message.getRequest(i);
+ final var request = message.getRequest(i);
- final List<PersistentRepr> reprs = CollectionConverters.asJava(request.payload());
+ final var reprs = CollectionConverters.asJava(request.payload());
LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark);
try {
bytes += writePayload(writer, reprs);
private long writePayload(final SegmentedJournalWriter<DataJournalEntry> writer, final List<PersistentRepr> reprs) {
long bytes = 0;
- for (PersistentRepr repr : reprs) {
+ for (var repr : reprs) {
final Object payload = repr.payload();
if (!(payload instanceof Serializable)) {
throw new UnsupportedOperationException("Non-serializable payload encountered "
import akka.persistence.PersistentRepr;
import akka.persistence.journal.japi.AsyncWriteJournal;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigMemorySize;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
* An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
* of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
* {@link SegmentedJournalActor} for details on how the persistence works.
- *
- * @author Robert Varga
*/
public class SegmentedFileJournal extends AsyncWriteJournal {
public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
- final Map<ActorRef, WriteMessages> map = new HashMap<>();
- final List<Future<Optional<Exception>>> result = new ArrayList<>();
+ final var map = new HashMap<ActorRef, WriteMessages>();
+ final var result = new ArrayList<Future<Optional<Exception>>>();
- for (AtomicWrite message : messages) {
- final String persistenceId = message.persistenceId();
- final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
+ for (var message : messages) {
+ final var persistenceId = message.persistenceId();
+ final var handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
}
}
private ActorRef createHandler(final String persistenceId) {
- final String directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
- final File directory = new File(rootDir, directoryName);
+ final var directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
+ final var directory = new File(rootDir, directoryName);
LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
- final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
+ final var handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
maxEntrySize, maxSegmentSize));
LOG.debug("Directory {} handled by {}", directory, handler);
return handler;
}
private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
- final ActorRef handler = handlers.get(persistenceId);
+ final var handler = handlers.get(persistenceId);
if (handler == null) {
return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
}
if (!config.hasPath(path)) {
return defaultValue;
}
- final ConfigMemorySize value = config.getMemorySize(path);
+ final var value = config.getMemorySize(path);
final long result = value.toBytes();
checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
return (int) result;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalSerdes;
import io.atomix.storage.journal.SegmentedJournal;
-import io.atomix.storage.journal.SegmentedJournalWriter;
import io.atomix.storage.journal.StorageLevel;
import java.io.File;
import java.util.ArrayList;
* <p>
* Split-file approach allows us to treat sequence numbers and indices as equivalent, without maintaining any explicit
* mapping information. The only additional information we need to maintain is the last deleted sequence number.
- *
- * @author Robert Varga
*/
final class SegmentedJournalActor extends AbstractActor {
abstract static class AsyncMessage<T> {
private final List<Promise<Optional<Exception>>> results = new ArrayList<>();
Future<Optional<Exception>> add(final AtomicWrite write) {
- final Promise<Optional<Exception>> promise = Promise.apply();
+ final var promise = Promise.<Optional<Exception>>apply();
requests.add(write);
results.add(promise);
return promise.future();
LOG.debug("{}: actor starting", persistenceId);
super.preStart();
- final MetricRegistry registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
- final String actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
+ final var registry = MetricsReporter.getInstance(MeteringBehavior.DOMAIN).getMetricsRegistry();
+ final var actorName = self().path().parent().toStringWithoutAddress() + '/' + directory.getName();
batchWriteTime = registry.timer(MetricRegistry.name(actorName, "batchWriteTime"));
messageWriteCount = registry.meter(MetricRegistry.name(actorName, "messageWriteCount"));
LOG.debug("{}: deleting entries up to {}", persistenceId, to);
lastDelete = to;
- final SegmentedJournalWriter<Long> deleteWriter = deleteJournal.writer();
- final Indexed<Long> entry = deleteWriter.append(lastDelete);
+ final var deleteWriter = deleteJournal.writer();
+ final var entry = deleteWriter.append(lastDelete);
deleteWriter.commit(entry.index());
dataJournal.deleteTo(lastDelete);
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final Stopwatch sw = Stopwatch.createStarted();
+ final var sw = Stopwatch.createStarted();
final long start = dataJournal.lastWrittenSequenceNr();
final long bytes = dataJournal.handleWriteMessages(message);
sw.stop();
return;
}
- final Stopwatch sw = Stopwatch.createStarted();
+ final var sw = Stopwatch.createStarted();
deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
.withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
- final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
+ final var lastEntry = deleteJournal.writer().getLastEntry();
lastDelete = lastEntry == null ? 0 : lastEntry.entry();
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,