Reduce JSR305 proliferation
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorContext;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import com.google.common.annotations.VisibleForTesting;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.function.Consumer;
28 import java.util.function.LongSupplier;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
34 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
36 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
37 import org.slf4j.Logger;
38
39 /**
40  * Implementation of the RaftActorContext interface.
41  *
42  * @author Moiz Raja
43  * @author Thomas Pantelis
44  */
45 public class RaftActorContextImpl implements RaftActorContext {
46     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
47
48     private final ActorRef actor;
49
50     private final ActorContext context;
51
52     private final String id;
53
54     private final ElectionTerm termInformation;
55
56     private long commitIndex;
57
58     private long lastApplied;
59
60     private ReplicatedLog replicatedLog;
61
62     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
63
64     private final Logger log;
65
66     private ConfigParams configParams;
67
68     private boolean dynamicServerConfiguration = false;
69
70     @VisibleForTesting
71     private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
72
73     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
74     // be passed to it in the constructor
75     private SnapshotManager snapshotManager;
76
77     private final DataPersistenceProvider persistenceProvider;
78
79     private short payloadVersion;
80
81     private boolean votingMember = true;
82
83     private RaftActorBehavior currentBehavior;
84
85     private int numVotingPeers = -1;
86
87     private Optional<Cluster> cluster;
88
89     private final Consumer<ApplyState> applyStateConsumer;
90
91     private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory;
92
93     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
94
95     public RaftActorContextImpl(final ActorRef actor, final ActorContext context, final String id,
96             final @NonNull ElectionTerm termInformation, final long commitIndex, final long lastApplied,
97             final @NonNull Map<String, String> peerAddresses,
98             final @NonNull ConfigParams configParams, final @NonNull DataPersistenceProvider persistenceProvider,
99             final @NonNull Consumer<ApplyState> applyStateConsumer, final @NonNull Logger logger) {
100         this.actor = actor;
101         this.context = context;
102         this.id = id;
103         this.termInformation = requireNonNull(termInformation);
104         this.commitIndex = commitIndex;
105         this.lastApplied = lastApplied;
106         this.configParams = requireNonNull(configParams);
107         this.persistenceProvider = requireNonNull(persistenceProvider);
108         this.log = requireNonNull(logger);
109         this.applyStateConsumer = requireNonNull(applyStateConsumer);
110
111         fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
112                 configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
113
114         for (Map.Entry<String, String> e : requireNonNull(peerAddresses).entrySet()) {
115             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
116         }
117     }
118
119     @VisibleForTesting
120     public void setPayloadVersion(final short payloadVersion) {
121         this.payloadVersion = payloadVersion;
122     }
123
124     @Override
125     public short getPayloadVersion() {
126         return payloadVersion;
127     }
128
129     public void setConfigParams(final ConfigParams configParams) {
130         this.configParams = configParams;
131     }
132
133     @Override
134     public ActorRef actorOf(final Props props) {
135         return context.actorOf(props);
136     }
137
138     @Override
139     public ActorSelection actorSelection(final String path) {
140         return context.actorSelection(path);
141     }
142
143     @Override
144     public String getId() {
145         return id;
146     }
147
148     @Override
149     public ActorRef getActor() {
150         return actor;
151     }
152
153     @Override
154     @SuppressWarnings("checkstyle:IllegalCatch")
155     public Optional<Cluster> getCluster() {
156         if (cluster == null) {
157             try {
158                 cluster = Optional.of(Cluster.get(getActorSystem()));
159             } catch (Exception e) {
160                 // An exception means there's no cluster configured. This will only happen in unit tests.
161                 log.debug("{}: Could not obtain Cluster", getId(), e);
162                 cluster = Optional.empty();
163             }
164         }
165
166         return cluster;
167     }
168
169     @Override
170     public ElectionTerm getTermInformation() {
171         return termInformation;
172     }
173
174     @Override
175     public long getCommitIndex() {
176         return commitIndex;
177     }
178
179     @Override public void setCommitIndex(final long commitIndex) {
180         this.commitIndex = commitIndex;
181     }
182
183     @Override
184     public long getLastApplied() {
185         return lastApplied;
186     }
187
188     @Override
189     public void setLastApplied(final long lastApplied) {
190         final Throwable stackTrace = log.isTraceEnabled() ? new Throwable() : null;
191         log.debug("{}: Moving last applied index from {} to {}", id, this.lastApplied, lastApplied, stackTrace);
192         this.lastApplied = lastApplied;
193     }
194
195     @Override
196     public void setReplicatedLog(final ReplicatedLog replicatedLog) {
197         this.replicatedLog = replicatedLog;
198     }
199
200     @Override
201     public ReplicatedLog getReplicatedLog() {
202         return replicatedLog;
203     }
204
205     @Override
206     public ActorSystem getActorSystem() {
207         return context.system();
208     }
209
210     @Override
211     public Logger getLogger() {
212         return this.log;
213     }
214
215     @Override
216     public Collection<String> getPeerIds() {
217         return peerInfoMap.keySet();
218     }
219
220     @Override
221     public Collection<PeerInfo> getPeers() {
222         return peerInfoMap.values();
223     }
224
225     @Override
226     public PeerInfo getPeerInfo(final String peerId) {
227         return peerInfoMap.get(peerId);
228     }
229
230     @Override
231     public String getPeerAddress(final String peerId) {
232         String peerAddress;
233         PeerInfo peerInfo = peerInfoMap.get(peerId);
234         if (peerInfo != null) {
235             peerAddress = peerInfo.getAddress();
236             if (peerAddress == null) {
237                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
238                 peerInfo.setAddress(peerAddress);
239             }
240         } else {
241             peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
242         }
243
244         return peerAddress;
245     }
246
247     @Override
248     public void updatePeerIds(final ServerConfigurationPayload serverConfig) {
249         votingMember = true;
250         boolean foundSelf = false;
251         Set<String> currentPeers = new HashSet<>(this.getPeerIds());
252         for (ServerInfo server : serverConfig.getServerConfig()) {
253             if (getId().equals(server.getId())) {
254                 foundSelf = true;
255                 if (!server.isVoting()) {
256                     votingMember = false;
257                 }
258             } else {
259                 VotingState votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
260                 if (!currentPeers.contains(server.getId())) {
261                     this.addToPeers(server.getId(), null, votingState);
262                 } else {
263                     this.getPeerInfo(server.getId()).setVotingState(votingState);
264                     currentPeers.remove(server.getId());
265                 }
266             }
267         }
268
269         for (String peerIdToRemove : currentPeers) {
270             this.removePeer(peerIdToRemove);
271         }
272
273         if (!foundSelf) {
274             votingMember = false;
275         }
276
277         log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
278
279         setDynamicServerConfigurationInUse();
280     }
281
282     @Override public ConfigParams getConfigParams() {
283         return configParams;
284     }
285
286     @Override
287     public void addToPeers(final String peerId, final String address, final VotingState votingState) {
288         peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
289         numVotingPeers = -1;
290     }
291
292     @Override
293     public void removePeer(final String name) {
294         if (getId().equals(name)) {
295             votingMember = false;
296         } else {
297             peerInfoMap.remove(name);
298             numVotingPeers = -1;
299         }
300     }
301
302     @Override public ActorSelection getPeerActorSelection(final String peerId) {
303         String peerAddress = getPeerAddress(peerId);
304         if (peerAddress != null) {
305             return actorSelection(peerAddress);
306         }
307         return null;
308     }
309
310     @Override
311     public void setPeerAddress(final String peerId, final String peerAddress) {
312         PeerInfo peerInfo = peerInfoMap.get(peerId);
313         if (peerInfo != null) {
314             log.info("Peer address for peer {} set to {}", peerId, peerAddress);
315             peerInfo.setAddress(peerAddress);
316         }
317     }
318
319     @Override
320     public SnapshotManager getSnapshotManager() {
321         if (snapshotManager == null) {
322             snapshotManager = new SnapshotManager(this, log);
323         }
324         return snapshotManager;
325     }
326
327     @Override
328     public long getTotalMemory() {
329         return totalMemoryRetriever.getAsLong();
330     }
331
332     @Override
333     public void setTotalMemoryRetriever(final LongSupplier retriever) {
334         totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
335     }
336
337     @Override
338     public boolean hasFollowers() {
339         return !getPeerIds().isEmpty();
340     }
341
342     @Override
343     public DataPersistenceProvider getPersistenceProvider() {
344         return persistenceProvider;
345     }
346
347
348     @Override
349     public RaftPolicy getRaftPolicy() {
350         return configParams.getRaftPolicy();
351     }
352
353     @Override
354     public boolean isDynamicServerConfigurationInUse() {
355         return dynamicServerConfiguration;
356     }
357
358     @Override
359     public void setDynamicServerConfigurationInUse() {
360         this.dynamicServerConfiguration = true;
361     }
362
363     @Override
364     public ServerConfigurationPayload getPeerServerInfo(final boolean includeSelf) {
365         if (!isDynamicServerConfigurationInUse()) {
366             return null;
367         }
368         Collection<PeerInfo> peers = getPeers();
369         List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
370         for (PeerInfo peer: peers) {
371             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
372         }
373
374         if (includeSelf) {
375             newConfig.add(new ServerInfo(getId(), votingMember));
376         }
377
378         return new ServerConfigurationPayload(newConfig);
379     }
380
381     @Override
382     public boolean isVotingMember() {
383         return votingMember;
384     }
385
386     @Override
387     public boolean anyVotingPeers() {
388         if (numVotingPeers < 0) {
389             numVotingPeers = 0;
390             for (PeerInfo info: getPeers()) {
391                 if (info.isVoting()) {
392                     numVotingPeers++;
393                 }
394             }
395         }
396
397         return numVotingPeers > 0;
398     }
399
400     @Override
401     public RaftActorBehavior getCurrentBehavior() {
402         return currentBehavior;
403     }
404
405     void setCurrentBehavior(final RaftActorBehavior behavior) {
406         this.currentBehavior = requireNonNull(behavior);
407     }
408
409     @Override
410     public Consumer<ApplyState> getApplyStateConsumer() {
411         return applyStateConsumer;
412     }
413
414     @Override
415     public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() {
416         return fileBackedOutputStreamFactory;
417     }
418
419     @SuppressWarnings("checkstyle:IllegalCatch")
420     void close() {
421         if (currentBehavior != null) {
422             try {
423                 currentBehavior.close();
424             } catch (Exception e) {
425                 log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
426             }
427         }
428     }
429
430     @Override
431     public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
432         return leadershipTransferCohort;
433     }
434
435     @Override
436     @SuppressWarnings("checkstyle:hiddenField")
437     public void setRaftActorLeadershipTransferCohort(final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
438         this.leadershipTransferCohort = leadershipTransferCohort;
439     }
440 }