import akka.actor.Props;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
private RaftActorRecoverySupport raftActorRecoverySupport;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
- private final byte[] restoreFromSnapshot;
+ private final Snapshot restoreFromSnapshot;
final CountDownLatch snapshotCommitted = new CountDownLatch(1);
private final Function<Runnable, Void> pauseLeaderFunction;
- protected MockRaftActor(AbstractBuilder<?, ?> builder) {
+ protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
state = new ArrayList<>();
pauseLeaderFunction = builder.pauseLeaderFunction;
}
- public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
+ public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) {
raftActorRecoverySupport = support;
}
(snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
}
+ @Override
+ public RaftActorContext getRaftActorContext() {
+ return super.getRaftActorContext();
+ }
+
public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
return snapshotMessageSupport;
}
try {
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
try {
assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
}
@Override
- protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
actorDelegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called: {}", persistenceId(), data);
}
@Override
- public void startLogRecoveryBatch(int maxBatchSize) {
+ public void startLogRecoveryBatch(final int maxBatchSize) {
}
@Override
- public void appendRecoveredLogEntry(Payload data) {
+ public void appendRecoveredLogEntry(final Payload data) {
state.add(data);
}
}
@Override
- public void applyRecoverySnapshot(byte[] bytes) {
- recoveryCohortDelegate.applyRecoverySnapshot(bytes);
- applySnapshotBytes(bytes);
+ public void applyRecoverySnapshot(final Snapshot.State newState) {
+ recoveryCohortDelegate.applyRecoverySnapshot(newState);
+ applySnapshotState(newState);
}
- private void applySnapshotBytes(byte[] bytes) {
- if (bytes.length == 0) {
- return;
- }
-
- try {
- Object data = toObject(bytes);
- if (data instanceof List) {
- state.clear();
- state.addAll((List<?>) data);
- }
- } catch (ClassNotFoundException | IOException e) {
- Throwables.propagate(e);
+ private void applySnapshotState(final Snapshot.State newState) {
+ if (newState instanceof MockSnapshotState) {
+ state.clear();
+ state.addAll(((MockSnapshotState)newState).getState());
}
}
@Override
- public void createSnapshot(ActorRef actorRef) {
+ public void createSnapshot(final ActorRef actorRef, final java.util.Optional<OutputStream> installSnapshotStream) {
LOG.info("{}: createSnapshot called", persistenceId());
- snapshotCohortDelegate.createSnapshot(actorRef);
+ snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
}
@Override
- public void applySnapshot(byte [] snapshot) {
+ public void applySnapshot(final Snapshot.State newState) {
LOG.info("{}: applySnapshot called", persistenceId());
- applySnapshotBytes(snapshot);
- snapshotCohortDelegate.applySnapshot(snapshot);
+ applySnapshotState(newState);
+ snapshotCohortDelegate.applySnapshot(newState);
+ }
+
+ @Override
+ public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
+ try {
+ return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
+ } catch (IOException e) {
+ throw new RuntimeException("Error deserializing state", e);
+ }
}
@Override
return this.getId();
}
- protected void newBehavior(RaftActorBehavior newBehavior) {
+ protected void newBehavior(final RaftActorBehavior newBehavior) {
self().tell(newBehavior, ActorRef.noSender());
}
}
@Override
- protected void pauseLeader(Runnable operation) {
+ protected void pauseLeader(final Runnable operation) {
if (pauseLeaderFunction != null) {
pauseLeaderFunction.apply(operation);
} else {
}
}
- public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
- Object obj = null;
- ByteArrayInputStream bis = null;
- ObjectInputStream ois = null;
- try {
- bis = new ByteArrayInputStream(bs);
- ois = new ObjectInputStream(bis);
- obj = ois.readObject();
- } finally {
- if (bis != null) {
- bis.close();
- }
- if (ois != null) {
- ois.close();
- }
+ public static List<Object> fromState(final Snapshot.State from) {
+ if (from instanceof MockSnapshotState) {
+ return ((MockSnapshotState)from).getState();
}
- return obj;
+
+ throw new IllegalStateException("Unexpected snapshot State: " + from);
}
public ReplicatedLog getReplicatedLog() {
}
@Override
- public byte[] getRestoreFromSnapshot() {
+ public Snapshot getRestoreFromSnapshot() {
return restoreFromSnapshot;
}
- public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
+ public static Props props(final String id, final Map<String, String> peerAddresses, final ConfigParams config) {
return builder().id(id).peerAddresses(peerAddresses).config(config).props();
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
+ final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) {
return builder().id(id).peerAddresses(peerAddresses).config(config)
.dataPersistenceProvider(dataPersistenceProvider).props();
}
private DataPersistenceProvider dataPersistenceProvider;
private ActorRef roleChangeNotifier;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
- private byte[] restoreFromSnapshot;
+ private Snapshot restoreFromSnapshot;
private Optional<Boolean> persistent = Optional.absent();
private final Class<A> actorClass;
private Function<Runnable, Void> pauseLeaderFunction;
private RaftActorSnapshotCohort snapshotCohort;
- protected AbstractBuilder(Class<A> actorClass) {
+ protected AbstractBuilder(final Class<A> actorClass) {
this.actorClass = actorClass;
}
return (T) this;
}
- public T id(String newId) {
+ public T id(final String newId) {
this.id = newId;
return self();
}
- public T peerAddresses(Map<String, String> newPeerAddresses) {
+ public T peerAddresses(final Map<String, String> newPeerAddresses) {
this.peerAddresses = newPeerAddresses;
return self();
}
- public T config(ConfigParams newConfig) {
+ public T config(final ConfigParams newConfig) {
this.config = newConfig;
return self();
}
- public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
+ public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) {
this.dataPersistenceProvider = newDataPersistenceProvider;
return self();
}
- public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
+ public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) {
this.roleChangeNotifier = newRoleChangeNotifier;
return self();
}
- public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
+ public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
this.snapshotMessageSupport = newSnapshotMessageSupport;
return self();
}
- public T restoreFromSnapshot(byte[] newRestoreFromSnapshot) {
+ public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) {
this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
- public T persistent(Optional<Boolean> newPersistent) {
+ public T persistent(final Optional<Boolean> newPersistent) {
this.persistent = newPersistent;
return self();
}
- public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
+ public T pauseLeaderFunction(final Function<Runnable, Void> newPauseLeaderFunction) {
this.pauseLeaderFunction = newPauseLeaderFunction;
return self();
}
- public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
+ public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) {
this.snapshotCohort = newSnapshotCohort;
return self();
}
}
public static class Builder extends AbstractBuilder<Builder, MockRaftActor> {
- private Builder() {
+ Builder() {
super(MockRaftActor.class);
}
}
+
+ public static class MockSnapshotState implements Snapshot.State {
+ private static final long serialVersionUID = 1L;
+
+ private final List<Object> state;
+
+ public MockSnapshotState(final List<Object> state) {
+ this.state = state;
+ }
+
+ public List<Object> getState() {
+ return state;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (state == null ? 0 : state.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MockSnapshotState other = (MockSnapshotState) obj;
+ if (state == null) {
+ if (other.state != null) {
+ return false;
+ }
+ } else if (!state.equals(other.state)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "MockSnapshotState [state=" + state + "]";
+ }
+ }
}