2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.japi.Creator;
17 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
18 import scala.concurrent.duration.FiniteDuration;
20 import java.util.concurrent.TimeUnit;
23 * A RaftReplicator is responsible for replicating messages to any one follower.
24 * Once it gets a message for replication it should keep trying to replicate it
25 * to the remote follower indefinitely.
27 * Any new messages that are sent to this actor while it is replicating a
28 * message may need to be stashed till the current message has been successfully
29 * replicated. When a message is successfully replicated the RaftReplicator
30 * needs to inform the RaftActor of it.
32 public class RaftReplicator extends UntypedActor {
35 * The interval at which a heart beat message will be sent to the remote
38 * Since this is set to 100 milliseconds the Election timeout should be
39 * at least 200 milliseconds
42 private static final FiniteDuration HEART_BEAT_INTERVAL =
43 new FiniteDuration(100, TimeUnit.MILLISECONDS);
46 * The state of the follower as known to this replicator
48 private final FollowerLogInformation followerLogInformation;
51 * The local RaftActor that created this replicator so that it could
52 * replicate messages to the follower
54 private final ActorRef leader;
58 * The remote RaftActor to whom the messages need to be replicated
60 private ActorSelection follower;
62 private Cancellable heartbeatCancel = null;
64 public RaftReplicator(FollowerLogInformation followerLogInformation,
67 this.followerLogInformation = followerLogInformation;
69 this.follower = getContext().actorSelection(followerLogInformation.getId());
71 // Immediately schedule a heartbeat
72 // Upon election: send initial empty AppendEntries RPCs
73 // (heartbeat) to each server; repeat during idle periods to
74 // prevent election timeouts (ยง5.2)
75 scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
78 private void scheduleHeartBeat(FiniteDuration interval) {
79 if(heartbeatCancel != null && ! heartbeatCancel.isCancelled()){
80 heartbeatCancel.cancel();
83 // Schedule a heartbeat. When the scheduler triggers the replicator
84 // will let the RaftActor (leader) know that a new heartbeat needs to be sent
85 // Scheduling the heartbeat only once here because heartbeats do not
86 // need to be sent if there are other messages being sent to the remote
89 getContext().system().scheduler().scheduleOnce(interval,
90 leader, new SendHeartBeat(), getContext().dispatcher(), getSelf());
95 @Override public void onReceive(Object message) throws Exception {
96 scheduleHeartBeat(HEART_BEAT_INTERVAL);
97 follower.forward(message, getContext());
100 public static Props props(final FollowerLogInformation followerLogInformation,
101 final ActorRef leader) {
102 return Props.create(new Creator<RaftReplicator>() {
104 @Override public RaftReplicator create() throws Exception {
105 return new RaftReplicator(followerLogInformation, leader);