import java.io.IOException;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
/**
* Support for serialization of {@link Journal} entries.
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.raft.journal.EntryReader;
+import org.opendaylight.controller.raft.journal.EntryWriter;
+import org.opendaylight.controller.raft.journal.RaftJournal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A {@link ByteBufJournal} Implementation.
+ * A {@link RaftJournal} Implementation.
*/
-public final class SegmentedByteBufJournal implements ByteBufJournal {
+public final class SegmentedByteBufJournal implements RaftJournal {
private static final Logger LOG = LoggerFactory.getLogger(SegmentedByteBufJournal.class);
private static final int SEGMENT_BUFFER_FACTOR = 3;
private final ConcurrentNavigableMap<Long, JournalSegment> segments = new ConcurrentSkipListMap<>();
- private final Collection<ByteBufReader> readers = ConcurrentHashMap.newKeySet();
+ private final Collection<EntryReader> readers = ConcurrentHashMap.newKeySet();
private final @NonNull ByteBufAllocator allocator;
private final @NonNull StorageLevel storageLevel;
private final @NonNull File directory;
private final @NonNull String name;
- private final @NonNull ByteBufWriter writer;
+ private final @NonNull EntryWriter writer;
private final int maxSegmentSize;
private final int maxEntrySize;
@Deprecated(forRemoval = true)
}
@Override
- public ByteBufWriter writer() {
+ public EntryWriter writer() {
return writer;
}
@Override
- public ByteBufReader openReader(final long index) {
+ public EntryReader openReader(final long index) {
return openReader(index, SegmentedByteBufReader::new);
}
@NonNullByDefault
- private ByteBufReader openReader(final long index,
- final BiFunction<SegmentedByteBufJournal, JournalSegment, ByteBufReader> constructor) {
+ private EntryReader openReader(final long index,
+ final BiFunction<SegmentedByteBufJournal, JournalSegment, EntryReader> constructor) {
final var reader = constructor.apply(this, segment(index));
reader.reset(index);
readers.add(reader);
}
@Override
- public ByteBufReader openCommitsReader(final long index) {
+ public EntryReader openCommitsReader(final long index) {
return openReader(index, SegmentedCommitsByteBufReader::new);
}
import io.netty.buffer.ByteBuf;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.raft.journal.EntryReader;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
/**
- * A {@link ByteBufReader} implementation.
+ * A {@link EntryReader} implementation.
*/
-sealed class SegmentedByteBufReader implements ByteBufReader permits SegmentedCommitsByteBufReader {
+sealed class SegmentedByteBufReader implements EntryReader permits SegmentedCommitsByteBufReader {
final @NonNull SegmentedByteBufJournal journal;
private JournalSegment currentSegment;
import static com.google.common.base.Verify.verifyNotNull;
import static java.util.Objects.requireNonNull;
+import org.opendaylight.controller.raft.journal.EntryWriter;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
+
/**
- * A {@link ByteBufWriter} implementation.
+ * A {@link EntryWriter} implementation.
*/
-final class SegmentedByteBufWriter implements ByteBufWriter {
+final class SegmentedByteBufWriter implements EntryWriter {
private final SegmentedByteBufJournal journal;
private JournalSegment currentSegment;
package io.atomix.storage.journal;
import io.netty.buffer.ByteBuf;
+import org.opendaylight.controller.raft.journal.EntryReader;
/**
- * A {@link ByteBufReader} traversing only committed entries.
+ * A {@link EntryReader} traversing only committed entries.
*/
final class SegmentedCommitsByteBufReader extends SegmentedByteBufReader {
SegmentedCommitsByteBufReader(final SegmentedByteBufJournal journal, final JournalSegment segment) {
import com.google.common.base.MoreObjects;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.RaftJournal;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
/**
- * A {@link Journal} implementation based on a {@link ByteBufJournal}.
+ * A {@link Journal} implementation based on a {@link RaftJournal}.
*/
public final class SegmentedJournal<E> implements Journal<E> {
private final @NonNull SegmentedJournalWriter<E> writer;
private final @NonNull FromByteBufMapper<E> readMapper;
- private final @NonNull ByteBufJournal journal;
+ private final @NonNull RaftJournal journal;
- public SegmentedJournal(final ByteBufJournal journal, final FromByteBufMapper<E> readMapper,
+ public SegmentedJournal(final RaftJournal journal, final FromByteBufMapper<E> readMapper,
final ToByteBufMapper<E> writeMapper) {
this.journal = requireNonNull(journal, "journal is required");
this.readMapper = requireNonNull(readMapper, "readMapper cannot be null");
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
+import org.opendaylight.controller.raft.journal.EntryReader;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
/**
- * A {@link JournalReader} backed by a {@link ByteBufReader}.
+ * A {@link JournalReader} backed by a {@link EntryReader}.
*/
@NonNullByDefault
final class SegmentedJournalReader<E> implements JournalReader<E> {
private final FromByteBufMapper<E> mapper;
- private final ByteBufReader reader;
+ private final EntryReader reader;
- SegmentedJournalReader(final ByteBufReader reader, final FromByteBufMapper<E> mapper) {
+ SegmentedJournalReader(final EntryReader reader, final FromByteBufMapper<E> mapper) {
this.reader = requireNonNull(reader);
this.mapper = requireNonNull(mapper);
}
import static java.util.Objects.requireNonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
+import org.opendaylight.controller.raft.journal.EntryWriter;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
/**
- * A {@link JournalWriter} backed by a {@link ByteBufWriter}.
+ * A {@link JournalWriter} backed by a {@link EntryWriter}.
*/
@NonNullByDefault
final class SegmentedJournalWriter<E> implements JournalWriter<E> {
private final ToByteBufMapper<E> mapper;
- private final ByteBufWriter writer;
+ private final EntryWriter writer;
- SegmentedJournalWriter(final ByteBufWriter writer, final ToByteBufMapper<E> mapper) {
+ SegmentedJournalWriter(final EntryWriter writer, final ToByteBufMapper<E> mapper) {
this.writer = requireNonNull(writer);
this.mapper = requireNonNull(mapper);
}
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.atomix.storage.journal;
+package org.opendaylight.controller.raft.journal;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
/**
- * A reader of {@link ByteBufJournal} entries.
+ * A reader of {@link RaftJournal} entries.
*/
@NonNullByDefault
-public interface ByteBufReader extends AutoCloseable {
+public interface EntryReader extends AutoCloseable {
/**
* Returns the next reader index.
*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.atomix.storage.journal;
+package org.opendaylight.controller.raft.journal;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
- * A writer of {@link ByteBufJournal} entries.
+ * A writer of {@link RaftJournal} entries.
*/
@NonNullByDefault
-public interface ByteBufWriter {
+public interface EntryWriter {
/**
* Returns the next index to be written.
*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.atomix.storage.journal;
+package org.opendaylight.controller.raft.journal;
import io.netty.buffer.ByteBuf;
import org.eclipse.jdt.annotation.NonNullByDefault;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.atomix.storage.journal;
+package org.opendaylight.controller.raft.journal;
import org.eclipse.jdt.annotation.NonNullByDefault;
/**
- * A journal of byte arrays. Provides the ability to write modify entries via {@link ByteBufWriter} and read them
- * back via {@link ByteBufReader}.
+ * A journal of byte arrays. Provides the ability to write modify entries via {@link EntryWriter} and read them
+ * back via {@link EntryReader}.
*/
@NonNullByDefault
-public interface ByteBufJournal extends AutoCloseable {
+public interface RaftJournal extends AutoCloseable {
/**
* Return the index of the first entry in the journal.
*
*
* @return The journal writer.
*/
- ByteBufWriter writer();
+ EntryWriter writer();
/**
- * Opens a new {@link ByteBufReader} reading all entries.
+ * Opens a new {@link EntryReader} reading all entries.
*
* @param index The index at which to start the reader.
* @return A new journal reader.
*/
- ByteBufReader openReader(long index);
+ EntryReader openReader(long index);
/**
- * Opens a new {@link ByteBufReader} reading only committed entries.
+ * Opens a new {@link EntryReader} reading only committed entries.
*
* @param index The index at which to start the reader.
* @return A new journal reader.
*/
- ByteBufReader openCommitsReader(long index);
+ EntryReader openCommitsReader(long index);
/**
* Compacts the journal up to the given index.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.atomix.storage.journal;
+package org.opendaylight.controller.raft.journal;
import io.netty.buffer.ByteBuf;
import java.io.EOFException;
--- /dev/null
+/*
+ * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Storage primitives for a RAFT journal.
+ */
+package org.opendaylight.controller.raft.journal;
import com.codahale.metrics.Timer;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
-import io.atomix.storage.journal.FromByteBufMapper;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.JournalSerdes;
import io.atomix.storage.journal.SegmentedByteBufJournal;
import io.atomix.storage.journal.SegmentedJournal;
import io.atomix.storage.journal.StorageLevel;
-import io.atomix.storage.journal.ToByteBufMapper;
import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.function.Consumer;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.controller.raft.journal.FromByteBufMapper;
+import org.opendaylight.controller.raft.journal.ToByteBufMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;