* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* The state of the followers log as known by the Leader
*/
* Increment the value of the nextIndex
* @return
*/
- public long incrNextIndex();
+ long incrNextIndex();
/**
* Decrement the value of the nextIndex
* @return
*/
- public long decrNextIndex();
+ long decrNextIndex();
/**
*
* Increment the value of the matchIndex
* @return
*/
- public long incrMatchIndex();
+ long incrMatchIndex();
- public void setMatchIndex(long matchIndex);
+ void setMatchIndex(long matchIndex);
/**
* The identifier of the follower
* This could simply be the url of the remote actor
*/
- public String getId();
+ String getId();
/**
* for each server, index of the next log entry
* to send to that server (initialized to leader
* last log index + 1)
*/
- public AtomicLong getNextIndex();
+ long getNextIndex();
/**
* for each server, index of highest log entry
* known to be replicated on server
* (initialized to 0, increases monotonically)
*/
- public AtomicLong getMatchIndex();
+ long getMatchIndex();
/**
* Checks if the follower is active by comparing the last updated with the duration
* @return boolean
*/
- public boolean isFollowerActive();
+ boolean isFollowerActive();
/**
* restarts the timeout clock of the follower
*/
- public void markFollowerActive();
+ void markFollowerActive();
/**
* This will stop the timeout clock
*/
- public void markFollowerInActive();
-
-
+ void markFollowerInActive();
}
package org.opendaylight.controller.cluster.raft;
import com.google.common.base.Stopwatch;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import scala.concurrent.duration.FiniteDuration;
public class FollowerLogInformationImpl implements FollowerLogInformation {
this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
}
+ @Override
public long incrNextIndex(){
return nextIndex.incrementAndGet();
}
this.nextIndex.set(nextIndex);
}
+ @Override
public long incrMatchIndex(){
return matchIndex.incrementAndGet();
}
this.matchIndex.set(matchIndex);
}
+ @Override
public String getId() {
return id;
}
- public AtomicLong getNextIndex() {
- return nextIndex;
+ @Override
+ public long getNextIndex() {
+ return nextIndex.get();
}
- public AtomicLong getMatchIndex() {
- return matchIndex;
+ @Override
+ public long getMatchIndex() {
+ return matchIndex.get();
}
@Override
int replicatedCount = 1;
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex().get() >= N) {
+ if (info.getMatchIndex() >= N) {
replicatedCount++;
}
}
return this;
}
+ @Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
return toRemove;
}
+ @Override
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
for (ClientRequestTracker tracker : trackerList) {
if (tracker.getIndex() == logIndex) {
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
+ LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
+ followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
if (followerActor != null) {
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex().get();
+ long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
List<ReplicatedLogEntry> entries = null;
final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex().get();
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
if (followerActor != null) {
- long nextIndex = e.getValue().getNextIndex().get();
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef followerActor = getTestActor();
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
Object msg = fromSerializableMessage(in);
if (msg instanceof AppendEntries) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef followerActor = getTestActor();
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
Object msg = fromSerializableMessage(in);
if (msg instanceof AppendEntries) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef raftActor = getTestActor();
new ExpectMsg<String>(duration("1 seconds"),
"match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof ApplyState) {
if (((ApplyState) in).getIdentifier().equals("state-id")) {
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
InstallSnapshot is = (InstallSnapshot)
assertEquals(1, leader.followerLogSize());
assertNotNull(leader.getFollower(followerActor.path().toString()));
FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex());
}};
}
return createActorContext(leaderActor);
}
+ @Override
protected RaftActorContext createActorContext(ActorRef actorRef) {
return new MockRaftActorContext("test", getSystem(), actorRef);
}