Do not break actor containment
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Props;
15 import akka.japi.Procedure;
16 import com.google.common.io.ByteSource;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.io.IOException;
19 import java.io.OutputStream;
20 import java.io.Serializable;
21 import java.util.HashMap;
22 import java.util.Map;
23 import java.util.Optional;
24 import java.util.function.Consumer;
25 import org.opendaylight.controller.cluster.DataPersistenceProvider;
26 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
27 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
28 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
29 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
30 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
31 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 public class MockRaftActorContext extends RaftActorContextImpl {
37     private static final Logger LOG = LoggerFactory.getLogger(MockRaftActorContext.class);
38
39     private ActorSystem system;
40     private RaftPolicy raftPolicy;
41     private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
42
43     private static ElectionTerm newElectionTerm() {
44         return new ElectionTerm() {
45             private long currentTerm = 1;
46             private String votedFor = "";
47
48             @Override
49             public long getCurrentTerm() {
50                 return currentTerm;
51             }
52
53             @Override
54             public String getVotedFor() {
55                 return votedFor;
56             }
57
58             @Override
59             public void update(final long newTerm, final String newVotedFor) {
60                 this.currentTerm = newTerm;
61                 this.votedFor = newVotedFor;
62
63                 // TODO : Write to some persistent state
64             }
65
66             @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
67                 update(newTerm, newVotedFor);
68             }
69         };
70     }
71
72     private static DataPersistenceProvider createProvider() {
73         return new NonPersistentDataProvider(Runnable::run);
74     }
75
76     public MockRaftActorContext() {
77         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
78                 new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
79                 MoreExecutors.directExecutor());
80         setReplicatedLog(new MockReplicatedLogBuilder().build());
81     }
82
83     public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
84         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
85             new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
86             MoreExecutors.directExecutor());
87
88         this.system = system;
89
90         initReplicatedLog();
91     }
92
93
94     public void initReplicatedLog() {
95         SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
96         long term = getTermInformation().getCurrentTerm();
97         replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
98         replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
99         setReplicatedLog(replicatedLog);
100         setCommitIndex(replicatedLog.lastIndex());
101         setLastApplied(replicatedLog.lastIndex());
102     }
103
104     @Override public ActorRef actorOf(final Props props) {
105         return system.actorOf(props);
106     }
107
108     @Override public ActorSelection actorSelection(final String path) {
109         return system.actorSelection(path);
110     }
111
112     @Override public ActorSystem getActorSystem() {
113         return this.system;
114     }
115
116     @Override public ActorSelection getPeerActorSelection(final String peerId) {
117         String peerAddress = getPeerAddress(peerId);
118         if (peerAddress != null) {
119             return actorSelection(peerAddress);
120         }
121         return null;
122     }
123
124     public void setPeerAddresses(final Map<String, String> peerAddresses) {
125         for (String id: getPeerIds()) {
126             removePeer(id);
127         }
128
129         for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
130             addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
131         }
132     }
133
134     @Override
135     public SnapshotManager getSnapshotManager() {
136         SnapshotManager snapshotManager = super.getSnapshotManager();
137         snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
138
139         snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
140             @Override
141             public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
142                 return ByteState.of(snapshotBytes.read());
143             }
144
145             @Override
146             public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
147             }
148
149             @Override
150             public void applySnapshot(final State snapshotState) {
151             }
152         });
153
154         return snapshotManager;
155     }
156
157     public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
158         this.createSnapshotProcedure = createSnapshotProcedure;
159     }
160
161     @Override
162     public RaftPolicy getRaftPolicy() {
163         return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
164     }
165
166     public void setRaftPolicy(final RaftPolicy raftPolicy) {
167         this.raftPolicy = raftPolicy;
168     }
169
170     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
171         @Override
172         public int dataSize() {
173             return -1;
174         }
175
176         @Override
177         public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
178         }
179
180         @Override
181         public boolean shouldCaptureSnapshot(final long logIndex) {
182             return false;
183         }
184
185         @Override
186         public boolean removeFromAndPersist(final long index) {
187             return removeFrom(index) >= 0;
188         }
189
190         @Override
191         @SuppressWarnings("checkstyle:IllegalCatch")
192         public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
193                 final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
194             append(replicatedLogEntry);
195
196             if (callback != null) {
197                 try {
198                     callback.apply(replicatedLogEntry);
199                 } catch (RuntimeException e) {
200                     throw e;
201                 } catch (Exception e) {
202                     throw new RuntimeException(e);
203                 }
204             }
205
206             return true;
207         }
208     }
209
210     public static class MockPayload extends Payload implements Serializable {
211         private static final long serialVersionUID = 3121380393130864247L;
212         private String value = "";
213         private int size;
214
215         public MockPayload() {
216         }
217
218         public MockPayload(final String data) {
219             this.value = data;
220             size = value.length();
221         }
222
223         public MockPayload(final String data, final int size) {
224             this(data);
225             this.size = size;
226         }
227
228         @Override
229         public int size() {
230             return size;
231         }
232
233         @Override
234         public String toString() {
235             return value;
236         }
237
238         @Override
239         public int hashCode() {
240             final int prime = 31;
241             int result = 1;
242             result = prime * result + (value == null ? 0 : value.hashCode());
243             return result;
244         }
245
246         @Override
247         public boolean equals(final Object obj) {
248             if (this == obj) {
249                 return true;
250             }
251             if (obj == null) {
252                 return false;
253             }
254             if (getClass() != obj.getClass()) {
255                 return false;
256             }
257             MockPayload other = (MockPayload) obj;
258             if (value == null) {
259                 if (other.value != null) {
260                     return false;
261                 }
262             } else if (!value.equals(other.value)) {
263                 return false;
264             }
265             return true;
266         }
267     }
268
269     public static class MockReplicatedLogBuilder {
270         private final ReplicatedLog mockLog = new SimpleReplicatedLog();
271
272         public  MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
273             for (int i = start; i < end; i++) {
274                 this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
275                         new MockRaftActorContext.MockPayload(Integer.toString(i))));
276             }
277             return this;
278         }
279
280         public  MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
281             this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
282             return this;
283         }
284
285         public ReplicatedLog build() {
286             return this.mockLog;
287         }
288     }
289
290     @Override
291     public void setCurrentBehavior(final RaftActorBehavior behavior) {
292         super.setCurrentBehavior(behavior);
293     }
294 }