X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=c4ff108611d9fbdb177f2ef4ace98bb030d69991;hb=f1855847c4f995e8124a324bc43825db5c200957;hp=8d4d5e48c81c6a91d2c089b0275f561575ee0baf;hpb=02bdbc1c781abc0b0b1d12dbfc1a19c316bebb98;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 8d4d5e48c8..8f416b3abc 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -10,15 +10,27 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; -import akka.japi.Creator; +import com.google.common.base.Optional; +import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; +import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; - -import java.util.HashMap; -import java.util.Map; +import org.opendaylight.controller.cluster.raft.RaftState; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; +import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; /** * A sample actor showing how the RaftActor is to be extended @@ -26,29 +38,29 @@ import java.util.Map; public class ExampleActor extends RaftActor { private final Map state = new HashMap(); + private final DataPersistenceProvider dataPersistenceProvider; private long persistIdentifier = 1; + private Optional roleChangeNotifier; - public ExampleActor(String id, Map peerAddresses) { - super(id, peerAddresses); + public ExampleActor(String id, Map peerAddresses, + Optional configParams) { + super(id, peerAddresses, configParams); + this.dataPersistenceProvider = new PersistentDataProvider(); + roleChangeNotifier = createRoleChangeNotifier(id); } - public static Props props(final String id, final Map peerAddresses){ - return Props.create(new Creator(){ - - @Override public ExampleActor create() throws Exception { - return new ExampleActor(id, peerAddresses); - } - }); + public static Props props(final String id, final Map peerAddresses, + final Optional configParams) { + return Props.create(ExampleActor.class, id, peerAddresses, configParams); } - @Override public void onReceiveCommand(Object message){ + @Override public void onReceiveCommand(Object message) throws Exception{ if(message instanceof KeyValue){ - if(isLeader()) { String persistId = Long.toString(persistIdentifier++); - persistData(getSender(), persistId, message); + persistData(getSender(), persistId, (Payload) message); } else { if(getLeader() != null) { getLeader().forward(message, getContext()); @@ -56,16 +68,49 @@ public class ExampleActor extends RaftActor { } } else if (message instanceof PrintState) { - LOG.debug("State of the node:"+getId() + " is="+state.size()); + if(LOG.isDebugEnabled()) { + LOG.debug("State of the node:{} has entries={}, {}", + getId(), state.size(), getReplicatedLogState()); + } } else if (message instanceof PrintRole) { - LOG.debug(getId() + " = " + getRaftState()); + if(LOG.isDebugEnabled()) { + if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { + final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); + LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet(), followers); + } else { + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), + getRaftActorContext().getPeerAddresses().keySet()); + } + + + } + + } else { + super.onReceiveCommand(message); } - super.onReceiveCommand(message); } - @Override protected void applyState(ActorRef clientActor, String identifier, - Object data) { + protected String getReplicatedLogState() { + return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex() + + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm() + + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size(); + } + + public Optional createRoleChangeNotifier(String actorId) { + ActorRef exampleRoleChangeNotifier = this.getContext().actorOf( + RoleChangeNotifier.getProps(actorId), actorId + "-notifier"); + return Optional.of(exampleRoleChangeNotifier); + } + + @Override + protected Optional getRoleChangeNotifier() { + return roleChangeNotifier; + } + + @Override protected void applyState(final ActorRef clientActor, final String identifier, + final Object data) { if(data instanceof KeyValue){ KeyValue kv = (KeyValue) data; state.put(kv.getKey(), kv.getValue()); @@ -75,20 +120,101 @@ public class ExampleActor extends RaftActor { } } - @Override protected Object createSnapshot() { - return state; + @Override protected void createSnapshot() { + ByteString bs = null; + try { + bs = fromObject(state); + } catch (Exception e) { + LOG.error(e, "Exception in creating snapshot"); + } + getSelf().tell(new CaptureSnapshotReply(bs), null); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { state.clear(); - state.putAll((HashMap) snapshot); + try { + state.putAll((HashMap) toObject(snapshot)); + } catch (Exception e) { + LOG.error(e, "Exception in applying snapshot"); + } + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); + } + } + + private ByteString fromObject(Object snapshot) throws Exception { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(snapshot); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } + + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + Object obj = null; + ByteArrayInputStream bis = null; + ObjectInputStream ois = null; + try { + bis = new ByteArrayInputStream(bs.toByteArray()); + ois = new ObjectInputStream(bis); + obj = ois.readObject(); + } finally { + if (bis != null) { + bis.close(); + } + if (ois != null) { + ois.close(); + } + } + return obj; + } + + @Override protected void onStateChanged() { + } - @Override public void onReceiveRecover(Object message) { + @Override + protected DataPersistenceProvider persistence() { + return dataPersistenceProvider; + } + + @Override public void onReceiveRecover(Object message)throws Exception { super.onReceiveRecover(message); } @Override public String persistenceId() { return getId(); } + + @Override + protected void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + protected void appendRecoveredLogEntry(Payload data) { + } + + @Override + protected void applyCurrentLogRecoveryBatch() { + } + + @Override + protected void onRecoveryComplete() { + } + + @Override + protected void applyRecoverySnapshot(ByteString snapshot) { + } }