266320127b214056942380d9d5f9a73437242557
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / DiskJournalSegmentWriter.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
20
21 import io.atomix.storage.journal.index.JournalIndex;
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.nio.MappedByteBuffer;
25 import java.nio.channels.FileChannel;
26
27 /**
28  * Segment writer.
29  * <p>
30  * The format of an entry in the log is as follows:
31  * <ul>
32  * <li>64-bit index</li>
33  * <li>8-bit boolean indicating whether a term change is contained in the entry</li>
34  * <li>64-bit optional term</li>
35  * <li>32-bit signed entry length, including the entry type ID</li>
36  * <li>8-bit signed entry type ID</li>
37  * <li>n-bit entry bytes</li>
38  * </ul>
39  *
40  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
41  */
42 final class DiskJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
43     private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
44
45     private final JournalSegmentReader<E> reader;
46     private final ByteBuffer buffer;
47
48     DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
49         final JournalIndex index, final JournalSerdes namespace) {
50         super(channel, segment, maxEntrySize, index, namespace);
51
52         buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
53         reader = new JournalSegmentReader<>(segment,
54             new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
55         reset(0);
56     }
57
58     DiskJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
59         super(previous);
60
61         buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
62         reader = new JournalSegmentReader<>(segment,
63             new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize, namespace);
64     }
65
66     @Override
67     MappedByteBuffer buffer() {
68         return null;
69     }
70
71     @Override
72     MappedJournalSegmentWriter<E> toMapped() {
73         return new MappedJournalSegmentWriter<>(this);
74     }
75
76     @Override
77     DiskJournalSegmentWriter<E> toFileChannel() {
78         return this;
79     }
80
81     @Override
82     JournalSegmentReader<E> reader() {
83         return reader;
84     }
85
86     @Override
87     ByteBuffer startWrite(final int position, final int size) {
88         return buffer.clear().slice(0, size);
89     }
90
91     @Override
92     void commitWrite(final int position, final ByteBuffer entry) {
93         try {
94             channel.write(entry, position);
95         } catch (IOException e) {
96             throw new StorageException(e);
97         }
98     }
99
100     @Override
101     void writeEmptyHeader(final int position) {
102         try {
103             channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
104         } catch (IOException e) {
105             throw new StorageException(e);
106         }
107     }
108
109     @Override
110     void flush() {
111         try {
112             if (channel.isOpen()) {
113                 channel.force(true);
114             }
115         } catch (IOException e) {
116             throw new StorageException(e);
117         }
118     }
119
120     @Override
121     void close() {
122         flush();
123     }
124 }