66059b5d62c1b756517cb691ab53f938b42006b0
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Supplier;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import org.opendaylight.controller.cluster.DataPersistenceProvider;
27 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
28 import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
29 import org.slf4j.Logger;
30
31 public class RaftActorContextImpl implements RaftActorContext {
32
33     private final ActorRef actor;
34
35     private final ActorContext context;
36
37     private final String id;
38
39     private final ElectionTerm termInformation;
40
41     private long commitIndex;
42
43     private long lastApplied;
44
45     private ReplicatedLog replicatedLog;
46
47     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
48
49     private final Logger LOG;
50
51     private ConfigParams configParams;
52
53     private boolean dynamicServerConfiguration = false;
54
55     @VisibleForTesting
56     private Supplier<Long> totalMemoryRetriever;
57
58     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
59     // be passed to it in the constructor
60     private SnapshotManager snapshotManager;
61
62     private final DataPersistenceProvider persistenceProvider;
63
64     private short payloadVersion;
65
66     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
67             ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
68             ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
69         this.actor = actor;
70         this.context = context;
71         this.id = id;
72         this.termInformation = termInformation;
73         this.commitIndex = commitIndex;
74         this.lastApplied = lastApplied;
75         this.configParams = configParams;
76         this.persistenceProvider = persistenceProvider;
77         this.LOG = logger;
78
79         for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
80             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
81         }
82     }
83
84     public void setPayloadVersion(short payloadVersion) {
85         this.payloadVersion = payloadVersion;
86     }
87
88     @Override
89     public short getPayloadVersion() {
90         return payloadVersion;
91     }
92
93     public void setConfigParams(ConfigParams configParams) {
94         this.configParams = configParams;
95     }
96
97     @Override
98     public ActorRef actorOf(Props props){
99         return context.actorOf(props);
100     }
101
102     @Override
103     public ActorSelection actorSelection(String path){
104         return context.actorSelection(path);
105     }
106
107     @Override
108     public String getId() {
109         return id;
110     }
111
112     @Override
113     public ActorRef getActor() {
114         return actor;
115     }
116
117     @Override
118     public ElectionTerm getTermInformation() {
119         return termInformation;
120     }
121
122     @Override
123     public long getCommitIndex() {
124         return commitIndex;
125     }
126
127     @Override public void setCommitIndex(long commitIndex) {
128         this.commitIndex = commitIndex;
129     }
130
131     @Override
132     public long getLastApplied() {
133         return lastApplied;
134     }
135
136     @Override
137     public void setLastApplied(long lastApplied) {
138         this.lastApplied = lastApplied;
139     }
140
141     @Override
142     public void setReplicatedLog(ReplicatedLog replicatedLog) {
143         this.replicatedLog = replicatedLog;
144     }
145
146     @Override
147     public ReplicatedLog getReplicatedLog() {
148         return replicatedLog;
149     }
150
151     @Override public ActorSystem getActorSystem() {
152         return context.system();
153     }
154
155     @Override public Logger getLogger() {
156         return this.LOG;
157     }
158
159     @Override
160     public Collection<String> getPeerIds() {
161         return peerInfoMap.keySet();
162     }
163
164     @Override
165     public Collection<PeerInfo> getPeers() {
166         return peerInfoMap.values();
167     }
168
169     @Override
170     public PeerInfo getPeerInfo(String peerId) {
171         return peerInfoMap.get(peerId);
172     }
173
174     @Override
175     public String getPeerAddress(String peerId) {
176         String peerAddress = null;
177         PeerInfo peerInfo = peerInfoMap.get(peerId);
178         if(peerInfo != null) {
179             peerAddress = peerInfo.getAddress();
180             if(peerAddress == null) {
181                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
182                 peerInfo.setAddress(peerAddress);
183             }
184         }
185
186         return peerAddress;
187     }
188
189     @Override
190     public void updatePeerIds(ServerConfigurationPayload serverConfig){
191
192         Set<String> currentPeers = new HashSet<>(this.getPeerIds());
193         for(ServerInfo server: serverConfig.getServerConfig()) {
194             if(!getId().equals(server.getId())) {
195                 VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
196                 if(!currentPeers.contains(server.getId())) {
197                     this.addToPeers(server.getId(), null, votingState);
198                 } else {
199                     this.getPeerInfo(server.getId()).setVotingState(votingState);
200                     currentPeers.remove(server.getId());
201                 }
202             }
203         }
204
205         for(String peerIdToRemove: currentPeers) {
206             this.removePeer(peerIdToRemove);
207         }
208         setDynamicServerConfigurationInUse();
209     }
210
211     @Override public ConfigParams getConfigParams() {
212         return configParams;
213     }
214
215     @Override
216     public void addToPeers(String id, String address, VotingState votingState) {
217         peerInfoMap.put(id, new PeerInfo(id, address, votingState));
218     }
219
220     @Override public void removePeer(String name) {
221         peerInfoMap.remove(name);
222     }
223
224     @Override public ActorSelection getPeerActorSelection(String peerId) {
225         String peerAddress = getPeerAddress(peerId);
226         if(peerAddress != null){
227             return actorSelection(peerAddress);
228         }
229         return null;
230     }
231
232     @Override
233     public void setPeerAddress(String peerId, String peerAddress) {
234         PeerInfo peerInfo = peerInfoMap.get(peerId);
235         if(peerInfo != null) {
236             LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
237             peerInfo.setAddress(peerAddress);
238         }
239     }
240
241     @Override
242     public SnapshotManager getSnapshotManager() {
243         if(snapshotManager == null){
244             snapshotManager = new SnapshotManager(this, LOG);
245         }
246         return snapshotManager;
247     }
248
249     @Override
250     public long getTotalMemory() {
251         return totalMemoryRetriever != null ? totalMemoryRetriever.get() : Runtime.getRuntime().totalMemory();
252     }
253
254     @Override
255     public void setTotalMemoryRetriever(Supplier<Long> retriever) {
256         totalMemoryRetriever = retriever;
257     }
258
259     @Override
260     public boolean hasFollowers() {
261         return getPeerIds().size() > 0;
262     }
263
264     @Override
265     public DataPersistenceProvider getPersistenceProvider() {
266         return persistenceProvider;
267     }
268
269
270     @Override
271     public RaftPolicy getRaftPolicy() {
272         return configParams.getRaftPolicy();
273     }
274
275     @Override
276     public boolean isDynamicServerConfigurationInUse() {
277         return dynamicServerConfiguration;
278     }
279
280     @Override
281     public void setDynamicServerConfigurationInUse() {
282         this.dynamicServerConfiguration = true;
283     }
284
285     @Override
286     public ServerConfigurationPayload getPeerServerInfo() {
287         if (!isDynamicServerConfigurationInUse()) {
288             return null;
289         }
290         Collection<PeerInfo> peers = getPeers();
291         List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
292         for(PeerInfo peer: peers) {
293             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
294         }
295
296         newConfig.add(new ServerInfo(getId(), true));
297         return (new ServerConfigurationPayload(newConfig));
298     }
299 }