2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static com.google.common.base.Verify.verifyNotNull;
20 import static java.util.Objects.requireNonNull;
22 import io.netty.buffer.ByteBuf;
25 * A {@link ByteBufWriter} implementation.
27 final class SegmentedByteBufWriter implements ByteBufWriter {
28 private final SegmentedByteBufJournal journal;
30 private JournalSegment currentSegment;
31 private JournalSegmentWriter currentWriter;
33 SegmentedByteBufWriter(final SegmentedByteBufJournal journal) {
34 this.journal = requireNonNull(journal);
35 currentSegment = journal.lastSegment();
36 currentWriter = currentSegment.acquireWriter();
40 public long lastIndex() {
41 return currentWriter.getLastIndex();
45 public long nextIndex() {
46 return currentWriter.getNextIndex();
50 public void commit(final long index) {
51 if (index > journal.getCommitIndex()) {
52 journal.setCommitIndex(index);
53 if (journal.isFlushOnCommit()) {
60 public long append(final ByteBuf buf) {
61 var index = currentWriter.append(buf);
65 // Slow path: we do not have enough capacity
66 currentWriter.flush();
67 currentSegment.releaseWriter();
68 currentSegment = journal.createNextSegment();
69 currentWriter = currentSegment.acquireWriter();
70 return verifyNotNull(currentWriter.append(buf));
74 public void reset(final long index) {
75 final long commitIndex = journal.getCommitIndex();
76 if (index <= commitIndex) {
77 // also catches index == 0, which is not a valid next index
78 throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", committed index: " + commitIndex);
81 if (index > currentSegment.firstIndex()) {
82 currentSegment.releaseWriter();
83 currentSegment = journal.resetSegments(index);
84 currentWriter = currentSegment.acquireWriter();
86 checkedTruncate(index - 1);
88 journal.resetHead(index);
92 public void truncate(final long index) {
93 if (index < journal.getCommitIndex()) {
94 throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
96 checkedTruncate(index);
99 private void checkedTruncate(final long index) {
100 // Delete all segments with first indexes greater than the given index.
101 while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) {
102 currentSegment.releaseWriter();
103 journal.removeSegment(currentSegment);
104 currentSegment = journal.lastSegment();
105 currentWriter = currentSegment.acquireWriter();
108 // Truncate the current index.
109 currentWriter.truncate(index);
111 // Reset segment readers.
112 journal.resetTail(index + 1);
116 public void flush() {
117 currentWriter.flush();