X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=5ab3f69bea994d60644cca47b9694205ae2ab039;hb=9f1061c46af5220ad95d8d0b94411ba2fd832a50;hp=6c65021d86158b22483ada73704f68ba9f83d84c;hpb=a469dbcec569cc972df0cd57cf725a2173d2604a;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 6c65021d86..5ab3f69bea 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; -import akka.japi.Creator; import com.google.common.base.Optional; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; @@ -20,7 +19,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; -import org.opendaylight.controller.cluster.DataPersistenceProvider; +import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; import org.opendaylight.controller.cluster.example.messages.PrintRole; @@ -28,6 +27,8 @@ import org.opendaylight.controller.cluster.example.messages.PrintState; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.ConfigParams; import org.opendaylight.controller.cluster.raft.RaftActor; +import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; +import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; @@ -36,30 +37,24 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payloa /** * A sample actor showing how the RaftActor is to be extended */ -public class ExampleActor extends RaftActor { +public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { - private final Map state = new HashMap(); - private final DataPersistenceProvider dataPersistenceProvider; + private final Map state = new HashMap<>(); private long persistIdentifier = 1; - private Optional roleChangeNotifier; + private final Optional roleChangeNotifier; public ExampleActor(String id, Map peerAddresses, Optional configParams) { super(id, peerAddresses, configParams); - this.dataPersistenceProvider = new PersistentDataProvider(); + setPersistence(true); roleChangeNotifier = createRoleChangeNotifier(id); } public static Props props(final String id, final Map peerAddresses, - final Optional configParams){ - return Props.create(new Creator(){ - - @Override public ExampleActor create() throws Exception { - return new ExampleActor(id, peerAddresses, configParams); - } - }); + final Optional configParams) { + return Props.create(ExampleActor.class, id, peerAddresses, configParams); } @Override public void onReceiveCommand(Object message) throws Exception{ @@ -81,9 +76,8 @@ public class ExampleActor extends RaftActor { } else if (message instanceof PrintRole) { if(LOG.isDebugEnabled()) { - String followers = ""; if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { - followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); + final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getRaftActorContext().getPeerAddresses().keySet(), followers); } else { @@ -127,25 +121,27 @@ public class ExampleActor extends RaftActor { } } - @Override protected void createSnapshot() { + @Override + public void createSnapshot(ActorRef actorRef) { ByteString bs = null; try { bs = fromObject(state); } catch (Exception e) { - LOG.error(e, "Exception in creating snapshot"); + LOG.error("Exception in creating snapshot", e); } - getSelf().tell(new CaptureSnapshotReply(bs), null); + getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); } - @Override protected void applySnapshot(ByteString snapshot) { + @Override + public void applySnapshot(byte [] snapshot) { state.clear(); try { - state.putAll((HashMap) toObject(snapshot)); + state.putAll((HashMap) toObject(snapshot)); } catch (Exception e) { - LOG.error(e, "Exception in applying snapshot"); + LOG.error("Exception in applying snapshot", e); } if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); + LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); } } @@ -169,12 +165,12 @@ public class ExampleActor extends RaftActor { } } - private Object toObject(ByteString bs) throws ClassNotFoundException, IOException { + private Object toObject(byte [] bs) throws ClassNotFoundException, IOException { Object obj = null; ByteArrayInputStream bis = null; ObjectInputStream ois = null; try { - bis = new ByteArrayInputStream(bs.toByteArray()); + bis = new ByteArrayInputStream(bs); ois = new ObjectInputStream(bis); obj = ois.readObject(); } finally { @@ -192,11 +188,6 @@ public class ExampleActor extends RaftActor { } - @Override - protected DataPersistenceProvider persistence() { - return dataPersistenceProvider; - } - @Override public void onReceiveRecover(Object message)throws Exception { super.onReceiveRecover(message); } @@ -206,22 +197,33 @@ public class ExampleActor extends RaftActor { } @Override - protected void startLogRecoveryBatch(int maxBatchSize) { + @Nonnull + protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { + return this; + } + + @Override + public void startLogRecoveryBatch(int maxBatchSize) { + } + + @Override + public void appendRecoveredLogEntry(Payload data) { } @Override - protected void appendRecoveredLogEntry(Payload data) { + public void applyCurrentLogRecoveryBatch() { } @Override - protected void applyCurrentLogRecoveryBatch() { + public void onRecoveryComplete() { } @Override - protected void onRecoveryComplete() { + public void applyRecoverySnapshot(byte[] snapshot) { } @Override - protected void applyRecoverySnapshot(ByteString snapshot) { + protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { + return this; } }