Bug 7521: Add FileBackedOutputStream and use for snapshot chunking
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Preconditions;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.function.Consumer;
28 import java.util.function.LongSupplier;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
32 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
33 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
34 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
35 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
36 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
37 import org.slf4j.Logger;
38
39 /**
40  * Implementation of the RaftActorContext interface.
41  *
42  * @author Moiz Raja
43  * @author Thomas Pantelis
44  */
45 public class RaftActorContextImpl implements RaftActorContext {
46     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
47
48     private final ActorRef actor;
49
50     private final ActorContext context;
51
52     private final String id;
53
54     private final ElectionTerm termInformation;
55
56     private long commitIndex;
57
58     private long lastApplied;
59
60     private ReplicatedLog replicatedLog;
61
62     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
63
64     private final Logger log;
65
66     private ConfigParams configParams;
67
68     private boolean dynamicServerConfiguration = false;
69
70     @VisibleForTesting
71     private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
72
73     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
74     // be passed to it in the constructor
75     private SnapshotManager snapshotManager;
76
77     private final DataPersistenceProvider persistenceProvider;
78
79     private short payloadVersion;
80
81     private boolean votingMember = true;
82
83     private RaftActorBehavior currentBehavior;
84
85     private int numVotingPeers = -1;
86
87     private Optional<Cluster> cluster;
88
89     private final Consumer<ApplyState> applyStateConsumer;
90
91     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
92             @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
93             @Nonnull Map<String, String> peerAddresses,
94             @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
95             @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
96         this.actor = actor;
97         this.context = context;
98         this.id = id;
99         this.termInformation = Preconditions.checkNotNull(termInformation);
100         this.commitIndex = commitIndex;
101         this.lastApplied = lastApplied;
102         this.configParams = Preconditions.checkNotNull(configParams);
103         this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
104         this.log = Preconditions.checkNotNull(logger);
105         this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
106
107         for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
108             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
109         }
110     }
111
112     @VisibleForTesting
113     public void setPayloadVersion(short payloadVersion) {
114         this.payloadVersion = payloadVersion;
115     }
116
117     @Override
118     public short getPayloadVersion() {
119         return payloadVersion;
120     }
121
122     public void setConfigParams(ConfigParams configParams) {
123         this.configParams = configParams;
124     }
125
126     @Override
127     public ActorRef actorOf(Props props) {
128         return context.actorOf(props);
129     }
130
131     @Override
132     public ActorSelection actorSelection(String path) {
133         return context.actorSelection(path);
134     }
135
136     @Override
137     public String getId() {
138         return id;
139     }
140
141     @Override
142     public ActorRef getActor() {
143         return actor;
144     }
145
146     @Override
147     @SuppressWarnings("checkstyle:IllegalCatch")
148     public Optional<Cluster> getCluster() {
149         if (cluster == null) {
150             try {
151                 cluster = Optional.of(Cluster.get(getActorSystem()));
152             } catch (Exception e) {
153                 // An exception means there's no cluster configured. This will only happen in unit tests.
154                 log.debug("{}: Could not obtain Cluster: {}", getId(), e);
155                 cluster = Optional.empty();
156             }
157         }
158
159         return cluster;
160     }
161
162     @Override
163     public ElectionTerm getTermInformation() {
164         return termInformation;
165     }
166
167     @Override
168     public long getCommitIndex() {
169         return commitIndex;
170     }
171
172     @Override public void setCommitIndex(long commitIndex) {
173         this.commitIndex = commitIndex;
174     }
175
176     @Override
177     public long getLastApplied() {
178         return lastApplied;
179     }
180
181     @Override
182     public void setLastApplied(long lastApplied) {
183         this.lastApplied = lastApplied;
184     }
185
186     @Override
187     public void setReplicatedLog(ReplicatedLog replicatedLog) {
188         this.replicatedLog = replicatedLog;
189     }
190
191     @Override
192     public ReplicatedLog getReplicatedLog() {
193         return replicatedLog;
194     }
195
196     @Override public ActorSystem getActorSystem() {
197         return context.system();
198     }
199
200     @Override public Logger getLogger() {
201         return this.log;
202     }
203
204     @Override
205     public Collection<String> getPeerIds() {
206         return peerInfoMap.keySet();
207     }
208
209     @Override
210     public Collection<PeerInfo> getPeers() {
211         return peerInfoMap.values();
212     }
213
214     @Override
215     public PeerInfo getPeerInfo(String peerId) {
216         return peerInfoMap.get(peerId);
217     }
218
219     @Override
220     public String getPeerAddress(String peerId) {
221         String peerAddress;
222         PeerInfo peerInfo = peerInfoMap.get(peerId);
223         if (peerInfo != null) {
224             peerAddress = peerInfo.getAddress();
225             if (peerAddress == null) {
226                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
227                 peerInfo.setAddress(peerAddress);
228             }
229         } else {
230             peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
231         }
232
233         return peerAddress;
234     }
235
236     @Override
237     public void updatePeerIds(ServerConfigurationPayload serverConfig) {
238         votingMember = true;
239         boolean foundSelf = false;
240         Set<String> currentPeers = new HashSet<>(this.getPeerIds());
241         for (ServerInfo server : serverConfig.getServerConfig()) {
242             if (getId().equals(server.getId())) {
243                 foundSelf = true;
244                 if (!server.isVoting()) {
245                     votingMember = false;
246                 }
247             } else {
248                 VotingState votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
249                 if (!currentPeers.contains(server.getId())) {
250                     this.addToPeers(server.getId(), null, votingState);
251                 } else {
252                     this.getPeerInfo(server.getId()).setVotingState(votingState);
253                     currentPeers.remove(server.getId());
254                 }
255             }
256         }
257
258         for (String peerIdToRemove : currentPeers) {
259             this.removePeer(peerIdToRemove);
260         }
261
262         if (!foundSelf) {
263             votingMember = false;
264         }
265
266         log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
267
268         setDynamicServerConfigurationInUse();
269     }
270
271     @Override public ConfigParams getConfigParams() {
272         return configParams;
273     }
274
275     @Override
276     public void addToPeers(String peerId, String address, VotingState votingState) {
277         peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
278         numVotingPeers = -1;
279     }
280
281     @Override
282     public void removePeer(String name) {
283         if (getId().equals(name)) {
284             votingMember = false;
285         } else {
286             peerInfoMap.remove(name);
287             numVotingPeers = -1;
288         }
289     }
290
291     @Override public ActorSelection getPeerActorSelection(String peerId) {
292         String peerAddress = getPeerAddress(peerId);
293         if (peerAddress != null) {
294             return actorSelection(peerAddress);
295         }
296         return null;
297     }
298
299     @Override
300     public void setPeerAddress(String peerId, String peerAddress) {
301         PeerInfo peerInfo = peerInfoMap.get(peerId);
302         if (peerInfo != null) {
303             log.info("Peer address for peer {} set to {}", peerId, peerAddress);
304             peerInfo.setAddress(peerAddress);
305         }
306     }
307
308     @Override
309     public SnapshotManager getSnapshotManager() {
310         if (snapshotManager == null) {
311             snapshotManager = new SnapshotManager(this, log);
312         }
313         return snapshotManager;
314     }
315
316     @Override
317     public long getTotalMemory() {
318         return totalMemoryRetriever.getAsLong();
319     }
320
321     @Override
322     public void setTotalMemoryRetriever(LongSupplier retriever) {
323         totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
324     }
325
326     @Override
327     public boolean hasFollowers() {
328         return !getPeerIds().isEmpty();
329     }
330
331     @Override
332     public DataPersistenceProvider getPersistenceProvider() {
333         return persistenceProvider;
334     }
335
336
337     @Override
338     public RaftPolicy getRaftPolicy() {
339         return configParams.getRaftPolicy();
340     }
341
342     @Override
343     public boolean isDynamicServerConfigurationInUse() {
344         return dynamicServerConfiguration;
345     }
346
347     @Override
348     public void setDynamicServerConfigurationInUse() {
349         this.dynamicServerConfiguration = true;
350     }
351
352     @Override
353     public ServerConfigurationPayload getPeerServerInfo(boolean includeSelf) {
354         if (!isDynamicServerConfigurationInUse()) {
355             return null;
356         }
357         Collection<PeerInfo> peers = getPeers();
358         List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
359         for (PeerInfo peer: peers) {
360             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
361         }
362
363         if (includeSelf) {
364             newConfig.add(new ServerInfo(getId(), votingMember));
365         }
366
367         return new ServerConfigurationPayload(newConfig);
368     }
369
370     @Override
371     public boolean isVotingMember() {
372         return votingMember;
373     }
374
375     @Override
376     public boolean anyVotingPeers() {
377         if (numVotingPeers < 0) {
378             numVotingPeers = 0;
379             for (PeerInfo info: getPeers()) {
380                 if (info.isVoting()) {
381                     numVotingPeers++;
382                 }
383             }
384         }
385
386         return numVotingPeers > 0;
387     }
388
389     @Override
390     public RaftActorBehavior getCurrentBehavior() {
391         return currentBehavior;
392     }
393
394     void setCurrentBehavior(final RaftActorBehavior behavior) {
395         this.currentBehavior = Preconditions.checkNotNull(behavior);
396     }
397
398     @Override
399     public Consumer<ApplyState> getApplyStateConsumer() {
400         return applyStateConsumer;
401     }
402
403     @Override
404     public FileBackedOutputStream newFileBackedOutputStream() {
405         return new FileBackedOutputStream(configParams.getFileBackedStreamingThreshold(),
406                 configParams.getTempFileDirectory());
407     }
408
409     @SuppressWarnings("checkstyle:IllegalCatch")
410     void close() {
411         if (currentBehavior != null) {
412             try {
413                 currentBehavior.close();
414             } catch (Exception e) {
415                 log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
416             }
417         }
418     }
419 }