/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.raft; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat; import scala.concurrent.duration.FiniteDuration; import java.util.concurrent.TimeUnit; /** * A RaftReplicator is responsible for replicating messages to any one follower. * Once it gets a message for replication it should keep trying to replicate it * to the remote follower indefinitely. *
* Any new messages that are sent to this actor while it is replicating a
* message may need to be stashed till the current message has been successfully
* replicated. When a message is successfully replicated the RaftReplicator
* needs to inform the RaftActor of it.
*/
public class RaftReplicator extends UntypedActor {
/**
* The interval at which a heart beat message will be sent to the remote
* RaftActor
*
* Since this is set to 100 milliseconds the Election timeout should be
* at least 200 milliseconds
*
*/
private static final FiniteDuration HEART_BEAT_INTERVAL =
new FiniteDuration(100, TimeUnit.MILLISECONDS);
/**
* The state of the follower as known to this replicator
*/
private final FollowerLogInformation followerLogInformation;
/**
* The local RaftActor that created this replicator so that it could
* replicate messages to the follower
*/
private final ActorRef leader;
/**
* The remote RaftActor to whom the messages need to be replicated
*/
private ActorSelection follower;
private Cancellable heartbeatCancel = null;
public RaftReplicator(FollowerLogInformation followerLogInformation,
ActorRef leader) {
this.followerLogInformation = followerLogInformation;
this.leader = leader;
this.follower = getContext().actorSelection(followerLogInformation.getId());
// Immediately schedule a heartbeat
// Upon election: send initial empty AppendEntries RPCs
// (heartbeat) to each server; repeat during idle periods to
// prevent election timeouts (ยง5.2)
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
}
private void scheduleHeartBeat(FiniteDuration interval) {
if(heartbeatCancel != null && ! heartbeatCancel.isCancelled()){
heartbeatCancel.cancel();
}
// Schedule a heartbeat. When the scheduler triggers the replicator
// will let the RaftActor (leader) know that a new heartbeat needs to be sent
// Scheduling the heartbeat only once here because heartbeats do not
// need to be sent if there are other messages being sent to the remote
// actor.
heartbeatCancel =
getContext().system().scheduler().scheduleOnce(interval,
leader, new SendHeartBeat(), getContext().dispatcher(), getSelf());
}
@Override public void onReceive(Object message) throws Exception {
scheduleHeartBeat(HEART_BEAT_INTERVAL);
follower.forward(message, getContext());
}
public static Props props(final FollowerLogInformation followerLogInformation,
final ActorRef leader) {
return Props.create(new Creator