f19783a9c49901c4619c445fda0d67967485ddda
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / MappableJournalSegmentWriter.java
1 /*
2  * Copyright 2018-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import java.io.IOException;
20 import java.nio.MappedByteBuffer;
21 import java.nio.channels.FileChannel;
22
23 /**
24  * Mappable log segment writer.
25  */
26 class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
27   private final FileChannel channel;
28   private final JournalSegment<E> segment;
29   private final int maxEntrySize;
30   private final JournalIndex index;
31   private final JournalSerdes namespace;
32   private JournalWriter<E> writer;
33
34   MappableJournalSegmentWriter(
35       FileChannel channel,
36       JournalSegment<E> segment,
37       int maxEntrySize,
38       JournalIndex index,
39       JournalSerdes namespace) {
40     this.channel = channel;
41     this.segment = segment;
42     this.maxEntrySize = maxEntrySize;
43     this.index = index;
44     this.namespace = namespace;
45     this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
46   }
47
48   /**
49    * Maps the segment writer into memory, returning the mapped buffer.
50    *
51    * @return the buffer that was mapped into memory
52    */
53   MappedByteBuffer map() {
54     if (writer instanceof MappedJournalSegmentWriter) {
55       return ((MappedJournalSegmentWriter<E>) writer).buffer();
56     }
57
58     try {
59       JournalWriter<E> writer = this.writer;
60       MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
61       this.writer = new MappedJournalSegmentWriter<>(buffer, segment, maxEntrySize, index, namespace);
62       writer.close();
63       return buffer;
64     } catch (IOException e) {
65       throw new StorageException(e);
66     }
67   }
68
69   /**
70    * Unmaps the mapped buffer.
71    */
72   void unmap() {
73     if (writer instanceof MappedJournalSegmentWriter) {
74       JournalWriter<E> writer = this.writer;
75       this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
76       writer.close();
77     }
78   }
79
80   MappedByteBuffer buffer() {
81     JournalWriter<E> writer = this.writer;
82     if (writer instanceof MappedJournalSegmentWriter) {
83       return ((MappedJournalSegmentWriter<E>) writer).buffer();
84     }
85     return null;
86   }
87
88   /**
89    * Returns the writer's first index.
90    *
91    * @return the writer's first index
92    */
93   public long firstIndex() {
94     return segment.index();
95   }
96
97   /**
98    * Returns the size of the segment.
99    *
100    * @return the size of the segment
101    */
102   public int size() {
103     try {
104       return (int) channel.size();
105     } catch (IOException e) {
106       throw new StorageException(e);
107     }
108   }
109
110   @Override
111   public long getLastIndex() {
112     return writer.getLastIndex();
113   }
114
115   @Override
116   public Indexed<E> getLastEntry() {
117     return writer.getLastEntry();
118   }
119
120   @Override
121   public long getNextIndex() {
122     return writer.getNextIndex();
123   }
124
125   @Override
126   public <T extends E> Indexed<T> append(T entry) {
127     return writer.append(entry);
128   }
129
130   @Override
131   public void append(Indexed<E> entry) {
132     writer.append(entry);
133   }
134
135   @Override
136   public void commit(long index) {
137     writer.commit(index);
138   }
139
140   @Override
141   public void reset(long index) {
142     writer.reset(index);
143   }
144
145   @Override
146   public void truncate(long index) {
147     writer.truncate(index);
148   }
149
150   @Override
151   public void flush() {
152     writer.flush();
153   }
154
155   @Override
156   public void close() {
157     writer.close();
158     try {
159       channel.close();
160     } catch (IOException e) {
161       throw new StorageException(e);
162     }
163   }
164 }