2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Props;
15 import akka.japi.Procedure;
16 import com.google.common.base.Preconditions;
17 import com.google.common.base.Supplier;
18 import com.google.protobuf.GeneratedMessage;
19 import java.io.Serializable;
20 import java.util.HashMap;
22 import org.opendaylight.controller.cluster.DataPersistenceProvider;
23 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
24 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
25 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
26 import org.opendaylight.controller.protobuff.messages.cluster.raft.test.MockPayloadMessages;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 public class MockRaftActorContext implements RaftActorContext {
33 private ActorSystem system;
34 private ActorRef actor;
35 private long index = 0;
36 private long lastApplied = 0;
37 private final ElectionTerm electionTerm;
38 private ReplicatedLog replicatedLog;
39 private Map<String, String> peerAddresses = new HashMap<>();
40 private ConfigParams configParams;
41 private boolean snapshotCaptureInitiated;
42 private SnapshotManager snapshotManager;
43 private DataPersistenceProvider persistenceProvider = new NonPersistentDataProvider();
44 private short payloadVersion;
46 public MockRaftActorContext(){
47 electionTerm = new ElectionTerm() {
48 private long currentTerm = 1;
49 private String votedFor = "";
52 public long getCurrentTerm() {
57 public String getVotedFor() {
62 public void update(long currentTerm, String votedFor){
63 this.currentTerm = currentTerm;
64 this.votedFor = votedFor;
66 // TODO : Write to some persistent state
69 @Override public void updateAndPersist(long currentTerm,
71 update(currentTerm, votedFor);
75 configParams = new DefaultConfigParamsImpl();
78 public MockRaftActorContext(String id, ActorSystem system, ActorRef actor){
88 public void initReplicatedLog(){
89 this.replicatedLog = new SimpleReplicatedLog();
90 long term = getTermInformation().getCurrentTerm();
91 this.replicatedLog.append(new MockReplicatedLogEntry(term, 0, new MockPayload("1")));
92 this.replicatedLog.append(new MockReplicatedLogEntry(term, 1, new MockPayload("2")));
95 @Override public ActorRef actorOf(Props props) {
96 return system.actorOf(props);
99 @Override public ActorSelection actorSelection(String path) {
100 return system.actorSelection(path);
103 @Override public String getId() {
107 @Override public ActorRef getActor() {
111 @Override public ElectionTerm getTermInformation() {
115 public void setIndex(long index){
119 @Override public long getCommitIndex() {
123 @Override public void setCommitIndex(long commitIndex) {
124 this.index = commitIndex;
127 @Override public void setLastApplied(long lastApplied){
128 this.lastApplied = lastApplied;
131 @Override public long getLastApplied() {
136 // FIXME : A lot of tests try to manipulate the replicated log by setting it using this method
137 // This is OK to do if the underlyingActor is not RafActor or a derived class. If not then you should not
138 // used this way to manipulate the log because the RaftActor actually has a field replicatedLog
139 // which it creates internally and sets on the RaftActorContext
140 // The only right way to manipulate the replicated log therefore is to get it from either the RaftActor
141 // or the RaftActorContext and modify the entries in there instead of trying to replace it by using this setter
142 // Simple assertion that will fail if you do so
143 // ReplicatedLog log = new ReplicatedLogImpl();
144 // raftActor.underlyingActor().getRaftActorContext().setReplicatedLog(log);
145 // assertEquals(log, raftActor.underlyingActor().getReplicatedLog())
146 public void setReplicatedLog(ReplicatedLog replicatedLog) {
147 this.replicatedLog = replicatedLog;
150 @Override public ReplicatedLog getReplicatedLog() {
151 return replicatedLog;
154 @Override public ActorSystem getActorSystem() {
158 @Override public Logger getLogger() {
159 return LoggerFactory.getLogger(getClass());
162 @Override public Map<String, String> getPeerAddresses() {
163 return peerAddresses;
166 @Override public String getPeerAddress(String peerId) {
167 return peerAddresses.get(peerId);
170 @Override public void addToPeers(String name, String address) {
171 peerAddresses.put(name, address);
174 @Override public void removePeer(String name) {
175 peerAddresses.remove(name);
178 @Override public ActorSelection getPeerActorSelection(String peerId) {
179 String peerAddress = getPeerAddress(peerId);
180 if(peerAddress != null){
181 return actorSelection(peerAddress);
186 @Override public void setPeerAddress(String peerId, String peerAddress) {
187 Preconditions.checkState(peerAddresses.containsKey(peerId));
188 peerAddresses.put(peerId, peerAddress);
191 public void setPeerAddresses(Map<String, String> peerAddresses) {
192 this.peerAddresses = peerAddresses;
196 public ConfigParams getConfigParams() {
201 public SnapshotManager getSnapshotManager() {
202 if(this.snapshotManager == null){
203 this.snapshotManager = new SnapshotManager(this, getLogger());
204 this.snapshotManager.setCreateSnapshotCallable(NoopProcedure.<Void>instance());
206 return this.snapshotManager;
209 public void setConfigParams(ConfigParams configParams) {
210 this.configParams = configParams;
214 public long getTotalMemory() {
215 return Runtime.getRuntime().totalMemory();
219 public void setTotalMemoryRetriever(Supplier<Long> retriever) {
223 public boolean hasFollowers() {
224 return getPeerAddresses().keySet().size() > 0;
228 public DataPersistenceProvider getPersistenceProvider() {
229 return persistenceProvider;
232 public void setPersistenceProvider(DataPersistenceProvider persistenceProvider) {
233 this.persistenceProvider = persistenceProvider;
237 public short getPayloadVersion() {
238 return payloadVersion;
241 public void setPayloadVersion(short payloadVersion) {
242 this.payloadVersion = payloadVersion;
245 public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
246 @Override public void appendAndPersist(
247 ReplicatedLogEntry replicatedLogEntry) {
248 append(replicatedLogEntry);
252 public int dataSize() {
256 @Override public void removeFromAndPersist(long index) {
261 public void appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback) {
262 append(replicatedLogEntry);
264 if(callback != null) {
266 callback.apply(replicatedLogEntry);
267 } catch (Exception e) {
274 public static class MockPayload extends Payload implements Serializable {
275 private static final long serialVersionUID = 3121380393130864247L;
276 private String value = "";
279 public MockPayload() {
282 public MockPayload(String s) {
284 size = value.length();
287 public MockPayload(String s, int size) {
292 @Override public Map<GeneratedMessage.GeneratedExtension<?, ?>, String> encode() {
293 Map<GeneratedMessage.GeneratedExtension<?, ?>, String> map = new HashMap<>();
294 map.put(MockPayloadMessages.value, value);
298 @Override public Payload decode(
299 AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payloadProtoBuff) {
300 String value = payloadProtoBuff.getExtension(MockPayloadMessages.value);
310 @Override public String getClientPayloadClassName() {
311 return MockPayload.class.getName();
315 public String toString() {
320 public int hashCode() {
321 final int prime = 31;
323 result = prime * result + ((value == null) ? 0 : value.hashCode());
328 public boolean equals(Object obj) {
335 if (getClass() != obj.getClass()) {
338 MockPayload other = (MockPayload) obj;
340 if (other.value != null) {
343 } else if (!value.equals(other.value)) {
350 public static class MockReplicatedLogEntry implements ReplicatedLogEntry, Serializable {
351 private static final long serialVersionUID = 1L;
353 private final long term;
354 private final long index;
355 private final Payload data;
357 public MockReplicatedLogEntry(long term, long index, Payload data){
364 @Override public Payload getData() {
368 @Override public long getTerm() {
372 @Override public long getIndex() {
378 return getData().size();
382 public int hashCode() {
383 final int prime = 31;
385 result = prime * result + ((data == null) ? 0 : data.hashCode());
386 result = prime * result + (int) (index ^ (index >>> 32));
387 result = prime * result + (int) (term ^ (term >>> 32));
392 public boolean equals(Object obj) {
399 if (getClass() != obj.getClass()) {
402 MockReplicatedLogEntry other = (MockReplicatedLogEntry) obj;
404 if (other.data != null) {
407 } else if (!data.equals(other.data)) {
410 if (index != other.index) {
413 if (term != other.term) {
420 public String toString() {
421 StringBuilder builder = new StringBuilder();
422 builder.append("MockReplicatedLogEntry [term=").append(term).append(", index=").append(index)
423 .append(", data=").append(data).append("]");
424 return builder.toString();
428 public static class MockReplicatedLogBuilder {
429 private final ReplicatedLog mockLog = new SimpleReplicatedLog();
431 public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
432 for (int i=start; i<end; i++) {
433 this.mockLog.append(new ReplicatedLogImplEntry(i, term, new MockRaftActorContext.MockPayload("foo" + i)));
438 public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
439 this.mockLog.append(new ReplicatedLogImplEntry(index, term, payload));
443 public ReplicatedLog build() {