Normalized copyright header
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / MappedJournalSegmentReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import java.nio.BufferUnderflowException;
20 import java.nio.ByteBuffer;
21 import java.util.zip.CRC32;
22
23 /**
24  * Log segment reader.
25  *
26  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
27  */
28 final class MappedJournalSegmentReader<E> extends JournalSegmentReader<E> {
29   private final ByteBuffer buffer;
30
31   MappedJournalSegmentReader(
32       ByteBuffer buffer,
33       JournalSegment<E> segment,
34       int maxEntrySize,
35       JournalIndex index,
36       JournalSerdes namespace) {
37     super(segment, maxEntrySize, index, namespace);
38     this.buffer = buffer.slice();
39     reset();
40   }
41
42   @Override
43   void setPosition(int position) {
44     buffer.position(position);
45   }
46
47   @Override
48   Indexed<E> readEntry(final long index) {
49     // Mark the buffer so it can be reset if necessary.
50     buffer.mark();
51
52     try {
53       // Read the length of the entry.
54       final int length = buffer.getInt();
55
56       // If the buffer length is zero then return.
57       if (length <= 0 || length > maxEntrySize) {
58         buffer.reset();
59         return null;
60       }
61
62       // Read the checksum of the entry.
63       long checksum = buffer.getInt() & 0xFFFFFFFFL;
64
65       // Compute the checksum for the entry bytes.
66       final CRC32 crc32 = new CRC32();
67       ByteBuffer slice = buffer.slice();
68       slice.limit(length);
69       crc32.update(slice);
70
71       // If the stored checksum equals the computed checksum, return the entry.
72       if (checksum == crc32.getValue()) {
73         slice.rewind();
74         E entry = namespace.deserialize(slice);
75         buffer.position(buffer.position() + length);
76         return new Indexed<>(index, entry, length);
77       } else {
78         buffer.reset();
79         return null;
80       }
81     } catch (BufferUnderflowException e) {
82       buffer.reset();
83       return null;
84     }
85   }
86 }