+
+ private final Map<String, String> state = new HashMap();
+ private final DataPersistenceProvider dataPersistenceProvider;
+
+ private long persistIdentifier = 1;
+
+
+ public ExampleActor(String id, Map<String, String> peerAddresses,
+ Optional<ConfigParams> configParams) {
+ super(id, peerAddresses, configParams);
+ this.dataPersistenceProvider = new PersistentDataProvider();
+ }
+
+ public static Props props(final String id, final Map<String, String> peerAddresses,
+ final Optional<ConfigParams> configParams){
+ return Props.create(new Creator<ExampleActor>(){
+
+ @Override public ExampleActor create() throws Exception {
+ return new ExampleActor(id, peerAddresses, configParams);
+ }
+ });
+ }
+
+ @Override public void onReceiveCommand(Object message) throws Exception{
+ if(message instanceof KeyValue){
+ if(isLeader()) {
+ String persistId = Long.toString(persistIdentifier++);
+ persistData(getSender(), persistId, (Payload) message);
+ } else {
+ if(getLeader() != null) {
+ getLeader().forward(message, getContext());
+ }
+ }
+
+ } else if (message instanceof PrintState) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("State of the node:{} has entries={}, {}",
+ getId(), state.size(), getReplicatedLogState());
+ }
+
+ } else if (message instanceof PrintRole) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+ }
+
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+
+ @Override protected void applyState(ActorRef clientActor, String identifier,
+ Object data) {
+ if(data instanceof KeyValue){
+ KeyValue kv = (KeyValue) data;
+ state.put(kv.getKey(), kv.getValue());
+ if(clientActor != null) {
+ clientActor.tell(new KeyValueSaved(), getSelf());
+ }
+ }
+ }
+
+ @Override protected void createSnapshot() {
+ ByteString bs = null;
+ try {
+ bs = fromObject(state);
+ } catch (Exception e) {
+ LOG.error(e, "Exception in creating snapshot");
+ }
+ getSelf().tell(new CaptureSnapshotReply(bs), null);
+ }
+
+ @Override protected void applySnapshot(ByteString snapshot) {
+ state.clear();
+ try {
+ state.putAll((HashMap) toObject(snapshot));
+ } catch (Exception e) {
+ LOG.error(e, "Exception in applying snapshot");
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
+ }