2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import com.google.common.annotations.VisibleForTesting;
18 import com.google.common.base.Preconditions;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.HashMap;
22 import java.util.HashSet;
23 import java.util.List;
25 import java.util.Optional;
27 import java.util.function.LongSupplier;
28 import org.opendaylight.controller.cluster.DataPersistenceProvider;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
31 import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
32 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
33 import org.slf4j.Logger;
35 public class RaftActorContextImpl implements RaftActorContext {
36 private static final LongSupplier JVM_MEMORY_RETRIEVER = () -> Runtime.getRuntime().maxMemory();
38 private final ActorRef actor;
40 private final ActorContext context;
42 private final String id;
44 private final ElectionTerm termInformation;
46 private long commitIndex;
48 private long lastApplied;
50 private ReplicatedLog replicatedLog;
52 private final Map<String, PeerInfo> peerInfoMap = new HashMap<>();
54 private final Logger LOG;
56 private ConfigParams configParams;
58 private boolean dynamicServerConfiguration = false;
61 private LongSupplier totalMemoryRetriever = JVM_MEMORY_RETRIEVER;
63 // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
64 // be passed to it in the constructor
65 private SnapshotManager snapshotManager;
67 private final DataPersistenceProvider persistenceProvider;
69 private short payloadVersion;
71 private boolean votingMember = true;
73 private RaftActorBehavior currentBehavior;
75 private int numVotingPeers = -1;
77 private Optional<Cluster> cluster;
79 public RaftActorContextImpl(ActorRef actor, ActorContext context, String id,
80 ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
81 ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
83 this.context = context;
85 this.termInformation = termInformation;
86 this.commitIndex = commitIndex;
87 this.lastApplied = lastApplied;
88 this.configParams = configParams;
89 this.persistenceProvider = persistenceProvider;
92 for(Map.Entry<String, String> e: peerAddresses.entrySet()) {
93 peerInfoMap.put(e.getKey(), new PeerInfo(e.getKey(), e.getValue(), VotingState.VOTING));
98 public void setPayloadVersion(short payloadVersion) {
99 this.payloadVersion = payloadVersion;
103 public short getPayloadVersion() {
104 return payloadVersion;
107 public void setConfigParams(ConfigParams configParams) {
108 this.configParams = configParams;
112 public ActorRef actorOf(Props props){
113 return context.actorOf(props);
117 public ActorSelection actorSelection(String path){
118 return context.actorSelection(path);
122 public String getId() {
127 public ActorRef getActor() {
132 public Optional<Cluster> getCluster() {
133 if(cluster == null) {
135 cluster = Optional.of(Cluster.get(getActorSystem()));
136 } catch(Exception e) {
137 // An exception means there's no cluster configured. This will only happen in unit tests.
138 LOG.debug("{}: Could not obtain Cluster: {}", getId(), e);
139 cluster = Optional.empty();
147 public ElectionTerm getTermInformation() {
148 return termInformation;
152 public long getCommitIndex() {
156 @Override public void setCommitIndex(long commitIndex) {
157 this.commitIndex = commitIndex;
161 public long getLastApplied() {
166 public void setLastApplied(long lastApplied) {
167 this.lastApplied = lastApplied;
171 public void setReplicatedLog(ReplicatedLog replicatedLog) {
172 this.replicatedLog = replicatedLog;
176 public ReplicatedLog getReplicatedLog() {
177 return replicatedLog;
180 @Override public ActorSystem getActorSystem() {
181 return context.system();
184 @Override public Logger getLogger() {
189 public Collection<String> getPeerIds() {
190 return peerInfoMap.keySet();
194 public Collection<PeerInfo> getPeers() {
195 return peerInfoMap.values();
199 public PeerInfo getPeerInfo(String peerId) {
200 return peerInfoMap.get(peerId);
204 public String getPeerAddress(String peerId) {
205 String peerAddress = null;
206 PeerInfo peerInfo = peerInfoMap.get(peerId);
207 if(peerInfo != null) {
208 peerAddress = peerInfo.getAddress();
209 if(peerAddress == null) {
210 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
211 peerInfo.setAddress(peerAddress);
214 peerAddress = configParams.getPeerAddressResolver().resolve(peerId);
221 public void updatePeerIds(ServerConfigurationPayload serverConfig){
223 boolean foundSelf = false;
224 Set<String> currentPeers = new HashSet<>(this.getPeerIds());
225 for(ServerInfo server: serverConfig.getServerConfig()) {
226 if(getId().equals(server.getId())) {
228 if(!server.isVoting()) {
229 votingMember = false;
232 VotingState votingState = server.isVoting() ? VotingState.VOTING: VotingState.NON_VOTING;
233 if(!currentPeers.contains(server.getId())) {
234 this.addToPeers(server.getId(), null, votingState);
236 this.getPeerInfo(server.getId()).setVotingState(votingState);
237 currentPeers.remove(server.getId());
242 for(String peerIdToRemove: currentPeers) {
243 this.removePeer(peerIdToRemove);
247 votingMember = false;
250 LOG.debug("{}: Updated server config: isVoting: {}, peers: {}", id, votingMember, peerInfoMap.values());
252 setDynamicServerConfigurationInUse();
255 @Override public ConfigParams getConfigParams() {
260 public void addToPeers(String id, String address, VotingState votingState) {
261 peerInfoMap.put(id, new PeerInfo(id, address, votingState));
266 public void removePeer(String name) {
267 if(getId().equals(name)) {
268 votingMember = false;
270 peerInfoMap.remove(name);
275 @Override public ActorSelection getPeerActorSelection(String peerId) {
276 String peerAddress = getPeerAddress(peerId);
277 if(peerAddress != null){
278 return actorSelection(peerAddress);
284 public void setPeerAddress(String peerId, String peerAddress) {
285 PeerInfo peerInfo = peerInfoMap.get(peerId);
286 if(peerInfo != null) {
287 LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
288 peerInfo.setAddress(peerAddress);
293 public SnapshotManager getSnapshotManager() {
294 if(snapshotManager == null){
295 snapshotManager = new SnapshotManager(this, LOG);
297 return snapshotManager;
301 public long getTotalMemory() {
302 return totalMemoryRetriever.getAsLong();
306 public void setTotalMemoryRetriever(LongSupplier retriever) {
307 totalMemoryRetriever = retriever == null ? JVM_MEMORY_RETRIEVER : retriever;
311 public boolean hasFollowers() {
312 return getPeerIds().size() > 0;
316 public DataPersistenceProvider getPersistenceProvider() {
317 return persistenceProvider;
322 public RaftPolicy getRaftPolicy() {
323 return configParams.getRaftPolicy();
327 public boolean isDynamicServerConfigurationInUse() {
328 return dynamicServerConfiguration;
332 public void setDynamicServerConfigurationInUse() {
333 this.dynamicServerConfiguration = true;
337 public ServerConfigurationPayload getPeerServerInfo(boolean includeSelf) {
338 if (!isDynamicServerConfigurationInUse()) {
341 Collection<PeerInfo> peers = getPeers();
342 List<ServerInfo> newConfig = new ArrayList<>(peers.size() + 1);
343 for(PeerInfo peer: peers) {
344 newConfig.add(new ServerInfo(peer.getId(), peer.isVoting()));
348 newConfig.add(new ServerInfo(getId(), votingMember));
351 return (new ServerConfigurationPayload(newConfig));
355 public boolean isVotingMember() {
360 public boolean anyVotingPeers() {
361 if(numVotingPeers < 0) {
363 for(PeerInfo info: getPeers()) {
364 if(info.isVoting()) {
370 return numVotingPeers > 0;
374 public RaftActorBehavior getCurrentBehavior() {
375 return currentBehavior;
378 void setCurrentBehavior(final RaftActorBehavior behavior) {
379 this.currentBehavior = Preconditions.checkNotNull(behavior);
383 if (currentBehavior != null) {
385 currentBehavior.close();
386 } catch (Exception e) {
387 LOG.debug("{}: Error closing behavior {}", getId(), currentBehavior.state());