Move entry serialization back to ByteBufWriter
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedByteBufWriter.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static com.google.common.base.Verify.verifyNotNull;
20 import static java.util.Objects.requireNonNull;
21
22 /**
23  * A {@link ByteBufWriter} implementation.
24  */
25 final class SegmentedByteBufWriter implements ByteBufWriter {
26     private final SegmentedByteBufJournal journal;
27
28     private JournalSegment currentSegment;
29     private JournalSegmentWriter currentWriter;
30
31     SegmentedByteBufWriter(final SegmentedByteBufJournal journal) {
32         this.journal = requireNonNull(journal);
33         currentSegment = journal.lastSegment();
34         currentWriter = currentSegment.acquireWriter();
35     }
36
37     @Override
38     public long nextIndex() {
39         return currentWriter.nextIndex();
40     }
41
42     @Override
43     public void commit(final long index) {
44         if (index > journal.getCommitIndex()) {
45             journal.setCommitIndex(index);
46             if (journal.isFlushOnCommit()) {
47                 flush();
48             }
49         }
50     }
51
52     @Override
53     public <T> int append(final ByteBufMapper<T> mapper, final T entry) {
54         final var size = currentWriter.append(mapper, entry);
55         return size != null ? size : appendToNextSegment(mapper, entry);
56     }
57
58     //  Slow path: we do not have enough capacity
59     private <T> int appendToNextSegment(final ByteBufMapper<T> mapper, final T entry) {
60         currentWriter.flush();
61         currentSegment.releaseWriter();
62         currentSegment = journal.createNextSegment();
63         currentWriter = currentSegment.acquireWriter();
64         return verifyNotNull(currentWriter.append(mapper, entry));
65     }
66
67     @Override
68     public void reset(final long index) {
69         final var commitIndex = journal.getCommitIndex();
70         if (index <= commitIndex) {
71             // also catches index == 0, which is not a valid next index
72             throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", committed index: " + commitIndex);
73         }
74
75         final var lastIndex = currentSegment.lastIndex();
76         final var prevIndex = index - 1;
77         if (prevIndex == lastIndex) {
78             // already at the correct position: no-op
79             return;
80         }
81         if (prevIndex > lastIndex) {
82             // cannot seek past last written entry
83             throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", lastIndex: " + lastIndex);
84         }
85
86         // move back:
87         // 1. delete all segments with first indexes greater than the given index.
88         while (prevIndex < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) {
89             currentSegment.releaseWriter();
90             journal.removeSegment(currentSegment);
91             currentSegment = journal.lastSegment();
92             currentWriter = currentSegment.acquireWriter();
93         }
94         // 2. truncate the current index.
95         currentWriter.truncate(prevIndex);
96
97         // 3. reset segment readers.
98         journal.resetTail(index);
99         journal.resetHead(index);
100     }
101
102     @Override
103     public void flush() {
104         currentWriter.flush();
105     }
106 }