X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-akka-raft-example%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=a2bcc8a724f68ec05f2c9c57440084fb8368dcb6;hb=b4bf55727093657662d8c16a50fa85f87978a586;hp=ff57bfd1d5ae21db8f8ae1646c9a8f6e1f08b3a5;hpb=348a37f613ef444b10a0e65b400390396552fc48;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index ff57bfd1d5..a2bcc8a724 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -11,15 +11,14 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; import com.google.common.base.Optional; -import com.google.protobuf.ByteString; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; -import javax.annotation.Nonnull; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -32,20 +31,27 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.util.StringIdentifier; +import org.opendaylight.yangtools.util.AbstractStringIdentifier; /** - * A sample actor showing how the RaftActor is to be extended + * A sample actor showing how the RaftActor is to be extended. */ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { + private static final class PayloadIdentifier extends AbstractStringIdentifier { + private static final long serialVersionUID = 1L; - private final Map state = new HashMap<>(); + PayloadIdentifier(final long identifier) { + super(String.valueOf(identifier)); + } + } - private long persistIdentifier = 1; + private final Map state = new HashMap<>(); private final Optional roleChangeNotifier; + private long persistIdentifier = 1; public ExampleActor(String id, Map peerAddresses, Optional configParams) { @@ -61,23 +67,23 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, @Override protected void handleNonRaftCommand(Object message) { - if(message instanceof KeyValue){ - if(isLeader()) { - persistData(getSender(), new StringIdentifier(String.valueOf(persistIdentifier++)), (Payload) message); + if (message instanceof KeyValue) { + if (isLeader()) { + persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false); } else { - if(getLeader() != null) { + if (getLeader() != null) { getLeader().forward(message, getContext()); } } } else if (message instanceof PrintState) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("State of the node:{} has entries={}, {}", getId(), state.size(), getReplicatedLogState()); } } else if (message instanceof PrintRole) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), @@ -114,88 +120,50 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { - if(data instanceof KeyValue){ + if (data instanceof KeyValue) { KeyValue kv = (KeyValue) data; state.put(kv.getKey(), kv.getValue()); - if(clientActor != null) { + if (clientActor != null) { clientActor.tell(new KeyValueSaved(), getSelf()); } } } @Override - public void createSnapshot(ActorRef actorRef) { - ByteString bs = null; + @SuppressWarnings("checkstyle:IllegalCatch") + public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { try { - bs = fromObject(state); - } catch (Exception e) { + if (installSnapshotStream.isPresent()) { + SerializationUtils.serialize((Serializable) state, installSnapshotStream.get()); + } + } catch (RuntimeException e) { LOG.error("Exception in creating snapshot", e); } - getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); + + getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null); } @Override - public void applySnapshot(byte [] snapshot) { + public void applySnapshot(Snapshot.State snapshotState) { state.clear(); - try { - state.putAll((HashMap) toObject(snapshot)); - } catch (Exception e) { - LOG.error("Exception in applying snapshot", e); - } - if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); - } - } + state.putAll(((MapState)snapshotState).state); - private static ByteString fromObject(Object snapshot) throws Exception { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(snapshot); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshot applied to state : {}", state.size()); } } - private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } - } - return obj; - } - - @Override protected void onStateChanged() { + @Override + protected void onStateChanged() { } - @Override public String persistenceId() { + @Override + public String persistenceId() { return getId(); } @Override - @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { return this; } @@ -217,7 +185,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(byte[] snapshot) { + public void applyRecoverySnapshot(Snapshot.State snapshotState) { } @Override @@ -226,7 +194,27 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public byte[] getRestoreFromSnapshot() { + public Snapshot getRestoreFromSnapshot() { return null; } + + @SuppressWarnings("unchecked") + @Override + public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) { + try { + return new MapState((Map) SerializationUtils.deserialize(snapshotBytes.read())); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + private static class MapState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + Map state; + + MapState(Map state) { + this.state = state; + } + } }