Normalized copyright header
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / FileChannelJournalSegmentReader.java
1 /*
2  * Copyright 2017-2022 Open Networking Foundation and others.  All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import java.io.IOException;
20 import java.nio.BufferUnderflowException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.FileChannel;
23 import java.util.zip.CRC32;
24
25 /**
26  * Log segment reader.
27  *
28  * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
29  */
30 final class FileChannelJournalSegmentReader<E> extends JournalSegmentReader<E> {
31   private final FileChannel channel;
32   private final ByteBuffer memory;
33   private long currentPosition;
34
35   FileChannelJournalSegmentReader(
36       FileChannel channel,
37       JournalSegment<E> segment,
38       int maxEntrySize,
39       JournalIndex index,
40       JournalSerdes namespace) {
41     super(segment, maxEntrySize, index, namespace);
42     this.channel = channel;
43     this.memory = ByteBuffer.allocate((maxEntrySize + JournalSegmentWriter.ENTRY_HEADER_BYTES) * 2);
44     reset();
45   }
46
47   @Override
48   void setPosition(int position) {
49     currentPosition = position;
50     memory.clear().flip();
51   }
52
53   @Override
54   Indexed<E> readEntry(final long index) {
55     try {
56       // Read more bytes from the segment if necessary.
57       if (memory.remaining() < maxEntrySize) {
58         long position = currentPosition + memory.position();
59         channel.read(memory.clear(), position);
60         currentPosition = position;
61         memory.flip();
62       }
63
64       // Mark the buffer so it can be reset if necessary.
65       memory.mark();
66
67       try {
68         // Read the length of the entry.
69         final int length = memory.getInt();
70
71         // If the buffer length is zero then return.
72         if (length <= 0 || length > maxEntrySize) {
73           memory.reset().limit(memory.position());
74           return null;
75         }
76
77         // Read the checksum of the entry.
78         long checksum = memory.getInt() & 0xFFFFFFFFL;
79
80         // Compute the checksum for the entry bytes.
81         final CRC32 crc32 = new CRC32();
82         crc32.update(memory.array(), memory.position(), length);
83
84         // If the stored checksum equals the computed checksum, return the entry.
85         if (checksum == crc32.getValue()) {
86           int limit = memory.limit();
87           memory.limit(memory.position() + length);
88           E entry = namespace.deserialize(memory);
89           memory.limit(limit);
90           return new Indexed<>(index, entry, length);
91         } else {
92           memory.reset().limit(memory.position());
93           return null;
94         }
95       } catch (BufferUnderflowException e) {
96         memory.reset().limit(memory.position());
97         return null;
98       }
99     } catch (IOException e) {
100       throw new StorageException(e);
101     }
102   }
103 }