2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static java.util.Objects.requireNonNull;
21 import io.netty.buffer.ByteBuf;
24 * A {@link ByteBufWriter} implementation.
26 final class SegmentedByteBufWriter implements ByteBufWriter {
27 private final SegmentedByteBufJournal journal;
29 private JournalSegment currentSegment;
30 private JournalSegmentWriter currentWriter;
32 SegmentedByteBufWriter(final SegmentedByteBufJournal journal) {
33 this.journal = requireNonNull(journal);
34 currentSegment = journal.lastSegment();
35 currentWriter = currentSegment.acquireWriter();
39 public long nextIndex() {
40 return currentWriter.nextIndex();
44 public void commit(final long index) {
45 if (index > journal.getCommitIndex()) {
46 journal.setCommitIndex(index);
47 if (journal.isFlushOnCommit()) {
54 public long append(final ByteBuf bytes) {
55 final var position = currentWriter.append(bytes);
56 return position != null ? position.index() : appendToNextSegment(bytes);
59 // Slow path: we do not have enough capacity
60 private long appendToNextSegment(final ByteBuf bytes) {
61 currentWriter.flush();
62 currentSegment.releaseWriter();
63 currentSegment = journal.createNextSegment();
64 currentWriter = currentSegment.acquireWriter();
65 return currentWriter.append(bytes).index();
69 public void reset(final long index) {
70 final var commitIndex = journal.getCommitIndex();
71 if (index <= commitIndex) {
72 // also catches index == 0, which is not a valid next index
73 throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", committed index: " + commitIndex);
76 final var lastIndex = currentSegment.lastIndex();
77 final var prevIndex = index - 1;
78 if (prevIndex == lastIndex) {
79 // already at the correct position: no-op
80 } else if (prevIndex < lastIndex) {
82 checkedTruncate(prevIndex);
83 journal.resetHead(index);
85 // cannot seek past last written entry
86 throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", lastIndex: " + lastIndex);
90 private void checkedTruncate(final long index) {
91 // Delete all segments with first indexes greater than the given index.
92 while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) {
93 currentSegment.releaseWriter();
94 journal.removeSegment(currentSegment);
95 currentSegment = journal.lastSegment();
96 currentWriter = currentSegment.acquireWriter();
99 // Truncate the current index.
100 currentWriter.truncate(index);
102 // Reset segment readers.
103 journal.resetTail(index + 1);
107 public void flush() {
108 currentWriter.flush();