Improve segmented journal actor metrics
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / MappedJournalSegmentWriter.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import io.atomix.storage.journal.index.JournalIndex;
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.MappedByteBuffer;
23 import java.nio.channels.FileChannel;
24 import org.eclipse.jdt.annotation.NonNull;
25
26 /**
27  * Segment writer.
28  * <p>
29  * The format of an entry in the log is as follows:
30  * <ul>
31  * <li>64-bit index</li>
32  * <li>8-bit boolean indicating whether a term change is contained in the entry</li>
33  * <li>64-bit optional term</li>
34  * <li>32-bit signed entry length, including the entry type ID</li>
35  * <li>8-bit signed entry type ID</li>
36  * <li>n-bit entry bytes</li>
37  * </ul>
38  *
39  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
40  */
41 final class MappedJournalSegmentWriter<E> extends JournalSegmentWriter<E> {
42     private final @NonNull MappedByteBuffer mappedBuffer;
43     private final JournalSegmentReader<E> reader;
44     private final ByteBuffer buffer;
45
46     MappedJournalSegmentWriter(final FileChannel channel, final JournalSegment<E> segment, final int maxEntrySize,
47         final JournalIndex index, final JournalSerdes namespace) {
48         super(channel, segment, maxEntrySize, index, namespace);
49
50         mappedBuffer = mapBuffer(channel, maxSegmentSize);
51         buffer = mappedBuffer.slice();
52         reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
53             maxEntrySize, namespace);
54         reset(0);
55     }
56
57     MappedJournalSegmentWriter(final JournalSegmentWriter<E> previous) {
58         super(previous);
59
60         mappedBuffer = mapBuffer(channel, maxSegmentSize);
61         buffer = mappedBuffer.slice();
62         reader = new JournalSegmentReader<>(segment, new MappedFileReader(segment.file().file().toPath(), mappedBuffer),
63             maxEntrySize, namespace);
64     }
65
66     private static @NonNull MappedByteBuffer mapBuffer(final FileChannel channel, final int maxSegmentSize) {
67         try {
68             return channel.map(FileChannel.MapMode.READ_WRITE, 0, maxSegmentSize);
69         } catch (IOException e) {
70             throw new StorageException(e);
71         }
72     }
73
74     @Override
75     @NonNull MappedByteBuffer buffer() {
76         return mappedBuffer;
77     }
78
79     @Override
80     MappedJournalSegmentWriter<E> toMapped() {
81         return this;
82     }
83
84     @Override
85     DiskJournalSegmentWriter<E> toFileChannel() {
86         close();
87         return new DiskJournalSegmentWriter<>(this);
88     }
89
90     @Override
91     JournalSegmentReader<E> reader() {
92         return reader;
93     }
94
95     @Override
96     ByteBuffer startWrite(final int position, final int size) {
97         return buffer.slice(position, size);
98     }
99
100     @Override
101     void commitWrite(final int position, final ByteBuffer entry) {
102         // No-op, buffer is write-through
103     }
104
105     @Override
106     void writeEmptyHeader(final int position) {
107         // Note: we issue a single putLong() instead of two putInt()s.
108         buffer.putLong(position, 0L);
109     }
110
111     @Override
112     void flush() {
113         mappedBuffer.force();
114     }
115
116     @Override
117     void close() {
118         flush();
119         try {
120             BufferCleaner.freeBuffer(mappedBuffer);
121         } catch (IOException e) {
122             throw new StorageException(e);
123         }
124     }
125 }