X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft-example%2Fsrc%2Fmain%2Fjava%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;fp=opendaylight%2Fmd-sal%2Fsal-akka-raft-example%2Fsrc%2Fmain%2Fjava%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=0000000000000000000000000000000000000000;hb=abf88e6a9973fe2f0e2c59af9b6543359772ced5;hp=5ab3f69bea994d60644cca47b9694205ae2ab039;hpb=d8acce7ba659501f61371eb07a86f391adde87d1;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/ExampleActor.java deleted file mode 100644 index 5ab3f69bea..0000000000 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/opendaylight/controller/cluster/example/ExampleActor.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.cluster.example; - -import akka.actor.ActorRef; -import akka.actor.Props; -import com.google.common.base.Optional; -import com.google.protobuf.ByteString; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nonnull; -import org.opendaylight.controller.cluster.example.messages.KeyValue; -import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; -import org.opendaylight.controller.cluster.example.messages.PrintRole; -import org.opendaylight.controller.cluster.example.messages.PrintState; -import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; -import org.opendaylight.controller.cluster.raft.ConfigParams; -import org.opendaylight.controller.cluster.raft.RaftActor; -import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; -import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.controller.cluster.raft.RaftState; -import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; -import org.opendaylight.controller.cluster.raft.behaviors.Leader; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; - -/** - * A sample actor showing how the RaftActor is to be extended - */ -public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { - - private final Map state = new HashMap<>(); - - private long persistIdentifier = 1; - private final Optional roleChangeNotifier; - - - public ExampleActor(String id, Map peerAddresses, - Optional configParams) { - super(id, peerAddresses, configParams); - setPersistence(true); - roleChangeNotifier = createRoleChangeNotifier(id); - } - - public static Props props(final String id, final Map peerAddresses, - final Optional configParams) { - return Props.create(ExampleActor.class, id, peerAddresses, configParams); - } - - @Override public void onReceiveCommand(Object message) throws Exception{ - if(message instanceof KeyValue){ - if(isLeader()) { - String persistId = Long.toString(persistIdentifier++); - persistData(getSender(), persistId, (Payload) message); - } else { - if(getLeader() != null) { - getLeader().forward(message, getContext()); - } - } - - } else if (message instanceof PrintState) { - if(LOG.isDebugEnabled()) { - LOG.debug("State of the node:{} has entries={}, {}", - getId(), state.size(), getReplicatedLogState()); - } - - } else if (message instanceof PrintRole) { - if(LOG.isDebugEnabled()) { - if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { - final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); - LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), - getRaftActorContext().getPeerAddresses().keySet(), followers); - } else { - LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), - getRaftActorContext().getPeerAddresses().keySet()); - } - - - } - - } else { - super.onReceiveCommand(message); - } - } - - protected String getReplicatedLogState() { - return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex() - + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm() - + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size(); - } - - public Optional createRoleChangeNotifier(String actorId) { - ActorRef exampleRoleChangeNotifier = this.getContext().actorOf( - RoleChangeNotifier.getProps(actorId), actorId + "-notifier"); - return Optional.of(exampleRoleChangeNotifier); - } - - @Override - protected Optional getRoleChangeNotifier() { - return roleChangeNotifier; - } - - @Override protected void applyState(final ActorRef clientActor, final String identifier, - final Object data) { - if(data instanceof KeyValue){ - KeyValue kv = (KeyValue) data; - state.put(kv.getKey(), kv.getValue()); - if(clientActor != null) { - clientActor.tell(new KeyValueSaved(), getSelf()); - } - } - } - - @Override - public void createSnapshot(ActorRef actorRef) { - ByteString bs = null; - try { - bs = fromObject(state); - } catch (Exception e) { - LOG.error("Exception in creating snapshot", e); - } - getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null); - } - - @Override - public void applySnapshot(byte [] snapshot) { - state.clear(); - try { - state.putAll((HashMap) toObject(snapshot)); - } catch (Exception e) { - LOG.error("Exception in applying snapshot", e); - } - if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); - } - } - - private ByteString fromObject(Object snapshot) throws Exception { - ByteArrayOutputStream b = null; - ObjectOutputStream o = null; - try { - b = new ByteArrayOutputStream(); - o = new ObjectOutputStream(b); - o.writeObject(snapshot); - byte[] snapshotBytes = b.toByteArray(); - return ByteString.copyFrom(snapshotBytes); - } finally { - if (o != null) { - o.flush(); - o.close(); - } - if (b != null) { - b.close(); - } - } - } - - private Object toObject(byte [] bs) throws ClassNotFoundException, IOException { - Object obj = null; - ByteArrayInputStream bis = null; - ObjectInputStream ois = null; - try { - bis = new ByteArrayInputStream(bs); - ois = new ObjectInputStream(bis); - obj = ois.readObject(); - } finally { - if (bis != null) { - bis.close(); - } - if (ois != null) { - ois.close(); - } - } - return obj; - } - - @Override protected void onStateChanged() { - - } - - @Override public void onReceiveRecover(Object message)throws Exception { - super.onReceiveRecover(message); - } - - @Override public String persistenceId() { - return getId(); - } - - @Override - @Nonnull - protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { - return this; - } - - @Override - public void startLogRecoveryBatch(int maxBatchSize) { - } - - @Override - public void appendRecoveredLogEntry(Payload data) { - } - - @Override - public void applyCurrentLogRecoveryBatch() { - } - - @Override - public void onRecoveryComplete() { - } - - @Override - public void applyRecoverySnapshot(byte[] snapshot) { - } - - @Override - protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() { - return this; - } -}