Merge "BUG-2627: do not duplicate descriptions"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
index 2437bb4b7d381ff34a58138a14a7723ee7289a64..c5ae4c41b2f4822a04ca9da300171ae2e618d52e 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.example;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import akka.japi.Creator;
 import com.google.common.base.Optional;
 import com.google.protobuf.ByteString;
 import java.io.ByteArrayInputStream;
@@ -25,6 +24,7 @@ import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
 import org.opendaylight.controller.cluster.example.messages.PrintState;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.ConfigParams;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.RaftState;
@@ -37,29 +37,26 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa
  */
 public class ExampleActor extends RaftActor {
 
-    private final Map<String, String> state = new HashMap<>();
+    private final Map<String, String> state = new HashMap();
     private final DataPersistenceProvider dataPersistenceProvider;
 
     private long persistIdentifier = 1;
+    private final Optional<ActorRef> roleChangeNotifier;
 
 
-    public ExampleActor(final String id, final Map<String, String> peerAddresses,
-        final Optional<ConfigParams> configParams) {
+    public ExampleActor(String id, Map<String, String> peerAddresses,
+        Optional<ConfigParams> configParams) {
         super(id, peerAddresses, configParams);
         this.dataPersistenceProvider = new PersistentDataProvider();
+        roleChangeNotifier = createRoleChangeNotifier(id);
     }
 
     public static Props props(final String id, final Map<String, String> peerAddresses,
-        final Optional<ConfigParams> configParams){
-        return Props.create(new Creator<ExampleActor>(){
-
-            @Override public ExampleActor create() throws Exception {
-                return new ExampleActor(id, peerAddresses, configParams);
-            }
-        });
+            final Optional<ConfigParams> configParams) {
+        return Props.create(ExampleActor.class, id, peerAddresses, configParams);
     }
 
-    @Override public void onReceiveCommand(final Object message) throws Exception{
+    @Override public void onReceiveCommand(Object message) throws Exception{
         if(message instanceof KeyValue){
             if(isLeader()) {
                 String persistId = Long.toString(persistIdentifier++);
@@ -78,12 +75,13 @@ public class ExampleActor extends RaftActor {
 
         } else if (message instanceof PrintRole) {
             if(LOG.isDebugEnabled()) {
-                String followers = "";
-                if (getRaftState() == RaftState.Leader) {
-                    followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
-                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
+                if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
+                    final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+                    LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
+                        getRaftActorContext().getPeerAddresses().keySet(), followers);
                 } else {
-                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
+                    LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
+                        getRaftActorContext().getPeerAddresses().keySet());
                 }
 
 
@@ -94,6 +92,23 @@ public class ExampleActor extends RaftActor {
         }
     }
 
+    protected String getReplicatedLogState() {
+        return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
+            + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
+            + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
+    }
+
+    public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
+        ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
+            RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
+        return Optional.<ActorRef>of(exampleRoleChangeNotifier);
+    }
+
+    @Override
+    protected Optional<ActorRef> getRoleChangeNotifier() {
+        return roleChangeNotifier;
+    }
+
     @Override protected void applyState(final ActorRef clientActor, final String identifier,
         final Object data) {
         if(data instanceof KeyValue){
@@ -110,24 +125,24 @@ public class ExampleActor extends RaftActor {
         try {
             bs = fromObject(state);
         } catch (Exception e) {
-            LOG.error(e, "Exception in creating snapshot");
+            LOG.error("Exception in creating snapshot", e);
         }
-        getSelf().tell(new CaptureSnapshotReply(bs), null);
+        getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
     }
 
-    @Override protected void applySnapshot(final ByteString snapshot) {
+    @Override protected void applySnapshot(byte [] snapshot) {
         state.clear();
         try {
             state.putAll((HashMap) toObject(snapshot));
         } catch (Exception e) {
-           LOG.error(e, "Exception in applying snapshot");
+           LOG.error("Exception in applying snapshot", e);
         }
         if(LOG.isDebugEnabled()) {
             LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
         }
     }
 
-    private ByteString fromObject(final Object snapshot) throws Exception {
+    private ByteString fromObject(Object snapshot) throws Exception {
         ByteArrayOutputStream b = null;
         ObjectOutputStream o = null;
         try {
@@ -147,12 +162,12 @@ public class ExampleActor extends RaftActor {
         }
     }
 
-    private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
+    private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
         Object obj = null;
         ByteArrayInputStream bis = null;
         ObjectInputStream ois = null;
         try {
-            bis = new ByteArrayInputStream(bs.toByteArray());
+            bis = new ByteArrayInputStream(bs);
             ois = new ObjectInputStream(bis);
             obj = ois.readObject();
         } finally {
@@ -175,7 +190,7 @@ public class ExampleActor extends RaftActor {
         return dataPersistenceProvider;
     }
 
-    @Override public void onReceiveRecover(final Object message)throws Exception {
+    @Override public void onReceiveRecover(Object message)throws Exception {
         super.onReceiveRecover(message);
     }
 
@@ -184,11 +199,11 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void startLogRecoveryBatch(final int maxBatchSize) {
+    protected void startLogRecoveryBatch(int maxBatchSize) {
     }
 
     @Override
-    protected void appendRecoveredLogEntry(final Payload data) {
+    protected void appendRecoveredLogEntry(Payload data) {
     }
 
     @Override
@@ -200,6 +215,6 @@ public class ExampleActor extends RaftActor {
     }
 
     @Override
-    protected void applyRecoverySnapshot(final ByteString snapshot) {
+    protected void applyRecoverySnapshot(byte[] snapshot) {
     }
 }