Move JournalWriter.getLastIndex()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedByteBufWriter.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static java.util.Objects.requireNonNull;
20
21 import io.netty.buffer.ByteBuf;
22
23 /**
24  * A {@link ByteBufWriter} implementation.
25  */
26 final class SegmentedByteBufWriter implements ByteBufWriter {
27     private final SegmentedByteBufJournal journal;
28
29     private JournalSegment currentSegment;
30     private JournalSegmentWriter currentWriter;
31
32     SegmentedByteBufWriter(final SegmentedByteBufJournal journal) {
33         this.journal = requireNonNull(journal);
34         currentSegment = journal.lastSegment();
35         currentWriter = currentSegment.acquireWriter();
36     }
37
38     @Override
39     public long nextIndex() {
40         return currentWriter.nextIndex();
41     }
42
43     @Override
44     public void commit(final long index) {
45         if (index > journal.getCommitIndex()) {
46             journal.setCommitIndex(index);
47             if (journal.isFlushOnCommit()) {
48                 flush();
49             }
50         }
51     }
52
53     @Override
54     public long append(final ByteBuf bytes) {
55         final var position = currentWriter.append(bytes);
56         return position != null ? position.index() : appendToNextSegment(bytes);
57     }
58
59     //  Slow path: we do not have enough capacity
60     private long appendToNextSegment(final ByteBuf bytes) {
61         currentWriter.flush();
62         currentSegment.releaseWriter();
63         currentSegment = journal.createNextSegment();
64         currentWriter = currentSegment.acquireWriter();
65         return currentWriter.append(bytes).index();
66     }
67
68     @Override
69     public void reset(final long index) {
70         final long commitIndex = journal.getCommitIndex();
71         if (index <= commitIndex) {
72           // also catches index == 0, which is not a valid next index
73           throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", committed index: " + commitIndex);
74         }
75
76         if (index > currentSegment.firstIndex()) {
77             currentSegment.releaseWriter();
78             currentSegment = journal.resetSegments(index);
79             currentWriter = currentSegment.acquireWriter();
80         } else {
81             checkedTruncate(index - 1);
82         }
83         journal.resetHead(index);
84     }
85
86     @Override
87     public void truncate(final long index) {
88         if (index < journal.getCommitIndex()) {
89             throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
90         }
91         checkedTruncate(index);
92     }
93
94     private void checkedTruncate(final long index) {
95         // Delete all segments with first indexes greater than the given index.
96         while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) {
97             currentSegment.releaseWriter();
98             journal.removeSegment(currentSegment);
99             currentSegment = journal.lastSegment();
100             currentWriter = currentSegment.acquireWriter();
101         }
102
103         // Truncate the current index.
104         currentWriter.truncate(index);
105
106         // Reset segment readers.
107         journal.resetTail(index + 1);
108     }
109
110     @Override
111     public void flush() {
112         currentWriter.flush();
113     }
114 }