Import atomix/{storage,utils}
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / MappableJournalSegmentWriter.java
1 /*
2  * Copyright 2018-present Open Networking Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package io.atomix.storage.journal;
17
18 import io.atomix.storage.StorageException;
19 import io.atomix.storage.journal.index.JournalIndex;
20 import io.atomix.utils.serializer.Namespace;
21
22 import java.io.IOException;
23 import java.nio.MappedByteBuffer;
24 import java.nio.channels.FileChannel;
25
26 /**
27  * Mappable log segment writer.
28  */
29 class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
30   private final FileChannel channel;
31   private final JournalSegment<E> segment;
32   private final int maxEntrySize;
33   private final JournalIndex index;
34   private final Namespace namespace;
35   private JournalWriter<E> writer;
36
37   MappableJournalSegmentWriter(
38       FileChannel channel,
39       JournalSegment<E> segment,
40       int maxEntrySize,
41       JournalIndex index,
42       Namespace namespace) {
43     this.channel = channel;
44     this.segment = segment;
45     this.maxEntrySize = maxEntrySize;
46     this.index = index;
47     this.namespace = namespace;
48     this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
49   }
50
51   /**
52    * Maps the segment writer into memory, returning the mapped buffer.
53    *
54    * @return the buffer that was mapped into memory
55    */
56   MappedByteBuffer map() {
57     if (writer instanceof MappedJournalSegmentWriter) {
58       return ((MappedJournalSegmentWriter<E>) writer).buffer();
59     }
60
61     try {
62       JournalWriter<E> writer = this.writer;
63       MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, segment.descriptor().maxSegmentSize());
64       this.writer = new MappedJournalSegmentWriter<>(buffer, segment, maxEntrySize, index, namespace);
65       writer.close();
66       return buffer;
67     } catch (IOException e) {
68       throw new StorageException(e);
69     }
70   }
71
72   /**
73    * Unmaps the mapped buffer.
74    */
75   void unmap() {
76     if (writer instanceof MappedJournalSegmentWriter) {
77       JournalWriter<E> writer = this.writer;
78       this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
79       writer.close();
80     }
81   }
82
83   MappedByteBuffer buffer() {
84     JournalWriter<E> writer = this.writer;
85     if (writer instanceof MappedJournalSegmentWriter) {
86       return ((MappedJournalSegmentWriter<E>) writer).buffer();
87     }
88     return null;
89   }
90
91   /**
92    * Returns the writer's first index.
93    *
94    * @return the writer's first index
95    */
96   public long firstIndex() {
97     return segment.index();
98   }
99
100   /**
101    * Returns the size of the segment.
102    *
103    * @return the size of the segment
104    */
105   public int size() {
106     try {
107       return (int) channel.size();
108     } catch (IOException e) {
109       throw new StorageException(e);
110     }
111   }
112
113   @Override
114   public long getLastIndex() {
115     return writer.getLastIndex();
116   }
117
118   @Override
119   public Indexed<E> getLastEntry() {
120     return writer.getLastEntry();
121   }
122
123   @Override
124   public long getNextIndex() {
125     return writer.getNextIndex();
126   }
127
128   @Override
129   public <T extends E> Indexed<T> append(T entry) {
130     return writer.append(entry);
131   }
132
133   @Override
134   public void append(Indexed<E> entry) {
135     writer.append(entry);
136   }
137
138   @Override
139   public void commit(long index) {
140     writer.commit(index);
141   }
142
143   @Override
144   public void reset(long index) {
145     writer.reset(index);
146   }
147
148   @Override
149   public void truncate(long index) {
150     writer.truncate(index);
151   }
152
153   @Override
154   public void flush() {
155     writer.flush();
156   }
157
158   @Override
159   public void close() {
160     writer.close();
161     try {
162       channel.close();
163     } catch (IOException e) {
164       throw new StorageException(e);
165     }
166   }
167 }