Remove legacy RaftRPC proxies
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.raft;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorContext;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.collect.ImmutableList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.Map;
24 import java.util.Optional;
25 import java.util.Set;
26 import java.util.concurrent.Executor;
27 import java.util.function.Consumer;
28 import java.util.function.LongSupplier;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
34 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
36 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
37 import org.slf4j.Logger;
38
39 /**
40  * Implementation of the RaftActorContext interface.
41  *
42  * @author Moiz Raja
43  * @author Thomas Pantelis
44  */
45 public class RaftActorContextImpl implements RaftActorContext {
46     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
47
48     private final ActorRef actor;
49
50     private final ActorContext context;
51
52     private final @NonNull Executor executor;
53
54     private final String id;
55
56     private final ElectionTerm termInformation;
57
58     private long commitIndex;
59
60     private long lastApplied;
61
62     private ReplicatedLog replicatedLog;
63
64     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
65
66     private final Logger log;
67
68     private ConfigParams configParams;
69
70     private boolean dynamicServerConfiguration = false;
71
72     @VisibleForTesting
73     private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
74
75     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
76     // be passed to it in the constructor
77     private SnapshotManager snapshotManager;
78
79     private final DataPersistenceProvider persistenceProvider;
80
81     private short payloadVersion;
82
83     private boolean votingMember = true;
84
85     private RaftActorBehavior currentBehavior;
86
87     private int numVotingPeers = -1;
88
89     private Optional<Cluster> cluster;
90
91     private final Consumer<ApplyState> applyStateConsumer;
92
93     private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory;
94
95     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
96
97     public RaftActorContextImpl(final ActorRef actor, final ActorContext context, final String id,
98             final @NonNull ElectionTerm termInformation, final long commitIndex, final long lastApplied,
99             final @NonNull Map<String, String> peerAddresses,
100             final @NonNull ConfigParams configParams, final @NonNull DataPersistenceProvider persistenceProvider,
101             final @NonNull Consumer<ApplyState> applyStateConsumer, final @NonNull Logger logger,
102             final @NonNull Executor executor) {
103         this.actor = actor;
104         this.context = context;
105         this.id = id;
106         this.termInformation = requireNonNull(termInformation);
107         this.executor = requireNonNull(executor);
108         this.commitIndex = commitIndex;
109         this.lastApplied = lastApplied;
110         this.configParams = requireNonNull(configParams);
111         this.persistenceProvider = requireNonNull(persistenceProvider);
112         log = requireNonNull(logger);
113         this.applyStateConsumer = requireNonNull(applyStateConsumer);
114
115         fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
116                 configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
117
118         for (Map.Entry<String, String> e : requireNonNull(peerAddresses).entrySet()) {
119             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
120         }
121     }
122
123     @VisibleForTesting
124     public void setPayloadVersion(final short payloadVersion) {
125         this.payloadVersion = payloadVersion;
126     }
127
128     @Override
129     public short getPayloadVersion() {
130         return payloadVersion;
131     }
132
133     public void setConfigParams(final ConfigParams configParams) {
134         this.configParams = configParams;
135     }
136
137     @Override
138     public ActorRef actorOf(final Props props) {
139         return context.actorOf(props);
140     }
141
142     @Override
143     public ActorSelection actorSelection(final String path) {
144         return context.actorSelection(path);
145     }
146
147     @Override
148     public String getId() {
149         return id;
150     }
151
152     @Override
153     public ActorRef getActor() {
154         return actor;
155     }
156
157     @Override
158     public final Executor getExecutor() {
159         return executor;
160     }
161
162     @Override
163     @SuppressWarnings("checkstyle:IllegalCatch")
164     public Optional<Cluster> getCluster() {
165         if (cluster == null) {
166             try {
167                 cluster = Optional.of(Cluster.get(getActorSystem()));
168             } catch (Exception e) {
169                 // An exception means there's no cluster configured. This will only happen in unit tests.
170                 log.debug("{}: Could not obtain Cluster", getId(), e);
171                 cluster = Optional.empty();
172             }
173         }
174
175         return cluster;
176     }
177
178     @Override
179     public ElectionTerm getTermInformation() {
180         return termInformation;
181     }
182
183     @Override
184     public long getCommitIndex() {
185         return commitIndex;
186     }
187
188     @Override public void setCommitIndex(final long commitIndex) {
189         this.commitIndex = commitIndex;
190     }
191
192     @Override
193     public long getLastApplied() {
194         return lastApplied;
195     }
196
197     @Override
198     public void setLastApplied(final long lastApplied) {
199         final Throwable stackTrace = log.isTraceEnabled() ? new Throwable() : null;
200         log.debug("{}: Moving last applied index from {} to {}", id, this.lastApplied, lastApplied, stackTrace);
201         this.lastApplied = lastApplied;
202     }
203
204     @Override
205     public void setReplicatedLog(final ReplicatedLog replicatedLog) {
206         this.replicatedLog = replicatedLog;
207     }
208
209     @Override
210     public ReplicatedLog getReplicatedLog() {
211         return replicatedLog;
212     }
213
214     @Override
215     public ActorSystem getActorSystem() {
216         return context.system();
217     }
218
219     @Override
220     public Logger getLogger() {
221         return log;
222     }
223
224     @Override
225     public Collection<String> getPeerIds() {
226         return peerInfoMap.keySet();
227     }
228
229     @Override
230     public Collection<PeerInfo> getPeers() {
231         return peerInfoMap.values();
232     }
233
234     @Override
235     public PeerInfo getPeerInfo(final String peerId) {
236         return peerInfoMap.get(peerId);
237     }
238
239     @Override
240     public String getPeerAddress(final String peerId) {
241         String peerAddress;
242         PeerInfo peerInfo = peerInfoMap.get(peerId);
243         if (peerInfo != null) {
244             peerAddress = peerInfo.getAddress();
245             if (peerAddress == null) {
246                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
247                 peerInfo.setAddress(peerAddress);
248             }
249         } else {
250             peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
251         }
252
253         return peerAddress;
254     }
255
256     @Override
257     public void updatePeerIds(final ServerConfigurationPayload serverConfig) {
258         votingMember = true;
259         boolean foundSelf = false;
260         Set<String> currentPeers = new HashSet<>(getPeerIds());
261         for (ServerInfo server : serverConfig.getServerConfig()) {
262             if (getId().equals(server.getId())) {
263                 foundSelf = true;
264                 if (!server.isVoting()) {
265                     votingMember = false;
266                 }
267             } else {
268                 VotingState votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
269                 if (!currentPeers.contains(server.getId())) {
270                     addToPeers(server.getId(), null, votingState);
271                 } else {
272                     getPeerInfo(server.getId()).setVotingState(votingState);
273                     currentPeers.remove(server.getId());
274                 }
275             }
276         }
277
278         for (String peerIdToRemove : currentPeers) {
279             removePeer(peerIdToRemove);
280         }
281
282         if (!foundSelf) {
283             votingMember = false;
284         }
285
286         log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
287
288         setDynamicServerConfigurationInUse();
289     }
290
291     @Override public ConfigParams getConfigParams() {
292         return configParams;
293     }
294
295     @Override
296     public void addToPeers(final String peerId, final String address, final VotingState votingState) {
297         peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
298         numVotingPeers = -1;
299     }
300
301     @Override
302     public void removePeer(final String name) {
303         if (getId().equals(name)) {
304             votingMember = false;
305         } else {
306             peerInfoMap.remove(name);
307             numVotingPeers = -1;
308         }
309     }
310
311     @Override public ActorSelection getPeerActorSelection(final String peerId) {
312         String peerAddress = getPeerAddress(peerId);
313         if (peerAddress != null) {
314             return actorSelection(peerAddress);
315         }
316         return null;
317     }
318
319     @Override
320     public void setPeerAddress(final String peerId, final String peerAddress) {
321         PeerInfo peerInfo = peerInfoMap.get(peerId);
322         if (peerInfo != null) {
323             log.info("Peer address for peer {} set to {}", peerId, peerAddress);
324             peerInfo.setAddress(peerAddress);
325         }
326     }
327
328     @Override
329     public SnapshotManager getSnapshotManager() {
330         if (snapshotManager == null) {
331             snapshotManager = new SnapshotManager(this, log);
332         }
333         return snapshotManager;
334     }
335
336     @Override
337     public long getTotalMemory() {
338         return totalMemoryRetriever.getAsLong();
339     }
340
341     @Override
342     public void setTotalMemoryRetriever(final LongSupplier retriever) {
343         totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
344     }
345
346     @Override
347     public boolean hasFollowers() {
348         return !getPeerIds().isEmpty();
349     }
350
351     @Override
352     public DataPersistenceProvider getPersistenceProvider() {
353         return persistenceProvider;
354     }
355
356
357     @Override
358     public RaftPolicy getRaftPolicy() {
359         return configParams.getRaftPolicy();
360     }
361
362     @Override
363     public boolean isDynamicServerConfigurationInUse() {
364         return dynamicServerConfiguration;
365     }
366
367     @Override
368     public void setDynamicServerConfigurationInUse() {
369         dynamicServerConfiguration = true;
370     }
371
372     @Override
373     public ServerConfigurationPayload getPeerServerInfo(final boolean includeSelf) {
374         if (!isDynamicServerConfigurationInUse()) {
375             return null;
376         }
377         final var peers = getPeers();
378         final var newConfig = ImmutableList.<ServerInfo>builderWithExpectedSize(peers.size() + (includeSelf ? 1 : 0));
379         for (PeerInfo peer : peers) {
380             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
381         }
382
383         if (includeSelf) {
384             newConfig.add(new ServerInfo(getId(), votingMember));
385         }
386
387         return new ServerConfigurationPayload(newConfig.build());
388     }
389
390     @Override
391     public boolean isVotingMember() {
392         return votingMember;
393     }
394
395     @Override
396     public boolean anyVotingPeers() {
397         if (numVotingPeers < 0) {
398             numVotingPeers = 0;
399             for (PeerInfo info: getPeers()) {
400                 if (info.isVoting()) {
401                     numVotingPeers++;
402                 }
403             }
404         }
405
406         return numVotingPeers > 0;
407     }
408
409     @Override
410     public RaftActorBehavior getCurrentBehavior() {
411         return currentBehavior;
412     }
413
414     void setCurrentBehavior(final RaftActorBehavior behavior) {
415         currentBehavior = requireNonNull(behavior);
416     }
417
418     @Override
419     public Consumer<ApplyState> getApplyStateConsumer() {
420         return applyStateConsumer;
421     }
422
423     @Override
424     public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() {
425         return fileBackedOutputStreamFactory;
426     }
427
428     @SuppressWarnings("checkstyle:IllegalCatch")
429     void close() {
430         if (currentBehavior != null) {
431             try {
432                 currentBehavior.close();
433             } catch (Exception e) {
434                 log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
435             }
436         }
437     }
438
439     @Override
440     public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
441         return leadershipTransferCohort;
442     }
443
444     @Override
445     @SuppressWarnings("checkstyle:hiddenField")
446     public void setRaftActorLeadershipTransferCohort(final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
447         this.leadershipTransferCohort = leadershipTransferCohort;
448     }
449 }