2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Props;
15 import akka.japi.Procedure;
16 import com.google.common.io.ByteSource;
17 import java.io.IOException;
18 import java.io.OutputStream;
19 import java.io.Serializable;
20 import java.util.HashMap;
22 import java.util.Optional;
23 import java.util.function.Consumer;
24 import org.opendaylight.controller.cluster.DataPersistenceProvider;
25 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
26 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
27 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
28 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
29 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
30 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
31 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public class MockRaftActorContext extends RaftActorContextImpl {
36 private static final Logger LOG = LoggerFactory.getLogger(MockRaftActorContext.class);
38 private ActorSystem system;
39 private RaftPolicy raftPolicy;
40 private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
42 private static ElectionTerm newElectionTerm() {
43 return new ElectionTerm() {
44 private long currentTerm = 1;
45 private String votedFor = "";
48 public long getCurrentTerm() {
53 public String getVotedFor() {
58 public void update(final long newTerm, final String newVotedFor) {
59 this.currentTerm = newTerm;
60 this.votedFor = newVotedFor;
62 // TODO : Write to some persistent state
65 @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
66 update(newTerm, newVotedFor);
71 private static DataPersistenceProvider createProvider() {
72 return new NonPersistentDataProvider(Runnable::run);
75 public MockRaftActorContext() {
76 super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
77 new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG);
78 setReplicatedLog(new MockReplicatedLogBuilder().build());
81 public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
82 super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
83 new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG);
91 public void initReplicatedLog() {
92 SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
93 long term = getTermInformation().getCurrentTerm();
94 replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
95 replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
96 setReplicatedLog(replicatedLog);
97 setCommitIndex(replicatedLog.lastIndex());
98 setLastApplied(replicatedLog.lastIndex());
101 @Override public ActorRef actorOf(final Props props) {
102 return system.actorOf(props);
105 @Override public ActorSelection actorSelection(final String path) {
106 return system.actorSelection(path);
109 @Override public ActorSystem getActorSystem() {
113 @Override public ActorSelection getPeerActorSelection(final String peerId) {
114 String peerAddress = getPeerAddress(peerId);
115 if (peerAddress != null) {
116 return actorSelection(peerAddress);
121 public void setPeerAddresses(final Map<String, String> peerAddresses) {
122 for (String id: getPeerIds()) {
126 for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
127 addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
132 public SnapshotManager getSnapshotManager() {
133 SnapshotManager snapshotManager = super.getSnapshotManager();
134 snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
136 snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
138 public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
139 return ByteState.of(snapshotBytes.read());
143 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
147 public void applySnapshot(final State snapshotState) {
151 return snapshotManager;
154 public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
155 this.createSnapshotProcedure = createSnapshotProcedure;
159 public RaftPolicy getRaftPolicy() {
160 return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
163 public void setRaftPolicy(final RaftPolicy raftPolicy) {
164 this.raftPolicy = raftPolicy;
167 public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
169 public int dataSize() {
174 public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
178 public boolean shouldCaptureSnapshot(final long logIndex) {
183 public boolean removeFromAndPersist(final long index) {
184 return removeFrom(index) >= 0;
188 @SuppressWarnings("checkstyle:IllegalCatch")
189 public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
190 final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
191 append(replicatedLogEntry);
193 if (callback != null) {
195 callback.apply(replicatedLogEntry);
196 } catch (RuntimeException e) {
198 } catch (Exception e) {
199 throw new RuntimeException(e);
207 public static class MockPayload extends Payload implements Serializable {
208 private static final long serialVersionUID = 3121380393130864247L;
209 private String value = "";
212 public MockPayload() {
215 public MockPayload(final String data) {
217 size = value.length();
220 public MockPayload(final String data, final int size) {
231 public String toString() {
236 public int hashCode() {
237 final int prime = 31;
239 result = prime * result + (value == null ? 0 : value.hashCode());
244 public boolean equals(final Object obj) {
251 if (getClass() != obj.getClass()) {
254 MockPayload other = (MockPayload) obj;
256 if (other.value != null) {
259 } else if (!value.equals(other.value)) {
266 public static class MockReplicatedLogBuilder {
267 private final ReplicatedLog mockLog = new SimpleReplicatedLog();
269 public MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
270 for (int i = start; i < end; i++) {
271 this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
272 new MockRaftActorContext.MockPayload(Integer.toString(i))));
277 public MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
278 this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
282 public ReplicatedLog build() {
288 public void setCurrentBehavior(final RaftActorBehavior behavior) {
289 super.setCurrentBehavior(behavior);