package org.opendaylight.controller.cluster.raft;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
// We define this as ArrayList so we can use ensureCapacity.
protected ArrayList<ReplicatedLogEntry> journal;
- protected long snapshotIndex = -1;
- protected long snapshotTerm = -1;
+ private long snapshotIndex = -1;
+ private long snapshotTerm = -1;
// to be used for rollback during save snapshot failure
- protected ArrayList<ReplicatedLogEntry> snapshottedJournal;
- protected long previousSnapshotIndex = -1;
- protected long previousSnapshotTerm = -1;
+ private ArrayList<ReplicatedLogEntry> snapshottedJournal;
+ private long previousSnapshotIndex = -1;
+ private long previousSnapshotTerm = -1;
protected int dataSize = 0;
public AbstractReplicatedLogImpl(long snapshotIndex,
}
public AbstractReplicatedLogImpl() {
- this.journal = new ArrayList<>();
+ this(-1L, -1L, Collections.<ReplicatedLogEntry>emptyList());
}
protected int adjustedIndex(long logEntryIndex) {
public List<ReplicatedLogEntry> getFrom(long logEntryIndex, int max) {
int adjustedIndex = adjustedIndex(logEntryIndex);
int size = journal.size();
- List<ReplicatedLogEntry> entries = new ArrayList<>(100);
if (adjustedIndex >= 0 && adjustedIndex < size) {
// physical index should be less than list size and >= 0
int maxIndex = adjustedIndex + max;
if(maxIndex > size){
maxIndex = size;
}
- entries.addAll(journal.subList(adjustedIndex, maxIndex));
+ return new ArrayList<>(journal.subList(adjustedIndex, maxIndex));
+ } else {
+ return Collections.emptyList();
}
- return entries;
}
-
@Override
public long size() {
return journal.size();
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import scala.concurrent.duration.FiniteDuration;
public class FollowerLogInformationImpl implements FollowerLogInformation {
+ private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> NEXT_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "nextIndex");
+ private static final AtomicLongFieldUpdater<FollowerLogInformationImpl> MATCH_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(FollowerLogInformationImpl.class, "matchIndex");
private final String id;
- private final AtomicLong nextIndex;
+ private final Stopwatch stopwatch = new Stopwatch();
- private final AtomicLong matchIndex;
+ private final long followerTimeoutMillis;
- private final Stopwatch stopwatch;
+ private volatile long nextIndex;
- private final long followerTimeoutMillis;
+ private volatile long matchIndex;
public FollowerLogInformationImpl(String id, long nextIndex,
long matchIndex, FiniteDuration followerTimeoutDuration) {
this.id = id;
- this.nextIndex = new AtomicLong(nextIndex);
- this.matchIndex = new AtomicLong(matchIndex);
- this.stopwatch = new Stopwatch();
+ this.nextIndex = nextIndex;
+ this.matchIndex = matchIndex;
this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
}
@Override
public long incrNextIndex(){
- return nextIndex.incrementAndGet();
+ return NEXT_INDEX_UPDATER.incrementAndGet(this);
}
@Override
public long decrNextIndex() {
- return nextIndex.decrementAndGet();
+ return NEXT_INDEX_UPDATER.decrementAndGet(this);
}
@Override
public void setNextIndex(long nextIndex) {
- this.nextIndex.set(nextIndex);
+ this.nextIndex = nextIndex;
}
@Override
public long incrMatchIndex(){
- return matchIndex.incrementAndGet();
+ return MATCH_INDEX_UPDATER.incrementAndGet(this);
}
@Override
public void setMatchIndex(long matchIndex) {
- this.matchIndex.set(matchIndex);
+ this.matchIndex = matchIndex;
}
@Override
@Override
public long getNextIndex() {
- return nextIndex.get();
+ return nextIndex;
}
@Override
public long getMatchIndex() {
- return matchIndex.get();
+ return matchIndex;
}
@Override
timer.stop();
LOG.info("Recovery snapshot applied for {} in {}: snapshotIndex={}, snapshotTerm={}, journal-size=" +
replicatedLog.size(), persistenceId(), timer.toString(),
- replicatedLog.snapshotIndex, replicatedLog.snapshotTerm);
+ replicatedLog.getSnapshotIndex(), replicatedLog.getSnapshotTerm());
}
private void onRecoveredJournalLogEntry(ReplicatedLogEntry logEntry) {
"Persistence Id = " + persistenceId() +
" Last index in log={}, snapshotIndex={}, snapshotTerm={}, " +
"journal-size={}",
- replicatedLog.lastIndex(), replicatedLog.snapshotIndex,
- replicatedLog.snapshotTerm, replicatedLog.size());
+ replicatedLog.lastIndex(), replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm(), replicatedLog.size());
initializeBehavior();
}
import com.google.common.collect.ImmutableMap.Builder;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
private Cancellable heartbeatSchedule = null;
- private List<ClientRequestTracker> trackerList = new ArrayList<>();
+ private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
protected final int minReplicationCount;
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
-
- ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
- if(toRemove != null) {
- trackerList.remove(toRemove);
+ final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ while (it.hasNext()) {
+ final ClientRequestTracker t = it.next();
+ if (t.getIndex() == logIndex) {
+ it.remove();
+ return t;
+ }
}
- return toRemove;
+ return null;
}
@Override
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
- List<ReplicatedLogEntry> entries = null;
if (mapFollowerToSnapshot.get(followerId) != null) {
// if install snapshot is in process , then sent next chunk if possible
} else {
long leaderLastIndex = context.getReplicatedLog().lastIndex();
long leaderSnapShotIndex = context.getReplicatedLog().getSnapshotIndex();
+ final List<ReplicatedLogEntry> entries;
if (isFollowerActive &&
context.getReplicatedLog().isPresent(followerNextIndex)) {
public void removeFromAndPersist(final long index) {
}
- @Override
- public void setSnapshotIndex(final long snapshotIndex) {
- this.snapshotIndex = snapshotIndex;
- }
-
- @Override
- public void setSnapshotTerm(final long snapshotTerm) {
- this.snapshotTerm = snapshotTerm;
- }
-
@Override
public int dataSize() {
return -1;