- MockAkkaJournal.clearJournal();
- MockSnapshotStore.setMockSnapshot(null);
- }
-
- public static class MockRaftActor extends RaftActor {
-
- private final DataPersistenceProvider dataPersistenceProvider;
- private final RaftActor delegate;
- private final CountDownLatch recoveryComplete = new CountDownLatch(1);
- private final List<Object> state;
- private ActorRef roleChangeNotifier;
- private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
-
- public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
- private static final long serialVersionUID = 1L;
- private final Map<String, String> peerAddresses;
- private final String id;
- private final Optional<ConfigParams> config;
- private final DataPersistenceProvider dataPersistenceProvider;
- private final ActorRef roleChangeNotifier;
-
- private MockRaftActorCreator(Map<String, String> peerAddresses, String id,
- Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider,
- ActorRef roleChangeNotifier) {
- this.peerAddresses = peerAddresses;
- this.id = id;
- this.config = config;
- this.dataPersistenceProvider = dataPersistenceProvider;
- this.roleChangeNotifier = roleChangeNotifier;
- }
-
- @Override
- public MockRaftActor create() throws Exception {
- MockRaftActor mockRaftActor = new MockRaftActor(id, peerAddresses, config,
- dataPersistenceProvider);
- mockRaftActor.roleChangeNotifier = this.roleChangeNotifier;
- return mockRaftActor;
- }
- }
-
- public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
- DataPersistenceProvider dataPersistenceProvider) {
- super(id, peerAddresses, config);
- state = new ArrayList<>();
- this.delegate = mock(RaftActor.class);
- if(dataPersistenceProvider == null){
- this.dataPersistenceProvider = new PersistentDataProvider();
- } else {
- this.dataPersistenceProvider = dataPersistenceProvider;
- }
- }
-
- public void waitForRecoveryComplete() {
- try {
- assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public void waitForInitializeBehaviorComplete() {
- try {
- assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public List<Object> getState() {
- return state;
- }
-
- public static Props props(final String id, final Map<String, String> peerAddresses,
- Optional<ConfigParams> config){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, null));
- }
-
- public static Props props(final String id, final Map<String, String> peerAddresses,
- Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, null));
- }
-
- public static Props props(final String id, final Map<String, String> peerAddresses,
- Optional<ConfigParams> config, ActorRef roleChangeNotifier){
- return Props.create(new MockRaftActorCreator(peerAddresses, id, config, null, roleChangeNotifier));
- }
-
- @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
- delegate.applyState(clientActor, identifier, data);
- LOG.info("applyState called");
- }
-
- @Override
- protected void startLogRecoveryBatch(int maxBatchSize) {
- }
-
- @Override
- protected void appendRecoveredLogEntry(Payload data) {
- state.add(data);
- }
-
- @Override
- protected void applyCurrentLogRecoveryBatch() {
- }
-
- @Override
- protected void onRecoveryComplete() {
- delegate.onRecoveryComplete();
- recoveryComplete.countDown();
- }
-
- @Override
- protected void initializeBehavior() {
- super.initializeBehavior();
- initializeBehaviorComplete.countDown();
- }
-
- @Override
- protected void applyRecoverySnapshot(byte[] bytes) {
- delegate.applyRecoverySnapshot(bytes);
- try {
- Object data = toObject(bytes);
- if (data instanceof List) {
- state.addAll((List<?>) data);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- @Override protected void createSnapshot() {
- delegate.createSnapshot();
- }
-
- @Override protected void applySnapshot(byte [] snapshot) {
- delegate.applySnapshot(snapshot);
- }
-
- @Override protected void onStateChanged() {
- delegate.onStateChanged();
- }
-
- @Override
- protected DataPersistenceProvider persistence() {
- return this.dataPersistenceProvider;
- }
-
- @Override
- protected Optional<ActorRef> getRoleChangeNotifier() {
- return Optional.fromNullable(roleChangeNotifier);
- }
-
- @Override public String persistenceId() {
- return this.getId();
- }
-
- private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
- Object obj = null;
- ByteArrayInputStream bis = null;
- ObjectInputStream ois = null;
- try {
- bis = new ByteArrayInputStream(bs);
- ois = new ObjectInputStream(bis);
- obj = ois.readObject();
- } finally {
- if (bis != null) {
- bis.close();
- }
- if (ois != null) {
- ois.close();
- }
- }
- return obj;
- }
-
- public ReplicatedLog getReplicatedLog(){
- return this.getRaftActorContext().getReplicatedLog();
- }
-