summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
348a37f)
This is an unused method, remove it. Also add some documentation
around trackers and simplify tracker-based messages.
Change-Id: Ia0afca836bd56b75dcdb74d6c6b309a9eeaa75ee
Signed-off-by: Robert Varga <rovarga@cisco.com>
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
- private Cancellable heartbeatSchedule = null;
-
- private final Collection<ClientRequestTracker> trackerList = new LinkedList<>();
-
- private int minReplicationCount;
+ /**
+ * Lookup table for request contexts based on journal index. We could use a {@link Map} here, but we really
+ * expect the entries to be modified in sequence, hence we open-code the lookup.
+ *
+ * TODO: Evaluate the use of ArrayDeque(), as that has lower memory overhead. Non-head removals are more costly,
+ * but we already expect those to be far from frequent.
+ */
+ private final Queue<ClientRequestTracker> trackers = new LinkedList<>();
+ private Cancellable heartbeatSchedule = null;
private Optional<SnapshotHolder> snapshot;
private Optional<SnapshotHolder> snapshot;
+ private int minReplicationCount;
protected AbstractLeader(RaftActorContext context, RaftState state) {
super(context, state);
protected AbstractLeader(RaftActorContext context, RaftState state) {
super(context, state);
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
@Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
- final Iterator<ClientRequestTracker> it = trackerList.iterator();
+ final Iterator<ClientRequestTracker> it = trackers.iterator();
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
if (t.getIndex() == logIndex) {
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
if (t.getIndex() == logIndex) {
- @Override
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- for (ClientRequestTracker tracker : trackerList) {
- if (tracker.getIndex() == logIndex) {
- return tracker;
- }
- }
- return null;
- }
-
@Override
protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
@Override
protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
RequestVoteReply requestVoteReply) {
// Create a tracker entry we will use this later to notify the
// client actor
// Create a tracker entry we will use this later to notify the
// client actor
new ClientRequestTrackerImpl(replicate.getClientActor(),
replicate.getIdentifier(),
logIndex)
new ClientRequestTrackerImpl(replicate.getClientActor(),
replicate.getIdentifier(),
logIndex)
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.yangtools.concepts.Identifier;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
return context.getReplicatedLog().lastIndex();
}
return context.getReplicatedLog().lastIndex();
}
- /**
- * @param logIndex
- * @return the client request tracker for the specified logIndex
- */
- protected ClientRequestTracker findClientRequestTracker(long logIndex) {
- return null;
- }
-
/**
* @param logIndex
* @return the client request tracker for the specified logIndex
/**
* @param logIndex
* @return the client request tracker for the specified logIndex
/**
*
* @return log index from the previous to last entry in the log
/**
*
* @return log index from the previous to last entry in the log
long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
long newLastApplied = context.getLastApplied();
// Now maybe we apply to the state machine
for (long i = context.getLastApplied() + 1; i < index + 1; i++) {
- final ActorRef clientActor;
- final Identifier identifier;
- final ClientRequestTracker tracker = removeClientRequestTracker(i);
- if (tracker != null) {
- clientActor = tracker.getClientActor();
- identifier = tracker.getIdentifier();
- } else {
- clientActor = null;
- identifier = null;
- }
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i);
if (replicatedLogEntry != null) {
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
ReplicatedLogEntry replicatedLogEntry = context.getReplicatedLog().get(i);
if (replicatedLogEntry != null) {
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- actor().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), actor());
+
+ final ApplyState msg;
+ final ClientRequestTracker tracker = removeClientRequestTracker(i);
+ if (tracker != null) {
+ msg = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
+ } else {
+ msg = new ApplyState(null, null, replicatedLogEntry);
+ }
+
+ actor().tell(msg, actor());
newLastApplied = i;
} else {
//if one index is not present in the log, no point in looping
newLastApplied = i;
} else {
//if one index is not present in the log, no point in looping