Position is a simple record
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / MappableJournalSegmentWriter.java
1 /*
2  * Copyright 2018-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.journal.index.JournalIndex;
19 import io.atomix.utils.serializer.Namespace;
20
21 import java.io.IOException;
22 import java.nio.MappedByteBuffer;
23 import java.nio.channels.FileChannel;
24
25 /**
26  * Mappable log segment writer.
27  */
28 class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
29   private final FileChannel channel;
30   private final JournalSegment<E> segment;
31   private final int maxEntrySize;
32   private final JournalIndex index;
33   private final Namespace namespace;
34   private JournalWriter<E> writer;
35
36   MappableJournalSegmentWriter(
37       FileChannel channel,
38       JournalSegment<E> segment,
39       int maxEntrySize,
40       JournalIndex index,
41       Namespace namespace) {
42     this.channel = channel;
43     this.segment = segment;
44     this.maxEntrySize = maxEntrySize;
45     this.index = index;
46     this.namespace = namespace;
47     this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
48   }
49
50   /**
51    * Maps the segment writer into memory, returning the mapped buffer.
52    *
53    * @return the buffer that was mapped into memory
54    */
55   MappedByteBuffer map() {
56     if (writer instanceof MappedJournalSegmentWriter) {
57       return ((MappedJournalSegmentWriter<E>) writer).buffer();
58     }
59
60     try {
61       JournalWriter<E> writer = this.writer;
62       MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
63       this.writer = new MappedJournalSegmentWriter<>(buffer, segment, maxEntrySize, index, namespace);
64       writer.close();
65       return buffer;
66     } catch (IOException e) {
67       throw new StorageException(e);
68     }
69   }
70
71   /**
72    * Unmaps the mapped buffer.
73    */
74   void unmap() {
75     if (writer instanceof MappedJournalSegmentWriter) {
76       JournalWriter<E> writer = this.writer;
77       this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
78       writer.close();
79     }
80   }
81
82   MappedByteBuffer buffer() {
83     JournalWriter<E> writer = this.writer;
84     if (writer instanceof MappedJournalSegmentWriter) {
85       return ((MappedJournalSegmentWriter<E>) writer).buffer();
86     }
87     return null;
88   }
89
90   /**
91    * Returns the writer's first index.
92    *
93    * @return the writer's first index
94    */
95   public long firstIndex() {
96     return segment.index();
97   }
98
99   /**
100    * Returns the size of the segment.
101    *
102    * @return the size of the segment
103    */
104   public int size() {
105     try {
106       return (int) channel.size();
107     } catch (IOException e) {
108       throw new StorageException(e);
109     }
110   }
111
112   @Override
113   public long getLastIndex() {
114     return writer.getLastIndex();
115   }
116
117   @Override
118   public Indexed<E> getLastEntry() {
119     return writer.getLastEntry();
120   }
121
122   @Override
123   public long getNextIndex() {
124     return writer.getNextIndex();
125   }
126
127   @Override
128   public <T extends E> Indexed<T> append(T entry) {
129     return writer.append(entry);
130   }
131
132   @Override
133   public void append(Indexed<E> entry) {
134     writer.append(entry);
135   }
136
137   @Override
138   public void commit(long index) {
139     writer.commit(index);
140   }
141
142   @Override
143   public void reset(long index) {
144     writer.reset(index);
145   }
146
147   @Override
148   public void truncate(long index) {
149     writer.truncate(index);
150   }
151
152   @Override
153   public void flush() {
154     writer.flush();
155   }
156
157   @Override
158   public void close() {
159     writer.close();
160     try {
161       channel.close();
162     } catch (IOException e) {
163       throw new StorageException(e);
164     }
165   }
166 }