Further Guava Optional cleanups
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorContext;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import com.google.common.annotations.VisibleForTesting;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.concurrent.Executor;
28 import java.util.function.Consumer;
29 import java.util.function.LongSupplier;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
34 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
38 import org.slf4j.Logger;
39
40 /**
41  * Implementation of the RaftActorContext interface.
42  *
43  * @author Moiz Raja
44  * @author Thomas Pantelis
45  */
46 public class RaftActorContextImpl implements RaftActorContext {
47     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
48
49     private final ActorRef actor;
50
51     private final ActorContext context;
52
53     private final @NonNull Executor executor;
54
55     private final String id;
56
57     private final ElectionTerm termInformation;
58
59     private long commitIndex;
60
61     private long lastApplied;
62
63     private ReplicatedLog replicatedLog;
64
65     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
66
67     private final Logger log;
68
69     private ConfigParams configParams;
70
71     private boolean dynamicServerConfiguration = false;
72
73     @VisibleForTesting
74     private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
75
76     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
77     // be passed to it in the constructor
78     private SnapshotManager snapshotManager;
79
80     private final DataPersistenceProvider persistenceProvider;
81
82     private short payloadVersion;
83
84     private boolean votingMember = true;
85
86     private RaftActorBehavior currentBehavior;
87
88     private int numVotingPeers = -1;
89
90     private Optional<Cluster> cluster;
91
92     private final Consumer<ApplyState> applyStateConsumer;
93
94     private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory;
95
96     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
97
98     public RaftActorContextImpl(final ActorRef actor, final ActorContext context, final String id,
99             final @NonNull ElectionTerm termInformation, final long commitIndex, final long lastApplied,
100             final @NonNull Map<String, String> peerAddresses,
101             final @NonNull ConfigParams configParams, final @NonNull DataPersistenceProvider persistenceProvider,
102             final @NonNull Consumer<ApplyState> applyStateConsumer, final @NonNull Logger logger,
103             final @NonNull Executor executor) {
104         this.actor = actor;
105         this.context = context;
106         this.id = id;
107         this.termInformation = requireNonNull(termInformation);
108         this.executor = requireNonNull(executor);
109         this.commitIndex = commitIndex;
110         this.lastApplied = lastApplied;
111         this.configParams = requireNonNull(configParams);
112         this.persistenceProvider = requireNonNull(persistenceProvider);
113         this.log = requireNonNull(logger);
114         this.applyStateConsumer = requireNonNull(applyStateConsumer);
115
116         fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
117                 configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
118
119         for (Map.Entry<String, String> e : requireNonNull(peerAddresses).entrySet()) {
120             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
121         }
122     }
123
124     @VisibleForTesting
125     public void setPayloadVersion(final short payloadVersion) {
126         this.payloadVersion = payloadVersion;
127     }
128
129     @Override
130     public short getPayloadVersion() {
131         return payloadVersion;
132     }
133
134     public void setConfigParams(final ConfigParams configParams) {
135         this.configParams = configParams;
136     }
137
138     @Override
139     public ActorRef actorOf(final Props props) {
140         return context.actorOf(props);
141     }
142
143     @Override
144     public ActorSelection actorSelection(final String path) {
145         return context.actorSelection(path);
146     }
147
148     @Override
149     public String getId() {
150         return id;
151     }
152
153     @Override
154     public ActorRef getActor() {
155         return actor;
156     }
157
158     @Override
159     public final Executor getExecutor() {
160         return executor;
161     }
162
163     @Override
164     @SuppressWarnings("checkstyle:IllegalCatch")
165     public Optional<Cluster> getCluster() {
166         if (cluster == null) {
167             try {
168                 cluster = Optional.of(Cluster.get(getActorSystem()));
169             } catch (Exception e) {
170                 // An exception means there's no cluster configured. This will only happen in unit tests.
171                 log.debug("{}: Could not obtain Cluster", getId(), e);
172                 cluster = Optional.empty();
173             }
174         }
175
176         return cluster;
177     }
178
179     @Override
180     public ElectionTerm getTermInformation() {
181         return termInformation;
182     }
183
184     @Override
185     public long getCommitIndex() {
186         return commitIndex;
187     }
188
189     @Override public void setCommitIndex(final long commitIndex) {
190         this.commitIndex = commitIndex;
191     }
192
193     @Override
194     public long getLastApplied() {
195         return lastApplied;
196     }
197
198     @Override
199     public void setLastApplied(final long lastApplied) {
200         final Throwable stackTrace = log.isTraceEnabled() ? new Throwable() : null;
201         log.debug("{}: Moving last applied index from {} to {}", id, this.lastApplied, lastApplied, stackTrace);
202         this.lastApplied = lastApplied;
203     }
204
205     @Override
206     public void setReplicatedLog(final ReplicatedLog replicatedLog) {
207         this.replicatedLog = replicatedLog;
208     }
209
210     @Override
211     public ReplicatedLog getReplicatedLog() {
212         return replicatedLog;
213     }
214
215     @Override
216     public ActorSystem getActorSystem() {
217         return context.system();
218     }
219
220     @Override
221     public Logger getLogger() {
222         return this.log;
223     }
224
225     @Override
226     public Collection<String> getPeerIds() {
227         return peerInfoMap.keySet();
228     }
229
230     @Override
231     public Collection<PeerInfo> getPeers() {
232         return peerInfoMap.values();
233     }
234
235     @Override
236     public PeerInfo getPeerInfo(final String peerId) {
237         return peerInfoMap.get(peerId);
238     }
239
240     @Override
241     public String getPeerAddress(final String peerId) {
242         String peerAddress;
243         PeerInfo peerInfo = peerInfoMap.get(peerId);
244         if (peerInfo != null) {
245             peerAddress = peerInfo.getAddress();
246             if (peerAddress == null) {
247                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
248                 peerInfo.setAddress(peerAddress);
249             }
250         } else {
251             peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
252         }
253
254         return peerAddress;
255     }
256
257     @Override
258     public void updatePeerIds(final ServerConfigurationPayload serverConfig) {
259         votingMember = true;
260         boolean foundSelf = false;
261         Set<String> currentPeers = new HashSet<>(this.getPeerIds());
262         for (ServerInfo server : serverConfig.getServerConfig()) {
263             if (getId().equals(server.getId())) {
264                 foundSelf = true;
265                 if (!server.isVoting()) {
266                     votingMember = false;
267                 }
268             } else {
269                 VotingState votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
270                 if (!currentPeers.contains(server.getId())) {
271                     this.addToPeers(server.getId(), null, votingState);
272                 } else {
273                     this.getPeerInfo(server.getId()).setVotingState(votingState);
274                     currentPeers.remove(server.getId());
275                 }
276             }
277         }
278
279         for (String peerIdToRemove : currentPeers) {
280             this.removePeer(peerIdToRemove);
281         }
282
283         if (!foundSelf) {
284             votingMember = false;
285         }
286
287         log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
288
289         setDynamicServerConfigurationInUse();
290     }
291
292     @Override public ConfigParams getConfigParams() {
293         return configParams;
294     }
295
296     @Override
297     public void addToPeers(final String peerId, final String address, final VotingState votingState) {
298         peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
299         numVotingPeers = -1;
300     }
301
302     @Override
303     public void removePeer(final String name) {
304         if (getId().equals(name)) {
305             votingMember = false;
306         } else {
307             peerInfoMap.remove(name);
308             numVotingPeers = -1;
309         }
310     }
311
312     @Override public ActorSelection getPeerActorSelection(final String peerId) {
313         String peerAddress = getPeerAddress(peerId);
314         if (peerAddress != null) {
315             return actorSelection(peerAddress);
316         }
317         return null;
318     }
319
320     @Override
321     public void setPeerAddress(final String peerId, final String peerAddress) {
322         PeerInfo peerInfo = peerInfoMap.get(peerId);
323         if (peerInfo != null) {
324             log.info("Peer address for peer {} set to {}", peerId, peerAddress);
325             peerInfo.setAddress(peerAddress);
326         }
327     }
328
329     @Override
330     public SnapshotManager getSnapshotManager() {
331         if (snapshotManager == null) {
332             snapshotManager = new SnapshotManager(this, log);
333         }
334         return snapshotManager;
335     }
336
337     @Override
338     public long getTotalMemory() {
339         return totalMemoryRetriever.getAsLong();
340     }
341
342     @Override
343     public void setTotalMemoryRetriever(final LongSupplier retriever) {
344         totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
345     }
346
347     @Override
348     public boolean hasFollowers() {
349         return !getPeerIds().isEmpty();
350     }
351
352     @Override
353     public DataPersistenceProvider getPersistenceProvider() {
354         return persistenceProvider;
355     }
356
357
358     @Override
359     public RaftPolicy getRaftPolicy() {
360         return configParams.getRaftPolicy();
361     }
362
363     @Override
364     public boolean isDynamicServerConfigurationInUse() {
365         return dynamicServerConfiguration;
366     }
367
368     @Override
369     public void setDynamicServerConfigurationInUse() {
370         this.dynamicServerConfiguration = true;
371     }
372
373     @Override
374     public ServerConfigurationPayload getPeerServerInfo(final boolean includeSelf) {
375         if (!isDynamicServerConfigurationInUse()) {
376             return null;
377         }
378         Collection<PeerInfo> peers = getPeers();
379         List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
380         for (PeerInfo peer: peers) {
381             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
382         }
383
384         if (includeSelf) {
385             newConfig.add(new ServerInfo(getId(), votingMember));
386         }
387
388         return new ServerConfigurationPayload(newConfig);
389     }
390
391     @Override
392     public boolean isVotingMember() {
393         return votingMember;
394     }
395
396     @Override
397     public boolean anyVotingPeers() {
398         if (numVotingPeers < 0) {
399             numVotingPeers = 0;
400             for (PeerInfo info: getPeers()) {
401                 if (info.isVoting()) {
402                     numVotingPeers++;
403                 }
404             }
405         }
406
407         return numVotingPeers > 0;
408     }
409
410     @Override
411     public RaftActorBehavior getCurrentBehavior() {
412         return currentBehavior;
413     }
414
415     void setCurrentBehavior(final RaftActorBehavior behavior) {
416         this.currentBehavior = requireNonNull(behavior);
417     }
418
419     @Override
420     public Consumer<ApplyState> getApplyStateConsumer() {
421         return applyStateConsumer;
422     }
423
424     @Override
425     public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() {
426         return fileBackedOutputStreamFactory;
427     }
428
429     @SuppressWarnings("checkstyle:IllegalCatch")
430     void close() {
431         if (currentBehavior != null) {
432             try {
433                 currentBehavior.close();
434             } catch (Exception e) {
435                 log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
436             }
437         }
438     }
439
440     @Override
441     public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
442         return leadershipTransferCohort;
443     }
444
445     @Override
446     @SuppressWarnings("checkstyle:hiddenField")
447     public void setRaftActorLeadershipTransferCohort(final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
448         this.leadershipTransferCohort = leadershipTransferCohort;
449     }
450 }