package io.atomix.storage.journal;
import io.netty.buffer.ByteBuf;
+import java.io.IOException;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
- * Support for serialization of {@link ByteBufJournal} entries.
+ * Support for mapping of {@link ByteBufJournal} entries to and from {@link ByteBuf}s.
*/
@NonNullByDefault
public interface ByteBufMapper<T> {
/**
- * Converts an object into a series of bytes in a {@link ByteBuf}.
+ * Converts the contents of a {@link ByteBuf} to an object.
*
- * @param obj the object
- * @return resulting buffer
+ * @param index entry index
+ * @param bytes entry bytes
+ * @return resulting object
*/
- ByteBuf objectToBytes(T obj) ;
+ T bytesToObject(final long index, ByteBuf bytes);
/**
- * Converts the contents of a {@link ByteBuf} to an object.
+ * Converts an object into a series of bytes in the specified {@link ByteBuf}.
*
- * @param buf buffer to convert
- * @return resulting object
+ * @param obj the object
+ * @param buf target buffer
+ * @throws IOException if an I/O error occurs
*/
- T bytesToObject(ByteBuf buf);
+ void objectToBytes(T obj, ByteBuf buf) throws IOException;
}
*/
package io.atomix.storage.journal;
-import io.netty.buffer.ByteBuf;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
/**
* Appends an entry to the journal.
*
- * @param bytes Data block to append
- * @return The index of appended data block
+ * @param mapper a {@link ByteBufMapper} to use with entry
+ * @param entry entry to append
+ * @return the on-disk size of the entry
*/
// FIXME: throws IOException
- long append(ByteBuf bytes);
+ <T> int append(ByteBufMapper<T> mapper, T entry);
/**
* Commits entries up to the given index.
*/
abstract void writeEmptyHeader(int position);
+ /**
+ * Allocate file space. Note that the allocated space may be a buffer disconnected from the file. Any modifications
+ * to the returned buffer need to be committed via {@link #commitWrite(int, ByteBuffer)}.
+ *
+ * @param position position to start from
+ * @param size the size to allocate
+ * @return A {@link ByteBuffer} covering the allocated area
+ */
abstract ByteBuffer startWrite(int position, int size);
abstract void commitWrite(int position, ByteBuffer entry);
import io.atomix.storage.journal.StorageException.TooLarge;
import io.atomix.storage.journal.index.JournalIndex;
-import io.atomix.storage.journal.index.Position;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.IOException;
import java.nio.MappedByteBuffer;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
/**
* Tries to append a binary data to the journal.
*
- * @param buf binary data to append
- * @return The index of appended data, or {@code null} if segment has no space
+ * @param mapper the mapper to use
+ * @param entry the entry
+ * @return the entry size, or {@code null} if segment has no space
*/
- Position append(final ByteBuf buf) {
- final var length = buf.readableBytes();
- if (length > maxEntrySize) {
- throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")");
- }
-
- // Store the entry index.
+ <T> @Nullable Integer append(final ByteBufMapper<T> mapper, final T entry) {
+ // we are appending at this index and position
final long index = nextIndex();
final int position = currentPosition;
- // check space available
- final int nextPosition = position + HEADER_BYTES + length;
- if (nextPosition >= maxSegmentSize) {
+ // Map the entry carefully: we may not have enough segment space to satisfy maxEntrySize, but most entries are
+ // way smaller than that.
+ final int bodyPosition = position + HEADER_BYTES;
+ final int avail = maxSegmentSize - bodyPosition;
+ if (avail <= 0) {
+ // we do not have enough space for the header and a byte: signal a retry
LOG.trace("Not enough space for {} at {}", index, position);
return null;
}
- // allocate buffer and write data
- final var writeBuffer = fileWriter.startWrite(position, length + HEADER_BYTES);
- writeBuffer.put(HEADER_BYTES, buf.nioBuffer(), 0, length);
+ // Entry must not exceed maxEntrySize
+ final var writeLimit = Math.min(avail, maxEntrySize);
+
+ // Allocate entry space
+ final var diskEntry = fileWriter.startWrite(position, writeLimit + HEADER_BYTES);
+ // Create a ByteBuf covering the bytes. Note we do not use slice(), as Netty will do the equivalent.
+ final var bytes = Unpooled.wrappedBuffer(diskEntry.position(HEADER_BYTES));
+ try {
+ mapper.objectToBytes(entry, bytes);
+ } catch (IOException e) {
+ // We ran out of buffer space: let's decide who's fault it is:
+ if (writeLimit == maxEntrySize) {
+ // - it is the entry and/or mapper. This is not exactly accurate, as there may be other serialization
+ // fault. This is as good as it gets.
+ throw new TooLarge("Serialized entry size exceeds maximum allowed bytes (" + maxEntrySize + ")", e);
+ }
+
+ // - it is us, as we do not have the capacity to hold maxEntrySize bytes
+ LOG.trace("Tail serialization with {} bytes available failed", writeLimit, e);
+ return null;
+ }
- // Compute the checksum for the entry.
- final var checksum = SegmentEntry.computeChecksum(writeBuffer.slice(HEADER_BYTES, length));
+ // Determine length, trim distEntry and compute checksum. We are okay with computeChecksum() consuming
+ // the buffer, as we rewind() it back.
+ final var length = bytes.readableBytes();
+ final var checksum = SegmentEntry.computeChecksum(
+ diskEntry.limit(HEADER_BYTES + length).position(HEADER_BYTES));
- // Create a single byte[] in memory for the entire entry and write it as a batch to the underlying buffer.
- fileWriter.commitWrite(position, writeBuffer.putInt(0, length).putInt(Integer.BYTES, checksum));
+ // update the header and commit entry to file
+ fileWriter.commitWrite(position, diskEntry.rewind().putInt(0, length).putInt(Integer.BYTES, checksum));
// Update the last entry with the correct index/term/length.
- currentPosition = nextPosition;
- return journalIndex.index(index, position);
+ currentPosition = bodyPosition + length;
+ journalIndex.index(index, position);
+ return length;
}
/**
*/
package io.atomix.storage.journal;
+import com.esotericsoftware.kryo.KryoException;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import io.atomix.utils.serializer.KryoJournalSerdesBuilder;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
default <T> ByteBufMapper<T> toMapper() {
return new ByteBufMapper<>() {
@Override
- public ByteBuf objectToBytes(final T obj) {
- return Unpooled.wrappedBuffer(serialize(obj));
+ public void objectToBytes(final T obj, final ByteBuf bytes) throws IOException {
+ final var buffer = bytes.nioBuffer();
+ try {
+ serialize(obj, buffer);
+ } catch (KryoException e) {
+ throw new IOException(e);
+ } finally {
+ // adjust writerIndex so that readableBytes() the bytes written
+ bytes.writerIndex(bytes.readerIndex() + buffer.position());
+ }
}
@Override
- public T bytesToObject(final ByteBuf buf) {
- return deserialize(buf.nioBuffer());
+ public T bytesToObject(final long index, final ByteBuf bytes) {
+ return deserialize(bytes.nioBuffer());
}
};
}
*/
package io.atomix.storage.journal;
+import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
-import io.netty.buffer.ByteBuf;
-
/**
* A {@link ByteBufWriter} implementation.
*/
}
@Override
- public long append(final ByteBuf bytes) {
- final var position = currentWriter.append(bytes);
- return position != null ? position.index() : appendToNextSegment(bytes);
+ public <T> int append(final ByteBufMapper<T> mapper, final T entry) {
+ final var size = currentWriter.append(mapper, entry);
+ return size != null ? size : appendToNextSegment(mapper, entry);
}
// Slow path: we do not have enough capacity
- private long appendToNextSegment(final ByteBuf bytes) {
+ private <T> int appendToNextSegment(final ByteBufMapper<T> mapper, final T entry) {
currentWriter.flush();
currentSegment.releaseWriter();
currentSegment = journal.createNextSegment();
currentWriter = currentSegment.acquireWriter();
- return currentWriter.append(bytes).index();
+ return verifyNotNull(currentWriter.append(mapper, entry));
}
@Override
import static java.util.Objects.requireNonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
* A {@link JournalReader} backed by a {@link ByteBufReader}.
*/
+@NonNullByDefault
final class SegmentedJournalReader<E> implements JournalReader<E> {
private final ByteBufMapper<E> mapper;
private final ByteBufReader reader;
@Override
public <T> @Nullable T tryNext(final EntryMapper<E, T> entryMapper) {
- return reader.tryNext(
- (index, buf) -> requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(buf), buf.readableBytes()))
- );
+ return reader.tryNext((index, buf) -> {
+ final var size = buf.readableBytes();
+ return requireNonNull(entryMapper.mapEntry(index, mapper.bytesToObject(index, buf), size));
+ });
}
@Override
import static java.util.Objects.requireNonNull;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
/**
* A {@link JournalWriter} backed by a {@link ByteBufWriter}.
*/
+@NonNullByDefault
final class SegmentedJournalWriter<E> implements JournalWriter<E> {
private final ByteBufMapper<E> mapper;
private final ByteBufWriter writer;
@Override
public <T extends E> Indexed<T> append(final T entry) {
- final var buf = mapper.objectToBytes(entry);
- return new Indexed<>(writer.append(buf), entry, buf.readableBytes());
+ final var index = writer.nextIndex();
+ return new Indexed<>(index, entry, writer.append(mapper, entry));
}
@Override