X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=c5ae4c41b2f4822a04ca9da300171ae2e618d52e;hp=06538fd2ae157618a167ef3096f2777981f7bae3;hb=4b207b5356775c4b4d231ae979f9f2134f617dd1;hpb=cd81eb73b7abf677571b2366425ccbc8d794f4b6 diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 06538fd2ae..c5ae4c41b2 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -10,27 +10,28 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; -import akka.japi.Creator; import com.google.common.base.Optional; import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.HashMap; -import java.util.Map; - /** * A sample actor showing how the RaftActor is to be extended */ @@ -40,22 +41,19 @@ public class ExampleActor extends RaftActor { private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; + private final Optional roleChangeNotifier; public ExampleActor(String id, Map peerAddresses, Optional configParams) { super(id, peerAddresses, configParams); this.dataPersistenceProvider = new PersistentDataProvider(); + roleChangeNotifier = createRoleChangeNotifier(id); } public static Props props(final String id, final Map peerAddresses, - final Optional configParams){ - return Props.create(new Creator(){ - - @Override public ExampleActor create() throws Exception { - return new ExampleActor(id, peerAddresses, configParams); - } - }); + final Optional configParams) { + return Props.create(ExampleActor.class, id, peerAddresses, configParams); } @Override public void onReceiveCommand(Object message) throws Exception{ @@ -77,7 +75,16 @@ public class ExampleActor extends RaftActor { } else if (message instanceof PrintRole) { if(LOG.isDebugEnabled()) { - LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers()); + if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { + final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); + LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet(), followers); + } else { + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet()); + } + + } } else { @@ -85,8 +92,25 @@ public class ExampleActor extends RaftActor { } } - @Override protected void applyState(ActorRef clientActor, String identifier, - Object data) { + protected String getReplicatedLogState() { + return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex() + + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm() + + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size(); + } + + public Optional createRoleChangeNotifier(String actorId) { + ActorRef exampleRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(actorId), actorId + "-notifier"); + return Optional.of(exampleRoleChangeNotifier); + } + + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + + @Override protected void applyState(final ActorRef clientActor, final String identifier, + final Object data) { if(data instanceof KeyValue){ KeyValue kv = (KeyValue) data; state.put(kv.getKey(), kv.getValue()); @@ -101,17 +125,17 @@ public class ExampleActor extends RaftActor { try { bs = fromObject(state); } catch (Exception e) { - LOG.error(e, "Exception in creating snapshot"); + LOG.error("Exception in creating snapshot", e); } - getSelf().tell(new CaptureSnapshotReply(bs), null); + getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); } - @Override protected void applySnapshot(ByteString snapshot) { + @Override protected void applySnapshot(byte [] snapshot) { state.clear(); try { state.putAll((HashMap) toObject(snapshot)); } catch (Exception e) { - LOG.error(e, "Exception in applying snapshot"); + LOG.error("Exception in applying snapshot", e); } if(LOG.isDebugEnabled()) { LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); @@ -138,12 +162,12 @@ public class ExampleActor extends RaftActor { } } - private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(byte [] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; try { - bis = new ByteArrayInputStream(bs.toByteArray()); + bis = new ByteArrayInputStream(bs); ois = new ObjectInputStream(bis); obj = ois.readObject(); } finally { @@ -191,6 +215,6 @@ public class ExampleActor extends RaftActor { } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { + protected void applyRecoverySnapshot(byte[] snapshot) { } }