import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
-import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-
-import com.google.common.collect.Sets;
-import io.atomix.storage.StorageException;
-import io.atomix.storage.StorageLevel;
-import io.atomix.utils.serializer.Namespace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
/**
* Segmented journal.
*/
-public class SegmentedJournal<E> implements Journal<E> {
+public final class SegmentedJournal<E> implements Journal<E> {
/**
* Returns a new Raft log builder.
private final String name;
private final StorageLevel storageLevel;
private final File directory;
- private final Namespace namespace;
+ private final JournalSerdes namespace;
private final int maxSegmentSize;
private final int maxEntrySize;
private final int maxEntriesPerSegment;
private final SegmentedJournalWriter<E> writer;
private volatile long commitIndex;
- private final NavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
- private final Collection<SegmentedJournalReader> readers = Sets.newConcurrentHashSet();
+ private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
+ private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
private JournalSegment<E> currentSegment;
private volatile boolean open = true;
String name,
StorageLevel storageLevel,
File directory,
- Namespace namespace,
+ JournalSerdes namespace,
int maxSegmentSize,
int maxEntrySize,
int maxEntriesPerSegment,
double indexDensity,
boolean flushOnCommit) {
- this.name = checkNotNull(name, "name cannot be null");
- this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null");
- this.directory = checkNotNull(directory, "directory cannot be null");
- this.namespace = checkNotNull(namespace, "namespace cannot be null");
+ this.name = requireNonNull(name, "name cannot be null");
+ this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
+ this.directory = requireNonNull(directory, "directory cannot be null");
+ this.namespace = requireNonNull(namespace, "namespace cannot be null");
this.maxSegmentSize = maxSegmentSize;
this.maxEntrySize = maxEntrySize;
this.maxEntriesPerSegment = maxEntriesPerSegment;
/**
* Opens the segments.
*/
- private void open() {
+ private synchronized void open() {
// Load existing log segments from disk.
for (JournalSegment<E> segment : loadSegments()) {
segments.put(segment.descriptor().index(), segment);
assertOpen();
assertDiskSpace();
- JournalSegment lastSegment = getLastSegment();
+ JournalSegment<E> lastSegment = getLastSegment();
JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
.withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
.withIndex(currentSegment.lastIndex() + 1)
*
* @param segment The segment to remove.
*/
- synchronized void removeSegment(JournalSegment segment) {
+ synchronized void removeSegment(JournalSegment<E> segment) {
segments.remove(segment.index());
segment.close();
segment.delete();
* @param index The index at which to reset readers.
*/
void resetHead(long index) {
- for (SegmentedJournalReader reader : readers) {
+ for (SegmentedJournalReader<E> reader : readers) {
if (reader.getNextIndex() < index) {
reader.reset(index);
}
* @param index The index at which to reset readers.
*/
void resetTail(long index) {
- for (SegmentedJournalReader reader : readers) {
+ for (SegmentedJournalReader<E> reader : readers) {
if (reader.getNextIndex() >= index) {
reader.reset(index);
}
}
}
- void closeReader(SegmentedJournalReader reader) {
+ void closeReader(SegmentedJournalReader<E> reader) {
readers.remove(reader);
}
SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
if (!compactSegments.isEmpty()) {
log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
- for (JournalSegment segment : compactSegments.values()) {
+ for (JournalSegment<E> segment : compactSegments.values()) {
log.trace("Deleting segment: {}", segment);
segment.close();
segment.delete();
/**
* Raft log builder.
*/
- public static class Builder<E> implements io.atomix.utils.Builder<SegmentedJournal<E>> {
+ public static final class Builder<E> {
private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
private static final String DEFAULT_NAME = "atomix";
private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
private static final double DEFAULT_INDEX_DENSITY = .005;
- private static final int DEFAULT_CACHE_SIZE = 1024;
-
- protected String name = DEFAULT_NAME;
- protected StorageLevel storageLevel = StorageLevel.DISK;
- protected File directory = new File(DEFAULT_DIRECTORY);
- protected Namespace namespace;
- protected int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
- protected int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
- protected int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
- protected double indexDensity = DEFAULT_INDEX_DENSITY;
- protected int cacheSize = DEFAULT_CACHE_SIZE;
+
+ private String name = DEFAULT_NAME;
+ private StorageLevel storageLevel = StorageLevel.DISK;
+ private File directory = new File(DEFAULT_DIRECTORY);
+ private JournalSerdes namespace;
+ private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
+ private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
+ private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
+ private double indexDensity = DEFAULT_INDEX_DENSITY;
private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
protected Builder() {
* @return The storage builder.
*/
public Builder<E> withName(String name) {
- this.name = checkNotNull(name, "name cannot be null");
+ this.name = requireNonNull(name, "name cannot be null");
return this;
}
* @return The storage builder.
*/
public Builder<E> withStorageLevel(StorageLevel storageLevel) {
- this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null");
+ this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null");
return this;
}
* @throws NullPointerException If the {@code directory} is {@code null}
*/
public Builder<E> withDirectory(String directory) {
- return withDirectory(new File(checkNotNull(directory, "directory cannot be null")));
+ return withDirectory(new File(requireNonNull(directory, "directory cannot be null")));
}
/**
* @throws NullPointerException If the {@code directory} is {@code null}
*/
public Builder<E> withDirectory(File directory) {
- this.directory = checkNotNull(directory, "directory cannot be null");
+ this.directory = requireNonNull(directory, "directory cannot be null");
return this;
}
* @param namespace The journal serializer.
* @return The journal builder.
*/
- public Builder<E> withNamespace(Namespace namespace) {
- this.namespace = checkNotNull(namespace, "namespace cannot be null");
+ public Builder<E> withNamespace(JournalSerdes namespace) {
+ this.namespace = requireNonNull(namespace, "namespace cannot be null");
return this;
}
return this;
}
- /**
- * Sets the journal cache size.
- *
- * @param cacheSize the journal cache size
- * @return the journal builder
- * @throws IllegalArgumentException if the cache size is not positive
- * @deprecated since 3.0.4
- */
- @Deprecated
- public Builder<E> withCacheSize(int cacheSize) {
- checkArgument(cacheSize >= 0, "cacheSize must be positive");
- this.cacheSize = cacheSize;
- return this;
- }
-
/**
* Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
* chaining.
return this;
}
- @Override
+ /**
+ * Build the {@link SegmentedJournal}.
+ *
+ * @return A new {@link SegmentedJournal}.
+ */
public SegmentedJournal<E> build() {
return new SegmentedJournal<>(
name,