2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.example;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.util.HashMap;
22 import org.opendaylight.controller.cluster.example.messages.KeyValue;
23 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
24 import org.opendaylight.controller.cluster.example.messages.PrintRole;
25 import org.opendaylight.controller.cluster.example.messages.PrintState;
26 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
27 import org.opendaylight.controller.cluster.raft.ConfigParams;
28 import org.opendaylight.controller.cluster.raft.RaftActor;
29 import org.opendaylight.controller.cluster.raft.RaftState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
32 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
35 * A sample actor showing how the RaftActor is to be extended
37 public class ExampleActor extends RaftActor {
39 private final Map<String, String> state = new HashMap();
41 private long persistIdentifier = 1;
42 private final Optional<ActorRef> roleChangeNotifier;
45 public ExampleActor(String id, Map<String, String> peerAddresses,
46 Optional<ConfigParams> configParams) {
47 super(id, peerAddresses, configParams);
49 roleChangeNotifier = createRoleChangeNotifier(id);
52 public static Props props(final String id, final Map<String, String> peerAddresses,
53 final Optional<ConfigParams> configParams) {
54 return Props.create(ExampleActor.class, id, peerAddresses, configParams);
57 @Override public void onReceiveCommand(Object message) throws Exception{
58 if(message instanceof KeyValue){
60 String persistId = Long.toString(persistIdentifier++);
61 persistData(getSender(), persistId, (Payload) message);
63 if(getLeader() != null) {
64 getLeader().forward(message, getContext());
68 } else if (message instanceof PrintState) {
69 if(LOG.isDebugEnabled()) {
70 LOG.debug("State of the node:{} has entries={}, {}",
71 getId(), state.size(), getReplicatedLogState());
74 } else if (message instanceof PrintRole) {
75 if(LOG.isDebugEnabled()) {
76 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
77 final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
78 LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
79 getRaftActorContext().getPeerAddresses().keySet(), followers);
81 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
82 getRaftActorContext().getPeerAddresses().keySet());
89 super.onReceiveCommand(message);
93 protected String getReplicatedLogState() {
94 return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
95 + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
96 + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
99 public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
100 ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
101 RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
102 return Optional.<ActorRef>of(exampleRoleChangeNotifier);
106 protected Optional<ActorRef> getRoleChangeNotifier() {
107 return roleChangeNotifier;
110 @Override protected void applyState(final ActorRef clientActor, final String identifier,
112 if(data instanceof KeyValue){
113 KeyValue kv = (KeyValue) data;
114 state.put(kv.getKey(), kv.getValue());
115 if(clientActor != null) {
116 clientActor.tell(new KeyValueSaved(), getSelf());
121 @Override protected void createSnapshot() {
122 ByteString bs = null;
124 bs = fromObject(state);
125 } catch (Exception e) {
126 LOG.error("Exception in creating snapshot", e);
128 getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
131 @Override protected void applySnapshot(byte [] snapshot) {
134 state.putAll((HashMap) toObject(snapshot));
135 } catch (Exception e) {
136 LOG.error("Exception in applying snapshot", e);
138 if(LOG.isDebugEnabled()) {
139 LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
143 private ByteString fromObject(Object snapshot) throws Exception {
144 ByteArrayOutputStream b = null;
145 ObjectOutputStream o = null;
147 b = new ByteArrayOutputStream();
148 o = new ObjectOutputStream(b);
149 o.writeObject(snapshot);
150 byte[] snapshotBytes = b.toByteArray();
151 return ByteString.copyFrom(snapshotBytes);
163 private Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
165 ByteArrayInputStream bis = null;
166 ObjectInputStream ois = null;
168 bis = new ByteArrayInputStream(bs);
169 ois = new ObjectInputStream(bis);
170 obj = ois.readObject();
182 @Override protected void onStateChanged() {
186 @Override public void onReceiveRecover(Object message)throws Exception {
187 super.onReceiveRecover(message);
190 @Override public String persistenceId() {
195 protected void startLogRecoveryBatch(int maxBatchSize) {
199 protected void appendRecoveredLogEntry(Payload data) {
203 protected void applyCurrentLogRecoveryBatch() {
207 protected void onRecoveryComplete() {
211 protected void applyRecoverySnapshot(byte[] snapshot) {