Initial code/design for an Akka Raft implementation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftReplicator.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.japi.Creator;
17 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
18 import scala.concurrent.duration.FiniteDuration;
19
20 import java.util.concurrent.TimeUnit;
21
22 /**
23  * A RaftReplicator is responsible for replicating messages to any one follower.
24  * Once it gets a message for replication it should keep trying to replicate it
25  * to the remote follower indefinitely.
26  * <p>
27  * Any new messages that are sent to this actor while it is replicating a
28  * message may need to be stashed till the current message has been successfully
29  * replicated. When a message is successfully replicated the RaftReplicator
30  * needs to inform the RaftActor of it.
31  */
32 public class RaftReplicator extends UntypedActor {
33
34     /**
35      * The interval at which a heart beat message will be sent to the remote
36      * RaftActor
37      *
38      * Since this is set to 100 milliseconds the Election timeout should be
39      * at least 200 milliseconds
40      *
41      */
42     private static final FiniteDuration HEART_BEAT_INTERVAL =
43         new FiniteDuration(100, TimeUnit.MILLISECONDS);
44
45     /**
46      * The state of the follower as known to this replicator
47      */
48     private final FollowerLogInformation followerLogInformation;
49
50     /**
51      * The local RaftActor that created this replicator so that it could
52      * replicate messages to the follower
53      */
54     private final ActorRef leader;
55
56
57     /**
58      * The remote RaftActor to whom the messages need to be replicated
59      */
60     private ActorSelection follower;
61
62     private Cancellable heartbeatCancel = null;
63
64     public RaftReplicator(FollowerLogInformation followerLogInformation,
65         ActorRef leader) {
66
67         this.followerLogInformation = followerLogInformation;
68         this.leader = leader;
69         this.follower = getContext().actorSelection(followerLogInformation.getId());
70
71         // Immediately schedule a heartbeat
72         // Upon election: send initial empty AppendEntries RPCs
73         // (heartbeat) to each server; repeat during idle periods to
74         // prevent election timeouts (ยง5.2)
75         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
76     }
77
78     private void scheduleHeartBeat(FiniteDuration interval) {
79         if(heartbeatCancel != null && ! heartbeatCancel.isCancelled()){
80             heartbeatCancel.cancel();
81         }
82
83         // Schedule a heartbeat. When the scheduler triggers the replicator
84         // will let the RaftActor (leader) know that a new heartbeat needs to be sent
85         // Scheduling the heartbeat only once here because heartbeats do not
86         // need to be sent if there are other messages being sent to the remote
87         // actor.
88         heartbeatCancel =
89             getContext().system().scheduler().scheduleOnce(interval,
90                 leader, new SendHeartBeat(), getContext().dispatcher(), getSelf());
91     }
92
93
94
95     @Override public void onReceive(Object message) throws Exception {
96         scheduleHeartBeat(HEART_BEAT_INTERVAL);
97         follower.forward(message, getContext());
98     }
99
100     public static Props props(final FollowerLogInformation followerLogInformation,
101         final ActorRef leader) {
102         return Props.create(new Creator<RaftReplicator>() {
103
104             @Override public RaftReplicator create() throws Exception {
105                 return new RaftReplicator(followerLogInformation, leader);
106             }
107         });
108     }
109 }