Bug 7521: Convert DatastoreSnapshot.ShardSnapshot to store Snapshot
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
index 5ab3f69bea994d60644cca47b9694205ae2ab039..5cd4c14ee40783cb3b00507896d769621293469c 100644 (file)
@@ -11,15 +11,15 @@ package org.opendaylight.controller.cluster.example;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import com.google.common.base.Optional;
-import com.google.protobuf.ByteString;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteSource;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
 import org.opendaylight.controller.cluster.example.messages.KeyValue;
 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
 import org.opendaylight.controller.cluster.example.messages.PrintRole;
@@ -32,12 +32,22 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.util.AbstractStringIdentifier;
 
 /**
  * A sample actor showing how the RaftActor is to be extended
  */
 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
+    private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
+        private static final long serialVersionUID = 1L;
+
+        PayloadIdentifier(final long identifier) {
+            super(String.valueOf(identifier));
+        }
+    }
 
     private final Map<String, String> state = new HashMap<>();
 
@@ -47,7 +57,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
 
     public ExampleActor(String id, Map<String, String> peerAddresses,
         Optional<ConfigParams> configParams) {
-        super(id, peerAddresses, configParams);
+        super(id, peerAddresses, configParams, (short)0);
         setPersistence(true);
         roleChangeNotifier = createRoleChangeNotifier(id);
     }
@@ -57,11 +67,11 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
     }
 
-    @Override public void onReceiveCommand(Object message) throws Exception{
+    @Override
+    protected void handleNonRaftCommand(Object message) {
         if(message instanceof KeyValue){
             if(isLeader()) {
-                String persistId = Long.toString(persistIdentifier++);
-                persistData(getSender(), persistId, (Payload) message);
+                persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
             } else {
                 if(getLeader() != null) {
                     getLeader().forward(message, getContext());
@@ -79,17 +89,17 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
-                        getRaftActorContext().getPeerAddresses().keySet(), followers);
+                        getRaftActorContext().getPeerIds(), followers);
                 } else {
                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
-                        getRaftActorContext().getPeerAddresses().keySet());
+                        getRaftActorContext().getPeerIds());
                 }
 
 
             }
 
         } else {
-            super.onReceiveCommand(message);
+            super.handleNonRaftCommand(message);
         }
     }
 
@@ -110,8 +120,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
         return roleChangeNotifier;
     }
 
-    @Override protected void applyState(final ActorRef clientActor, final String identifier,
-        final Object data) {
+    @Override
+    protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if(data instanceof KeyValue){
             KeyValue kv = (KeyValue) data;
             state.put(kv.getKey(), kv.getValue());
@@ -122,21 +132,23 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void createSnapshot(ActorRef actorRef) {
-        ByteString bs = null;
+    public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
         try {
-            bs = fromObject(state);
+            if (installSnapshotStream.isPresent()) {
+                SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
+            }
         } catch (Exception e) {
             LOG.error("Exception in creating snapshot", e);
         }
-        getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
+
+        getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
     }
 
     @Override
-    public void applySnapshot(byte [] snapshot) {
+    public void applySnapshot(Snapshot.State snapshotState) {
         state.clear();
         try {
-            state.putAll((HashMap<String, String>) toObject(snapshot));
+            state.putAll(((MapState)snapshotState).state);
         } catch (Exception e) {
            LOG.error("Exception in applying snapshot", e);
         }
@@ -145,53 +157,10 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
         }
     }
 
-    private ByteString fromObject(Object snapshot) throws Exception {
-        ByteArrayOutputStream b = null;
-        ObjectOutputStream o = null;
-        try {
-            b = new ByteArrayOutputStream();
-            o = new ObjectOutputStream(b);
-            o.writeObject(snapshot);
-            byte[] snapshotBytes = b.toByteArray();
-            return ByteString.copyFrom(snapshotBytes);
-        } finally {
-            if (o != null) {
-                o.flush();
-                o.close();
-            }
-            if (b != null) {
-                b.close();
-            }
-        }
-    }
-
-    private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
-        Object obj = null;
-        ByteArrayInputStream bis = null;
-        ObjectInputStream ois = null;
-        try {
-            bis = new ByteArrayInputStream(bs);
-            ois = new ObjectInputStream(bis);
-            obj = ois.readObject();
-        } finally {
-            if (bis != null) {
-                bis.close();
-            }
-            if (ois != null) {
-                ois.close();
-            }
-        }
-        return obj;
-    }
-
     @Override protected void onStateChanged() {
 
     }
 
-    @Override public void onReceiveRecover(Object message)throws Exception {
-        super.onReceiveRecover(message);
-    }
-
     @Override public String persistenceId() {
         return getId();
     }
@@ -219,11 +188,41 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort,
     }
 
     @Override
-    public void applyRecoverySnapshot(byte[] snapshot) {
+    public void applyRecoverySnapshot(Snapshot.State snapshotState) {
     }
 
     @Override
     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
         return this;
     }
+
+    @Override
+    public Snapshot getRestoreFromSnapshot() {
+        return null;
+    }
+
+    @Override
+    public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+        try {
+            return deserializePreCarbonSnapshot(snapshotBytes.read());
+        } catch (IOException e) {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Snapshot.State deserializePreCarbonSnapshot(byte[] from) {
+        return new MapState((Map<String, String>) SerializationUtils.deserialize(from));
+    }
+
+    private static class MapState implements Snapshot.State {
+        private static final long serialVersionUID = 1L;
+
+        Map<String, String> state;
+
+        MapState(Map<String, String> state) {
+            this.state = state;
+        }
+    }
 }