X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft-example%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=5d2d17122516ae5bb3a108951aed7331d99c8cdf;hp=5ab3f69bea994d60644cca47b9694205ae2ab039;hb=20733d0406fb31b701e32ee74ee13dc8769a256c;hpb=2727bea09c83646b6cbd2ef9672d0b7f6cf3b22f diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 5ab3f69bea..5d2d171225 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -5,21 +5,19 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; -import com.google.common.base.Optional; -import com.google.protobuf.ByteString; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import com.google.common.base.Throwables; +import com.google.common.io.ByteSource; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; -import javax.annotation.Nonnull; +import java.util.Optional; +import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -32,22 +30,31 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.persisted.Snapshot; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifier; +import org.opendaylight.yangtools.util.AbstractStringIdentifier; /** - * A sample actor showing how the RaftActor is to be extended + * A sample actor showing how the RaftActor is to be extended. */ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { + private static final class PayloadIdentifier extends AbstractStringIdentifier { + private static final long serialVersionUID = 1L; - private final Map state = new HashMap<>(); + PayloadIdentifier(final long identifier) { + super(String.valueOf(identifier)); + } + } - private long persistIdentifier = 1; + private final Map state = new HashMap<>(); private final Optional roleChangeNotifier; + private long persistIdentifier = 1; - public ExampleActor(String id, Map peerAddresses, - Optional configParams) { - super(id, peerAddresses, configParams); + public ExampleActor(final String id, final Map peerAddresses, + final Optional configParams) { + super(id, peerAddresses, configParams, (short)0); setPersistence(true); roleChangeNotifier = createRoleChangeNotifier(id); } @@ -57,39 +64,39 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, return Props.create(ExampleActor.class, id, peerAddresses, configParams); } - @Override public void onReceiveCommand(Object message) throws Exception{ - if(message instanceof KeyValue){ - if(isLeader()) { - String persistId = Long.toString(persistIdentifier++); - persistData(getSender(), persistId, (Payload) message); + @Override + protected void handleNonRaftCommand(final Object message) { + if (message instanceof KeyValue) { + if (isLeader()) { + persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false); } else { - if(getLeader() != null) { + if (getLeader() != null) { getLeader().forward(message, getContext()); } } } else if (message instanceof PrintState) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("State of the node:{} has entries={}, {}", getId(), state.size(), getReplicatedLogState()); } } else if (message instanceof PrintRole) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), - getRaftActorContext().getPeerAddresses().keySet(), followers); + getRaftActorContext().getPeerIds(), followers); } else { LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), - getRaftActorContext().getPeerAddresses().keySet()); + getRaftActorContext().getPeerIds()); } } } else { - super.onReceiveCommand(message); + super.handleNonRaftCommand(message); } } @@ -99,7 +106,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size(); } - public Optional createRoleChangeNotifier(String actorId) { + public Optional createRoleChangeNotifier(final String actorId) { ActorRef exampleRoleChangeNotifier = this.getContext().actorOf( RoleChangeNotifier.getProps(actorId), actorId + "-notifier"); return Optional.of(exampleRoleChangeNotifier); @@ -110,104 +117,62 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, return roleChangeNotifier; } - @Override protected void applyState(final ActorRef clientActor, final String identifier, - final Object data) { - if(data instanceof KeyValue){ + @Override + protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { + if (data instanceof KeyValue) { KeyValue kv = (KeyValue) data; state.put(kv.getKey(), kv.getValue()); - if(clientActor != null) { + if (clientActor != null) { clientActor.tell(new KeyValueSaved(), getSelf()); } } } @Override - public void createSnapshot(ActorRef actorRef) { - ByteString bs = null; + @SuppressWarnings("checkstyle:IllegalCatch") + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { try { - bs = fromObject(state); - } catch (Exception e) { + if (installSnapshotStream.isPresent()) { + SerializationUtils.serialize((Serializable) state, installSnapshotStream.get()); + } + } catch (RuntimeException e) { LOG.error("Exception in creating snapshot", e); } - getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); + + getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null); } @Override - public void applySnapshot(byte [] snapshot) { + public void applySnapshot(final Snapshot.State snapshotState) { state.clear(); - try { - state.putAll((HashMap) toObject(snapshot)); - } catch (Exception e) { - LOG.error("Exception in applying snapshot", e); - } - if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); - } - } + state.putAll(((MapState)snapshotState).state); - private ByteString fromObject(Object snapshot) throws Exception { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(snapshot); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshot applied to state : {}", state.size()); } } - private Object toObject(byte [] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } - } - return obj; - } - - @Override protected void onStateChanged() { - - } + @Override + protected void onStateChanged() { - @Override public void onReceiveRecover(Object message)throws Exception { - super.onReceiveRecover(message); } - @Override public String persistenceId() { + @Override + public String persistenceId() { return getId(); } @Override - @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { return this; } @Override - public void startLogRecoveryBatch(int maxBatchSize) { + public void startLogRecoveryBatch(final int maxBatchSize) { } @Override - public void appendRecoveredLogEntry(Payload data) { + public void appendRecoveredLogEntry(final Payload data) { } @Override @@ -219,11 +184,36 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(byte[] snapshot) { + public void applyRecoverySnapshot(final Snapshot.State snapshotState) { } @Override protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { return this; } + + @Override + public Snapshot getRestoreFromSnapshot() { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) { + try { + return new MapState((Map) SerializationUtils.deserialize(snapshotBytes.read())); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + private static class MapState implements Snapshot.State { + private static final long serialVersionUID = 1L; + + Map state; + + MapState(final Map state) { + this.state = state; + } + } }