Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorContext;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.collect.ImmutableList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.concurrent.Executor;
26 import java.util.function.Consumer;
27 import java.util.function.LongSupplier;
28 import org.eclipse.jdt.annotation.NonNull;
29 import org.opendaylight.controller.cluster.DataPersistenceProvider;
30 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
33 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
34 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
35 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
36 import org.slf4j.Logger;
37
38 /**
39  * Implementation of the RaftActorContext interface.
40  *
41  * @author Moiz Raja
42  * @author Thomas Pantelis
43  */
44 public class RaftActorContextImpl implements RaftActorContext {
45     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
46
47     private final ActorRef actor;
48
49     private final ActorContext context;
50
51     private final @NonNull Executor executor;
52
53     private final String id;
54
55     private final ElectionTerm termInformation;
56
57     private long commitIndex;
58
59     private long lastApplied;
60
61     private ReplicatedLog replicatedLog;
62
63     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
64
65     private final Logger log;
66
67     private ConfigParams configParams;
68
69     private boolean dynamicServerConfiguration = false;
70
71     @VisibleForTesting
72     private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
73
74     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
75     // be passed to it in the constructor
76     private SnapshotManager snapshotManager;
77
78     private final DataPersistenceProvider persistenceProvider;
79
80     private short payloadVersion;
81
82     private boolean votingMember = true;
83
84     private RaftActorBehavior currentBehavior;
85
86     private int numVotingPeers = -1;
87
88     private Optional<Cluster> cluster;
89
90     private final Consumer<ApplyState> applyStateConsumer;
91
92     private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory;
93
94     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
95
96     public RaftActorContextImpl(final ActorRef actor, final ActorContext context, final String id,
97             final @NonNull ElectionTerm termInformation, final long commitIndex, final long lastApplied,
98             final @NonNull Map<String, String> peerAddresses,
99             final @NonNull ConfigParams configParams, final @NonNull DataPersistenceProvider persistenceProvider,
100             final @NonNull Consumer<ApplyState> applyStateConsumer, final @NonNull Logger logger,
101             final @NonNull Executor executor) {
102         this.actor = actor;
103         this.context = context;
104         this.id = id;
105         this.termInformation = requireNonNull(termInformation);
106         this.executor = requireNonNull(executor);
107         this.commitIndex = commitIndex;
108         this.lastApplied = lastApplied;
109         this.configParams = requireNonNull(configParams);
110         this.persistenceProvider = requireNonNull(persistenceProvider);
111         log = requireNonNull(logger);
112         this.applyStateConsumer = requireNonNull(applyStateConsumer);
113
114         fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
115                 configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
116
117         for (Map.Entry<String, String> e : requireNonNull(peerAddresses).entrySet()) {
118             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
119         }
120     }
121
122     @VisibleForTesting
123     public void setPayloadVersion(final short payloadVersion) {
124         this.payloadVersion = payloadVersion;
125     }
126
127     @Override
128     public short getPayloadVersion() {
129         return payloadVersion;
130     }
131
132     public void setConfigParams(final ConfigParams configParams) {
133         this.configParams = configParams;
134     }
135
136     @Override
137     public ActorRef actorOf(final Props props) {
138         return context.actorOf(props);
139     }
140
141     @Override
142     public ActorSelection actorSelection(final String path) {
143         return context.actorSelection(path);
144     }
145
146     @Override
147     public String getId() {
148         return id;
149     }
150
151     @Override
152     public ActorRef getActor() {
153         return actor;
154     }
155
156     @Override
157     public final Executor getExecutor() {
158         return executor;
159     }
160
161     @Override
162     @SuppressWarnings("checkstyle:IllegalCatch")
163     public Optional<Cluster> getCluster() {
164         if (cluster == null) {
165             try {
166                 cluster = Optional.of(Cluster.get(getActorSystem()));
167             } catch (Exception e) {
168                 // An exception means there's no cluster configured. This will only happen in unit tests.
169                 log.debug("{}: Could not obtain Cluster", getId(), e);
170                 cluster = Optional.empty();
171             }
172         }
173
174         return cluster;
175     }
176
177     @Override
178     public ElectionTerm getTermInformation() {
179         return termInformation;
180     }
181
182     @Override
183     public long getCommitIndex() {
184         return commitIndex;
185     }
186
187     @Override public void setCommitIndex(final long commitIndex) {
188         this.commitIndex = commitIndex;
189     }
190
191     @Override
192     public long getLastApplied() {
193         return lastApplied;
194     }
195
196     @Override
197     public void setLastApplied(final long lastApplied) {
198         final Throwable stackTrace = log.isTraceEnabled() ? new Throwable() : null;
199         log.debug("{}: Moving last applied index from {} to {}", id, this.lastApplied, lastApplied, stackTrace);
200         this.lastApplied = lastApplied;
201     }
202
203     @Override
204     public void setReplicatedLog(final ReplicatedLog replicatedLog) {
205         this.replicatedLog = replicatedLog;
206     }
207
208     @Override
209     public ReplicatedLog getReplicatedLog() {
210         return replicatedLog;
211     }
212
213     @Override
214     public ActorSystem getActorSystem() {
215         return context.system();
216     }
217
218     @Override
219     public Logger getLogger() {
220         return log;
221     }
222
223     @Override
224     public Collection<String> getPeerIds() {
225         return peerInfoMap.keySet();
226     }
227
228     @Override
229     public Collection<PeerInfo> getPeers() {
230         return peerInfoMap.values();
231     }
232
233     @Override
234     public PeerInfo getPeerInfo(final String peerId) {
235         return peerInfoMap.get(peerId);
236     }
237
238     @Override
239     public String getPeerAddress(final String peerId) {
240         String peerAddress;
241         PeerInfo peerInfo = peerInfoMap.get(peerId);
242         if (peerInfo != null) {
243             peerAddress = peerInfo.getAddress();
244             if (peerAddress == null) {
245                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
246                 peerInfo.setAddress(peerAddress);
247             }
248         } else {
249             peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
250         }
251
252         return peerAddress;
253     }
254
255     @Override
256     public void updatePeerIds(final ServerConfigurationPayload serverConfig) {
257         boolean newVotingMember = false;
258         var currentPeers = new HashSet<>(getPeerIds());
259         for (var server : serverConfig.getServerConfig()) {
260             if (getId().equals(server.peerId())) {
261                 newVotingMember = server.isVoting();
262             } else {
263                 final var votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
264                 if (currentPeers.contains(server.peerId())) {
265                     getPeerInfo(server.peerId()).setVotingState(votingState);
266                     currentPeers.remove(server.peerId());
267                 } else {
268                     addToPeers(server.peerId(), null, votingState);
269                 }
270             }
271         }
272
273         for (String peerIdToRemove : currentPeers) {
274             removePeer(peerIdToRemove);
275         }
276
277         votingMember = newVotingMember;
278         log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
279
280         setDynamicServerConfigurationInUse();
281     }
282
283     @Override public ConfigParams getConfigParams() {
284         return configParams;
285     }
286
287     @Override
288     public void addToPeers(final String peerId, final String address, final VotingState votingState) {
289         peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
290         numVotingPeers = -1;
291     }
292
293     @Override
294     public void removePeer(final String name) {
295         if (getId().equals(name)) {
296             votingMember = false;
297         } else {
298             peerInfoMap.remove(name);
299             numVotingPeers = -1;
300         }
301     }
302
303     @Override public ActorSelection getPeerActorSelection(final String peerId) {
304         String peerAddress = getPeerAddress(peerId);
305         if (peerAddress != null) {
306             return actorSelection(peerAddress);
307         }
308         return null;
309     }
310
311     @Override
312     public void setPeerAddress(final String peerId, final String peerAddress) {
313         PeerInfo peerInfo = peerInfoMap.get(peerId);
314         if (peerInfo != null) {
315             log.info("Peer address for peer {} set to {}", peerId, peerAddress);
316             peerInfo.setAddress(peerAddress);
317         }
318     }
319
320     @Override
321     public SnapshotManager getSnapshotManager() {
322         if (snapshotManager == null) {
323             snapshotManager = new SnapshotManager(this, log);
324         }
325         return snapshotManager;
326     }
327
328     @Override
329     public long getTotalMemory() {
330         return totalMemoryRetriever.getAsLong();
331     }
332
333     @Override
334     public void setTotalMemoryRetriever(final LongSupplier retriever) {
335         totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
336     }
337
338     @Override
339     public boolean hasFollowers() {
340         return !getPeerIds().isEmpty();
341     }
342
343     @Override
344     public DataPersistenceProvider getPersistenceProvider() {
345         return persistenceProvider;
346     }
347
348
349     @Override
350     public RaftPolicy getRaftPolicy() {
351         return configParams.getRaftPolicy();
352     }
353
354     @Override
355     public boolean isDynamicServerConfigurationInUse() {
356         return dynamicServerConfiguration;
357     }
358
359     @Override
360     public void setDynamicServerConfigurationInUse() {
361         dynamicServerConfiguration = true;
362     }
363
364     @Override
365     public ServerConfigurationPayload getPeerServerInfo(final boolean includeSelf) {
366         if (!isDynamicServerConfigurationInUse()) {
367             return null;
368         }
369         final var peers = getPeers();
370         final var newConfig = ImmutableList.<ServerInfo>builderWithExpectedSize(peers.size() + (includeSelf ? 1 : 0));
371         for (PeerInfo peer : peers) {
372             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
373         }
374
375         if (includeSelf) {
376             newConfig.add(new ServerInfo(getId(), votingMember));
377         }
378
379         return new ServerConfigurationPayload(newConfig.build());
380     }
381
382     @Override
383     public boolean isVotingMember() {
384         return votingMember;
385     }
386
387     @Override
388     public boolean anyVotingPeers() {
389         if (numVotingPeers < 0) {
390             numVotingPeers = 0;
391             for (PeerInfo info: getPeers()) {
392                 if (info.isVoting()) {
393                     numVotingPeers++;
394                 }
395             }
396         }
397
398         return numVotingPeers > 0;
399     }
400
401     @Override
402     public RaftActorBehavior getCurrentBehavior() {
403         return currentBehavior;
404     }
405
406     void setCurrentBehavior(final RaftActorBehavior behavior) {
407         currentBehavior = requireNonNull(behavior);
408     }
409
410     @Override
411     public Consumer<ApplyState> getApplyStateConsumer() {
412         return applyStateConsumer;
413     }
414
415     @Override
416     public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() {
417         return fileBackedOutputStreamFactory;
418     }
419
420     @SuppressWarnings("checkstyle:IllegalCatch")
421     void close() {
422         if (currentBehavior != null) {
423             try {
424                 currentBehavior.close();
425             } catch (Exception e) {
426                 log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
427             }
428         }
429     }
430
431     @Override
432     public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
433         return leadershipTransferCohort;
434     }
435
436     @Override
437     @SuppressWarnings("checkstyle:hiddenField")
438     public void setRaftActorLeadershipTransferCohort(final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
439         this.leadershipTransferCohort = leadershipTransferCohort;
440     }
441 }