Make Raft messages serializable
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
index 35a2c98bd420ed56e18319a7388c477ab63bed13..641ec0582c3790cac20a6b7102ad5e12134e72e1 100644 (file)
@@ -13,7 +13,10 @@ import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
+import org.opendaylight.controller.cluster.example.messages.PrintRole;
+import org.opendaylight.controller.cluster.example.messages.PrintState;
 import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -43,15 +46,23 @@ public class ExampleActor extends RaftActor {
 
     @Override public void onReceiveCommand(Object message){
         if(message instanceof KeyValue){
-
             if(isLeader()) {
                 String persistId = Long.toString(persistIdentifier++);
-                persistData(getSender(), persistId, message);
+                persistData(getSender(), persistId, (Payload) message);
             } else {
-                getLeader().forward(message, getContext());
+                if(getLeader() != null) {
+                    getLeader().forward(message, getContext());
+                }
             }
+
+        } else if (message instanceof PrintState) {
+            LOG.debug("State of the node:"+getId() + " has = "+state.size() + " entries");
+
+        } else if (message instanceof PrintRole) {
+            LOG.debug(getId() + " = " + getRaftState());
+        } else {
+            super.onReceiveCommand(message);
         }
-        super.onReceiveCommand(message);
     }
 
     @Override protected void applyState(ActorRef clientActor, String identifier,
@@ -65,6 +76,15 @@ public class ExampleActor extends RaftActor {
         }
     }
 
+    @Override protected Object createSnapshot() {
+        return state;
+    }
+
+    @Override protected void applySnapshot(Object snapshot) {
+        state.clear();
+        state.putAll((HashMap) snapshot);
+    }
+
     @Override public void onReceiveRecover(Object message) {
         super.onReceiveRecover(message);
     }