* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.example;
import akka.actor.ActorRef;
import akka.actor.Props;
-import com.google.common.base.Optional;
-import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import com.google.common.io.ByteSource;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import javax.annotation.Nonnull;
+import java.util.Optional;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.messages.Payload;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.AbstractStringIdentifier;
/**
- * A sample actor showing how the RaftActor is to be extended
+ * A sample actor showing how the RaftActor is to be extended.
*/
public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
+ private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
+ private static final long serialVersionUID = 1L;
- private final Map<String, String> state = new HashMap<>();
+ PayloadIdentifier(final long identifier) {
+ super(String.valueOf(identifier));
+ }
+ }
- private long persistIdentifier = 1;
+ private final Map<String, String> state = new HashMap<>();
private final Optional<ActorRef> roleChangeNotifier;
+ private long persistIdentifier = 1;
- public ExampleActor(String id, Map<String, String> peerAddresses,
- Optional<ConfigParams> configParams) {
- super(id, peerAddresses, configParams);
+ public ExampleActor(final String id, final Map<String, String> peerAddresses,
+ final Optional<ConfigParams> configParams) {
+ super(id, peerAddresses, configParams, (short)0);
setPersistence(true);
roleChangeNotifier = createRoleChangeNotifier(id);
}
return Props.create(ExampleActor.class, id, peerAddresses, configParams);
}
- @Override public void onReceiveCommand(Object message) throws Exception{
- if(message instanceof KeyValue){
- if(isLeader()) {
- String persistId = Long.toString(persistIdentifier++);
- persistData(getSender(), persistId, (Payload) message);
- } else {
- if(getLeader() != null) {
- getLeader().forward(message, getContext());
- }
+ @Override
+ protected void handleNonRaftCommand(final Object message) {
+ if (message instanceof KeyValue) {
+ if (isLeader()) {
+ persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
+ } else if (getLeader() != null) {
+ getLeader().forward(message, getContext());
}
} else if (message instanceof PrintState) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("State of the node:{} has entries={}, {}",
getId(), state.size(), getReplicatedLogState());
}
} else if (message instanceof PrintRole) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
- final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
+ final String followers = ((Leader)getCurrentBehavior()).printFollowerStates();
LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
- getRaftActorContext().getPeerAddresses().keySet(), followers);
+ getRaftActorContext().getPeerIds(), followers);
} else {
LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
- getRaftActorContext().getPeerAddresses().keySet());
+ getRaftActorContext().getPeerIds());
}
}
} else {
- super.onReceiveCommand(message);
+ super.handleNonRaftCommand(message);
}
}
+ ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
}
- public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
- ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
+ public Optional<ActorRef> createRoleChangeNotifier(final String actorId) {
+ ActorRef exampleRoleChangeNotifier = getContext().actorOf(
RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
return Optional.<ActorRef>of(exampleRoleChangeNotifier);
}
return roleChangeNotifier;
}
- @Override protected void applyState(final ActorRef clientActor, final String identifier,
- final Object data) {
- if(data instanceof KeyValue){
- KeyValue kv = (KeyValue) data;
+ @Override
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
+ if (data instanceof KeyValue kv) {
state.put(kv.getKey(), kv.getValue());
- if(clientActor != null) {
+ if (clientActor != null) {
clientActor.tell(new KeyValueSaved(), getSelf());
}
}
}
@Override
- public void createSnapshot(ActorRef actorRef) {
- ByteString bs = null;
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
try {
- bs = fromObject(state);
- } catch (Exception e) {
+ if (installSnapshotStream.isPresent()) {
+ SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
+ }
+ } catch (RuntimeException e) {
LOG.error("Exception in creating snapshot", e);
}
- getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
+
+ getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
}
@Override
- public void applySnapshot(byte [] snapshot) {
+ public void applySnapshot(final Snapshot.State snapshotState) {
state.clear();
- try {
- state.putAll((HashMap<String, String>) toObject(snapshot));
- } catch (Exception e) {
- LOG.error("Exception in applying snapshot", e);
- }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
- }
- }
-
- private ByteString fromObject(Object snapshot) throws Exception {
- ByteArrayOutputStream b = null;
- ObjectOutputStream o = null;
- try {
- b = new ByteArrayOutputStream();
- o = new ObjectOutputStream(b);
- o.writeObject(snapshot);
- byte[] snapshotBytes = b.toByteArray();
- return ByteString.copyFrom(snapshotBytes);
- } finally {
- if (o != null) {
- o.flush();
- o.close();
- }
- if (b != null) {
- b.close();
- }
- }
- }
+ state.putAll(((MapState)snapshotState).state);
- private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
- Object obj = null;
- ByteArrayInputStream bis = null;
- ObjectInputStream ois = null;
- try {
- bis = new ByteArrayInputStream(bs);
- ois = new ObjectInputStream(bis);
- obj = ois.readObject();
- } finally {
- if (bis != null) {
- bis.close();
- }
- if (ois != null) {
- ois.close();
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot applied to state : {}", state.size());
}
- return obj;
}
- @Override protected void onStateChanged() {
-
- }
+ @Override
+ protected void onStateChanged() {
- @Override public void onReceiveRecover(Object message)throws Exception {
- super.onReceiveRecover(message);
}
- @Override public String persistenceId() {
+ @Override
+ public String persistenceId() {
return getId();
}
@Override
- @Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
return this;
}
@Override
- public void startLogRecoveryBatch(int maxBatchSize) {
+ public void startLogRecoveryBatch(final int maxBatchSize) {
}
@Override
- public void appendRecoveredLogEntry(Payload data) {
+ public void appendRecoveredLogEntry(final Payload data) {
}
@Override
}
@Override
- public void applyRecoverySnapshot(byte[] snapshot) {
+ public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
}
@Override
protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
return this;
}
+
+ @Override
+ public Snapshot getRestoreFromSnapshot() {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
+ try {
+ return new MapState((Map<String, String>) SerializationUtils.deserialize(snapshotBytes.read()));
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private static class MapState implements Snapshot.State {
+ private static final long serialVersionUID = 1L;
+
+ Map<String, String> state;
+
+ MapState(final Map<String, String> state) {
+ this.state = state;
+ }
+ }
}