import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
-import javax.annotation.Nonnull;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.yangtools.util.AbstractStringIdentifier;
/**
- * A sample actor showing how the RaftActor is to be extended
+ * A sample actor showing how the RaftActor is to be extended.
*/
public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
}
private final Map<String, String> state = new HashMap<>();
-
- private long persistIdentifier = 1;
private final Optional<ActorRef> roleChangeNotifier;
+ private long persistIdentifier = 1;
public ExampleActor(String id, Map<String, String> peerAddresses,
Optional<ConfigParams> configParams) {
@Override
protected void handleNonRaftCommand(Object message) {
- if(message instanceof KeyValue){
- if(isLeader()) {
+ if (message instanceof KeyValue) {
+ if (isLeader()) {
persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
} else {
- if(getLeader() != null) {
+ if (getLeader() != null) {
getLeader().forward(message, getContext());
}
}
} else if (message instanceof PrintState) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("State of the node:{} has entries={}, {}",
getId(), state.size(), getReplicatedLogState());
}
} else if (message instanceof PrintRole) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
- if(data instanceof KeyValue){
+ if (data instanceof KeyValue) {
KeyValue kv = (KeyValue) data;
state.put(kv.getKey(), kv.getValue());
- if(clientActor != null) {
+ if (clientActor != null) {
clientActor.tell(new KeyValueSaved(), getSelf());
}
}
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
try {
if (installSnapshotStream.isPresent()) {
SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
}
- } catch (Exception e) {
+ } catch (RuntimeException e) {
LOG.error("Exception in creating snapshot", e);
}
@Override
public void applySnapshot(Snapshot.State snapshotState) {
state.clear();
- try {
- state.putAll(((MapState)snapshotState).state);
- } catch (Exception e) {
- LOG.error("Exception in applying snapshot", e);
- }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
+ state.putAll(((MapState)snapshotState).state);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Snapshot applied to state : {}", state.size());
}
}
- @Override protected void onStateChanged() {
+ @Override
+ protected void onStateChanged() {
}
- @Override public String persistenceId() {
+ @Override
+ public String persistenceId() {
return getId();
}
@Override
- @Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
return this;
}
return null;
}
+ @SuppressWarnings("unchecked")
@Override
public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
try {
- return deserializePreCarbonSnapshot(snapshotBytes.read());
+ return new MapState((Map<String, String>) SerializationUtils.deserialize(snapshotBytes.read()));
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
- @SuppressWarnings("unchecked")
- @Override
- public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
- return new MapState((Map<String, String>) SerializationUtils.deserialize(from));
- }
-
private static class MapState implements Snapshot.State {
private static final long serialVersionUID = 1L;