import com.esotericsoftware.kryo.KryoException;
import io.atomix.storage.journal.index.JournalIndex;
-import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.zip.CRC32;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
abstract sealed class JournalSegmentWriter<E> permits DiskJournalSegmentWriter, MappedJournalSegmentWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(JournalSegmentWriter.class);
+
final @NonNull FileChannel channel;
final @NonNull JournalSegment<E> segment;
private final @NonNull JournalIndex index;
}
/**
- * Appends an entry to the journal.
+ * Tries to append an entry to the journal.
*
* @param entry The entry to append.
- * @return The appended indexed entry.
+ * @return The appended indexed entry, or {@code null} if there is not enough space available
*/
- final <T extends E> Indexed<T> append(final T entry) {
+ final <T extends E> @Nullable Indexed<T> append(final T entry) {
// Store the entry index.
final long index = getNextIndex();
final int position = currentPosition;
final int bodyPosition = position + HEADER_BYTES;
final int avail = maxSegmentSize - bodyPosition;
if (avail < 0) {
- throw new BufferOverflowException();
+ LOG.trace("Not enough space for {} at {}", index, position);
+ return null;
}
final var writeLimit = Math.min(avail, maxEntrySize);
} catch (KryoException e) {
if (writeLimit != maxEntrySize) {
// We have not provided enough capacity, signal to roll to next segment
- throw new BufferOverflowException();
+ LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
+ return null;
}
// Just reset the buffer. There's no need to zero the bytes since we haven't written the length or checksum.
- throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
+ throw new StorageException.TooLarge("Entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
}
final int length = diskEntry.position() - HEADER_BYTES;
*/
package io.atomix.storage.journal;
+import org.eclipse.jdt.annotation.NonNull;
+
/**
* Log writer.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public interface JournalWriter<E> {
- /**
- * Returns the last written index.
- *
- * @return The last written index.
- */
- long getLastIndex();
+ /**
+ * Returns the last written index.
+ *
+ * @return The last written index.
+ */
+ long getLastIndex();
- /**
- * Returns the last entry written.
- *
- * @return The last entry written.
- */
- Indexed<E> getLastEntry();
+ /**
+ * Returns the last entry written.
+ *
+ * @return The last entry written.
+ */
+ Indexed<E> getLastEntry();
- /**
- * Returns the next index to be written.
- *
- * @return The next index to be written.
- */
- long getNextIndex();
+ /**
+ * Returns the next index to be written.
+ *
+ * @return The next index to be written.
+ */
+ long getNextIndex();
- /**
- * Appends an entry to the journal.
- *
- * @param entry The entry to append.
- * @return The appended indexed entry.
- */
- <T extends E> Indexed<T> append(T entry);
+ /**
+ * Appends an entry to the journal.
+ *
+ * @param entry The entry to append.
+ * @return The appended indexed entry.
+ */
+ <T extends E> @NonNull Indexed<T> append(T entry);
- /**
- * Commits entries up to the given index.
- *
- * @param index The index up to which to commit entries.
- */
- void commit(long index);
+ /**
+ * Commits entries up to the given index.
+ *
+ * @param index The index up to which to commit entries.
+ */
+ void commit(long index);
- /**
- * Resets the head of the journal to the given index.
- *
- * @param index the index to which to reset the head of the journal
- */
- void reset(long index);
+ /**
+ * Resets the head of the journal to the given index.
+ *
+ * @param index the index to which to reset the head of the journal
+ */
+ void reset(long index);
- /**
- * Truncates the log to the given index.
- *
- * @param index The index to which to truncate the log.
- */
- void truncate(long index);
+ /**
+ * Truncates the log to the given index.
+ *
+ * @param index The index to which to truncate the log.
+ */
+ void truncate(long index);
- /**
- * Flushes written entries to disk.
- */
- void flush();
+ /**
+ * Flushes written entries to disk.
+ */
+ void flush();
}
*/
package io.atomix.storage.journal;
-import java.nio.BufferOverflowException;
+import static com.google.common.base.Verify.verifyNotNull;
/**
* Raft log writer.
@Override
public <T extends E> Indexed<T> append(T entry) {
- try {
- return currentWriter.append(entry);
- } catch (BufferOverflowException e) {
- if (currentSegment.firstIndex() == currentWriter.getNextIndex()) {
- throw e;
- }
+ var indexed = currentWriter.append(entry);
+ if (indexed != null) {
+ return indexed;
}
+ // Slow path: we do not have enough capacity
currentWriter.flush();
currentSegment.releaseWriter();
currentSegment = journal.getNextSegment();
currentWriter = currentSegment.acquireWriter();
- return currentWriter.append(entry);
+ return verifyNotNull(currentWriter.append(entry));
}
@Override
public TooLarge(final String message) {
super(message);
}
+
+ public TooLarge(final String message, final Throwable cause) {
+ super(message, cause);
+ }
}
/**