Add Payload.serializedSize()
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / MockRaftActorContext.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.raft;
10
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.Props;
17 import com.google.common.io.ByteSource;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import java.io.IOException;
20 import java.io.OutputStream;
21 import java.io.Serializable;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Objects;
25 import java.util.Optional;
26 import java.util.function.Consumer;
27 import org.opendaylight.controller.cluster.DataPersistenceProvider;
28 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
29 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
30 import org.opendaylight.controller.cluster.raft.messages.Payload;
31 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
32 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot.State;
34 import org.opendaylight.controller.cluster.raft.policy.RaftPolicy;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 public class MockRaftActorContext extends RaftActorContextImpl {
39     private static final Logger LOG = LoggerFactory.getLogger(MockRaftActorContext.class);
40
41     private ActorSystem system;
42     private RaftPolicy raftPolicy;
43     private Consumer<Optional<OutputStream>> createSnapshotProcedure = out -> { };
44
45     private static ElectionTerm newElectionTerm() {
46         return new ElectionTerm() {
47             private long currentTerm = 1;
48             private String votedFor = "";
49
50             @Override
51             public long getCurrentTerm() {
52                 return currentTerm;
53             }
54
55             @Override
56             public String getVotedFor() {
57                 return votedFor;
58             }
59
60             @Override
61             public void update(final long newTerm, final String newVotedFor) {
62                 currentTerm = newTerm;
63                 votedFor = newVotedFor;
64
65                 // TODO : Write to some persistent state
66             }
67
68             @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
69                 update(newTerm, newVotedFor);
70             }
71         };
72     }
73
74     private static DataPersistenceProvider createProvider() {
75         return new NonPersistentDataProvider(Runnable::run);
76     }
77
78     public MockRaftActorContext() {
79         super(null, null, "test", newElectionTerm(), -1, -1, new HashMap<>(),
80                 new DefaultConfigParamsImpl(), createProvider(), applyState -> { }, LOG,
81                 MoreExecutors.directExecutor());
82         setReplicatedLog(new MockReplicatedLogBuilder().build());
83     }
84
85     public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
86         super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
87             new DefaultConfigParamsImpl(), createProvider(), applyState -> actor.tell(applyState, actor), LOG,
88             MoreExecutors.directExecutor());
89
90         this.system = system;
91
92         initReplicatedLog();
93     }
94
95
96     public void initReplicatedLog() {
97         SimpleReplicatedLog replicatedLog = new SimpleReplicatedLog();
98         long term = getTermInformation().getCurrentTerm();
99         replicatedLog.append(new SimpleReplicatedLogEntry(0, term, new MockPayload("1")));
100         replicatedLog.append(new SimpleReplicatedLogEntry(1, term, new MockPayload("2")));
101         setReplicatedLog(replicatedLog);
102         setCommitIndex(replicatedLog.lastIndex());
103         setLastApplied(replicatedLog.lastIndex());
104     }
105
106     @Override public ActorRef actorOf(final Props props) {
107         return system.actorOf(props);
108     }
109
110     @Override public ActorSelection actorSelection(final String path) {
111         return system.actorSelection(path);
112     }
113
114     @Override public ActorSystem getActorSystem() {
115         return system;
116     }
117
118     @Override public ActorSelection getPeerActorSelection(final String peerId) {
119         String peerAddress = getPeerAddress(peerId);
120         if (peerAddress != null) {
121             return actorSelection(peerAddress);
122         }
123         return null;
124     }
125
126     public void setPeerAddresses(final Map<String, String> peerAddresses) {
127         for (String id: getPeerIds()) {
128             removePeer(id);
129         }
130
131         for (Map.Entry<String, String> e: peerAddresses.entrySet()) {
132             addToPeers(e.getKey(), e.getValue(), VotingState.VOTING);
133         }
134     }
135
136     @Override
137     public SnapshotManager getSnapshotManager() {
138         SnapshotManager snapshotManager = super.getSnapshotManager();
139         snapshotManager.setCreateSnapshotConsumer(createSnapshotProcedure);
140
141         snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
142             @Override
143             public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
144                 return ByteState.of(snapshotBytes.read());
145             }
146
147             @Override
148             public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
149             }
150
151             @Override
152             public void applySnapshot(final State snapshotState) {
153             }
154         });
155
156         return snapshotManager;
157     }
158
159     public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
160         this.createSnapshotProcedure = createSnapshotProcedure;
161     }
162
163     @Override
164     public RaftPolicy getRaftPolicy() {
165         return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
166     }
167
168     public void setRaftPolicy(final RaftPolicy raftPolicy) {
169         this.raftPolicy = raftPolicy;
170     }
171
172     public static class SimpleReplicatedLog extends AbstractReplicatedLogImpl {
173         @Override
174         public int dataSize() {
175             return -1;
176         }
177
178         @Override
179         public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
180         }
181
182         @Override
183         public boolean shouldCaptureSnapshot(final long logIndex) {
184             return false;
185         }
186
187         @Override
188         public boolean removeFromAndPersist(final long index) {
189             return removeFrom(index) >= 0;
190         }
191
192         @Override
193         @SuppressWarnings("checkstyle:IllegalCatch")
194         public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
195                 final Consumer<ReplicatedLogEntry> callback, final boolean doAsync) {
196             append(replicatedLogEntry);
197
198             if (callback != null) {
199                 callback.accept(replicatedLogEntry);
200             }
201
202             return true;
203         }
204     }
205
206     public static final class MockPayload extends Payload {
207         private static final long serialVersionUID = 3121380393130864247L;
208
209         private final String data;
210         private final int size;
211
212         public MockPayload() {
213             this("");
214         }
215
216         public MockPayload(final String data) {
217             this(data, data.length());
218         }
219
220         public MockPayload(final String data, final int size) {
221             this.data = requireNonNull(data);
222             this.size = size;
223         }
224
225         @Override
226         public int size() {
227             return size;
228         }
229
230         @Override
231         public int serializedSize() {
232             return size;
233         }
234
235         @Override
236         public String toString() {
237             return data;
238         }
239
240         @Override
241         public int hashCode() {
242             return data.hashCode();
243         }
244
245         @Override
246         public boolean equals(final Object obj) {
247             return this == obj || obj instanceof MockPayload other && Objects.equals(data, other.data)
248                 && size == other.size;
249         }
250
251         @Override
252         protected Object writeReplace() {
253             return new MockPayloadProxy(data, size);
254         }
255     }
256
257     private static final class MockPayloadProxy implements Serializable {
258         private static final long serialVersionUID = 1L;
259
260         private final String value;
261         private final int size;
262
263         MockPayloadProxy(String value, int size) {
264             this.value = value;
265             this.size = size;
266         }
267
268         Object readResolve() {
269             return new MockPayload(value, size);
270         }
271     }
272
273     public static class MockReplicatedLogBuilder {
274         private final ReplicatedLog mockLog = new SimpleReplicatedLog();
275
276         public  MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
277             for (int i = start; i < end; i++) {
278                 mockLog.append(new SimpleReplicatedLogEntry(i, term,
279                         new MockRaftActorContext.MockPayload(Integer.toString(i))));
280             }
281             return this;
282         }
283
284         public  MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
285             mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
286             return this;
287         }
288
289         public ReplicatedLog build() {
290             return mockLog;
291         }
292     }
293
294     @Override
295     public void setCurrentBehavior(final RaftActorBehavior behavior) {
296         super.setCurrentBehavior(behavior);
297     }
298 }