X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-segmented-journal%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fakka%2Fsegjournal%2FDataJournalV0.java;h=243a064b80fea81b9fed2a522dacb9f0105aabc4;hb=HEAD;hp=20761c32416558d43f28eb32cf05d28edea92301;hpb=7e629cabf95cf30562f10a658c8ebdd724d1b18d;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java index 20761c3241..935ded32e2 100644 --- a/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java +++ b/opendaylight/md-sal/sal-akka-segmented-journal/src/main/java/org/opendaylight/controller/akka/segjournal/DataJournalV0.java @@ -11,18 +11,21 @@ import akka.actor.ActorSystem; import akka.persistence.PersistentRepr; import com.codahale.metrics.Histogram; import com.google.common.base.VerifyException; +import io.atomix.storage.journal.JournalReader; import io.atomix.storage.journal.JournalSerdes; +import io.atomix.storage.journal.JournalWriter; +import io.atomix.storage.journal.SegmentedByteBufJournal; import io.atomix.storage.journal.SegmentedJournal; -import io.atomix.storage.journal.SegmentedJournalReader; -import io.atomix.storage.journal.SegmentedJournalWriter; import io.atomix.storage.journal.StorageLevel; import java.io.File; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import org.opendaylight.controller.akka.segjournal.DataJournalEntry.FromPersistence; import org.opendaylight.controller.akka.segjournal.DataJournalEntry.ToPersistence; import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.ReplayMessages; import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WriteMessages; +import org.opendaylight.controller.akka.segjournal.SegmentedJournalActor.WrittenMessages; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.jdk.javaapi.CollectionConverters; @@ -38,18 +41,23 @@ final class DataJournalV0 extends DataJournal { DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system, final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) { super(persistenceId, messageSize); - entries = SegmentedJournal.builder() - .withStorageLevel(storage).withDirectory(directory).withName("data") - .withNamespace(JournalSerdes.builder() - .register(new DataJournalEntrySerializer(system), FromPersistence.class, ToPersistence.class) - .build()) - .withMaxEntrySize(maxEntrySize).withMaxSegmentSize(maxSegmentSize) - .build(); + + final var serdes = JournalSerdes.builder() + .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class) + .build(); + + entries = new SegmentedJournal<>(SegmentedByteBufJournal.builder() + .withDirectory(directory) + .withName("data") + .withStorageLevel(storage) + .withMaxEntrySize(maxEntrySize) + .withMaxSegmentSize(maxSegmentSize) + .build(), serdes.toReadMapper(), serdes.toWriteMapper()); } @Override long lastWrittenSequenceNr() { - return entries.writer().getLastIndex(); + return entries.lastIndex(); } @Override @@ -64,9 +72,15 @@ final class DataJournalV0 extends DataJournal { @Override void close() { + flush(); entries.close(); } + @Override + void flush() { + entries.writer().flush(); + } + @Override @SuppressWarnings("checkstyle:illegalCatch") void handleReplayMessages(final ReplayMessages message, final long fromSequenceNr) { @@ -80,59 +94,59 @@ final class DataJournalV0 extends DataJournal { } } - private void handleReplayMessages(final SegmentedJournalReader reader, - final ReplayMessages message) { + private void handleReplayMessages(final JournalReader reader, final ReplayMessages message) { int count = 0; - while (reader.hasNext() && count < message.max) { - final var next = reader.next(); - if (next.index() > message.toSequenceNr) { + while (count < message.max && reader.getNextIndex() <= message.toSequenceNr) { + final var repr = reader.tryNext((index, entry, size) -> { + LOG.trace("{}: replay index={} entry={}", persistenceId, index, entry); + updateLargestSize(size); + if (entry instanceof FromPersistence fromPersistence) { + return fromPersistence.toRepr(persistenceId, index); + } + throw new VerifyException("Unexpected entry " + entry); + }); + + if (repr == null) { break; } - LOG.trace("{}: replay {}", persistenceId, next); - updateLargestSize(next.size()); - final var entry = next.entry(); - if (entry instanceof FromPersistence fromPersistence) { - final var repr = fromPersistence.toRepr(persistenceId, next.index()); - LOG.debug("{}: replaying {}", persistenceId, repr); - message.replayCallback.accept(repr); - count++; - } else { - throw new VerifyException("Unexpected entry " + entry); - } + LOG.debug("{}: replaying {}", persistenceId, repr); + message.replayCallback.accept(repr); + count++; } LOG.debug("{}: successfully replayed {} entries", persistenceId, count); } @Override @SuppressWarnings("checkstyle:illegalCatch") - long handleWriteMessages(final WriteMessages message) { + WrittenMessages handleWriteMessages(final WriteMessages message) { final int count = message.size(); + final var responses = new ArrayList<>(); final var writer = entries.writer(); - long bytes = 0; + long writtenBytes = 0; for (int i = 0; i < count; ++i) { - final long mark = writer.getLastIndex(); + final long prevNextIndex = writer.getNextIndex(); final var request = message.getRequest(i); final var reprs = CollectionConverters.asJava(request.payload()); - LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark); + LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), prevNextIndex); try { - bytes += writePayload(writer, reprs); + writtenBytes += writePayload(writer, reprs); } catch (Exception e) { - LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e); - message.setFailure(i, e); - writer.truncate(mark); + LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, + prevNextIndex, e); + responses.add(e); + writer.reset(prevNextIndex); continue; } - - message.setSuccess(i); + responses.add(null); } - writer.flush(); - return bytes; + + return new WrittenMessages(message, responses, writtenBytes); } - private long writePayload(final SegmentedJournalWriter writer, final List reprs) { + private long writePayload(final JournalWriter writer, final List reprs) { long bytes = 0; for (var repr : reprs) { final Object payload = repr.payload();