X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=97b912ef74dcb1a78073999a06a8957c63c8eb47;hp=cbd7ca2d70f5dc090a1e842b75c200cb0c1976b9;hb=73e969cf365dd78772596c71e940ae44fe2f22d3;hpb=3b4aea96013998f11450f5b83e4527bebd27af6a diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index cbd7ca2d70..97b912ef74 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,15 +11,24 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import akka.japi.Creator; + import com.google.common.base.Optional; +import com.google.protobuf.ByteString; + import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; import org.opendaylight.controller.cluster.example.messages.PrintState; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; @@ -60,11 +69,15 @@ public class ExampleActor extends RaftActor { } } else if (message instanceof PrintState) { - LOG.debug("State of the node:{} has entries={}, {}", - getId(), state.size(), getReplicatedLogState()); + if(LOG.isDebugEnabled()) { + LOG.debug("State of the node:{} has entries={}, {}", + getId(), state.size(), getReplicatedLogState()); + } } else if (message instanceof PrintRole) { - LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),getPeers()); + if(LOG.isDebugEnabled()) { + LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers()); + } } else { super.onReceiveCommand(message); @@ -82,14 +95,65 @@ public class ExampleActor extends RaftActor { } } - @Override protected Object createSnapshot() { - return state; + @Override protected void createSnapshot() { + ByteString bs = null; + try { + bs = fromObject(state); + } catch (Exception e) { + LOG.error(e, "Exception in creating snapshot"); + } + getSelf().tell(new CaptureSnapshotReply(bs), null); } - @Override protected void applySnapshot(Object snapshot) { + @Override protected void applySnapshot(ByteString snapshot) { state.clear(); - state.putAll((HashMap) snapshot); - LOG.debug("Snapshot applied to state :" + ((HashMap) snapshot).size()); + try { + state.putAll((HashMap) toObject(snapshot)); + } catch (Exception e) { + LOG.error(e, "Exception in applying snapshot"); + } + if(LOG.isDebugEnabled()) { + LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); + } + } + + private ByteString fromObject(Object snapshot) throws Exception { + ByteArrayOutputStream b = null; + ObjectOutputStream o = null; + try { + b = new ByteArrayOutputStream(); + o = new ObjectOutputStream(b); + o.writeObject(snapshot); + byte[] snapshotBytes = b.toByteArray(); + return ByteString.copyFrom(snapshotBytes); + } finally { + if (o != null) { + o.flush(); + o.close(); + } + if (b != null) { + b.close(); + } + } + } + + private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + Object obj = null; + ByteArrayInputStream bis = null; + ObjectInputStream ois = null; + try { + bis = new ByteArrayInputStream(bs.toByteArray()); + ois = new ObjectInputStream(bis); + obj = ois.readObject(); + } finally { + if (bis != null) { + bis.close(); + } + if (ois != null) { + ois.close(); + } + } + return obj; } @Override protected void onStateChanged() { @@ -103,4 +167,24 @@ public class ExampleActor extends RaftActor { @Override public String persistenceId() { return getId(); } + + @Override + protected void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + protected void appendRecoveredLogEntry(Payload data) { + } + + @Override + protected void applyCurrentLogRecoveryBatch() { + } + + @Override + protected void onRecoveryComplete() { + } + + @Override + protected void applyRecoverySnapshot(ByteString snapshot) { + } }