import akka.actor.ActorRef;
import akka.actor.Props;
-import akka.japi.Creator;
import com.google.common.base.Optional;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
-import org.opendaylight.controller.cluster.DataPersistenceProvider;
+import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.ConfigParams;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
/**
* A sample actor showing how the RaftActor is to be extended
*/
-public class ExampleActor extends RaftActor {
+public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
- private final Map<String, String> state = new HashMap<>();
- private final DataPersistenceProvider dataPersistenceProvider;
+ private final Map<String, String> state = new HashMap();
private long persistIdentifier = 1;
+ private final Optional<ActorRef> roleChangeNotifier;
- public ExampleActor(final String id, final Map<String, String> peerAddresses,
- final Optional<ConfigParams> configParams) {
+ public ExampleActor(String id, Map<String, String> peerAddresses,
+ Optional<ConfigParams> configParams) {
super(id, peerAddresses, configParams);
- this.dataPersistenceProvider = new PersistentDataProvider();
+ setPersistence(true);
+ roleChangeNotifier = createRoleChangeNotifier(id);
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- final Optional<ConfigParams> configParams){
- return Props.create(new Creator<ExampleActor>(){
- private static final long serialVersionUID = 1L;
-
- @Override public ExampleActor create() throws Exception {
- return new ExampleActor(id, peerAddresses, configParams);
- }
- });
+ final Optional<ConfigParams> configParams) {
+ return Props.create(ExampleActor.class, id, peerAddresses, configParams);
}
- @Override public void onReceiveCommand(final Object message) throws Exception{
+ @Override public void onReceiveCommand(Object message) throws Exception{
if(message instanceof KeyValue){
if(isLeader()) {
String persistId = Long.toString(persistIdentifier++);
} else if (message instanceof PrintRole) {
if(LOG.isDebugEnabled()) {
- String followers = "";
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
- followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
- LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+ final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+ LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
+ getRaftActorContext().getPeerAddresses().keySet(), followers);
} else {
- LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+ LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
+ getRaftActorContext().getPeerAddresses().keySet());
}
}
}
+ protected String getReplicatedLogState() {
+ return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
+ + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
+ + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
+ }
+
+ public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
+ ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
+ RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
+ return Optional.<ActorRef>of(exampleRoleChangeNotifier);
+ }
+
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
@Override protected void applyState(final ActorRef clientActor, final String identifier,
final Object data) {
if(data instanceof KeyValue){
}
}
- @Override protected void createSnapshot() {
+ @Override
+ public void createSnapshot(ActorRef actorRef) {
ByteString bs = null;
try {
bs = fromObject(state);
} catch (Exception e) {
- LOG.error(e, "Exception in creating snapshot");
+ LOG.error("Exception in creating snapshot", e);
}
- getSelf().tell(new CaptureSnapshotReply(bs), null);
+ getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
}
- @Override protected void applySnapshot(final ByteString snapshot) {
+ @Override
+ public void applySnapshot(byte [] snapshot) {
state.clear();
try {
- state.putAll((Map<String, String>) toObject(snapshot));
+ state.putAll((HashMap) toObject(snapshot));
} catch (Exception e) {
- LOG.error(e, "Exception in applying snapshot");
+ LOG.error("Exception in applying snapshot", e);
}
if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
+ LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
}
}
- private ByteString fromObject(final Object snapshot) throws Exception {
+ private ByteString fromObject(Object snapshot) throws Exception {
ByteArrayOutputStream b = null;
ObjectOutputStream o = null;
try {
}
}
- private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
+ private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
Object obj = null;
ByteArrayInputStream bis = null;
ObjectInputStream ois = null;
try {
- bis = new ByteArrayInputStream(bs.toByteArray());
+ bis = new ByteArrayInputStream(bs);
ois = new ObjectInputStream(bis);
obj = ois.readObject();
} finally {
}
- @Override
- protected DataPersistenceProvider persistence() {
- return dataPersistenceProvider;
- }
-
- @Override public void onReceiveRecover(final Object message)throws Exception {
+ @Override public void onReceiveRecover(Object message)throws Exception {
super.onReceiveRecover(message);
}
}
@Override
- protected void startLogRecoveryBatch(final int maxBatchSize) {
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return this;
+ }
+
+ @Override
+ public void startLogRecoveryBatch(int maxBatchSize) {
+ }
+
+ @Override
+ public void appendRecoveredLogEntry(Payload data) {
}
@Override
- protected void appendRecoveredLogEntry(final Payload data) {
+ public void applyCurrentLogRecoveryBatch() {
}
@Override
- protected void applyCurrentLogRecoveryBatch() {
+ public void onRecoveryComplete() {
}
@Override
- protected void onRecoveryComplete() {
+ public void applyRecoverySnapshot(byte[] snapshot) {
}
@Override
- protected void applyRecoverySnapshot(final ByteString snapshot) {
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return this;
}
}