2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
3 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.raft;
11 import static org.junit.Assert.assertEquals;
12 import static org.mockito.Mockito.mock;
14 import akka.actor.ActorRef;
15 import akka.actor.Props;
16 import com.google.common.base.Function;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Throwables;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.io.ByteArrayInputStream;
21 import java.io.IOException;
22 import java.io.ObjectInputStream;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import javax.annotation.Nonnull;
30 import org.opendaylight.controller.cluster.DataPersistenceProvider;
31 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
33 import org.opendaylight.yangtools.concepts.Identifier;
35 public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
36 public static final short PAYLOAD_VERSION = 5;
38 final RaftActor actorDelegate;
39 final RaftActorRecoveryCohort recoveryCohortDelegate;
40 volatile RaftActorSnapshotCohort snapshotCohortDelegate;
41 private final CountDownLatch recoveryComplete = new CountDownLatch(1);
42 private final List<Object> state;
43 private final ActorRef roleChangeNotifier;
44 protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
45 private RaftActorRecoverySupport raftActorRecoverySupport;
46 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
47 private final byte[] restoreFromSnapshot;
48 final CountDownLatch snapshotCommitted = new CountDownLatch(1);
49 private final Function<Runnable, Void> pauseLeaderFunction;
51 protected MockRaftActor(AbstractBuilder<?, ?> builder) {
52 super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
53 Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
54 state = new ArrayList<>();
55 this.actorDelegate = mock(RaftActor.class);
56 this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
58 this.snapshotCohortDelegate = builder.snapshotCohort != null ? builder.snapshotCohort :
59 mock(RaftActorSnapshotCohort.class);
61 if (builder.dataPersistenceProvider == null) {
62 setPersistence(builder.persistent.isPresent() ? builder.persistent.get() : true);
64 setPersistence(builder.dataPersistenceProvider);
67 roleChangeNotifier = builder.roleChangeNotifier;
68 snapshotMessageSupport = builder.snapshotMessageSupport;
69 restoreFromSnapshot = builder.restoreFromSnapshot;
70 pauseLeaderFunction = builder.pauseLeaderFunction;
73 public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
74 raftActorRecoverySupport = support;
78 public RaftActorRecoverySupport newRaftActorRecoverySupport() {
79 return raftActorRecoverySupport != null ? raftActorRecoverySupport : super.newRaftActorRecoverySupport();
83 protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
84 return snapshotMessageSupport != null ? snapshotMessageSupport :
85 (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
88 public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
89 return snapshotMessageSupport;
92 public void waitForRecoveryComplete() {
94 assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
95 } catch (InterruptedException e) {
96 Throwables.propagate(e);
100 public void waitForInitializeBehaviorComplete() {
102 assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
103 } catch (InterruptedException e) {
104 Throwables.propagate(e);
109 public void waitUntilLeader() {
110 for (int i = 0; i < 10; i++) {
114 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
118 public List<Object> getState() {
123 protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
124 actorDelegate.applyState(clientActor, identifier, data);
125 LOG.info("{}: applyState called: {}", persistenceId(), data);
132 protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
137 protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
142 public void startLogRecoveryBatch(int maxBatchSize) {
146 public void appendRecoveredLogEntry(Payload data) {
151 public void applyCurrentLogRecoveryBatch() {
155 protected void onRecoveryComplete() {
156 actorDelegate.onRecoveryComplete();
157 recoveryComplete.countDown();
161 protected void initializeBehavior() {
162 super.initializeBehavior();
163 initializeBehaviorComplete.countDown();
167 public void applyRecoverySnapshot(byte[] bytes) {
168 recoveryCohortDelegate.applyRecoverySnapshot(bytes);
169 applySnapshotBytes(bytes);
172 private void applySnapshotBytes(byte[] bytes) {
173 if (bytes.length == 0) {
178 Object data = toObject(bytes);
179 if (data instanceof List) {
181 state.addAll((List<?>) data);
183 } catch (ClassNotFoundException | IOException e) {
184 Throwables.propagate(e);
189 public void createSnapshot(ActorRef actorRef) {
190 LOG.info("{}: createSnapshot called", persistenceId());
191 snapshotCohortDelegate.createSnapshot(actorRef);
195 public void applySnapshot(byte [] snapshot) {
196 LOG.info("{}: applySnapshot called", persistenceId());
197 applySnapshotBytes(snapshot);
198 snapshotCohortDelegate.applySnapshot(snapshot);
202 protected void onStateChanged() {
203 actorDelegate.onStateChanged();
207 protected Optional<ActorRef> getRoleChangeNotifier() {
208 return Optional.fromNullable(roleChangeNotifier);
211 @Override public String persistenceId() {
215 protected void newBehavior(RaftActorBehavior newBehavior) {
216 self().tell(newBehavior, ActorRef.noSender());
220 protected void handleCommand(final Object message) {
221 if (message instanceof RaftActorBehavior) {
222 super.changeCurrentBehavior((RaftActorBehavior)message);
224 super.handleCommand(message);
226 if (RaftActorSnapshotMessageSupport.COMMIT_SNAPSHOT.equals(message)) {
227 snapshotCommitted.countDown();
233 protected void pauseLeader(Runnable operation) {
234 if (pauseLeaderFunction != null) {
235 pauseLeaderFunction.apply(operation);
237 super.pauseLeader(operation);
241 public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
243 ByteArrayInputStream bis = null;
244 ObjectInputStream ois = null;
246 bis = new ByteArrayInputStream(bs);
247 ois = new ObjectInputStream(bis);
248 obj = ois.readObject();
260 public ReplicatedLog getReplicatedLog() {
261 return this.getRaftActorContext().getReplicatedLog();
265 public byte[] getRestoreFromSnapshot() {
266 return restoreFromSnapshot;
269 public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
270 return builder().id(id).peerAddresses(peerAddresses).config(config).props();
273 public static Props props(final String id, final Map<String, String> peerAddresses,
274 ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
275 return builder().id(id).peerAddresses(peerAddresses).config(config)
276 .dataPersistenceProvider(dataPersistenceProvider).props();
279 public static Builder builder() {
280 return new Builder();
283 public static class AbstractBuilder<T extends AbstractBuilder<T, A>, A extends MockRaftActor> {
284 private Map<String, String> peerAddresses = Collections.emptyMap();
286 private ConfigParams config;
287 private DataPersistenceProvider dataPersistenceProvider;
288 private ActorRef roleChangeNotifier;
289 private RaftActorSnapshotMessageSupport snapshotMessageSupport;
290 private byte[] restoreFromSnapshot;
291 private Optional<Boolean> persistent = Optional.absent();
292 private final Class<A> actorClass;
293 private Function<Runnable, Void> pauseLeaderFunction;
294 private RaftActorSnapshotCohort snapshotCohort;
296 protected AbstractBuilder(Class<A> actorClass) {
297 this.actorClass = actorClass;
300 @SuppressWarnings("unchecked")
305 public T id(String newId) {
310 public T peerAddresses(Map<String, String> newPeerAddresses) {
311 this.peerAddresses = newPeerAddresses;
315 public T config(ConfigParams newConfig) {
316 this.config = newConfig;
320 public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
321 this.dataPersistenceProvider = newDataPersistenceProvider;
325 public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
326 this.roleChangeNotifier = newRoleChangeNotifier;
330 public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
331 this.snapshotMessageSupport = newSnapshotMessageSupport;
335 public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
336 this.restoreFromSnapshot = newRestoreFromSnapshot;
340 public T persistent(Optional<Boolean> newPersistent) {
341 this.persistent = newPersistent;
345 public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
346 this.pauseLeaderFunction = newPauseLeaderFunction;
350 public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
351 this.snapshotCohort = newSnapshotCohort;
355 public Props props() {
356 return Props.create(actorClass, this);
360 public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
362 super(MockRaftActor.class);