Disconnect JournalSegmentReader from JournalReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / SegmentedJournalWriter.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.nio.BufferOverflowException;
19
20 /**
21  * Raft log writer.
22  */
23 public final class SegmentedJournalWriter<E> implements JournalWriter<E> {
24   private final SegmentedJournal<E> journal;
25   private JournalSegment<E> currentSegment;
26   private MappableJournalSegmentWriter<E> currentWriter;
27
28   SegmentedJournalWriter(SegmentedJournal<E> journal) {
29     this.journal = journal;
30     this.currentSegment = journal.getLastSegment();
31     currentSegment.acquire();
32     this.currentWriter = currentSegment.writer();
33   }
34
35   @Override
36   public long getLastIndex() {
37     return currentWriter.getLastIndex();
38   }
39
40   @Override
41   public Indexed<E> getLastEntry() {
42     return currentWriter.getLastEntry();
43   }
44
45   @Override
46   public long getNextIndex() {
47     return currentWriter.getNextIndex();
48   }
49
50   @Override
51   public void reset(long index) {
52     if (index > currentSegment.index()) {
53       currentSegment.release();
54       currentSegment = journal.resetSegments(index);
55       currentSegment.acquire();
56       currentWriter = currentSegment.writer();
57     } else {
58       truncate(index - 1);
59     }
60     journal.resetHead(index);
61   }
62
63   @Override
64   public void commit(long index) {
65     if (index > journal.getCommitIndex()) {
66       journal.setCommitIndex(index);
67       if (journal.isFlushOnCommit()) {
68         flush();
69       }
70     }
71   }
72
73   @Override
74   public <T extends E> Indexed<T> append(T entry) {
75     try {
76       return currentWriter.append(entry);
77     } catch (BufferOverflowException e) {
78       if (currentSegment.index() == currentWriter.getNextIndex()) {
79         throw e;
80       }
81     }
82
83     moveToNextSegment();
84     return currentWriter.append(entry);
85   }
86
87   @Override
88   public void append(Indexed<E> entry) {
89     try {
90       currentWriter.append(entry);
91       return;
92     } catch (BufferOverflowException e) {
93       if (currentSegment.index() == currentWriter.getNextIndex()) {
94         throw e;
95       }
96     }
97
98     moveToNextSegment();
99     currentWriter.append(entry);
100   }
101
102   private void moveToNextSegment() {
103     currentWriter.flush();
104     currentSegment.release();
105     currentSegment = journal.getNextSegment();
106     currentSegment.acquire();
107     currentWriter = currentSegment.writer();
108   }
109
110   @Override
111   public void truncate(long index) {
112     if (index < journal.getCommitIndex()) {
113       throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
114     }
115
116     // Delete all segments with first indexes greater than the given index.
117     while (index < currentSegment.index() && currentSegment != journal.getFirstSegment()) {
118       currentSegment.release();
119       journal.removeSegment(currentSegment);
120       currentSegment = journal.getLastSegment();
121       currentSegment.acquire();
122       currentWriter = currentSegment.writer();
123     }
124
125     // Truncate the current index.
126     currentWriter.truncate(index);
127
128     // Reset segment readers.
129     journal.resetTail(index + 1);
130   }
131
132   @Override
133   public void flush() {
134     currentWriter.flush();
135   }
136
137   @Override
138   public void close() {
139     currentWriter.close();
140   }
141 }