Refactor SegmentedJournalWriter.reset()
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedByteBufWriter.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o. and others.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static com.google.common.base.Verify.verifyNotNull;
20 import static java.util.Objects.requireNonNull;
21
22 import io.netty.buffer.ByteBuf;
23
24 /**
25  * A {@link ByteBufWriter} implementation.
26  */
27 final class SegmentedByteBufWriter implements ByteBufWriter {
28     private final SegmentedByteBufJournal journal;
29
30     private JournalSegment currentSegment;
31     private JournalSegmentWriter currentWriter;
32
33     SegmentedByteBufWriter(final SegmentedByteBufJournal journal) {
34         this.journal = requireNonNull(journal);
35         currentSegment = journal.lastSegment();
36         currentWriter = currentSegment.acquireWriter();
37     }
38
39     @Override
40     public long lastIndex() {
41         return currentWriter.getLastIndex();
42     }
43
44     @Override
45     public long nextIndex() {
46         return currentWriter.getNextIndex();
47     }
48
49     @Override
50     public void commit(final long index) {
51         if (index > journal.getCommitIndex()) {
52             journal.setCommitIndex(index);
53             if (journal.isFlushOnCommit()) {
54                 flush();
55             }
56         }
57     }
58
59     @Override
60     public long append(final ByteBuf buf) {
61         var index = currentWriter.append(buf);
62         if (index != null) {
63             return index;
64         }
65         //  Slow path: we do not have enough capacity
66         currentWriter.flush();
67         currentSegment.releaseWriter();
68         currentSegment = journal.createNextSegment();
69         currentWriter = currentSegment.acquireWriter();
70         return verifyNotNull(currentWriter.append(buf));
71     }
72
73     @Override
74     public void reset(final long index) {
75         final long commitIndex = journal.getCommitIndex();
76         if (index <= commitIndex) {
77           // also catches index == 0, which is not a valid next index
78           throw new IndexOutOfBoundsException("Cannot reset to: " + index + ", committed index: " + commitIndex);
79         }
80
81         if (index > currentSegment.firstIndex()) {
82             currentSegment.releaseWriter();
83             currentSegment = journal.resetSegments(index);
84             currentWriter = currentSegment.acquireWriter();
85         } else {
86             checkedTruncate(index - 1);
87         }
88         journal.resetHead(index);
89     }
90
91     @Override
92     public void truncate(final long index) {
93         if (index < journal.getCommitIndex()) {
94             throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
95         }
96         checkedTruncate(index);
97     }
98
99     private void checkedTruncate(final long index) {
100         // Delete all segments with first indexes greater than the given index.
101         while (index < currentSegment.firstIndex() && currentSegment != journal.firstSegment()) {
102             currentSegment.releaseWriter();
103             journal.removeSegment(currentSegment);
104             currentSegment = journal.lastSegment();
105             currentWriter = currentSegment.acquireWriter();
106         }
107
108         // Truncate the current index.
109         currentWriter.truncate(index);
110
111         // Reset segment readers.
112         journal.resetTail(index + 1);
113     }
114
115     @Override
116     public void flush() {
117         currentWriter.flush();
118     }
119 }