BUG-8618: refactor SyncStatusTracker state
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Preconditions;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.function.Consumer;
28 import java.util.function.LongSupplier;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
34 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
38 import org.slf4j.Logger;
39
40 /**
41  * Implementation of the RaftActorContext interface.
42  *
43  * @author Moiz Raja
44  * @author Thomas Pantelis
45  */
46 public class RaftActorContextImpl implements RaftActorContext {
47     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
48
49     private final ActorRef actor;
50
51     private final ActorContext context;
52
53     private final String id;
54
55     private final ElectionTerm termInformation;
56
57     private long commitIndex;
58
59     private long lastApplied;
60
61     private ReplicatedLog replicatedLog;
62
63     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
64
65     private final Logger log;
66
67     private ConfigParams configParams;
68
69     private boolean dynamicServerConfiguration = false;
70
71     @VisibleForTesting
72     private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
73
74     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
75     // be passed to it in the constructor
76     private SnapshotManager snapshotManager;
77
78     private final DataPersistenceProvider persistenceProvider;
79
80     private short payloadVersion;
81
82     private boolean votingMember = true;
83
84     private RaftActorBehavior currentBehavior;
85
86     private int numVotingPeers = -1;
87
88     private Optional<Cluster> cluster;
89
90     private final Consumer<ApplyState> applyStateConsumer;
91
92     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
93
94     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
95             @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
96             @Nonnull Map<String, String> peerAddresses,
97             @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
98             @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
99         this.actor = actor;
100         this.context = context;
101         this.id = id;
102         this.termInformation = Preconditions.checkNotNull(termInformation);
103         this.commitIndex = commitIndex;
104         this.lastApplied = lastApplied;
105         this.configParams = Preconditions.checkNotNull(configParams);
106         this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
107         this.log = Preconditions.checkNotNull(logger);
108         this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
109
110         for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
111             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
112         }
113     }
114
115     @VisibleForTesting
116     public void setPayloadVersion(short payloadVersion) {
117         this.payloadVersion = payloadVersion;
118     }
119
120     @Override
121     public short getPayloadVersion() {
122         return payloadVersion;
123     }
124
125     public void setConfigParams(ConfigParams configParams) {
126         this.configParams = configParams;
127     }
128
129     @Override
130     public ActorRef actorOf(Props props) {
131         return context.actorOf(props);
132     }
133
134     @Override
135     public ActorSelection actorSelection(String path) {
136         return context.actorSelection(path);
137     }
138
139     @Override
140     public String getId() {
141         return id;
142     }
143
144     @Override
145     public ActorRef getActor() {
146         return actor;
147     }
148
149     @Override
150     @SuppressWarnings("checkstyle:IllegalCatch")
151     public Optional<Cluster> getCluster() {
152         if (cluster == null) {
153             try {
154                 cluster = Optional.of(Cluster.get(getActorSystem()));
155             } catch (Exception e) {
156                 // An exception means there's no cluster configured. This will only happen in unit tests.
157                 log.debug("{}: Could not obtain Cluster: {}", getId(), e);
158                 cluster = Optional.empty();
159             }
160         }
161
162         return cluster;
163     }
164
165     @Override
166     public ElectionTerm getTermInformation() {
167         return termInformation;
168     }
169
170     @Override
171     public long getCommitIndex() {
172         return commitIndex;
173     }
174
175     @Override public void setCommitIndex(long commitIndex) {
176         this.commitIndex = commitIndex;
177     }
178
179     @Override
180     public long getLastApplied() {
181         return lastApplied;
182     }
183
184     @Override
185     public void setLastApplied(long lastApplied) {
186         this.lastApplied = lastApplied;
187     }
188
189     @Override
190     public void setReplicatedLog(ReplicatedLog replicatedLog) {
191         this.replicatedLog = replicatedLog;
192     }
193
194     @Override
195     public ReplicatedLog getReplicatedLog() {
196         return replicatedLog;
197     }
198
199     @Override public ActorSystem getActorSystem() {
200         return context.system();
201     }
202
203     @Override public Logger getLogger() {
204         return this.log;
205     }
206
207     @Override
208     public Collection<String> getPeerIds() {
209         return peerInfoMap.keySet();
210     }
211
212     @Override
213     public Collection<PeerInfo> getPeers() {
214         return peerInfoMap.values();
215     }
216
217     @Override
218     public PeerInfo getPeerInfo(String peerId) {
219         return peerInfoMap.get(peerId);
220     }
221
222     @Override
223     public String getPeerAddress(String peerId) {
224         String peerAddress;
225         PeerInfo peerInfo = peerInfoMap.get(peerId);
226         if (peerInfo != null) {
227             peerAddress = peerInfo.getAddress();
228             if (peerAddress == null) {
229                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
230                 peerInfo.setAddress(peerAddress);
231             }
232         } else {
233             peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
234         }
235
236         return peerAddress;
237     }
238
239     @Override
240     public void updatePeerIds(ServerConfigurationPayload serverConfig) {
241         votingMember = true;
242         boolean foundSelf = false;
243         Set<String> currentPeers = new HashSet<>(this.getPeerIds());
244         for (ServerInfo server : serverConfig.getServerConfig()) {
245             if (getId().equals(server.getId())) {
246                 foundSelf = true;
247                 if (!server.isVoting()) {
248                     votingMember = false;
249                 }
250             } else {
251                 VotingState votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
252                 if (!currentPeers.contains(server.getId())) {
253                     this.addToPeers(server.getId(), null, votingState);
254                 } else {
255                     this.getPeerInfo(server.getId()).setVotingState(votingState);
256                     currentPeers.remove(server.getId());
257                 }
258             }
259         }
260
261         for (String peerIdToRemove : currentPeers) {
262             this.removePeer(peerIdToRemove);
263         }
264
265         if (!foundSelf) {
266             votingMember = false;
267         }
268
269         log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
270
271         setDynamicServerConfigurationInUse();
272     }
273
274     @Override public ConfigParams getConfigParams() {
275         return configParams;
276     }
277
278     @Override
279     public void addToPeers(String peerId, String address, VotingState votingState) {
280         peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
281         numVotingPeers = -1;
282     }
283
284     @Override
285     public void removePeer(String name) {
286         if (getId().equals(name)) {
287             votingMember = false;
288         } else {
289             peerInfoMap.remove(name);
290             numVotingPeers = -1;
291         }
292     }
293
294     @Override public ActorSelection getPeerActorSelection(String peerId) {
295         String peerAddress = getPeerAddress(peerId);
296         if (peerAddress != null) {
297             return actorSelection(peerAddress);
298         }
299         return null;
300     }
301
302     @Override
303     public void setPeerAddress(String peerId, String peerAddress) {
304         PeerInfo peerInfo = peerInfoMap.get(peerId);
305         if (peerInfo != null) {
306             log.info("Peer address for peer {} set to {}", peerId, peerAddress);
307             peerInfo.setAddress(peerAddress);
308         }
309     }
310
311     @Override
312     public SnapshotManager getSnapshotManager() {
313         if (snapshotManager == null) {
314             snapshotManager = new SnapshotManager(this, log);
315         }
316         return snapshotManager;
317     }
318
319     @Override
320     public long getTotalMemory() {
321         return totalMemoryRetriever.getAsLong();
322     }
323
324     @Override
325     public void setTotalMemoryRetriever(LongSupplier retriever) {
326         totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
327     }
328
329     @Override
330     public boolean hasFollowers() {
331         return !getPeerIds().isEmpty();
332     }
333
334     @Override
335     public DataPersistenceProvider getPersistenceProvider() {
336         return persistenceProvider;
337     }
338
339
340     @Override
341     public RaftPolicy getRaftPolicy() {
342         return configParams.getRaftPolicy();
343     }
344
345     @Override
346     public boolean isDynamicServerConfigurationInUse() {
347         return dynamicServerConfiguration;
348     }
349
350     @Override
351     public void setDynamicServerConfigurationInUse() {
352         this.dynamicServerConfiguration = true;
353     }
354
355     @Override
356     public ServerConfigurationPayload getPeerServerInfo(boolean includeSelf) {
357         if (!isDynamicServerConfigurationInUse()) {
358             return null;
359         }
360         Collection<PeerInfo> peers = getPeers();
361         List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
362         for (PeerInfo peer: peers) {
363             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
364         }
365
366         if (includeSelf) {
367             newConfig.add(new ServerInfo(getId(), votingMember));
368         }
369
370         return new ServerConfigurationPayload(newConfig);
371     }
372
373     @Override
374     public boolean isVotingMember() {
375         return votingMember;
376     }
377
378     @Override
379     public boolean anyVotingPeers() {
380         if (numVotingPeers < 0) {
381             numVotingPeers = 0;
382             for (PeerInfo info: getPeers()) {
383                 if (info.isVoting()) {
384                     numVotingPeers++;
385                 }
386             }
387         }
388
389         return numVotingPeers > 0;
390     }
391
392     @Override
393     public RaftActorBehavior getCurrentBehavior() {
394         return currentBehavior;
395     }
396
397     void setCurrentBehavior(final RaftActorBehavior behavior) {
398         this.currentBehavior = Preconditions.checkNotNull(behavior);
399     }
400
401     @Override
402     public Consumer<ApplyState> getApplyStateConsumer() {
403         return applyStateConsumer;
404     }
405
406     @Override
407     public FileBackedOutputStream newFileBackedOutputStream() {
408         return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(),
409                 configParams.getTempFileDirectory());
410     }
411
412     @SuppressWarnings("checkstyle:IllegalCatch")
413     void close() {
414         if (currentBehavior != null) {
415             try {
416                 currentBehavior.close();
417             } catch (Exception e) {
418                 log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
419             }
420         }
421     }
422
423     @Override
424     @Nullable
425     public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
426         return leadershipTransferCohort;
427     }
428
429     @Override
430     public void setRaftActorLeadershipTransferCohort(
431             @Nullable RaftActorLeadershipTransferCohort leadershipTransferCohort) {
432         this.leadershipTransferCohort = leadershipTransferCohort;
433     }
434 }