package org.opendaylight.controller.cluster.raft;
+import static com.google.common.base.Preconditions.checkState;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActorContext;
-import akka.event.LoggingAdapter;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
import java.util.Map;
+import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.slf4j.Logger;
-public class RaftActorContextImpl implements RaftActorContext{
+public class RaftActorContextImpl implements RaftActorContext {
private final ActorRef actor;
private long lastApplied;
- private final ReplicatedLog replicatedLog;
+ private ReplicatedLog replicatedLog;
private final Map<String, String> peerAddresses;
- private final LoggingAdapter LOG;
+ private final Logger LOG;
+
+ private ConfigParams configParams;
+
+ @VisibleForTesting
+ private Supplier<Long> totalMemoryRetriever;
+
+ // Snapshot manager will need to be created on demand as it needs raft actor context which cannot
+ // be passed to it in the constructor
+ private SnapshotManager snapshotManager;
+ private final DataPersistenceProvider persistenceProvider;
- public RaftActorContextImpl(ActorRef actor, UntypedActorContext context,
- String id,
- ElectionTerm termInformation, long commitIndex,
- long lastApplied, ReplicatedLog replicatedLog, Map<String, String> peerAddresses, LoggingAdapter logger) {
+ public RaftActorContextImpl(ActorRef actor, UntypedActorContext context, String id,
+ ElectionTerm termInformation, long commitIndex, long lastApplied, Map<String, String> peerAddresses,
+ ConfigParams configParams, DataPersistenceProvider persistenceProvider, Logger logger) {
this.actor = actor;
this.context = context;
this.id = id;
this.termInformation = termInformation;
this.commitIndex = commitIndex;
this.lastApplied = lastApplied;
- this.replicatedLog = replicatedLog;
this.peerAddresses = peerAddresses;
+ this.configParams = configParams;
+ this.persistenceProvider = persistenceProvider;
this.LOG = logger;
}
+ void setConfigParams(ConfigParams configParams) {
+ this.configParams = configParams;
+ }
+
+ @Override
public ActorRef actorOf(Props props){
return context.actorOf(props);
}
+ @Override
public ActorSelection actorSelection(String path){
return context.actorSelection(path);
}
+ @Override
public String getId() {
return id;
}
+ @Override
public ActorRef getActor() {
return actor;
}
+ @Override
public ElectionTerm getTermInformation() {
return termInformation;
}
+ @Override
public long getCommitIndex() {
return commitIndex;
}
this.commitIndex = commitIndex;
}
+ @Override
public long getLastApplied() {
return lastApplied;
}
this.lastApplied = lastApplied;
}
+ @Override public void setReplicatedLog(ReplicatedLog replicatedLog) {
+ this.replicatedLog = replicatedLog;
+ }
+
@Override public ReplicatedLog getReplicatedLog() {
return replicatedLog;
}
return context.system();
}
- @Override public LoggingAdapter getLogger() {
+ @Override public Logger getLogger() {
return this.LOG;
}
return peerAddresses.get(peerId);
}
+ @Override public ConfigParams getConfigParams() {
+ return configParams;
+ }
+
@Override public void addToPeers(String name, String address) {
- LOG.debug("Kamal--> addToPeer for:"+name);
peerAddresses.put(name, address);
}
@Override public void removePeer(String name) {
- LOG.debug("Kamal--> removePeer for:"+name);
peerAddresses.remove(name);
}
+
+ @Override public ActorSelection getPeerActorSelection(String peerId) {
+ String peerAddress = getPeerAddress(peerId);
+ if(peerAddress != null){
+ return actorSelection(peerAddress);
+ }
+ return null;
+ }
+
+ @Override public void setPeerAddress(String peerId, String peerAddress) {
+ LOG.info("Peer address for peer {} set to {}", peerId, peerAddress);
+ checkState(peerAddresses.containsKey(peerId), peerId + " is unknown");
+
+ peerAddresses.put(peerId, peerAddress);
+ }
+
+ @Override
+ public SnapshotManager getSnapshotManager() {
+ if(snapshotManager == null){
+ snapshotManager = new SnapshotManager(this, LOG);
+ }
+ return snapshotManager;
+ }
+
+ @Override
+ public long getTotalMemory() {
+ return totalMemoryRetriever != null ? totalMemoryRetriever.get() : Runtime.getRuntime().totalMemory();
+ }
+
+ @Override
+ public void setTotalMemoryRetriever(Supplier<Long> retriever) {
+ totalMemoryRetriever = retriever;
+ }
+
+ @Override
+ public boolean hasFollowers() {
+ return getPeerAddresses().keySet().size() > 0;
+ }
+
+ @Override
+ public DataPersistenceProvider getPersistenceProvider() {
+ return persistenceProvider;
+ }
}