Bug 7362: Notify applyState synchronously
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / raft / RaftActorContextImpl.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Preconditions;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Optional;
26 import java.util.Set;
27 import java.util.function.Consumer;
28 import java.util.function.LongSupplier;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
32 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
33 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
34 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
35 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
36 import org.slf4j.Logger;
37
38 /**
39  * Implementation of the RaftActorContext interface.
40  *
41  * @author Moiz Raja
42  * @author Thomas Pantelis
43  */
44 public class RaftActorContextImpl implements RaftActorContext {
45     private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
46
47     private final ActorRef actor;
48
49     private final ActorContext context;
50
51     private final String id;
52
53     private final ElectionTerm termInformation;
54
55     private long commitIndex;
56
57     private long lastApplied;
58
59     private ReplicatedLog replicatedLog;
60
61     private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
62
63     private final Logger log;
64
65     private ConfigParams configParams;
66
67     private boolean dynamicServerConfiguration = false;
68
69     @VisibleForTesting
70     private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
71
72     // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
73     // be passed to it in the constructor
74     private SnapshotManager snapshotManager;
75
76     private final DataPersistenceProvider persistenceProvider;
77
78     private short payloadVersion;
79
80     private boolean votingMember = true;
81
82     private RaftActorBehavior currentBehavior;
83
84     private int numVotingPeers = -1;
85
86     private Optional<Cluster> cluster;
87
88     private final Consumer<ApplyState> applyStateConsumer;
89
90     public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
91             @Nonnull ElectionTerm termInformation, long commitIndex, long lastApplied,
92             @Nonnull Map<String, String> peerAddresses,
93             @Nonnull ConfigParams configParams, @Nonnull DataPersistenceProvider persistenceProvider,
94             @Nonnull Consumer<ApplyState> applyStateConsumer, @Nonnull Logger logger) {
95         this.actor = actor;
96         this.context = context;
97         this.id = id;
98         this.termInformation = Preconditions.checkNotNull(termInformation);
99         this.commitIndex = commitIndex;
100         this.lastApplied = lastApplied;
101         this.configParams = Preconditions.checkNotNull(configParams);
102         this.persistenceProvider = Preconditions.checkNotNull(persistenceProvider);
103         this.log = Preconditions.checkNotNull(logger);
104         this.applyStateConsumer = Preconditions.checkNotNull(applyStateConsumer);
105
106         for (Map.Entry<String, String> e: Preconditions.checkNotNull(peerAddresses).entrySet()) {
107             peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
108         }
109     }
110
111     @VisibleForTesting
112     public void setPayloadVersion(short payloadVersion) {
113         this.payloadVersion = payloadVersion;
114     }
115
116     @Override
117     public short getPayloadVersion() {
118         return payloadVersion;
119     }
120
121     public void setConfigParams(ConfigParams configParams) {
122         this.configParams = configParams;
123     }
124
125     @Override
126     public ActorRef actorOf(Props props) {
127         return context.actorOf(props);
128     }
129
130     @Override
131     public ActorSelection actorSelection(String path) {
132         return context.actorSelection(path);
133     }
134
135     @Override
136     public String getId() {
137         return id;
138     }
139
140     @Override
141     public ActorRef getActor() {
142         return actor;
143     }
144
145     @Override
146     @SuppressWarnings("checkstyle:IllegalCatch")
147     public Optional<Cluster> getCluster() {
148         if (cluster == null) {
149             try {
150                 cluster = Optional.of(Cluster.get(getActorSystem()));
151             } catch (Exception e) {
152                 // An exception means there's no cluster configured. This will only happen in unit tests.
153                 log.debug("{}: Could not obtain Cluster: {}", getId(), e);
154                 cluster = Optional.empty();
155             }
156         }
157
158         return cluster;
159     }
160
161     @Override
162     public ElectionTerm getTermInformation() {
163         return termInformation;
164     }
165
166     @Override
167     public long getCommitIndex() {
168         return commitIndex;
169     }
170
171     @Override public void setCommitIndex(long commitIndex) {
172         this.commitIndex = commitIndex;
173     }
174
175     @Override
176     public long getLastApplied() {
177         return lastApplied;
178     }
179
180     @Override
181     public void setLastApplied(long lastApplied) {
182         this.lastApplied = lastApplied;
183     }
184
185     @Override
186     public void setReplicatedLog(ReplicatedLog replicatedLog) {
187         this.replicatedLog = replicatedLog;
188     }
189
190     @Override
191     public ReplicatedLog getReplicatedLog() {
192         return replicatedLog;
193     }
194
195     @Override public ActorSystem getActorSystem() {
196         return context.system();
197     }
198
199     @Override public Logger getLogger() {
200         return this.log;
201     }
202
203     @Override
204     public Collection<String> getPeerIds() {
205         return peerInfoMap.keySet();
206     }
207
208     @Override
209     public Collection<PeerInfo> getPeers() {
210         return peerInfoMap.values();
211     }
212
213     @Override
214     public PeerInfo getPeerInfo(String peerId) {
215         return peerInfoMap.get(peerId);
216     }
217
218     @Override
219     public String getPeerAddress(String peerId) {
220         String peerAddress;
221         PeerInfo peerInfo = peerInfoMap.get(peerId);
222         if (peerInfo != null) {
223             peerAddress = peerInfo.getAddress();
224             if (peerAddress == null) {
225                 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
226                 peerInfo.setAddress(peerAddress);
227             }
228         } else {
229             peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
230         }
231
232         return peerAddress;
233     }
234
235     @Override
236     public void updatePeerIds(ServerConfigurationPayload serverConfig) {
237         votingMember = true;
238         boolean foundSelf = false;
239         Set<String> currentPeers = new HashSet<>(this.getPeerIds());
240         for (ServerInfo server : serverConfig.getServerConfig()) {
241             if (getId().equals(server.getId())) {
242                 foundSelf = true;
243                 if (!server.isVoting()) {
244                     votingMember = false;
245                 }
246             } else {
247                 VotingState votingState = server.isVoting() ? VotingState.VOTING : VotingState.NON_VOTING;
248                 if (!currentPeers.contains(server.getId())) {
249                     this.addToPeers(server.getId(), null, votingState);
250                 } else {
251                     this.getPeerInfo(server.getId()).setVotingState(votingState);
252                     currentPeers.remove(server.getId());
253                 }
254             }
255         }
256
257         for (String peerIdToRemove : currentPeers) {
258             this.removePeer(peerIdToRemove);
259         }
260
261         if (!foundSelf) {
262             votingMember = false;
263         }
264
265         log.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
266
267         setDynamicServerConfigurationInUse();
268     }
269
270     @Override public ConfigParams getConfigParams() {
271         return configParams;
272     }
273
274     @Override
275     public void addToPeers(String peerId, String address, VotingState votingState) {
276         peerInfoMap.put(peerId, new PeerInfo(peerId, address, votingState));
277         numVotingPeers = -1;
278     }
279
280     @Override
281     public void removePeer(String name) {
282         if (getId().equals(name)) {
283             votingMember = false;
284         } else {
285             peerInfoMap.remove(name);
286             numVotingPeers = -1;
287         }
288     }
289
290     @Override public ActorSelection getPeerActorSelection(String peerId) {
291         String peerAddress = getPeerAddress(peerId);
292         if (peerAddress != null) {
293             return actorSelection(peerAddress);
294         }
295         return null;
296     }
297
298     @Override
299     public void setPeerAddress(String peerId, String peerAddress) {
300         PeerInfo peerInfo = peerInfoMap.get(peerId);
301         if (peerInfo != null) {
302             log.info("Peer address for peer {} set to {}", peerId, peerAddress);
303             peerInfo.setAddress(peerAddress);
304         }
305     }
306
307     @Override
308     public SnapshotManager getSnapshotManager() {
309         if (snapshotManager == null) {
310             snapshotManager = new SnapshotManager(this, log);
311         }
312         return snapshotManager;
313     }
314
315     @Override
316     public long getTotalMemory() {
317         return totalMemoryRetriever.getAsLong();
318     }
319
320     @Override
321     public void setTotalMemoryRetriever(LongSupplier retriever) {
322         totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
323     }
324
325     @Override
326     public boolean hasFollowers() {
327         return !getPeerIds().isEmpty();
328     }
329
330     @Override
331     public DataPersistenceProvider getPersistenceProvider() {
332         return persistenceProvider;
333     }
334
335
336     @Override
337     public RaftPolicy getRaftPolicy() {
338         return configParams.getRaftPolicy();
339     }
340
341     @Override
342     public boolean isDynamicServerConfigurationInUse() {
343         return dynamicServerConfiguration;
344     }
345
346     @Override
347     public void setDynamicServerConfigurationInUse() {
348         this.dynamicServerConfiguration = true;
349     }
350
351     @Override
352     public ServerConfigurationPayload getPeerServerInfo(boolean includeSelf) {
353         if (!isDynamicServerConfigurationInUse()) {
354             return null;
355         }
356         Collection<PeerInfo> peers = getPeers();
357         List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
358         for (PeerInfo peer: peers) {
359             newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
360         }
361
362         if (includeSelf) {
363             newConfig.add(new ServerInfo(getId(), votingMember));
364         }
365
366         return new ServerConfigurationPayload(newConfig);
367     }
368
369     @Override
370     public boolean isVotingMember() {
371         return votingMember;
372     }
373
374     @Override
375     public boolean anyVotingPeers() {
376         if (numVotingPeers < 0) {
377             numVotingPeers = 0;
378             for (PeerInfo info: getPeers()) {
379                 if (info.isVoting()) {
380                     numVotingPeers++;
381                 }
382             }
383         }
384
385         return numVotingPeers > 0;
386     }
387
388     @Override
389     public RaftActorBehavior getCurrentBehavior() {
390         return currentBehavior;
391     }
392
393     void setCurrentBehavior(final RaftActorBehavior behavior) {
394         this.currentBehavior = Preconditions.checkNotNull(behavior);
395     }
396
397     @Override
398     public Consumer<ApplyState> getApplyStateConsumer() {
399         return applyStateConsumer;
400     }
401
402     @SuppressWarnings("checkstyle:IllegalCatch")
403     void close() {
404         if (currentBehavior != null) {
405             try {
406                 currentBehavior.close();
407             } catch (Exception e) {
408                 log.debug("{}: Error closing behavior {}", getId(), currentBehavior.state(), e);
409             }
410         }
411     }
412 }