Remove RaftReplicator and move hearbeat logic to the leader
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / behaviors / Leader.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft.behaviors;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Cancellable;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
16 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
17 import org.opendaylight.controller.cluster.raft.RaftActorContext;
18 import org.opendaylight.controller.cluster.raft.RaftState;
19 import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
20 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
21 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
22 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
23 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
24 import scala.concurrent.duration.FiniteDuration;
25
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicLong;
32
33 /**
34  * The behavior of a RaftActor when it is in the Leader state
35  * <p/>
36  * Leaders:
37  * <ul>
38  * <li> Upon election: send initial empty AppendEntries RPCs
39  * (heartbeat) to each server; repeat during idle periods to
40  * prevent election timeouts (§5.2)
41  * <li> If command received from client: append entry to local log,
42  * respond after entry applied to state machine (§5.3)
43  * <li> If last log index ≥ nextIndex for a follower: send
44  * AppendEntries RPC with log entries starting at nextIndex
45  * <ul>
46  * <li> If successful: update nextIndex and matchIndex for
47  * follower (§5.3)
48  * <li> If AppendEntries fails because of log inconsistency:
49  * decrement nextIndex and retry (§5.3)
50  * </ul>
51  * <li> If there exists an N such that N > commitIndex, a majority
52  * of matchIndex[i] ≥ N, and log[N].term == currentTerm:
53  * set commitIndex = N (§5.3, §5.4).
54  */
55 public class Leader extends AbstractRaftActorBehavior {
56
57     /**
58      * The interval at which a heart beat message will be sent to the remote
59      * RaftActor
60      * <p/>
61      * Since this is set to 100 milliseconds the Election timeout should be
62      * at least 200 milliseconds
63      */
64     private static final FiniteDuration HEART_BEAT_INTERVAL =
65         new FiniteDuration(100, TimeUnit.MILLISECONDS);
66
67     private final Map<String, ActorRef> followerToReplicator = new HashMap<>();
68
69     private final Map<String, FollowerLogInformation> followerToLog =
70         new HashMap();
71
72     private final Map<String, ActorSelection> followerToActor = new HashMap<>();
73
74     private Cancellable heartbeatCancel = null;
75
76     public Leader(RaftActorContext context, List<String> followers) {
77         super(context);
78
79         for (String follower : followers) {
80
81             FollowerLogInformation followerLogInformation =
82                 new FollowerLogInformationImpl(follower,
83                     new AtomicLong(0),
84                     new AtomicLong(0));
85
86             followerToActor.put(follower,
87                 context.actorSelection(followerLogInformation.getId()));
88             followerToLog.put(follower, followerLogInformation);
89
90         }
91
92         // Immediately schedule a heartbeat
93         // Upon election: send initial empty AppendEntries RPCs
94         // (heartbeat) to each server; repeat during idle periods to
95         // prevent election timeouts (§5.2)
96         scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
97
98
99     }
100
101     @Override protected RaftState handleAppendEntries(ActorRef sender,
102         AppendEntries appendEntries, RaftState suggestedState) {
103         return suggestedState;
104     }
105
106     @Override protected RaftState handleAppendEntriesReply(ActorRef sender,
107         AppendEntriesReply appendEntriesReply, RaftState suggestedState) {
108         return suggestedState;
109     }
110
111     @Override protected RaftState handleRequestVote(ActorRef sender,
112         RequestVote requestVote, RaftState suggestedState) {
113         return suggestedState;
114     }
115
116     @Override protected RaftState handleRequestVoteReply(ActorRef sender,
117         RequestVoteReply requestVoteReply, RaftState suggestedState) {
118         return suggestedState;
119     }
120
121     @Override protected RaftState state() {
122         return RaftState.Leader;
123     }
124
125     @Override public RaftState handleMessage(ActorRef sender, Object message) {
126         Preconditions.checkNotNull(sender, "sender should not be null");
127
128         scheduleHeartBeat(HEART_BEAT_INTERVAL);
129
130         if (message instanceof SendHeartBeat) {
131             for (ActorSelection follower : followerToActor.values()) {
132                 follower.tell(new AppendEntries(
133                     context.getTermInformation().getCurrentTerm().get(),
134                     context.getId(),
135                     context.getReplicatedLog().last().getIndex(),
136                     context.getReplicatedLog().last().getTerm(),
137                     Collections.EMPTY_LIST, context.getCommitIndex().get()),
138                     context.getActor());
139             }
140             return state();
141         }
142         return super.handleMessage(sender, message);
143     }
144
145     private void scheduleHeartBeat(FiniteDuration interval) {
146         if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
147             heartbeatCancel.cancel();
148         }
149
150         // Schedule a heartbeat. When the scheduler triggers the replicator
151         // will let the RaftActor (leader) know that a new heartbeat needs to be sent
152         // Scheduling the heartbeat only once here because heartbeats do not
153         // need to be sent if there are other messages being sent to the remote
154         // actor.
155         heartbeatCancel =
156             context.getActorSystem().scheduler().scheduleOnce(interval,
157                 context.getActor(), new SendHeartBeat(),
158                 context.getActorSystem().dispatcher(), context.getActor());
159     }
160
161 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.