2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import com.google.common.io.ByteSource;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.io.IOException;
20 import java.io.OutputStream;
21 import java.io.Serializable;
22 import java.util.HashMap;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.function.Consumer;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.messages.Payload;
31 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
32 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
34 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 public class MockRaftActorContext extends RaftActorContextImpl {
39 private static final Logger LOG = LoggerFactory.getLogger(MockRaftActorContext.class);
41 private ActorSystem system;
42 private RaftPolicy raftPolicy;
43 private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
45 private static ElectionTerm newElectionTerm() {
46 return new ElectionTerm() {
47 private long currentTerm = 1;
48 private String votedFor = "";
51 public long getCurrentTerm() {
56 public String getVotedFor() {
61 public void update(final long newTerm, final String newVotedFor) {
62 currentTerm = newTerm;
63 votedFor = newVotedFor;
65 // TODO : Write to some persistent state
68 @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
69 update(newTerm, newVotedFor);
74 private static DataPersistenceProvider createProvider() {
75 return new NonPersistentDataProvider(Runnable::run);
78 public MockRaftActorContext() {
79 super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
80 new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
81 MoreExecutors.directExecutor());
82 setReplicatedLog(new MockReplicatedLogBuilder().build());
85 public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
86 super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
87 new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
88 MoreExecutors.directExecutor());
96 public void initReplicatedLog() {
97 SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
98 long term = getTermInformation().getCurrentTerm();
99 replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
100 replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
101 setReplicatedLog(replicatedLog);
102 setCommitIndex(replicatedLog.lastIndex());
103 setLastApplied(replicatedLog.lastIndex());
106 @Override public ActorRef actorOf(final Props props) {
107 return system.actorOf(props);
110 @Override public ActorSelection actorSelection(final String path) {
111 return system.actorSelection(path);
114 @Override public ActorSystem getActorSystem() {
118 @Override public ActorSelection getPeerActorSelection(final String peerId) {
119 String peerAddress = getPeerAddress(peerId);
120 if (peerAddress != null) {
121 return actorSelection(peerAddress);
126 public void setPeerAddresses(final Map<String, String> peerAddresses) {
127 for (String id: getPeerIds()) {
131 for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
132 addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
137 public SnapshotManager getSnapshotManager() {
138 SnapshotManager snapshotManager = super.getSnapshotManager();
139 snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
141 snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
143 public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
144 return ByteState.of(snapshotBytes.read());
148 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
152 public void applySnapshot(final State snapshotState) {
156 return snapshotManager;
159 public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
160 this.createSnapshotProcedure = createSnapshotProcedure;
164 public RaftPolicy getRaftPolicy() {
165 return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
168 public void setRaftPolicy(final RaftPolicy raftPolicy) {
169 this.raftPolicy = raftPolicy;
172 public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
174 public int dataSize() {
179 public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
183 public boolean shouldCaptureSnapshot(final long logIndex) {
188 public boolean removeFromAndPersist(final long index) {
189 return removeFrom(index) >= 0;
193 @SuppressWarnings("checkstyle:IllegalCatch")
194 public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
195 final Consumer<ReplicatedLogEntry> callback, final boolean doAsync) {
196 append(replicatedLogEntry);
198 if (callback != null) {
199 callback.accept(replicatedLogEntry);
206 public static final class MockPayload extends Payload {
207 private static final long serialVersionUID = 3121380393130864247L;
209 private final String data;
210 private final int size;
212 public MockPayload() {
216 public MockPayload(final String data) {
217 this(data, data.length());
220 public MockPayload(final String data, final int size) {
221 this.data = requireNonNull(data);
231 public int serializedSize() {
236 public String toString() {
241 public int hashCode() {
242 return data.hashCode();
246 public boolean equals(final Object obj) {
247 return this == obj || obj instanceof MockPayload other && Objects.equals(data, other.data)
248 && size == other.size;
252 protected Object writeReplace() {
253 return new MockPayloadProxy(data, size);
257 private static final class MockPayloadProxy implements Serializable {
258 private static final long serialVersionUID = 1L;
260 private final String value;
261 private final int size;
263 MockPayloadProxy(String value, int size) {
268 Object readResolve() {
269 return new MockPayload(value, size);
273 public static class MockReplicatedLogBuilder {
274 private final ReplicatedLog mockLog = new SimpleReplicatedLog();
276 public MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
277 for (int i = start; i < end; i++) {
278 mockLog.append(new SimpleReplicatedLogEntry(i, term,
279 new MockRaftActorContext.MockPayload(Integer.toString(i))));
284 public MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
285 mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
289 public ReplicatedLog build() {
295 public void setCurrentBehavior(final RaftActorBehavior behavior) {
296 super.setCurrentBehavior(behavior);