2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Props;
15 import akka.japi.Procedure;
16 import com.google.common.io.ByteSource;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.io.IOException;
19 import java.io.OutputStream;
20 import java.io.Serializable;
21 import java.util.HashMap;
23 import java.util.Optional;
24 import java.util.function.Consumer;
25 import org.opendaylight.controller.cluster.DataPersistenceProvider;
26 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
27 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
28 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
29 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
30 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
31 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 public class MockRaftActorContext extends RaftActorContextImpl {
37 private static final Logger LOG = LoggerFactory.getLogger(MockRaftActorContext.class);
39 private ActorSystem system;
40 private RaftPolicy raftPolicy;
41 private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
43 private static ElectionTerm newElectionTerm() {
44 return new ElectionTerm() {
45 private long currentTerm = 1;
46 private String votedFor = "";
49 public long getCurrentTerm() {
54 public String getVotedFor() {
59 public void update(final long newTerm, final String newVotedFor) {
60 this.currentTerm = newTerm;
61 this.votedFor = newVotedFor;
63 // TODO : Write to some persistent state
66 @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
67 update(newTerm, newVotedFor);
72 private static DataPersistenceProvider createProvider() {
73 return new NonPersistentDataProvider(Runnable::run);
76 public MockRaftActorContext() {
77 super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
78 new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
79 MoreExecutors.directExecutor());
80 setReplicatedLog(new MockReplicatedLogBuilder().build());
83 public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
84 super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
85 new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
86 MoreExecutors.directExecutor());
94 public void initReplicatedLog() {
95 SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
96 long term = getTermInformation().getCurrentTerm();
97 replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
98 replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
99 setReplicatedLog(replicatedLog);
100 setCommitIndex(replicatedLog.lastIndex());
101 setLastApplied(replicatedLog.lastIndex());
104 @Override public ActorRef actorOf(final Props props) {
105 return system.actorOf(props);
108 @Override public ActorSelection actorSelection(final String path) {
109 return system.actorSelection(path);
112 @Override public ActorSystem getActorSystem() {
116 @Override public ActorSelection getPeerActorSelection(final String peerId) {
117 String peerAddress = getPeerAddress(peerId);
118 if (peerAddress != null) {
119 return actorSelection(peerAddress);
124 public void setPeerAddresses(final Map<String, String> peerAddresses) {
125 for (String id: getPeerIds()) {
129 for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
130 addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
135 public SnapshotManager getSnapshotManager() {
136 SnapshotManager snapshotManager = super.getSnapshotManager();
137 snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
139 snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
141 public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
142 return ByteState.of(snapshotBytes.read());
146 public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
150 public void applySnapshot(final State snapshotState) {
154 return snapshotManager;
157 public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
158 this.createSnapshotProcedure = createSnapshotProcedure;
162 public RaftPolicy getRaftPolicy() {
163 return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
166 public void setRaftPolicy(final RaftPolicy raftPolicy) {
167 this.raftPolicy = raftPolicy;
170 public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
172 public int dataSize() {
177 public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
181 public boolean shouldCaptureSnapshot(final long logIndex) {
186 public boolean removeFromAndPersist(final long index) {
187 return removeFrom(index) >= 0;
191 @SuppressWarnings("checkstyle:IllegalCatch")
192 public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
193 final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
194 append(replicatedLogEntry);
196 if (callback != null) {
198 callback.apply(replicatedLogEntry);
199 } catch (RuntimeException e) {
201 } catch (Exception e) {
202 throw new RuntimeException(e);
210 public static class MockPayload extends Payload implements Serializable {
211 private static final long serialVersionUID = 3121380393130864247L;
212 private String value = "";
215 public MockPayload() {
218 public MockPayload(final String data) {
220 size = value.length();
223 public MockPayload(final String data, final int size) {
234 public String toString() {
239 public int hashCode() {
240 final int prime = 31;
242 result = prime * result + (value == null ? 0 : value.hashCode());
247 public boolean equals(final Object obj) {
254 if (getClass() != obj.getClass()) {
257 MockPayload other = (MockPayload) obj;
259 if (other.value != null) {
262 } else if (!value.equals(other.value)) {
269 public static class MockReplicatedLogBuilder {
270 private final ReplicatedLog mockLog = new SimpleReplicatedLog();
272 public MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
273 for (int i = start; i < end; i++) {
274 this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
275 new MockRaftActorContext.MockPayload(Integer.toString(i))));
280 public MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
281 this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
285 public ReplicatedLog build() {
291 public void setCurrentBehavior(final RaftActorBehavior behavior) {
292 super.setCurrentBehavior(behavior);