X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=6c65021d86158b22483ada73704f68ba9f83d84c;hp=6dfa4afd6b6951a351790d0fb87679b40d19dc90;hb=a469dbcec569cc972df0cd57cf725a2173d2604a;hpb=443d331f01bcc9e3aa8442f60b84211b2f175967 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 6dfa4afd6b..6c65021d86 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -25,6 +25,7 @@ import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.RaftState; @@ -37,22 +38,23 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa */ public class ExampleActor extends RaftActor { - private final Map state = new HashMap<>(); + private final Map state = new HashMap(); private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; + private Optional roleChangeNotifier; - public ExampleActor(final String id, final Map peerAddresses, - final Optional configParams) { + public ExampleActor(String id, Map peerAddresses, + Optional configParams) { super(id, peerAddresses, configParams); this.dataPersistenceProvider = new PersistentDataProvider(); + roleChangeNotifier = createRoleChangeNotifier(id); } public static Props props(final String id, final Map peerAddresses, final Optional configParams){ return Props.create(new Creator(){ - private static final long serialVersionUID = 1L; @Override public ExampleActor create() throws Exception { return new ExampleActor(id, peerAddresses, configParams); @@ -60,7 +62,7 @@ public class ExampleActor extends RaftActor { }); } - @Override public void onReceiveCommand(final Object message) throws Exception{ + @Override public void onReceiveCommand(Object message) throws Exception{ if(message instanceof KeyValue){ if(isLeader()) { String persistId = Long.toString(persistIdentifier++); @@ -82,9 +84,11 @@ public class ExampleActor extends RaftActor { String followers = ""; if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); - LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers); + LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet(), followers); } else { - LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers()); + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet()); } @@ -95,6 +99,23 @@ public class ExampleActor extends RaftActor { } } + protected String getReplicatedLogState() { + return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex() + + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm() + + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size(); + } + + public Optional createRoleChangeNotifier(String actorId) { + ActorRef exampleRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(actorId), actorId + "-notifier"); + return Optional.of(exampleRoleChangeNotifier); + } + + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + @Override protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { if(data instanceof KeyValue){ @@ -116,19 +137,19 @@ public class ExampleActor extends RaftActor { getSelf().tell(new CaptureSnapshotReply(bs), null); } - @Override protected void applySnapshot(final ByteString snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { state.clear(); try { - state.putAll((Map) toObject(snapshot)); + state.putAll((HashMap) toObject(snapshot)); } catch (Exception e) { LOG.error(e, "Exception in applying snapshot"); } if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state : {}", ((Map) state).size()); + LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); } } - private ByteString fromObject(final Object snapshot) throws Exception { + private ByteString fromObject(Object snapshot) throws Exception { ByteArrayOutputStream b = null; ObjectOutputStream o = null; try { @@ -148,7 +169,7 @@ public class ExampleActor extends RaftActor { } } - private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; @@ -176,7 +197,7 @@ public class ExampleActor extends RaftActor { return dataPersistenceProvider; } - @Override public void onReceiveRecover(final Object message)throws Exception { + @Override public void onReceiveRecover(Object message)throws Exception { super.onReceiveRecover(message); } @@ -185,11 +206,11 @@ public class ExampleActor extends RaftActor { } @Override - protected void startLogRecoveryBatch(final int maxBatchSize) { + protected void startLogRecoveryBatch(int maxBatchSize) { } @Override - protected void appendRecoveredLogEntry(final Payload data) { + protected void appendRecoveredLogEntry(Payload data) { } @Override @@ -201,6 +222,6 @@ public class ExampleActor extends RaftActor { } @Override - protected void applyRecoverySnapshot(final ByteString snapshot) { + protected void applyRecoverySnapshot(ByteString snapshot) { } }