import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
public class MockRaftActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
+ public static final short PAYLOAD_VERSION = 5;
+
final RaftActor actorDelegate;
final RaftActorRecoveryCohort recoveryCohortDelegate;
- final RaftActorSnapshotCohort snapshotCohortDelegate;
+ volatile RaftActorSnapshotCohort snapshotCohortDelegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
private ActorRef roleChangeNotifier;
- private final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
+ protected final CountDownLatch initializeBehaviorComplete = new CountDownLatch(1);
private RaftActorRecoverySupport raftActorRecoverySupport;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
DataPersistenceProvider dataPersistenceProvider) {
- super(id, peerAddresses, config);
+ super(id, peerAddresses, config, PAYLOAD_VERSION);
state = new ArrayList<>();
this.actorDelegate = mock(RaftActor.class);
this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
@Override
protected RaftActorSnapshotMessageSupport newRaftActorSnapshotMessageSupport() {
- return snapshotMessageSupport != null ? snapshotMessageSupport : super.newRaftActorSnapshotMessageSupport();
+ return snapshotMessageSupport != null ? snapshotMessageSupport :
+ (snapshotMessageSupport = super.newRaftActorSnapshotMessageSupport());
+ }
+
+ public RaftActorSnapshotMessageSupport getSnapshotMessageSupport() {
+ return snapshotMessageSupport;
}
public void waitForRecoveryComplete() {
return Props.create(new MockRaftActorCreator(peerAddresses, id, config, dataPersistenceProvider, roleChangeNotifier));
}
-
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
actorDelegate.applyState(clientActor, identifier, data);
- LOG.info("{}: applyState called", persistenceId());
+ LOG.info("{}: applyState called: {}", persistenceId(), data);
+
+ state.add(data);
}
@Override
@Override
public void applyRecoverySnapshot(byte[] bytes) {
recoveryCohortDelegate.applyRecoverySnapshot(bytes);
+ applySnapshotBytes(bytes);
+ }
+
+ private void applySnapshotBytes(byte[] bytes) {
try {
Object data = toObject(bytes);
if (data instanceof List) {
public void applySnapshot(byte [] snapshot) {
LOG.info("{}: applySnapshot called", persistenceId());
snapshotCohortDelegate.applySnapshot(snapshot);
+ applySnapshotBytes(snapshot);
}
@Override
return this.getId();
}
- private Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
+ protected void newBehavior(RaftActorBehavior newBehavior) {
+ self().tell(newBehavior, ActorRef.noSender());
+ }
+
+ @Override
+ public void handleCommand(final Object message) {
+ if(message instanceof RaftActorBehavior) {
+ super.changeCurrentBehavior((RaftActorBehavior)message);
+ } else {
+ super.handleCommand(message);
+ }
+ }
+
+ public static Object toObject(byte[] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
public ReplicatedLog getReplicatedLog(){
return this.getRaftActorContext().getReplicatedLog();
}
-}
\ No newline at end of file
+}