import akka.event.LoggingAdapter;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
+import akka.persistence.SaveSnapshotSuccess;
+import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.persistence.UntypedPersistentActor;
import org.opendaylight.controller.cluster.raft.behaviors.Candidate;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.internal.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
* RaftActor encapsulates a state machine that needs to be kept synchronized
* in a cluster. It implements the RAFT algorithm as described in the paper
* <a href='https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf'>
- * In Search of an Understandable Consensus Algorithm</a>
- * <p>
+ * In Search of an Understandable Consensus Algorithm</a>
+ * <p/>
* RaftActor has 3 states and each state has a certain behavior associated
* with it. A Raft actor can behave as,
* <ul>
- * <li> A Leader </li>
- * <li> A Follower (or) </li>
- * <li> A Candidate </li>
+ * <li> A Leader </li>
+ * <li> A Follower (or) </li>
+ * <li> A Candidate </li>
* </ul>
- *
- * <p>
+ * <p/>
+ * <p/>
* A RaftActor MUST be a Leader in order to accept requests from clients to
* change the state of it's encapsulated state machine. Once a RaftActor becomes
* a Leader it is also responsible for ensuring that all followers ultimately
* have the same log and therefore the same state machine as itself.
- *
- * <p>
+ * <p/>
+ * <p/>
* The current behavior of a RaftActor determines how election for leadership
* is initiated and how peer RaftActors react to request for votes.
- *
- * <p>
+ * <p/>
+ * <p/>
* Each RaftActor also needs to know the current election term. It uses this
* information for a couple of things. One is to simply figure out who it
* voted for in the last election. Another is to figure out if the message
* it received to update it's state is stale.
- *
- * <p>
+ * <p/>
+ * <p/>
* The RaftActor uses akka-persistence to store it's replicated log.
* Furthermore through it's behaviors a Raft Actor determines
- *
+ * <p/>
* <ul>
* <li> when a log entry should be persisted </li>
* <li> when a log entry should be applied to the state machine (and) </li>
* <li> when a snapshot should be saved </li>
* </ul>
- *
- * <a href="http://doc.akka.io/api/akka/2.3.3/index.html#akka.persistence.UntypedEventsourcedProcessor">UntypeEventSourceProcessor</a>
*/
public abstract class RaftActor extends UntypedPersistentActor {
protected final LoggingAdapter LOG =
Logging.getLogger(getContext().system(), this);
/**
- * The current state determines the current behavior of a RaftActor
+ * The current state determines the current behavior of a RaftActor
* A Raft Actor always starts off in the Follower State
*/
private RaftActorBehavior currentBehavior;
private ReplicatedLogImpl replicatedLog = new ReplicatedLogImpl();
-
- public RaftActor(String id, Map<String, String> peerAddresses){
+ public RaftActor(String id, Map<String, String> peerAddresses) {
context = new RaftActorContextImpl(this.getSelf(),
this.getContext(),
id, new ElectionTermImpl(getSelf().path().toString()),
-1, -1, replicatedLog, peerAddresses, LOG);
- currentBehavior = switchBehavior(RaftState.Follower);
}
@Override public void onReceiveRecover(Object message) {
- if(message instanceof ReplicatedLogEntry) {
+ if (message instanceof SnapshotOffer) {
+ SnapshotOffer offer = (SnapshotOffer) message;
+ Snapshot snapshot = (Snapshot) offer.snapshot();
+
+ // Create a replicated log with the snapshot information
+ // The replicated log can be used later on to retrieve this snapshot
+ // when we need to install it on a peer
+ replicatedLog = new ReplicatedLogImpl(snapshot);
+
+ // Apply the snapshot to the actors state
+ applySnapshot(snapshot.getState());
+
+ } else if (message instanceof ReplicatedLogEntry) {
replicatedLog.append((ReplicatedLogEntry) message);
- } else if(message instanceof RecoveryCompleted){
- LOG.debug("Log now has messages to index : " + replicatedLog.lastIndex());
+ } else if (message instanceof RecoveryCompleted) {
+ LOG.debug(
+ "Last index in log : " + replicatedLog.lastIndex());
+ currentBehavior = switchBehavior(RaftState.Follower);
}
}
@Override public void onReceiveCommand(Object message) {
- if(message instanceof ApplyState){
+ if (message instanceof ApplyState) {
- ApplyState applyState = (ApplyState) message;
+ ApplyState applyState = (ApplyState) message;
- LOG.debug("Applying state for log index {}", applyState.getReplicatedLogEntry().getIndex());
+ LOG.debug("Applying state for log index {}",
+ applyState.getReplicatedLogEntry().getIndex());
applyState(applyState.getClientActor(), applyState.getIdentifier(),
applyState.getReplicatedLogEntry().getData());
- } else if(message instanceof FindLeader){
- getSender().tell(new FindLeaderReply(
- context.getPeerAddress(currentBehavior.getLeaderId())),
- getSelf());
+ } else if(message instanceof ApplySnapshot ) {
+ applySnapshot(((ApplySnapshot) message).getSnapshot());
+ } else if (message instanceof FindLeader) {
+ getSender().tell(
+ new FindLeaderReply(
+ context.getPeerAddress(currentBehavior.getLeaderId())),
+ getSelf()
+ );
+ } else if (message instanceof SaveSnapshotSuccess) {
+ SaveSnapshotSuccess success = (SaveSnapshotSuccess) message;
+
+ // TODO: Not sure if we want to be this aggressive with trimming stuff
+ trimPersistentData(success.metadata().sequenceNr());
+
+ } else if (message instanceof SaveSnapshotFailure) {
+ // TODO: Handle failure in saving the snapshot
} else {
RaftState state =
currentBehavior.handleMessage(getSender(), message);
}
}
- private RaftActorBehavior switchBehavior(RaftState state){
- if(currentBehavior != null) {
- if (currentBehavior.state() == state) {
- return currentBehavior;
- }
- LOG.info("Switching from state " + currentBehavior.state() + " to "
- + state);
-
- try {
- currentBehavior.close();
- } catch (Exception e) {
- LOG.error(e, "Failed to close behavior : " + currentBehavior.state());
- }
- } else {
- LOG.info("Switching behavior to " + state);
- }
- RaftActorBehavior behavior = null;
- if(state == RaftState.Candidate){
- behavior = new Candidate(context);
- } else if(state == RaftState.Follower){
- behavior = new Follower(context);
- } else {
- behavior = new Leader(context);
- }
- return behavior;
- }
/**
* When a derived RaftActor needs to persist something it must call
* @param identifier
* @param data
*/
- protected void persistData(ActorRef clientActor, String identifier, Object data){
+ protected void persistData(ActorRef clientActor, String identifier,
+ Object data) {
LOG.debug("Persist data " + identifier);
ReplicatedLogEntry replicatedLogEntry = new ReplicatedLogImplEntry(
context.getReplicatedLog().lastIndex() + 1,
context.getTermInformation().getCurrentTerm(), data);
- replicatedLog.appendAndPersist(clientActor, identifier, replicatedLogEntry);
+ replicatedLog
+ .appendAndPersist(clientActor, identifier, replicatedLogEntry);
}
- protected abstract void applyState(ActorRef clientActor, String identifier, Object data);
-
- protected String getId(){
+ protected String getId() {
return context.getId();
}
- protected boolean isLeader(){
+ /**
+ * Derived actors can call the isLeader method to check if the current
+ * RaftActor is the Leader or not
+ *
+ * @return true it this RaftActor is a Leader false otherwise
+ */
+ protected boolean isLeader() {
return context.getId().equals(currentBehavior.getLeaderId());
}
- protected ActorSelection getLeader(){
+ /**
+ * Derived actor can call getLeader if they need a reference to the Leader.
+ * This would be useful for example in forwarding a request to an actor
+ * which is the leader
+ *
+ * @return A reference to the leader if known, null otherwise
+ */
+ protected ActorSelection getLeader() {
String leaderId = currentBehavior.getLeaderId();
+ if (leaderId == null) {
+ return null;
+ }
String peerAddress = context.getPeerAddress(leaderId);
- LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = " + peerAddress);
+ LOG.debug("getLeader leaderId = " + leaderId + " peerAddress = "
+ + peerAddress);
return context.actorSelection(peerAddress);
}
+ /**
+ * The applyState method will be called by the RaftActor when some data
+ * needs to be applied to the actor's state
+ *
+ * @param clientActor A reference to the client who sent this message. This
+ * is the same reference that was passed to persistData
+ * by the derived actor. clientActor may be null when
+ * the RaftActor is behaving as a follower or during
+ * recovery.
+ * @param identifier The identifier of the persisted data. This is also
+ * the same identifier that was passed to persistData by
+ * the derived actor. identifier may be null when
+ * the RaftActor is behaving as a follower or during
+ * recovery
+ * @param data A piece of data that was persisted by the persistData call.
+ * This should NEVER be null.
+ */
+ protected abstract void applyState(ActorRef clientActor, String identifier,
+ Object data);
+
+ /**
+ * This method will be called by the RaftActor when a snapshot needs to be
+ * created. The derived actor should respond with its current state.
+ * <p/>
+ * During recovery the state that is returned by the derived actor will
+ * be passed back to it by calling the applySnapshot method
+ *
+ * @return The current state of the actor
+ */
+ protected abstract Object createSnapshot();
+
+ /**
+ * This method will be called by the RaftActor during recovery to
+ * reconstruct the state of the actor.
+ * <p/>
+ * This method may also be called at any other point during normal
+ * operations when the derived actor is out of sync with it's peers
+ * and the only way to bring it in sync is by applying a snapshot
+ *
+ * @param snapshot A snapshot of the state of the actor
+ */
+ protected abstract void applySnapshot(Object snapshot);
+
+ private RaftActorBehavior switchBehavior(RaftState state) {
+ if (currentBehavior != null) {
+ if (currentBehavior.state() == state) {
+ return currentBehavior;
+ }
+ LOG.info("Switching from state " + currentBehavior.state() + " to "
+ + state);
+
+ try {
+ currentBehavior.close();
+ } catch (Exception e) {
+ LOG.error(e,
+ "Failed to close behavior : " + currentBehavior.state());
+ }
+
+ } else {
+ LOG.info("Switching behavior to " + state);
+ }
+ RaftActorBehavior behavior = null;
+ if (state == RaftState.Candidate) {
+ behavior = new Candidate(context);
+ } else if (state == RaftState.Follower) {
+ behavior = new Follower(context);
+ } else {
+ behavior = new Leader(context);
+ }
+ return behavior;
+ }
+
+ private void trimPersistentData(long sequenceNumber) {
+ // Trim snapshots
+ // FIXME : Not sure how exactly the SnapshotSelectionCriteria is applied
+ // For now guessing that it is ANDed.
+ deleteSnapshots(new SnapshotSelectionCriteria(
+ sequenceNumber - 100000, 43200000));
+
+ // Trim journal
+ deleteMessages(sequenceNumber);
+ }
+
private class ReplicatedLogImpl implements ReplicatedLog {
- private final List<ReplicatedLogEntry> journal = new ArrayList();
- private long snapshotIndex = 0;
- private Object snapShot = null;
+ private final List<ReplicatedLogEntry> journal;
+ private final Object snapshot;
+ private long snapshotIndex = -1;
+ private long snapshotTerm = -1;
+
+ public ReplicatedLogImpl(Snapshot snapshot) {
+ this.snapshot = snapshot.getState();
+ this.snapshotIndex = snapshot.getLastAppliedIndex();
+ this.snapshotTerm = snapshot.getLastAppliedTerm();
+
+ this.journal = new ArrayList<>(snapshot.getUnAppliedEntries());
+ }
+ public ReplicatedLogImpl() {
+ this.snapshot = null;
+ this.journal = new ArrayList<>();
+ }
@Override public ReplicatedLogEntry get(long index) {
- if(index < 0 || index >= journal.size()){
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
return null;
}
- return journal.get((int) (index - snapshotIndex));
+ return journal.get(adjustedIndex);
}
@Override public ReplicatedLogEntry last() {
- if(journal.size() == 0){
+ if (journal.size() == 0) {
return null;
}
return get(journal.size() - 1);
}
@Override public long lastIndex() {
- if(journal.size() == 0){
+ if (journal.size() == 0) {
return -1;
}
}
@Override public long lastTerm() {
- if(journal.size() == 0){
+ if (journal.size() == 0) {
return -1;
}
@Override public void removeFrom(long index) {
- if(index < 0 || index >= journal.size()){
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
return;
}
- for(int i= (int) (index - snapshotIndex) ; i < journal.size() ; i++){
+ for (int i = adjustedIndex;
+ i < journal.size(); i++) {
deleteMessage(i);
journal.remove(i);
}
}
- @Override public void append(final ReplicatedLogEntry replicatedLogEntry) {
+ @Override public void append(
+ final ReplicatedLogEntry replicatedLogEntry) {
journal.add(replicatedLogEntry);
}
@Override public List<ReplicatedLogEntry> getFrom(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
List<ReplicatedLogEntry> entries = new ArrayList<>(100);
- if(index < 0 || index >= journal.size()){
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
return entries;
}
- for(int i= (int) (index - snapshotIndex); i < journal.size() ; i++){
+ for (int i = adjustedIndex;
+ i < journal.size(); i++) {
entries.add(journal.get(i));
}
return entries;
}
- @Override public void appendAndPersist(final ReplicatedLogEntry replicatedLogEntry){
+ @Override public void appendAndPersist(
+ final ReplicatedLogEntry replicatedLogEntry) {
appendAndPersist(null, null, replicatedLogEntry);
}
- public void appendAndPersist(final ActorRef clientActor, final String identifier, final ReplicatedLogEntry replicatedLogEntry){
- context.getLogger().debug("Append log entry and persist" + replicatedLogEntry.getIndex());
+ public void appendAndPersist(final ActorRef clientActor,
+ final String identifier,
+ final ReplicatedLogEntry replicatedLogEntry) {
+ context.getLogger().debug(
+ "Append log entry and persist " + replicatedLogEntry.getIndex());
// FIXME : By adding the replicated log entry to the in-memory journal we are not truly ensuring durability of the logs
journal.add(replicatedLogEntry);
+
+ // When persisting events with persist it is guaranteed that the
+ // persistent actor will not receive further commands between the
+ // persist call and the execution(s) of the associated event
+ // handler. This also holds for multiple persist calls in context
+ // of a single command.
persist(replicatedLogEntry,
new Procedure<ReplicatedLogEntry>() {
public void apply(ReplicatedLogEntry evt) throws Exception {
+ // FIXME : Tentatively create a snapshot every hundred thousand entries. To be tuned.
+ if (size() > 100000) {
+ ReplicatedLogEntry lastAppliedEntry =
+ get(context.getLastApplied());
+ long lastAppliedIndex = -1;
+ long lastAppliedTerm = -1;
+ if (lastAppliedEntry != null) {
+ lastAppliedIndex = lastAppliedEntry.getIndex();
+ lastAppliedTerm = lastAppliedEntry.getTerm();
+ }
+
+ saveSnapshot(Snapshot.create(createSnapshot(),
+ getFrom(context.getLastApplied() + 1),
+ lastIndex(), lastTerm(), lastAppliedIndex,
+ lastAppliedTerm));
+ }
// Send message for replication
- if(clientActor != null) {
+ if (clientActor != null) {
currentBehavior.handleMessage(getSelf(),
new Replicate(clientActor, identifier,
- replicatedLogEntry));
+ replicatedLogEntry)
+ );
}
}
- });
+ }
+ );
}
@Override public long size() {
return journal.size() + snapshotIndex;
}
+
+ @Override public boolean isPresent(long index) {
+ int adjustedIndex = adjustedIndex(index);
+
+ if (adjustedIndex < 0 || adjustedIndex >= journal.size()) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override public boolean isInSnapshot(long index) {
+ return index <= snapshotIndex;
+ }
+
+ @Override public Object getSnapshot() {
+ return snapshot;
+ }
+
+ @Override public long getSnapshotIndex() {
+ return snapshotIndex;
+ }
+
+ @Override public long getSnapshotTerm() {
+ return snapshotTerm;
+ }
+
+ private int adjustedIndex(long index) {
+ if(snapshotIndex < 0){
+ return (int) index;
+ }
+ return (int) (index - snapshotIndex);
+ }
}
+
private static class ReplicatedLogImplEntry implements ReplicatedLogEntry,
Serializable {
private final long term;
private final Object payload;
- public ReplicatedLogImplEntry(long index, long term, Object payload){
+ public ReplicatedLogImplEntry(long index, long term, Object payload) {
this.index = index;
this.term = term;
}
+ private static class Snapshot implements Serializable {
+ private final Object state;
+ private final List<ReplicatedLogEntry> unAppliedEntries;
+ private final long lastIndex;
+ private final long lastTerm;
+ private final long lastAppliedIndex;
+ private final long lastAppliedTerm;
+
+ private Snapshot(Object state,
+ List<ReplicatedLogEntry> unAppliedEntries, long lastIndex,
+ long lastTerm, long lastAppliedIndex, long lastAppliedTerm) {
+ this.state = state;
+ this.unAppliedEntries = unAppliedEntries;
+ this.lastIndex = lastIndex;
+ this.lastTerm = lastTerm;
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ }
+
+
+ public static Snapshot create(Object state,
+ List<ReplicatedLogEntry> entries, long lastIndex, long lastTerm,
+ long lastAppliedIndex, long lastAppliedTerm) {
+ return new Snapshot(state, entries, lastIndex, lastTerm,
+ lastAppliedIndex, lastAppliedTerm);
+ }
+
+ public Object getState() {
+ return state;
+ }
+
+ public List<ReplicatedLogEntry> getUnAppliedEntries() {
+ return unAppliedEntries;
+ }
+
+ public long getLastTerm() {
+ return lastTerm;
+ }
+
+ public long getLastAppliedIndex() {
+ return lastAppliedIndex;
+ }
+
+ public long getLastAppliedTerm() {
+ return lastAppliedTerm;
+ }
+ }
+
+
}
import org.opendaylight.controller.cluster.raft.internal.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.internal.messages.Replicate;
import org.opendaylight.controller.cluster.raft.internal.messages.SendHeartBeat;
+import org.opendaylight.controller.cluster.raft.internal.messages.SendInstallSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
+import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
private final Map<String, ActorSelection> followerToActor = new HashMap<>();
- private Cancellable heartbeatCancel = null;
+ private Cancellable heartbeatSchedule = null;
+ private Cancellable appendEntriesSchedule = null;
+ private Cancellable installSnapshotSchedule = null;
private List<ClientRequestTracker> trackerList = new ArrayList<>();
// prevent election timeouts (§5.2)
scheduleHeartBeat(new FiniteDuration(0, TimeUnit.SECONDS));
+ scheduleInstallSnapshotCheck(
+ new FiniteDuration(HEART_BEAT_INTERVAL.length() * 1000,
+ HEART_BEAT_INTERVAL.unit())
+ );
}
try {
if (message instanceof SendHeartBeat) {
return sendHeartBeat();
+ } else if(message instanceof SendInstallSnapshot) {
+ installSnapshotIfNeeded();
} else if (message instanceof Replicate) {
replicate((Replicate) message);
+ } else if (message instanceof InstallSnapshotReply){
+ // FIXME : Should I be checking the term here too?
+ handleInstallSnapshotReply(
+ (InstallSnapshotReply) message);
}
} finally {
scheduleHeartBeat(HEART_BEAT_INTERVAL);
return super.handleMessage(sender, message);
}
+ private void handleInstallSnapshotReply(InstallSnapshotReply message) {
+ InstallSnapshotReply reply = message;
+ String followerId = reply.getFollowerId();
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ followerLogInformation
+ .setMatchIndex(context.getReplicatedLog().getSnapshotIndex());
+ followerLogInformation
+ .setNextIndex(context.getReplicatedLog().getSnapshotIndex() + 1);
+ }
+
private void replicate(Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
long nextIndex = followerLogInformation.getNextIndex().get();
- List<ReplicatedLogEntry> entries =
- context.getReplicatedLog().getFrom(nextIndex);
+ List<ReplicatedLogEntry> entries = Collections.emptyList();
+
+ if(context.getReplicatedLog().isPresent(nextIndex)){
+ entries =
+ context.getReplicatedLog().getFrom(nextIndex);
+ }
followerActor.tell(
new AppendEntries(currentTerm(), context.getId(),
}
}
+ private void installSnapshotIfNeeded(){
+ for (String followerId : followerToActor.keySet()) {
+ ActorSelection followerActor =
+ followerToActor.get(followerId);
+
+ FollowerLogInformation followerLogInformation =
+ followerToLog.get(followerId);
+
+ long nextIndex = followerLogInformation.getNextIndex().get();
+
+ if(!context.getReplicatedLog().isPresent(nextIndex) && context.getReplicatedLog().isInSnapshot(nextIndex)){
+ followerActor.tell(
+ new InstallSnapshot(currentTerm(), context.getId(),
+ context.getReplicatedLog().getSnapshotIndex(),
+ context.getReplicatedLog().getSnapshotTerm(),
+ context.getReplicatedLog().getSnapshot()
+ ),
+ actor()
+ );
+ }
+ }
+ }
+
private RaftState sendHeartBeat() {
if (followerToActor.size() > 0) {
sendAppendEntries();
}
private void stopHeartBeat() {
- if (heartbeatCancel != null && !heartbeatCancel.isCancelled()) {
- heartbeatCancel.cancel();
+ if (heartbeatSchedule != null && !heartbeatSchedule.isCancelled()) {
+ heartbeatSchedule.cancel();
+ }
+ }
+
+ private void stopInstallSnapshotSchedule() {
+ if (installSnapshotSchedule != null && !installSnapshotSchedule.isCancelled()) {
+ installSnapshotSchedule.cancel();
}
}
// Scheduling the heartbeat only once here because heartbeats do not
// need to be sent if there are other messages being sent to the remote
// actor.
- heartbeatCancel =
+ heartbeatSchedule =
context.getActorSystem().scheduler().scheduleOnce(
interval,
context.getActor(), new SendHeartBeat(),
context.getActorSystem().dispatcher(), context.getActor());
}
+
+ private void scheduleInstallSnapshotCheck(FiniteDuration interval) {
+ if(followerToActor.keySet().size() == 0){
+ // Optimization - do not bother scheduling a heartbeat as there are
+ // no followers
+ return;
+ }
+
+ stopInstallSnapshotSchedule();
+
+ // Schedule a message to send append entries to followers that can
+ // accept an append entries with some data in it
+ installSnapshotSchedule =
+ context.getActorSystem().scheduler().scheduleOnce(
+ interval,
+ context.getActor(), new SendInstallSnapshot(),
+ context.getActorSystem().dispatcher(), context.getActor());
+ }
+
+
+
@Override public void close() throws Exception {
stopHeartBeat();
}