import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jdt.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Log segment.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
-final class JournalSegment implements AutoCloseable {
+final class JournalSegment {
+ private static final Logger LOG = LoggerFactory.getLogger(JournalSegment.class);
+
private final JournalSegmentFile file;
private final StorageLevel storageLevel;
private final int maxEntrySize;
*
* @param reader the closed segment reader
*/
- void closeReader(JournalSegmentReader reader) {
+ void closeReader(final JournalSegmentReader reader) {
if (readers.remove(reader)) {
release();
}
*
* @return indicates whether the segment is open
*/
- public boolean isOpen() {
+ boolean isOpen() {
return open;
}
/**
* Closes the segment.
*/
- @Override
- public void close() {
+ void close() {
if (!open) {
return;
}
+ LOG.debug("Closing segment: {}", this);
open = false;
readers.forEach(JournalSegmentReader::close);
if (references.get() == 0) {
* Deletes the segment.
*/
void delete() {
+ close();
+ LOG.debug("Deleting segment: {}", this);
try {
Files.deleteIfExists(file.path());
} catch (IOException e) {
assertOpen();
// If the index already equals the first segment index, skip the reset.
- JournalSegment firstSegment = getFirstSegment();
+ final var firstSegment = getFirstSegment();
if (index == firstSegment.firstIndex()) {
return firstSegment;
}
- for (JournalSegment segment : segments.values()) {
- segment.close();
- segment.delete();
- }
+ segments.values().forEach(JournalSegment::delete);
segments.clear();
currentSegment = createSegment(1, index);
*/
synchronized void removeSegment(JournalSegment segment) {
segments.remove(segment.firstIndex());
- segment.close();
segment.delete();
resetCurrentSegment();
}
corrupted = true;
}
if (corrupted) {
- segment.close();
segment.delete();
iterator.remove();
}
final var compactSegments = segments.headMap(segmentEntry.getValue().firstIndex());
if (!compactSegments.isEmpty()) {
LOG.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
- for (JournalSegment segment : compactSegments.values()) {
- LOG.trace("Deleting segment: {}", segment);
- segment.close();
- segment.delete();
- }
+ compactSegments.values().forEach(JournalSegment::delete);
compactSegments.clear();
resetHead(segmentEntry.getValue().firstIndex());
}
@Override
public void close() {
- segments.values().forEach(segment -> {
- LOG.debug("Closing segment: {}", segment);
- segment.close();
- });
+ segments.values().forEach(JournalSegment::close);
currentSegment = null;
open = false;
}