} else if (message instanceof PrintRole) {
if(LOG.isDebugEnabled()) {
- String followers = "";
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
- followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+ final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
getRaftActorContext().getPeerAddresses().keySet(), followers);
} else {
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* The state of the followers log as known by the Leader
*/
* Increment the value of the nextIndex
* @return
*/
- public long incrNextIndex();
+ long incrNextIndex();
/**
* Decrement the value of the nextIndex
* @return
*/
- public long decrNextIndex();
+ long decrNextIndex();
/**
*
* Increment the value of the matchIndex
* @return
*/
- public long incrMatchIndex();
+ long incrMatchIndex();
- public void setMatchIndex(long matchIndex);
+ void setMatchIndex(long matchIndex);
/**
* The identifier of the follower
* This could simply be the url of the remote actor
*/
- public String getId();
+ String getId();
/**
* for each server, index of the next log entry
* to send to that server (initialized to leader
* last log index + 1)
*/
- public AtomicLong getNextIndex();
+ long getNextIndex();
/**
* for each server, index of highest log entry
* known to be replicated on server
* (initialized to 0, increases monotonically)
*/
- public AtomicLong getMatchIndex();
+ long getMatchIndex();
/**
* Checks if the follower is active by comparing the last updated with the duration
* @return boolean
*/
- public boolean isFollowerActive();
+ boolean isFollowerActive();
/**
* restarts the timeout clock of the follower
*/
- public void markFollowerActive();
+ void markFollowerActive();
/**
* This will stop the timeout clock
*/
- public void markFollowerInActive();
-
-
+ void markFollowerInActive();
}
package org.opendaylight.controller.cluster.raft;
import com.google.common.base.Stopwatch;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import scala.concurrent.duration.FiniteDuration;
-public class FollowerLogInformationImpl implements FollowerLogInformation{
+public class FollowerLogInformationImpl implements FollowerLogInformation {
private final String id;
private final long followerTimeoutMillis;
- public FollowerLogInformationImpl(String id, AtomicLong nextIndex,
- AtomicLong matchIndex, FiniteDuration followerTimeoutDuration) {
+ public FollowerLogInformationImpl(String id, long nextIndex,
+ long matchIndex, FiniteDuration followerTimeoutDuration) {
this.id = id;
- this.nextIndex = nextIndex;
- this.matchIndex = matchIndex;
+ this.nextIndex = new AtomicLong(nextIndex);
+ this.matchIndex = new AtomicLong(matchIndex);
this.stopwatch = new Stopwatch();
this.followerTimeoutMillis = followerTimeoutDuration.toMillis();
}
+ @Override
public long incrNextIndex(){
return nextIndex.incrementAndGet();
}
- @Override public long decrNextIndex() {
+ @Override
+ public long decrNextIndex() {
return nextIndex.decrementAndGet();
}
- @Override public void setNextIndex(long nextIndex) {
+ @Override
+ public void setNextIndex(long nextIndex) {
this.nextIndex.set(nextIndex);
}
+ @Override
public long incrMatchIndex(){
return matchIndex.incrementAndGet();
}
- @Override public void setMatchIndex(long matchIndex) {
+ @Override
+ public void setMatchIndex(long matchIndex) {
this.matchIndex.set(matchIndex);
}
+ @Override
public String getId() {
return id;
}
- public AtomicLong getNextIndex() {
- return nextIndex;
+ @Override
+ public long getNextIndex() {
+ return nextIndex.get();
}
- public AtomicLong getMatchIndex() {
- return matchIndex;
+ @Override
+ public long getMatchIndex() {
+ return matchIndex.get();
}
@Override
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.ClientRequestTrackerImpl;
import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
// This would be passed as the hash code of the last chunk when sending the first chunk
public static final int INITIAL_LAST_CHUNK_HASH_CODE = -1;
- protected final Map<String, FollowerLogInformation> followerToLog = new HashMap<>();
- protected final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
-
- protected final Set<String> followers;
+ private final Map<String, FollowerLogInformation> followerToLog;
+ private final Map<String, FollowerToSnapshot> mapFollowerToSnapshot = new HashMap<>();
private Cancellable heartbeatSchedule = null;
public AbstractLeader(RaftActorContext context) {
super(context);
- followers = context.getPeerAddresses().keySet();
-
- for (String followerId : followers) {
+ final Builder<String, FollowerLogInformation> ftlBuilder = ImmutableMap.builder();
+ for (String followerId : context.getPeerAddresses().keySet()) {
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(followerId,
- new AtomicLong(context.getCommitIndex()),
- new AtomicLong(-1),
+ context.getCommitIndex(), -1,
context.getConfigParams().getElectionTimeOutInterval());
- followerToLog.put(followerId, followerLogInformation);
+ ftlBuilder.put(followerId, followerLogInformation);
}
+ followerToLog = ftlBuilder.build();
leaderId = context.getId();
- if(LOG.isDebugEnabled()) {
- LOG.debug("Election:Leader has following peers: {}", followers);
- }
+ LOG.debug("Election:Leader has following peers: {}", getFollowerIds());
- minReplicationCount = getMajorityVoteCount(followers.size());
+ minReplicationCount = getMajorityVoteCount(getFollowerIds().size());
// the isolated Leader peer count will be 1 less than the majority vote count.
// this is because the vote count has the self vote counted in it
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
}
+ /**
+ * Return an immutable collection of follower identifiers.
+ *
+ * @return Collection of follower IDs
+ */
+ protected final Collection<String> getFollowerIds() {
+ return followerToLog.keySet();
+ }
+
private Optional<ByteString> getSnapshot() {
return snapshot;
}
int replicatedCount = 1;
for (FollowerLogInformation info : followerToLog.values()) {
- if (info.getMatchIndex().get() >= N) {
+ if (info.getMatchIndex() >= N) {
replicatedCount++;
}
}
return this;
}
+ @Override
protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
ClientRequestTracker toRemove = findClientRequestTracker(logIndex);
return toRemove;
}
+ @Override
protected ClientRequestTracker findClientRequestTracker(long logIndex) {
for (ClientRequestTracker tracker : trackerList) {
if (tracker.getIndex() == logIndex) {
mapFollowerToSnapshot.remove(followerId);
if(LOG.isDebugEnabled()) {
- LOG.debug("followerToLog.get(followerId).getNextIndex().get()=" +
- followerToLog.get(followerId).getNextIndex().get());
+ LOG.debug("followerToLog.get(followerId).getNextIndex()=" +
+ followerToLog.get(followerId).getNextIndex());
}
if (mapFollowerToSnapshot.isEmpty()) {
logIndex)
);
- if (followers.size() == 0) {
+ if (followerToLog.isEmpty()) {
context.setCommitIndex(logIndex);
applyLogToStateMachine(logIndex);
} else {
private void sendAppendEntries() {
// Send an AppendEntries to all followers
- for (String followerId : followers) {
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ final String followerId = e.getKey();
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long followerNextIndex = followerLogInformation.getNextIndex().get();
+ long followerNextIndex = followerLogInformation.getNextIndex();
boolean isFollowerActive = followerLogInformation.isFollowerActive();
List<ReplicatedLogEntry> entries = null;
*
*/
private void installSnapshotIfNeeded() {
- for (String followerId : followers) {
- ActorSelection followerActor =
- context.getPeerActorSelection(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ final ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation =
- followerToLog.get(followerId);
-
- long nextIndex = followerLogInformation.getNextIndex().get();
+ if (followerActor != null) {
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- LOG.info("{} follower needs a snapshot install", followerId);
+ LOG.info("{} follower needs a snapshot install", e.getKey());
if (snapshot.isPresent()) {
// if a snapshot is present in the memory, most likely another install is in progress
// no need to capture snapshot
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, e.getKey());
} else {
initiateCaptureSnapshot();
private void sendInstallSnapshot() {
- for (String followerId : followers) {
- ActorSelection followerActor = context.getPeerActorSelection(followerId);
+ for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
+ ActorSelection followerActor = context.getPeerActorSelection(e.getKey());
- if(followerActor != null) {
- FollowerLogInformation followerLogInformation = followerToLog.get(followerId);
- long nextIndex = followerLogInformation.getNextIndex().get();
+ if (followerActor != null) {
+ long nextIndex = e.getValue().getNextIndex();
if (!context.getReplicatedLog().isPresent(nextIndex) &&
context.getReplicatedLog().isInSnapshot(nextIndex)) {
- sendSnapshotChunk(followerActor, followerId);
+ sendSnapshotChunk(followerActor, e.getKey());
}
}
}
}
private void sendHeartBeat() {
- if (followers.size() > 0) {
+ if (!followerToLog.isEmpty()) {
sendAppendEntries();
}
}
}
private void scheduleHeartBeat(FiniteDuration interval) {
- if(followers.size() == 0){
+ if (followerToLog.isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;
// called from example-actor for printing the follower-states
public String printFollowerStates() {
- StringBuilder sb = new StringBuilder();
- for(FollowerLogInformation followerLogInformation : followerToLog.values()) {
- boolean isFollowerActive = followerLogInformation.isFollowerActive();
- sb.append("{"+followerLogInformation.getId() + " state:" + isFollowerActive + "},");
+ final StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (FollowerLogInformation followerLogInformation : followerToLog.values()) {
+ sb.append('{');
+ sb.append(followerLogInformation.getId());
+ sb.append(" state:");
+ sb.append(followerLogInformation.isFollowerActive());
+ sb.append("},");
}
- return "[" + sb.toString() + "]";
+ sb.append(']');
+
+ return sb.toString();
+ }
+
+ @VisibleForTesting
+ public FollowerLogInformation getFollower(String followerId) {
+ return followerToLog.get(followerId);
+ }
+
+ @VisibleForTesting
+ protected void setFollowerSnapshot(String followerId, FollowerToSnapshot snapshot) {
+ mapFollowerToSnapshot.put(followerId, snapshot);
+ }
+
+ @VisibleForTesting
+ public int followerSnapshotSize() {
+ return mapFollowerToSnapshot.size();
}
@VisibleForTesting
- void markFollowerActive(String followerId) {
- followerToLog.get(followerId).markFollowerActive();
+ public int followerLogSize() {
+ return followerToLog.size();
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import akka.actor.ActorRef;
}
protected void scheduleInstallSnapshotCheck(FiniteDuration interval) {
- if(followers.size() == 0){
+ if (getFollowerIds().isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
return;
context.getActorSystem().dispatcher(), context.getActor());
}
- @Override public void close() throws Exception {
+ @Override
+ public void close() throws Exception {
stopInstallSnapshotSchedule();
stopIsolatedLeaderCheckSchedule();
super.close();
@VisibleForTesting
void markFollowerActive(String followerId) {
- followerToLog.get(followerId).markFollowerActive();
+ getFollower(followerId).markFollowerActive();
}
@VisibleForTesting
void markFollowerInActive(String followerId) {
- followerToLog.get(followerId).markFollowerInActive();
+ getFollower(followerId).markFollowerInActive();
}
}
*/
package org.opendaylight.controller.cluster.raft;
-
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Test;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
FollowerLogInformation followerLogInformation =
new FollowerLogInformationImpl(
- "follower1", new AtomicLong(10), new AtomicLong(9), timeoutDuration);
+ "follower1", 10, 9, timeoutDuration);
}};
}
-
@Test
public void testThatLeaderSendsAHeartbeatMessageToAllFollowers() {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef followerActor = getTestActor();
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
Object msg = fromSerializableMessage(in);
if (msg instanceof AppendEntries) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef followerActor = getTestActor();
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
Object msg = fromSerializableMessage(in);
if (msg instanceof AppendEntries) {
new JavaTestKit(getSystem()) {{
new Within(duration("1 seconds")) {
+ @Override
protected void run() {
ActorRef raftActor = getTestActor();
new ExpectMsg<String>(duration("1 seconds"),
"match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof ApplyState) {
if (((ApplyState) in).getIdentifier().equals("state-id")) {
final String out =
new ExpectMsg<String>(duration("1 seconds"), "match hint") {
// do not put code outside this method, will run afterwards
+ @Override
protected String match(Object in) {
if (in instanceof InstallSnapshotMessages.InstallSnapshot) {
InstallSnapshot is = (InstallSnapshot)
assertTrue(raftBehavior instanceof Leader);
- assertEquals(leader.mapFollowerToSnapshot.size(), 0);
- assertEquals(leader.followerToLog.size(), 1);
- assertNotNull(leader.followerToLog.get(followerActor.path().toString()));
- FollowerLogInformation fli = leader.followerToLog.get(followerActor.path().toString());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex, fli.getMatchIndex().get());
- assertEquals(snapshotIndex + 1, fli.getNextIndex().get());
+ assertEquals(0, leader.followerSnapshotSize());
+ assertEquals(1, leader.followerLogSize());
+ assertNotNull(leader.getFollower(followerActor.path().toString()));
+ FollowerLogInformation fli = leader.getFollower(followerActor.path().toString());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex, fli.getMatchIndex());
+ assertEquals(snapshotIndex + 1, fli.getNextIndex());
}};
}
return createActorContext(leaderActor);
}
+ @Override
protected RaftActorContext createActorContext(ActorRef actorRef) {
return new MockRaftActorContext("test", getSystem(), actorRef);
}
public void createFollowerToSnapshot(String followerId, ByteString bs ) {
fts = new FollowerToSnapshot(bs);
- mapFollowerToSnapshot.put(followerId, fts);
-
+ setFollowerSnapshot(followerId, fts);
}
}
package org.opendaylight.controller.sal.binding.impl.connect.dom;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.concurrent.Future;
-
import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-
/*
* RPC's can have both input, output, one or the other, or neither.
*
*
*/
public class RpcInvocationStrategy {
+ private final Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction = new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public RpcResult<?> apply(final RpcResult<CompositeNode> result) {
+ final Object output;
+ if (getOutputClass() != null && result.getResult() != null) {
+ output = mappingService.dataObjectFromDataDom(getOutputClass().get(), result.getResult());
+ } else {
+ output = null;
+ }
+
+ return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
+ }
+ };
private final BindingIndependentMappingService mappingService;
private final RpcProvisionRegistry biRpcRegistry;
final Method targetMethod,
final BindingIndependentMappingService mappingService,
final RpcProvisionRegistry biRpcRegistry ) {
-
+ this.mappingService = mappingService;
+ this.biRpcRegistry = biRpcRegistry;
this.targetMethod = targetMethod;
this.rpc = rpc;
- Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
- Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
-
- if ( outputClassOption != null && outputClassOption.isPresent() ) {
- this.outputClass = new WeakReference(outputClassOption.get() ) ;
+ final Optional<Class<?>> outputClassOption = BindingReflections.resolveRpcOutputClass(targetMethod);
+ if (outputClassOption.isPresent()) {
+ this.outputClass = new WeakReference(outputClassOption.get());
} else {
- this.outputClass = null ;
+ this.outputClass = null;
}
- if ( inputClassOption != null && inputClassOption.isPresent() ) {
- this.inputClass = new WeakReference(inputClassOption.get() ) ;
+
+ final Optional<Class<? extends DataContainer>> inputClassOption = BindingReflections.resolveRpcInputClass(targetMethod);
+ if (inputClassOption.isPresent() ) {
+ this.inputClass = new WeakReference(inputClassOption.get());
} else {
- this.inputClass = null ;
+ this.inputClass = null;
}
-
- this.mappingService = mappingService;
- this.biRpcRegistry = biRpcRegistry;
}
@SuppressWarnings({ "unchecked" })
inputXml = ImmutableCompositeNode.create( rpc, Collections.<Node<?>>emptyList() );
}
- Function<RpcResult<CompositeNode>, RpcResult<?>> transformationFunction =
- new Function<RpcResult<CompositeNode>, RpcResult<?>>() {
- @SuppressWarnings("rawtypes")
- @Override
- public RpcResult<?> apply(RpcResult<CompositeNode> result) {
-
- Object output = null;
-
- if( getOutputClass() != null ) {
- if (result.getResult() != null) {
- output = mappingService.dataObjectFromDataDom(getOutputClass().get(),
- result.getResult());
- }
- }
-
- return RpcResultBuilder.from( (RpcResult)result ).withResult( output ).build();
- }
- };
-
return Futures.transform(biRpcRegistry.invokeRpc(rpc, inputXml), transformationFunction);
}
}
@SuppressWarnings("rawtypes")
- public WeakReference<Class> getOutputClass() {
+ @VisibleForTesting
+ WeakReference<Class> getOutputClass() {
return outputClass;
}
}