import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
+
+ private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
+
protected final Logger LOG = LoggerFactory.getLogger(getClass());
/**
if (message instanceof ApplyState){
ApplyState applyState = (ApplyState) message;
+ long elapsedTime = (System.nanoTime() - applyState.getStartTime());
+ if(elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS){
+ LOG.warn("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
+ TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+ }
+
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Applying state for log index {} data {}",
persistenceId(), applyState.getReplicatedLogEntry().getIndex(),
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getLastAppliedIndex(),
captureSnapshot.getLastAppliedTerm());
- } else {
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else if(captureSnapshot.getReplicatedToAllIndex() != -1){
// clear the log based on replicatedToAllIndex
context.getReplicatedLog().snapshotPreCommit(captureSnapshot.getReplicatedToAllIndex(),
captureSnapshot.getReplicatedToAllTerm());
+
+ getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+ } else {
+ // The replicatedToAllIndex was not found in the log
+ // This means that replicatedToAllIndex never moved beyond -1 or that it is already in the snapshot.
+ // In this scenario we may need to save the snapshot to the akka persistence
+ // snapshot for recovery but we do not need to do the replicated log trimming.
+ context.getReplicatedLog().snapshotPreCommit(replicatedLog.getSnapshotIndex(),
+ replicatedLog.getSnapshotTerm());
}
- getCurrentBehavior().setReplicatedToAllIndex(captureSnapshot.getReplicatedToAllIndex());
+
LOG.info("{}: Removed in-memory snapshotted entries, adjusted snaphsotIndex:{} " +
"and term:{}", persistenceId(), captureSnapshot.getLastAppliedIndex(),