import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import static java.util.Objects.requireNonNull;
+import io.atomix.storage.journal.JournalSegment.Inactive;
import io.atomix.storage.journal.StorageException.TooLarge;
import io.atomix.storage.journal.index.JournalIndex;
import io.netty.buffer.Unpooled;
import java.io.EOFException;
import java.io.IOException;
-import java.nio.MappedByteBuffer;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.slf4j.Logger;
private final FileWriter fileWriter;
final @NonNull JournalSegment segment;
private final @NonNull JournalIndex journalIndex;
- final int maxSegmentSize;
- final int maxEntrySize;
private int currentPosition;
- JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final int maxEntrySize,
- final JournalIndex journalIndex, final int currentPosition) {
+ JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+ final int currentPosition) {
this.fileWriter = requireNonNull(fileWriter);
this.segment = requireNonNull(segment);
this.journalIndex = requireNonNull(journalIndex);
- this.maxEntrySize = maxEntrySize;
this.currentPosition = currentPosition;
- maxSegmentSize = segment.file().maxSize();
}
- JournalSegmentWriter(final JournalSegmentWriter previous, final FileWriter fileWriter) {
- segment = previous.segment;
- journalIndex = previous.journalIndex;
- maxSegmentSize = previous.maxSegmentSize;
- maxEntrySize = previous.maxEntrySize;
- currentPosition = previous.currentPosition;
+ JournalSegmentWriter(final FileWriter fileWriter, final JournalSegment segment, final JournalIndex journalIndex,
+ final Inactive segmentState) {
this.fileWriter = requireNonNull(fileWriter);
+ this.segment = requireNonNull(segment);
+ this.journalIndex = requireNonNull(journalIndex);
+ currentPosition = segmentState.position();
+ }
+
+ int currentPosition() {
+ return currentPosition;
}
/**
// Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
// way smaller than that.
final int bodyPosition = position + HEADER_BYTES;
- final int avail = maxSegmentSize - bodyPosition;
+ final int avail = segment.file().maxSize() - bodyPosition;
if (avail <= 0) {
// we do not have enough space for the header and a byte: signal a retry
LOG.trace("Not enough space for {} at {}", index, position);
}
// Entry must not exceed maxEntrySize
+ final var maxEntrySize = fileWriter.maxEntrySize();
final var writeLimit = Math.min(avail, maxEntrySize);
// Allocate entry space
currentPosition = index < segment.firstIndex() ? JournalSegmentDescriptor.BYTES
// recover position and last written
- : JournalSegment.indexEntries(fileWriter, segment, maxEntrySize, journalIndex, index, nearest);
+ : JournalSegment.indexEntries(fileWriter, segment, journalIndex, index, nearest);
// Zero the entry header at current channel position.
fileWriter.writeEmptyHeader(currentPosition);
* Flushes written entries to disk.
*/
void flush() {
- fileWriter.flush();
- }
-
- /**
- * Closes this writer.
- */
- void close() {
- fileWriter.close();
- }
-
- /**
- * Returns the mapped buffer underlying the segment writer, or {@code null} if the writer does not have such a
- * buffer.
- *
- * @return the mapped buffer underlying the segment writer, or {@code null}.
- */
- @Nullable MappedByteBuffer buffer() {
- return fileWriter.buffer();
- }
-
- @NonNull JournalSegmentWriter toMapped() {
- final var newWriter = fileWriter.toMapped();
- return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
- }
-
- @NonNull JournalSegmentWriter toFileChannel() {
- final var newWriter = fileWriter.toDisk();
- return newWriter == null ? this : new JournalSegmentWriter(this, newWriter);
+ try {
+ fileWriter.flush();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
}