2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Props;
15 import akka.japi.Procedure;
16 import com.google.common.base.Throwables;
17 import com.google.common.io.ByteSource;
18 import java.io.IOException;
19 import java.io.OutputStream;
20 import java.io.Serializable;
21 import java.util.HashMap;
23 import java.util.Optional;
24 import java.util.function.Consumer;
25 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
26 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
27 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
28 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
30 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
31 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public class MockRaftActorContext extends RaftActorContextImpl {
36 private static final Logger LOG = LoggerFactory.getLogger(MockRaftActorContext.class);
38 private ActorSystem system;
39 private RaftPolicy raftPolicy;
40 private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
42 private static ElectionTerm newElectionTerm() {
43 return new ElectionTerm() {
44 private long currentTerm = 1;
45 private String votedFor = "";
48 public long getCurrentTerm() {
53 public String getVotedFor() {
58 public void update(long newTerm, String newVotedFor) {
59 this.currentTerm = newTerm;
60 this.votedFor = newVotedFor;
62 // TODO : Write to some persistent state
65 @Override public void updateAndPersist(long newTerm, String newVotedFor) {
66 update(newTerm, newVotedFor);
71 public MockRaftActorContext() {
72 super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
73 new DefaultConfigParamsImpl(), new NonPersistentDataProvider(), applyState -> { }, LOG);
74 setReplicatedLog(new MockReplicatedLogBuilder().build());
77 public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
78 super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
79 new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
80 applyState -> actor.tell(applyState, actor), LOG);
88 public void initReplicatedLog() {
89 SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
90 long term = getTermInformation().getCurrentTerm();
91 replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
92 replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
93 setReplicatedLog(replicatedLog);
94 setCommitIndex(replicatedLog.lastIndex());
95 setLastApplied(replicatedLog.lastIndex());
98 @Override public ActorRef actorOf(Props props) {
99 return system.actorOf(props);
102 @Override public ActorSelection actorSelection(String path) {
103 return system.actorSelection(path);
106 @Override public ActorSystem getActorSystem() {
110 @Override public ActorSelection getPeerActorSelection(String peerId) {
111 String peerAddress = getPeerAddress(peerId);
112 if (peerAddress != null) {
113 return actorSelection(peerAddress);
118 public void setPeerAddresses(Map<String, String> peerAddresses) {
119 for (String id: getPeerIds()) {
123 for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
124 addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
129 public SnapshotManager getSnapshotManager() {
130 SnapshotManager snapshotManager = super.getSnapshotManager();
131 snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
133 snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
135 public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
136 return ByteState.of(snapshotBytes.read());
140 public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
144 public void applySnapshot(State snapshotState) {
148 return snapshotManager;
151 public void setCreateSnapshotProcedure(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
152 this.createSnapshotProcedure = createSnapshotProcedure;
156 public RaftPolicy getRaftPolicy() {
157 return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
160 public void setRaftPolicy(RaftPolicy raftPolicy) {
161 this.raftPolicy = raftPolicy;
164 public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
166 public int dataSize() {
171 public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
175 public boolean shouldCaptureSnapshot(long logIndex) {
180 public boolean removeFromAndPersist(long index) {
181 return removeFrom(index) >= 0;
185 @SuppressWarnings("checkstyle:IllegalCatch")
186 public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
188 append(replicatedLogEntry);
190 if (callback != null) {
192 callback.apply(replicatedLogEntry);
193 } catch (Exception e) {
194 Throwables.propagate(e);
202 public static class MockPayload extends Payload implements Serializable {
203 private static final long serialVersionUID = 3121380393130864247L;
204 private String value = "";
207 public MockPayload() {
210 public MockPayload(String data) {
212 size = value.length();
215 public MockPayload(String data, int size) {
226 public String toString() {
231 public int hashCode() {
232 final int prime = 31;
234 result = prime * result + (value == null ? 0 : value.hashCode());
239 public boolean equals(Object obj) {
246 if (getClass() != obj.getClass()) {
249 MockPayload other = (MockPayload) obj;
251 if (other.value != null) {
254 } else if (!value.equals(other.value)) {
261 public static class MockReplicatedLogBuilder {
262 private final ReplicatedLog mockLog = new SimpleReplicatedLog();
264 public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
265 for (int i = start; i < end; i++) {
266 this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
267 new MockRaftActorContext.MockPayload(Integer.toString(i))));
272 public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
273 this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
277 public ReplicatedLog build() {
283 public void setCurrentBehavior(final RaftActorBehavior behavior) {
284 super.setCurrentBehavior(behavior);