private volatile long commitIndex;
private final NavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
- private final Collection<SegmentedJournalReader> readers = Sets.newConcurrentHashSet();
+ private final Collection<SegmentedJournalReader<E>> readers = Sets.newConcurrentHashSet();
private JournalSegment<E> currentSegment;
private volatile boolean open = true;
assertOpen();
assertDiskSpace();
- JournalSegment lastSegment = getLastSegment();
+ JournalSegment<E> lastSegment = getLastSegment();
JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder()
.withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
.withIndex(currentSegment.lastIndex() + 1)
*
* @param segment The segment to remove.
*/
- synchronized void removeSegment(JournalSegment segment) {
+ synchronized void removeSegment(JournalSegment<E> segment) {
segments.remove(segment.index());
segment.close();
segment.delete();
* @param index The index at which to reset readers.
*/
void resetHead(long index) {
- for (SegmentedJournalReader reader : readers) {
+ for (SegmentedJournalReader<E> reader : readers) {
if (reader.getNextIndex() < index) {
reader.reset(index);
}
* @param index The index at which to reset readers.
*/
void resetTail(long index) {
- for (SegmentedJournalReader reader : readers) {
+ for (SegmentedJournalReader<E> reader : readers) {
if (reader.getNextIndex() >= index) {
reader.reset(index);
}
}
}
- void closeReader(SegmentedJournalReader reader) {
+ void closeReader(SegmentedJournalReader<E> reader) {
readers.remove(reader);
}
SortedMap<Long, JournalSegment<E>> compactSegments = segments.headMap(segmentEntry.getValue().index());
if (!compactSegments.isEmpty()) {
log.debug("{} - Compacting {} segment(s)", name, compactSegments.size());
- for (JournalSegment segment : compactSegments.values()) {
+ for (JournalSegment<E> segment : compactSegments.values()) {
log.trace("Deleting segment: {}", segment);
segment.close();
segment.delete();
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import static org.junit.Assert.assertEquals;
protected abstract StorageLevel storageLevel();
@Parameterized.Parameters
- public static Collection primeNumbers() {
+ public static List<Object[]> primeNumbers() {
List<Object[]> runs = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
for (int j = 1; j <= 10; j++) {
}
@Test
- @SuppressWarnings("unchecked")
public void testWriteRead() throws Exception {
try (Journal<TestEntry> journal = createJournal()) {
JournalWriter<TestEntry> writer = journal.writer();
// Test reading an entry
Indexed<TestEntry> entry1;
reader.reset();
- entry1 = (Indexed) reader.next();
+ entry1 = reader.next();
assertEquals(1, entry1.index());
assertEquals(entry1, reader.getCurrentEntry());
assertEquals(1, reader.getCurrentIndex());
Indexed<TestEntry> entry2;
assertTrue(reader.hasNext());
assertEquals(2, reader.getNextIndex());
- entry2 = (Indexed) reader.next();
+ entry2 = reader.next();
assertEquals(2, entry2.index());
assertEquals(entry2, reader.getCurrentEntry());
assertEquals(2, reader.getCurrentIndex());
// Test opening a new reader and reading from the journal.
reader = journal.openReader(1);
assertTrue(reader.hasNext());
- entry1 = (Indexed) reader.next();
+ entry1 = reader.next();
assertEquals(1, entry1.index());
assertEquals(entry1, reader.getCurrentEntry());
assertEquals(1, reader.getCurrentIndex());
assertTrue(reader.hasNext());
assertEquals(2, reader.getNextIndex());
- entry2 = (Indexed) reader.next();
+ entry2 = reader.next();
assertEquals(2, entry2.index());
assertEquals(entry2, reader.getCurrentEntry());
assertEquals(2, reader.getCurrentIndex());
// Test opening a new reader and reading from the journal.
reader = journal.openReader(1);
assertTrue(reader.hasNext());
- entry1 = (Indexed) reader.next();
+ entry1 = reader.next();
assertEquals(1, entry1.index());
assertEquals(entry1, reader.getCurrentEntry());
assertEquals(1, reader.getCurrentIndex());
assertTrue(reader.hasNext());
assertEquals(2, reader.getNextIndex());
- entry2 = (Indexed) reader.next();
+ entry2 = reader.next();
assertEquals(2, entry2.index());
assertEquals(entry2, reader.getCurrentEntry());
assertEquals(2, reader.getCurrentIndex());
assertEquals(1, reader.getCurrentEntry().index());
assertTrue(reader.hasNext());
assertEquals(2, reader.getNextIndex());
- entry2 = (Indexed) reader.next();
+ entry2 = reader.next();
assertEquals(2, entry2.index());
assertEquals(entry2, reader.getCurrentEntry());
assertEquals(2, reader.getCurrentIndex());
}
@Test
- @SuppressWarnings("unchecked")
public void testWriteReadEntries() throws Exception {
try (Journal<TestEntry> journal = createJournal()) {
JournalWriter<TestEntry> writer = journal.writer();
writer.append(ENTRY);
assertTrue(reader.hasNext());
Indexed<TestEntry> entry;
- entry = (Indexed) reader.next();
+ entry = reader.next();
assertEquals(i, entry.index());
assertEquals(32, entry.entry().bytes().length);
reader.reset(i);
- entry = (Indexed) reader.next();
+ entry = reader.next();
assertEquals(i, entry.index());
assertEquals(32, entry.entry().bytes().length);
assertTrue(reader.hasNext());
reader.reset(i);
assertTrue(reader.hasNext());
- entry = (Indexed) reader.next();
+ entry = reader.next();
assertEquals(i, entry.index());
assertEquals(32, entry.entry().bytes().length);
}
}
@Test
- @SuppressWarnings("unchecked")
public void testWriteReadCommittedEntries() throws Exception {
try (Journal<TestEntry> journal = createJournal()) {
JournalWriter<TestEntry> writer = journal.writer();
writer.commit(i);
assertTrue(reader.hasNext());
Indexed<TestEntry> entry;
- entry = (Indexed) reader.next();
+ entry = reader.next();
assertEquals(i, entry.index());
assertEquals(32, entry.entry().bytes().length);
reader.reset(i);
- entry = (Indexed) reader.next();
+ entry = reader.next();
assertEquals(i, entry.index());
assertEquals(32, entry.entry().bytes().length);
}