X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft-example%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=663d400b53b3e3f7a589a08b819f930572d4c753;hb=refs%2Fchanges%2F72%2F50572%2F9;hp=5ab3f69bea994d60644cca47b9694205ae2ab039;hpb=412db94945c5db5d2da918f5e23bd3abcecc4d10;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 5ab3f69bea..663d400b53 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import com.google.common.base.Optional; -import com.google.protobuf.ByteString; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import javax.annotation.Nonnull; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -32,12 +32,22 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.AbstractStringIdentifier; /** * A sample actor showing how the RaftActor is to be extended */ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { + private static final class PayloadIdentifier extends AbstractStringIdentifier { + private static final long serialVersionUID = 1L; + + PayloadIdentifier(final long identifier) { + super(String.valueOf(identifier)); + } + } private final Map state = new HashMap<>(); @@ -47,7 +57,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, public ExampleActor(String id, Map peerAddresses, Optional configParams) { - super(id, peerAddresses, configParams); + super(id, peerAddresses, configParams, (short)0); setPersistence(true); roleChangeNotifier = createRoleChangeNotifier(id); } @@ -57,11 +67,11 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, return Props.create(ExampleActor.class, id, peerAddresses, configParams); } - @Override public void onReceiveCommand(Object message) throws Exception{ + @Override + protected void handleNonRaftCommand(Object message) { if(message instanceof KeyValue){ if(isLeader()) { - String persistId = Long.toString(persistIdentifier++); - persistData(getSender(), persistId, (Payload) message); + persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false); } else { if(getLeader() != null) { getLeader().forward(message, getContext()); @@ -79,17 +89,17 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), - getRaftActorContext().getPeerAddresses().keySet(), followers); + getRaftActorContext().getPeerIds(), followers); } else { LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), - getRaftActorContext().getPeerAddresses().keySet()); + getRaftActorContext().getPeerIds()); } } } else { - super.onReceiveCommand(message); + super.handleNonRaftCommand(message); } } @@ -110,8 +120,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, return roleChangeNotifier; } - @Override protected void applyState(final ActorRef clientActor, final String identifier, - final Object data) { + @Override + protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { if(data instanceof KeyValue){ KeyValue kv = (KeyValue) data; state.put(kv.getKey(), kv.getValue()); @@ -122,21 +132,23 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void createSnapshot(ActorRef actorRef) { - ByteString bs = null; + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { try { - bs = fromObject(state); + if (installSnapshotStream.isPresent()) { + SerializationUtils.serialize((Serializable) state, installSnapshotStream.get()); + } } catch (Exception e) { LOG.error("Exception in creating snapshot", e); } - getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); + + getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null); } @Override - public void applySnapshot(byte [] snapshot) { + public void applySnapshot(Snapshot.State snapshotState) { state.clear(); try { - state.putAll((HashMap) toObject(snapshot)); + state.putAll(((MapState)snapshotState).state); } catch (Exception e) { LOG.error("Exception in applying snapshot", e); } @@ -145,53 +157,10 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } } - private ByteString fromObject(Object snapshot) throws Exception { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(snapshot); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } - - private Object toObject(byte [] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } - } - return obj; - } - @Override protected void onStateChanged() { } - @Override public void onReceiveRecover(Object message)throws Exception { - super.onReceiveRecover(message); - } - @Override public String persistenceId() { return getId(); } @@ -219,11 +188,41 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(byte[] snapshot) { + public void applyRecoverySnapshot(Snapshot.State snapshotState) { } @Override protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { return this; } + + @Override + public byte[] getRestoreFromSnapshot() { + return null; + } + + @Override + public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) { + try { + return deserializePreCarbonSnapshot(snapshotBytes.read()); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @SuppressWarnings("unchecked") + @Override + public Snapshot.State deserializePreCarbonSnapshot(byte[] from) { + return new MapState((Map) SerializationUtils.deserialize(from)); + } + + private static class MapState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + Map state; + + MapState(Map state) { + this.state = state; + } + } }