Move entry tracking to SegmentedJournalReader
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / MappedJournalSegmentReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  * Copyright (c) 2024 PANTHEON.tech, s.r.o.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 package io.atomix.storage.journal;
18
19 import java.nio.BufferUnderflowException;
20 import java.nio.ByteBuffer;
21 import java.util.zip.CRC32;
22
23 /**
24  * Log segment reader.
25  *
26  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
27  */
28 final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
29   private final ByteBuffer buffer;
30
31   MappedJournalSegmentReader(
32       ByteBuffer buffer,
33       JournalSegment<E> segment,
34       int maxEntrySize,
35       JournalSerdes namespace) {
36     super(segment, maxEntrySize, namespace);
37     this.buffer = buffer.slice();
38   }
39
40   @Override
41   void setPosition(int position) {
42     buffer.position(position);
43   }
44
45   @Override
46   Indexed<E> readEntry(final long index) {
47     // Mark the buffer so it can be reset if necessary.
48     buffer.mark();
49
50     try {
51       // Read the length of the entry.
52       final int length = buffer.getInt();
53
54       // If the buffer length is zero then return.
55       if (length <= 0 || length > maxEntrySize) {
56         buffer.reset();
57         return null;
58       }
59
60       // Read the checksum of the entry.
61       long checksum = buffer.getInt() & 0xFFFFFFFFL;
62
63       // Compute the checksum for the entry bytes.
64       final CRC32 crc32 = new CRC32();
65       ByteBuffer slice = buffer.slice();
66       slice.limit(length);
67       crc32.update(slice);
68
69       // If the stored checksum equals the computed checksum, return the entry.
70       if (checksum == crc32.getValue()) {
71         slice.rewind();
72         E entry = namespace.deserialize(slice);
73         buffer.position(buffer.position() + length);
74         return new Indexed<>(index, entry, length);
75       } else {
76         buffer.reset();
77         return null;
78       }
79     } catch (BufferUnderflowException e) {
80       buffer.reset();
81       return null;
82     }
83   }
84 }