2296eabe27a9ed818031af09360130a9815511e2
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Preconditions;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.function.Consumer;
28 import java.util.function.LongSupplier;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 import org.opendaylight.controller.cluster.DataPersistenceProvider;
32 import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
33 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
34 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
36 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
37 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
38 import org.slf4j.Logger;
39
40 /**
41  * Implementation of the RaftActorContext interface.
42  *
43  * @author Moiz Raja
44  * @author Thomas Pantelis
45  */
46 public class RaftActorContextImpl implements RaftActorContext {
47     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
48
49     private final ActorRef actor;
50
51     private final ActorContext context;
52
53     private final String id;
54
55     private final ElectionTerm termInformation;
56
57     private long commitIndex;
58
59     private long lastApplied;
60
61     private ReplicatedLog replicatedLog;
62
63     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
64
65     private final Logger log;
66
67     private ConfigParams configParams;
68
69     private boolean dynamicServerConfiguration = false;
70
71     @VisibleForTesting
72     private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
73
74     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
75     // be passed to it in the constructor
76     private SnapshotManager snapshotManager;
77
78     private final DataPersistenceProvider persistenceProvider;
79
80     private short payloadVersion;
81
82     private boolean votingMember = true;
83
84     private RaftActorBehavior currentBehavior;
85
86     private int numVotingPeers = -1;
87
88     private Optional<Cluster> cluster;
89
90     private final Consumer<ApplyState> applyStateConsumer;
91
92     private final FileBackedOutputStreamFactory fileBackedOutputStreamFactory;
93
94     private RaftActorLeadershipTransferCohort leadershipTransferCohort;
95
96     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
97             @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
98             @Nonnull Map<String, String> peerAddresses,
99             @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
100             @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
101         this.actor = actor;
102         this.context = context;
103         this.id = id;
104         this.termInformation = Preconditions.checkNotNull(termInformation);
105         this.commitIndex = commitIndex;
106         this.lastApplied = lastApplied;
107         this.configParams = Preconditions.checkNotNull(configParams);
108         this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
109         this.log = Preconditions.checkNotNull(logger);
110         this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
111
112         fileBackedOutputStreamFactory = new FileBackedOutputStreamFactory(
113                 configParams.getFileBackedStreamingThreshold(), configParams.getTempFileDirectory());
114
115         for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
116             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
117         }
118     }
119
120     @VisibleForTesting
121     public void setPayloadVersion(short payloadVersion) {
122         this.payloadVersion = payloadVersion;
123     }
124
125     @Override
126     public short getPayloadVersion() {
127         return payloadVersion;
128     }
129
130     public void setConfigParams(ConfigParams configParams) {
131         this.configParams = configParams;
132     }
133
134     @Override
135     public ActorRef actorOf(Props props) {
136         return context.actorOf(props);
137     }
138
139     @Override
140     public ActorSelection actorSelection(String path) {
141         return context.actorSelection(path);
142     }
143
144     @Override
145     public String getId() {
146         return id;
147     }
148
149     @Override
150     public ActorRef getActor() {
151         return actor;
152     }
153
154     @Override
155     @SuppressWarnings("checkstyle:IllegalCatch")
156     public Optional<Cluster> getCluster() {
157         if (cluster == null) {
158             try {
159                 cluster = Optional.of(Cluster.get(getActorSystem()));
160             } catch (Exception e) {
161                 // An exception means there's no cluster configured. This will only happen in unit tests.
162                 log.debug("{}: Could not obtain Cluster: {}", getId(), e);
163                 cluster = Optional.empty();
164             }
165         }
166
167         return cluster;
168     }
169
170     @Override
171     public ElectionTerm getTermInformation() {
172         return termInformation;
173     }
174
175     @Override
176     public long getCommitIndex() {
177         return commitIndex;
178     }
179
180     @Override public void setCommitIndex(long commitIndex) {
181         this.commitIndex = commitIndex;
182     }
183
184     @Override
185     public long getLastApplied() {
186         return lastApplied;
187     }
188
189     @Override
190     public void setLastApplied(long lastApplied) {
191         final Throwable stackTrace = log.isTraceEnabled() ? new Throwable() : null;
192         log.debug("{}: Moving last applied index from {} to {}", id, this.lastApplied, lastApplied, stackTrace);
193         this.lastApplied = lastApplied;
194     }
195
196     @Override
197     public void setReplicatedLog(ReplicatedLog replicatedLog) {
198         this.replicatedLog = replicatedLog;
199     }
200
201     @Override
202     public ReplicatedLog getReplicatedLog() {
203         return replicatedLog;
204     }
205
206     @Override
207     public ActorSystem getActorSystem() {
208         return context.system();
209     }
210
211     @Override
212     public Logger getLogger() {
213         return this.log;
214     }
215
216     @Override
217     public Collection<String> getPeerIds() {
218         return peerInfoMap.keySet();
219     }
220
221     @Override
222     public Collection<PeerInfo> getPeers() {
223         return peerInfoMap.values();
224     }
225
226     @Override
227     public PeerInfo getPeerInfo(String peerId) {
228         return peerInfoMap.get(peerId);
229     }
230
231     @Override
232     public String getPeerAddress(String peerId) {
233         String peerAddress;
234         PeerInfo peerInfo = peerInfoMap.get(peerId);
235         if (peerInfo != null) {
236             peerAddress = peerInfo.getAddress();
237             if (peerAddress == null) {
238                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
239                 peerInfo.setAddress(peerAddress);
240             }
241         } else {
242             peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
243         }
244
245         return peerAddress;
246     }
247
248     @Override
249     public void updatePeerIds(ServerConfigurationPayload serverConfig) {
250         votingMember = true;
251         boolean foundSelf = false;
252         Set<String> currentPeers = new HashSet<>(this.getPeerIds());
253         for (ServerInfo server : serverConfig.getServerConfig()) {
254             if (getId().equals(server.getId())) {
255                 foundSelf = true;
256                 if (!server.isVoting()) {
257                     votingMember = false;
258                 }
259             } else {
260                 VotingState votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
261                 if (!currentPeers.contains(server.getId())) {
262                     this.addToPeers(server.getId(), null, votingState);
263                 } else {
264                     this.getPeerInfo(server.getId()).setVotingState(votingState);
265                     currentPeers.remove(server.getId());
266                 }
267             }
268         }
269
270         for (String peerIdToRemove : currentPeers) {
271             this.removePeer(peerIdToRemove);
272         }
273
274         if (!foundSelf) {
275             votingMember = false;
276         }
277
278         log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
279
280         setDynamicServerConfigurationInUse();
281     }
282
283     @Override public ConfigParams getConfigParams() {
284         return configParams;
285     }
286
287     @Override
288     public void addToPeers(String peerId, String address, VotingState votingState) {
289         peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
290         numVotingPeers = -1;
291     }
292
293     @Override
294     public void removePeer(String name) {
295         if (getId().equals(name)) {
296             votingMember = false;
297         } else {
298             peerInfoMap.remove(name);
299             numVotingPeers = -1;
300         }
301     }
302
303     @Override public ActorSelection getPeerActorSelection(String peerId) {
304         String peerAddress = getPeerAddress(peerId);
305         if (peerAddress != null) {
306             return actorSelection(peerAddress);
307         }
308         return null;
309     }
310
311     @Override
312     public void setPeerAddress(String peerId, String peerAddress) {
313         PeerInfo peerInfo = peerInfoMap.get(peerId);
314         if (peerInfo != null) {
315             log.info("Peer address for peer {} set to {}", peerId, peerAddress);
316             peerInfo.setAddress(peerAddress);
317         }
318     }
319
320     @Override
321     public SnapshotManager getSnapshotManager() {
322         if (snapshotManager == null) {
323             snapshotManager = new SnapshotManager(this, log);
324         }
325         return snapshotManager;
326     }
327
328     @Override
329     public long getTotalMemory() {
330         return totalMemoryRetriever.getAsLong();
331     }
332
333     @Override
334     public void setTotalMemoryRetriever(LongSupplier retriever) {
335         totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
336     }
337
338     @Override
339     public boolean hasFollowers() {
340         return !getPeerIds().isEmpty();
341     }
342
343     @Override
344     public DataPersistenceProvider getPersistenceProvider() {
345         return persistenceProvider;
346     }
347
348
349     @Override
350     public RaftPolicy getRaftPolicy() {
351         return configParams.getRaftPolicy();
352     }
353
354     @Override
355     public boolean isDynamicServerConfigurationInUse() {
356         return dynamicServerConfiguration;
357     }
358
359     @Override
360     public void setDynamicServerConfigurationInUse() {
361         this.dynamicServerConfiguration = true;
362     }
363
364     @Override
365     public ServerConfigurationPayload getPeerServerInfo(boolean includeSelf) {
366         if (!isDynamicServerConfigurationInUse()) {
367             return null;
368         }
369         Collection<PeerInfo> peers = getPeers();
370         List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
371         for (PeerInfo peer: peers) {
372             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
373         }
374
375         if (includeSelf) {
376             newConfig.add(new ServerInfo(getId(), votingMember));
377         }
378
379         return new ServerConfigurationPayload(newConfig);
380     }
381
382     @Override
383     public boolean isVotingMember() {
384         return votingMember;
385     }
386
387     @Override
388     public boolean anyVotingPeers() {
389         if (numVotingPeers < 0) {
390             numVotingPeers = 0;
391             for (PeerInfo info: getPeers()) {
392                 if (info.isVoting()) {
393                     numVotingPeers++;
394                 }
395             }
396         }
397
398         return numVotingPeers > 0;
399     }
400
401     @Override
402     public RaftActorBehavior getCurrentBehavior() {
403         return currentBehavior;
404     }
405
406     void setCurrentBehavior(final RaftActorBehavior behavior) {
407         this.currentBehavior = Preconditions.checkNotNull(behavior);
408     }
409
410     @Override
411     public Consumer<ApplyState> getApplyStateConsumer() {
412         return applyStateConsumer;
413     }
414
415     @Override
416     public FileBackedOutputStreamFactory getFileBackedOutputStreamFactory() {
417         return fileBackedOutputStreamFactory;
418     }
419
420     @SuppressWarnings("checkstyle:IllegalCatch")
421     void close() {
422         if (currentBehavior != null) {
423             try {
424                 currentBehavior.close();
425             } catch (Exception e) {
426                 log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
427             }
428         }
429     }
430
431     @Override
432     @Nullable
433     public RaftActorLeadershipTransferCohort getRaftActorLeadershipTransferCohort() {
434         return leadershipTransferCohort;
435     }
436
437     @Override
438     public void setRaftActorLeadershipTransferCohort(
439             @Nullable RaftActorLeadershipTransferCohort leadershipTransferCohort) {
440         this.leadershipTransferCohort = leadershipTransferCohort;
441     }
442 }