*/
package io.atomix.storage.journal;
-import io.netty.buffer.ByteBuf;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
*/
@NonNullByDefault
public interface ByteBufReader extends AutoCloseable {
- /**
- * A journal entry processor. Responsible for transforming bytes into their internal representation.
- *
- * @param <T> Internal representation type
- */
- @FunctionalInterface
- interface EntryMapper<T> {
- /**
- * Process an entry.
- *
- * @param index entry index
- * @param bytes entry bytes
- * @return resulting internal representation
- */
- T mapEntry(long index, ByteBuf bytes);
- }
-
/**
* Returns the next reader index.
*
/**
* Try to move to the next binary data block.
*
- * @param entryMapper callback to be invoked on binary data
+ * @param mapper callback to be invoked on binary data
* @return processed binary data, or {@code null}
*/
- <T> @Nullable T tryNext(EntryMapper<T> entryMapper);
+ <T> @Nullable T tryNext(FromByteBufMapper<T> mapper);
/**
* Resets the reader to the start.
/**
* Appends an entry to the journal.
*
- * @param mapper a {@link ByteBufMapper} to use with entry
+ * @param mapper a {@link ToByteBufMapper} to use with entry
* @param entry entry to append
* @return the on-disk size of the entry
*/
// FIXME: throws IOException
- <T> int append(ByteBufMapper<T> mapper, T entry);
+ <T> int append(ToByteBufMapper<T> mapper, T entry);
/**
* Commits entries up to the given index.
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.atomix.storage.journal;
+
+import io.netty.buffer.ByteBuf;
+import org.eclipse.jdt.annotation.NonNullByDefault;
+
+/**
+ * Interface for transforming bytes into their internal representation.
+ *
+ * @param <T> Internal representation type
+ */
+@NonNullByDefault
+@FunctionalInterface
+public interface FromByteBufMapper<T> {
+ /**
+ * Converts the contents of a {@link ByteBuf} to an object.
+ *
+ * @param index entry index
+ * @param bytes entry bytes
+ * @return resulting object
+ */
+ T bytesToObject(long index, ByteBuf bytes);
+}
* @param entry the entry
* @return the entry size, or {@code null} if segment has no space
*/
- <T> @Nullable Integer append(final ByteBufMapper<T> mapper, final T entry) {
+ <T> @Nullable Integer append(final ToByteBufMapper<T> mapper, final T entry) {
// we are appending at this index and position
final long index = nextIndex();
final int position = currentPosition;
import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import io.atomix.utils.serializer.KryoJournalSerdesBuilder;
-import io.netty.buffer.ByteBuf;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
/**
* Support for serialization of {@link Journal} entries.
*
- * @deprecated due to dependency on outdated Kryo library, {@link ByteBufMapper} to be used instead.
+ * @deprecated due to dependency on outdated Kryo library, {@link FromByteBufMapper} to be used instead.
*/
@Deprecated(forRemoval = true, since = "9.0.3")
public interface JournalSerdes {
*/
<T> T deserialize(InputStream stream, int bufferSize);
+
/**
- * Returns a {@link ByteBufMapper} backed by this object.
+ * Returns a {@link FromByteBufMapper} backed by this object.
*
- * @return a {@link ByteBufMapper} backed by this object
+ * @return a {@link FromByteBufMapper} backed by this object
*/
- default <T> ByteBufMapper<T> toMapper() {
- return new ByteBufMapper<>() {
- @Override
- public void objectToBytes(final T obj, final ByteBuf bytes) throws IOException {
- final var buffer = bytes.nioBuffer();
- try {
- serialize(obj, buffer);
- } catch (KryoException e) {
- throw newIOException(e);
- } finally {
- // adjust writerIndex so that readableBytes() the bytes written
- bytes.writerIndex(bytes.readerIndex() + buffer.position());
- }
- }
+ default <T> FromByteBufMapper<T> toReadMapper() {
+ return (index, bytes) -> deserialize(bytes.nioBuffer());
+ }
- @Override
- public T bytesToObject(final long index, final ByteBuf bytes) {
- return deserialize(bytes.nioBuffer());
+ /**
+ * Returns a {@link ToByteBufMapper} backed by this object.
+ *
+ * @return a {@link ToByteBufMapper} backed by this object
+ */
+ default <T> ToByteBufMapper<T> toWriteMapper() {
+ return (obj, buf) -> {
+ final var nio = buf.nioBuffer();
+ try {
+ serialize(obj, nio);
+ } catch (KryoException e) {
+ throw newIOException(e);
+ } finally {
+ // adjust writerIndex so that readableBytes() the bytes written
+ buf.writerIndex(buf.readerIndex() + nio.position());
}
+ };
+ }
- private static IOException newIOException(final KryoException cause) {
- // We may have multiple nested KryoExceptions, intertwined with others, like IOExceptions. Let's find
- // the deepest one.
- var rootKryo = cause;
- for (var nextCause = rootKryo.getCause(); nextCause != null; nextCause = nextCause.getCause()) {
- if (nextCause instanceof KryoException kryo) {
- rootKryo = kryo;
- }
- }
- // It would be nice to have a better way of discerning these, but alas it is what it is.
- if (rootKryo.getMessage().startsWith("Buffer overflow.")) {
- final var ex = new EOFException();
- ex.initCause(cause);
- return ex;
- }
- return new IOException(rootKryo);
+ private static IOException newIOException(final KryoException cause) {
+ // We may have multiple nested KryoExceptions, intertwined with others, like IOExceptions. Let's find
+ // the deepest one.
+ var rootKryo = cause;
+ for (var nextCause = rootKryo.getCause(); nextCause != null; nextCause = nextCause.getCause()) {
+ if (nextCause instanceof KryoException kryo) {
+ rootKryo = kryo;
}
- };
+ }
+ // It would be nice to have a better way of discerning these, but alas it is what it is.
+ if (rootKryo.getMessage().startsWith("Buffer overflow.")) {
+ final var ex = new EOFException();
+ ex.initCause(cause);
+ return ex;
+ }
+ return new IOException(rootKryo);
}
/**
}
@Override
- public final <T> T tryNext(final EntryMapper<T> entryMapper) {
+ public final <T> T tryNext(final FromByteBufMapper<T> mapper) {
final var index = nextIndex;
final var bytes = tryAdvance(index);
- return bytes == null ? null : entryMapper.mapEntry(index, bytes);
+ return bytes == null ? null : mapper.bytesToObject(index, bytes);
}
/**
}
@Override
- public <T> int append(final ByteBufMapper<T> mapper, final T entry) {
+ public <T> int append(final ToByteBufMapper<T> mapper, final T entry) {
final var size = currentWriter.append(mapper, entry);
return size != null ? size : appendToNextSegment(mapper, entry);
}
// Slow path: we do not have enough capacity
- private <T> int appendToNextSegment(final ByteBufMapper<T> mapper, final T entry) {
+ private <T> int appendToNextSegment(final ToByteBufMapper<T> mapper, final T entry) {
currentWriter.flush();
currentSegment.releaseWriter();
currentSegment = journal.createNextSegment();
import static java.util.Objects.requireNonNull;
import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
/**
* A {@link Journal} implementation based on a {@link ByteBufJournal}.
*/
public final class SegmentedJournal<E> implements Journal<E> {
- private final SegmentedJournalWriter<E> writer;
- private final ByteBufMapper<E> mapper;
- private final ByteBufJournal journal;
+ private final @NonNull SegmentedJournalWriter<E> writer;
+ private final @NonNull FromByteBufMapper<E> readMapper;
+ private final @NonNull ByteBufJournal journal;
- public SegmentedJournal(final ByteBufJournal journal, final ByteBufMapper<E> mapper) {
+ public SegmentedJournal(final ByteBufJournal journal, final FromByteBufMapper<E> readMapper,
+ final ToByteBufMapper<E> writeMapper) {
this.journal = requireNonNull(journal, "journal is required");
- this.mapper = requireNonNull(mapper, "mapper cannot be null");
- writer = new SegmentedJournalWriter<>(journal.writer(), mapper);
+ this.readMapper = requireNonNull(readMapper, "readMapper cannot be null");
+ writer = new SegmentedJournalWriter<>(journal.writer(),
+ requireNonNull(writeMapper, "writeMapper cannot be null"));
}
@Override
case ALL -> journal.openReader(index);
case COMMITS -> journal.openCommitsReader(index);
};
- return new SegmentedJournalReader<>(byteReader, mapper);
+ return new SegmentedJournalReader<>(byteReader, readMapper);
}
@Override
*/
@NonNullByDefault
final class SegmentedJournalReader<E> implements JournalReader<E> {
- private final ByteBufMapper<E> mapper;
+ private final FromByteBufMapper<E> mapper;
private final ByteBufReader reader;
- SegmentedJournalReader(final ByteBufReader reader, final ByteBufMapper<E> mapper) {
+ SegmentedJournalReader(final ByteBufReader reader, final FromByteBufMapper<E> mapper) {
this.reader = requireNonNull(reader);
this.mapper = requireNonNull(mapper);
}
*/
@NonNullByDefault
final class SegmentedJournalWriter<E> implements JournalWriter<E> {
- private final ByteBufMapper<E> mapper;
+ private final ToByteBufMapper<E> mapper;
private final ByteBufWriter writer;
- SegmentedJournalWriter(final ByteBufWriter writer, final ByteBufMapper<E> mapper) {
+ SegmentedJournalWriter(final ByteBufWriter writer, final ToByteBufMapper<E> mapper) {
this.writer = requireNonNull(writer);
this.mapper = requireNonNull(mapper);
}
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
- * Support for mapping of {@link ByteBufJournal} entries to and from {@link ByteBuf}s.
+ * Interface for transforming internal represetation to bytes.
+ *
+ * @param <T> Internal representation type
*/
@NonNullByDefault
-public interface ByteBufMapper<T> {
- /**
- * Converts the contents of a {@link ByteBuf} to an object.
- *
- * @param index entry index
- * @param bytes entry bytes
- * @return resulting object
- */
- T bytesToObject(long index, ByteBuf bytes);
-
+@FunctionalInterface
+public interface ToByteBufMapper<T> {
/**
* Converts an object into a series of bytes in the specified {@link ByteBuf}.
*
.withStorageLevel(storageLevel)
.withMaxSegmentSize(maxSegmentSize)
.withIndexDensity(.2)
- .build(), NAMESPACE.toMapper());
+ .build(), NAMESPACE.toReadMapper(), NAMESPACE.toWriteMapper());
}
@Test
DataJournalV0(final String persistenceId, final Histogram messageSize, final ActorSystem system,
final StorageLevel storage, final File directory, final int maxEntrySize, final int maxSegmentSize) {
super(persistenceId, messageSize);
+
+ final var serdes = JournalSerdes.builder()
+ .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class)
+ .build();
+
entries = new SegmentedJournal<>(SegmentedByteBufJournal.builder()
.withDirectory(directory)
.withName("data")
.withStorageLevel(storage)
.withMaxEntrySize(maxEntrySize)
.withMaxSegmentSize(maxSegmentSize)
- .build(),
- JournalSerdes.builder()
- .register(new DataJournalEntrySerdes(system), FromPersistence.class, ToPersistence.class)
- .build().toMapper());
+ .build(), serdes.toReadMapper(), serdes.toWriteMapper());
}
@Override
import com.codahale.metrics.Timer;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
+import io.atomix.storage.journal.FromByteBufMapper;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalSerdes;
import io.atomix.storage.journal.SegmentedByteBufJournal;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.StorageLevel;
+import io.atomix.storage.journal.ToByteBufMapper;
import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
}
private static final Logger LOG = LoggerFactory.getLogger(SegmentedJournalActor.class);
- private static final JournalSerdes DELETE_NAMESPACE = JournalSerdes.builder()
- .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
- .build();
private static final int DELETE_SEGMENT_SIZE = 64 * 1024;
+ private static final FromByteBufMapper<Long> READ_MAPPER;
+ private static final ToByteBufMapper<Long> WRITE_MAPPER;
+
+ static {
+ final var namespace = JournalSerdes.builder()
+ .register(LongEntrySerdes.LONG_ENTRY_SERDES, Long.class)
+ .build();
+
+ READ_MAPPER = namespace.toReadMapper();
+ WRITE_MAPPER = namespace.toWriteMapper();
+ }
private final String persistenceId;
private final StorageLevel storage;
.withDirectory(directory)
.withName("delete")
.withMaxSegmentSize(DELETE_SEGMENT_SIZE)
- .build(), DELETE_NAMESPACE.toMapper());
+ .build(), READ_MAPPER, WRITE_MAPPER);
final var lastDeleteRecovered = deleteJournal.openReader(deleteJournal.lastIndex())
.tryNext((index, value, length) -> value);
lastDelete = lastDeleteRecovered == null ? 0 : lastDeleteRecovered;