import akka.actor.ActorRef;
import akka.actor.Props;
import com.google.common.base.Optional;
-import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.util.AbstractStringIdentifier;
}
@Override
- public void createSnapshot(ActorRef actorRef) {
- ByteString bs = null;
+ public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
try {
- bs = fromObject(state);
+ if (installSnapshotStream.isPresent()) {
+ SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
+ }
} catch (Exception e) {
LOG.error("Exception in creating snapshot", e);
}
- getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
+
+ getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
}
@Override
- public void applySnapshot(byte [] snapshot) {
+ public void applySnapshot(Snapshot.State snapshotState) {
state.clear();
try {
- state.putAll((HashMap<String, String>) toObject(snapshot));
+ state.putAll(((MapState)snapshotState).state);
} catch (Exception e) {
LOG.error("Exception in applying snapshot", e);
}
}
}
- private static ByteString fromObject(Object snapshot) throws Exception {
- ByteArrayOutputStream b = null;
- ObjectOutputStream o = null;
- try {
- b = new ByteArrayOutputStream();
- o = new ObjectOutputStream(b);
- o.writeObject(snapshot);
- byte[] snapshotBytes = b.toByteArray();
- return ByteString.copyFrom(snapshotBytes);
- } finally {
- if (o != null) {
- o.flush();
- o.close();
- }
- if (b != null) {
- b.close();
- }
- }
- }
-
- private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
- Object obj = null;
- ByteArrayInputStream bis = null;
- ObjectInputStream ois = null;
- try {
- bis = new ByteArrayInputStream(bs);
- ois = new ObjectInputStream(bis);
- obj = ois.readObject();
- } finally {
- if (bis != null) {
- bis.close();
- }
- if (ois != null) {
- ois.close();
- }
- }
- return obj;
- }
-
@Override protected void onStateChanged() {
}
}
@Override
- public void applyRecoverySnapshot(byte[] snapshot) {
+ public void applyRecoverySnapshot(Snapshot.State snapshotState) {
}
@Override
public byte[] getRestoreFromSnapshot() {
return null;
}
+
+ @Override
+ public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+ try {
+ return deserializePreCarbonSnapshot(snapshotBytes.read());
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
+ return new MapState((Map<String, String>) SerializationUtils.deserialize(from));
+ }
+
+ private static class MapState implements Snapshot.State {
+ private static final long serialVersionUID = 1L;
+
+ Map<String, String> state;
+
+ MapState(Map<String, String> state) {
+ this.state = state;
+ }
+ }
}