Make Raft messages serializable
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
index 3c8e12b8b11e0c5fd977ceedbda2fa65ffb1b71a..641ec0582c3790cac20a6b7102ad5e12134e72e1 100644 (file)
@@ -8,27 +8,88 @@
 
 package org.opendaylight.controller.cluster.example;
 
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.Creator;
+import org.opendaylight.controller.cluster.example.messages.KeyValue;
+import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * A sample actor showing how the RaftActor is to be extended
  */
 public class ExampleActor extends RaftActor {
-    public ExampleActor(String id) {
-        super(id);
+
+    private final Map<String, String> state = new HashMap();
+
+    private long persistIdentifier = 1;
+
+
+    public ExampleActor(String id, Map<String, String> peerAddresses) {
+        super(id, peerAddresses);
+    }
+
+    public static Props props(final String id, final Map<String, String> peerAddresses){
+        return Props.create(new Creator<ExampleActor>(){
+
+            @Override public ExampleActor create() throws Exception {
+                return new ExampleActor(id, peerAddresses);
+            }
+        });
     }
 
     @Override public void onReceiveCommand(Object message){
-        /*
-            Here the extended class does whatever it needs to do.
-            If it cannot handle a message then it passes it on to the super
-            class for handling
-         */
-        super.onReceiveCommand(message);
+        if(message instanceof KeyValue){
+            if(isLeader()) {
+                String persistId = Long.toString(persistIdentifier++);
+                persistData(getSender(), persistId, (Payload) message);
+            } else {
+                if(getLeader() != null) {
+                    getLeader().forward(message, getContext());
+                }
+            }
+
+        } else if (message instanceof PrintState) {
+            LOG.debug("State of the node:"+getId() + " has = "+state.size() + " entries");
+
+        } else if (message instanceof PrintRole) {
+            LOG.debug(getId() + " = " + getRaftState());
+        } else {
+            super.onReceiveCommand(message);
+        }
+    }
+
+    @Override protected void applyState(ActorRef clientActor, String identifier,
+        Object data) {
+        if(data instanceof KeyValue){
+            KeyValue kv = (KeyValue) data;
+            state.put(kv.getKey(), kv.getValue());
+            if(clientActor != null) {
+                clientActor.tell(new KeyValueSaved(), getSelf());
+            }
+        }
+    }
+
+    @Override protected Object createSnapshot() {
+        return state;
+    }
+
+    @Override protected void applySnapshot(Object snapshot) {
+        state.clear();
+        state.putAll((HashMap) snapshot);
     }
 
     @Override public void onReceiveRecover(Object message) {
         super.onReceiveRecover(message);
     }
 
+    @Override public String persistenceId() {
+        return getId();
+    }
 }