Introduce JournalSegmentWriter
[controller.git] / atomix-storage / src / main / java / io / atomix / storage / journal / MappableJournalSegmentReader.java
1 /*
2  * Copyright 2018-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import java.io.IOException;
20 import java.nio.ByteBuffer;
21 import java.nio.channels.FileChannel;
22
23 /**
24  * Mappable log segment reader.
25  */
26 class MappableJournalSegmentReader<E> implements JournalReader<E> {
27   private final JournalSegment<E> segment;
28   private final FileChannel channel;
29   private final int maxEntrySize;
30   private final JournalIndex index;
31   private final JournalSerdes namespace;
32   private JournalReader<E> reader;
33
34   MappableJournalSegmentReader(
35       FileChannel channel,
36       JournalSegment<E> segment,
37       int maxEntrySize,
38       JournalIndex index,
39       JournalSerdes namespace) {
40     this.channel = channel;
41     this.segment = segment;
42     this.maxEntrySize = maxEntrySize;
43     this.index = index;
44     this.namespace = namespace;
45     this.reader = new FileChannelJournalSegmentReader<>(channel, segment, maxEntrySize, index, namespace);
46   }
47
48   /**
49    * Converts the reader to a mapped reader using the given buffer.
50    *
51    * @param buffer the mapped buffer
52    */
53   void map(ByteBuffer buffer) {
54     if (!(reader instanceof MappedJournalSegmentReader)) {
55       JournalReader<E> reader = this.reader;
56       this.reader = new MappedJournalSegmentReader<>(buffer, segment, maxEntrySize, index, namespace);
57       this.reader.reset(reader.getNextIndex());
58       reader.close();
59     }
60   }
61
62   /**
63    * Converts the reader to an unmapped reader.
64    */
65   void unmap() {
66     if (reader instanceof MappedJournalSegmentReader) {
67       JournalReader<E> reader = this.reader;
68       this.reader = new FileChannelJournalSegmentReader<>(channel, segment, maxEntrySize, index, namespace);
69       this.reader.reset(reader.getNextIndex());
70       reader.close();
71     }
72   }
73
74   @Override
75   public long getFirstIndex() {
76     return reader.getFirstIndex();
77   }
78
79   @Override
80   public long getCurrentIndex() {
81     return reader.getCurrentIndex();
82   }
83
84   @Override
85   public Indexed<E> getCurrentEntry() {
86     return reader.getCurrentEntry();
87   }
88
89   @Override
90   public long getNextIndex() {
91     return reader.getNextIndex();
92   }
93
94   @Override
95   public boolean hasNext() {
96     return reader.hasNext();
97   }
98
99   @Override
100   public Indexed<E> next() {
101     return reader.next();
102   }
103
104   @Override
105   public void reset() {
106     reader.reset();
107   }
108
109   @Override
110   public void reset(long index) {
111     reader.reset(index);
112   }
113
114   @Override
115   public void close() {
116     reader.close();
117     try {
118       channel.close();
119     } catch (IOException e) {
120       throw new StorageException(e);
121     } finally {
122       segment.closeReader(this);
123     }
124   }
125 }