2 * Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
3 * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 package io.atomix.storage.journal;
19 import static com.google.common.base.Verify.verifyNotNull;
20 import static java.util.Objects.requireNonNull;
23 * A {@link ByteBufWriter} implementation.
25 final class SegmentedByteBufWriter implements ByteBufWriter {
26 private final SegmentedByteBufJournal journal;
28 private JournalSegment currentSegment;
29 private JournalSegmentWriter currentWriter;
31 SegmentedByteBufWriter(final SegmentedByteBufJournal journal) {
32 this.journal = requireNonNull(journal);
33 currentSegment = journal.lastSegment();
34 currentWriter = currentSegment.acquireWriter();
38 public long nextIndex() {
39 return currentWriter.nextIndex();
43 public void commit(final long index) {
44 if (index > journal.getCommitIndex()) {
45 journal.setCommitIndex(index);
46 if (journal.isFlushOnCommit()) {
53 public <T> int append(final ByteBufMapper<T> mapper, final T entry) {
54 final var size = currentWriter.append(mapper, entry);
55 return size != null ? size : appendToNextSegment(mapper, entry);
58 // Slow path: we do not have enough capacity
59 private <T> int appendToNextSegment(final ByteBufMapper<T> mapper, final T entry) {
60 currentWriter.flush();
61 currentSegment.releaseWriter();
62 currentSegment = journal.createNextSegment();
63 currentWriter = currentSegment.acquireWriter();
64 return verifyNotNull(currentWriter.append(mapper, entry));
68 public void reset(final long index) {
69 final var commitIndex = journal.getCommitIndex();
70 if (index <= commitIndex) {
71 // also catches index == 0, which is not a valid next index
72 throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", committed index: " + commitIndex);
75 final var lastIndex = currentSegment.lastIndex();
76 final var prevIndex = index - 1;
77 if (prevIndex == lastIndex) {
78 // already at the correct position: no-op
81 if (prevIndex > lastIndex) {
82 // cannot seek past last written entry
83 throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", lastIndex: " + lastIndex);
87 // 1. delete all segments with first indexes greater than the given index.
88 while (prevIndex < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) {
89 currentSegment.releaseWriter();
90 journal.removeSegment(currentSegment);
91 currentSegment = journal.lastSegment();
92 currentWriter = currentSegment.acquireWriter();
94 // 2. truncate the current index.
95 currentWriter.truncate(prevIndex);
97 // 3. reset segment readers.
98 journal.resetTail(index);
99 journal.resetHead(index);
103 public void flush() {
104 currentWriter.flush();