Do not leak Kryo from atomix.storage
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / SegmentedJournalWriter.java
1 /*
2  * Copyright 2017-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import java.nio.BufferOverflowException;
19
20 /**
21  * Raft log writer.
22  */
23 public class SegmentedJournalWriter<E> implements JournalWriter<E> {
24   private final SegmentedJournal<E> journal;
25   private JournalSegment<E> currentSegment;
26   private MappableJournalSegmentWriter<E> currentWriter;
27
28   public SegmentedJournalWriter(SegmentedJournal<E> journal) {
29     this.journal = journal;
30     this.currentSegment = journal.getLastSegment();
31     currentSegment.acquire();
32     this.currentWriter = currentSegment.writer();
33   }
34
35   @Override
36   public long getLastIndex() {
37     return currentWriter.getLastIndex();
38   }
39
40   @Override
41   public Indexed<E> getLastEntry() {
42     return currentWriter.getLastEntry();
43   }
44
45   @Override
46   public long getNextIndex() {
47     return currentWriter.getNextIndex();
48   }
49
50   @Override
51   public void reset(long index) {
52     if (index > currentSegment.index()) {
53       currentSegment.release();
54       currentSegment = journal.resetSegments(index);
55       currentSegment.acquire();
56       currentWriter = currentSegment.writer();
57     } else {
58       truncate(index - 1);
59     }
60     journal.resetHead(index);
61   }
62
63   @Override
64   public void commit(long index) {
65     if (index > journal.getCommitIndex()) {
66       journal.setCommitIndex(index);
67       if (journal.isFlushOnCommit()) {
68         flush();
69       }
70     }
71   }
72
73   @Override
74   public <T extends E> Indexed<T> append(T entry) {
75     try {
76       return currentWriter.append(entry);
77     } catch (BufferOverflowException e) {
78       if (currentSegment.index() == currentWriter.getNextIndex()) {
79         throw e;
80       }
81       currentWriter.flush();
82       currentSegment.release();
83       currentSegment = journal.getNextSegment();
84       currentSegment.acquire();
85       currentWriter = currentSegment.writer();
86       return currentWriter.append(entry);
87     }
88   }
89
90   @Override
91   public void append(Indexed<E> entry) {
92     try {
93       currentWriter.append(entry);
94     } catch (BufferOverflowException e) {
95       if (currentSegment.index() == currentWriter.getNextIndex()) {
96         throw e;
97       }
98       currentWriter.flush();
99       currentSegment.release();
100       currentSegment = journal.getNextSegment();
101       currentSegment.acquire();
102       currentWriter = currentSegment.writer();
103       currentWriter.append(entry);
104     }
105   }
106
107   @Override
108   public void truncate(long index) {
109     if (index < journal.getCommitIndex()) {
110       throw new IndexOutOfBoundsException("Cannot truncate committed index: " + index);
111     }
112
113     // Delete all segments with first indexes greater than the given index.
114     while (index < currentSegment.index() && currentSegment != journal.getFirstSegment()) {
115       currentSegment.release();
116       journal.removeSegment(currentSegment);
117       currentSegment = journal.getLastSegment();
118       currentSegment.acquire();
119       currentWriter = currentSegment.writer();
120     }
121
122     // Truncate the current index.
123     currentWriter.truncate(index);
124
125     // Reset segment readers.
126     journal.resetTail(index + 1);
127   }
128
129   @Override
130   public void flush() {
131     currentWriter.flush();
132   }
133
134   @Override
135   public void close() {
136     currentWriter.close();
137   }
138 }