2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.example;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import com.google.common.base.Optional;
15 import com.google.protobuf.ByteString;
16 import org.opendaylight.controller.cluster.example.messages.KeyValue;
17 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
18 import org.opendaylight.controller.cluster.example.messages.PrintRole;
19 import org.opendaylight.controller.cluster.example.messages.PrintState;
20 import org.opendaylight.controller.cluster.raft.ConfigParams;
21 import org.opendaylight.controller.cluster.raft.RaftActor;
22 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
23 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
25 import java.io.ByteArrayInputStream;
26 import java.io.ByteArrayOutputStream;
27 import java.io.IOException;
28 import java.io.ObjectInputStream;
29 import java.io.ObjectOutputStream;
30 import java.util.HashMap;
34 * A sample actor showing how the RaftActor is to be extended
36 public class ExampleActor extends RaftActor {
38 private final Map<String, String> state = new HashMap();
40 private long persistIdentifier = 1;
43 public ExampleActor(String id, Map<String, String> peerAddresses,
44 Optional<ConfigParams> configParams) {
45 super(id, peerAddresses, configParams);
48 public static Props props(final String id, final Map<String, String> peerAddresses,
49 final Optional<ConfigParams> configParams){
50 return Props.create(new Creator<ExampleActor>(){
52 @Override public ExampleActor create() throws Exception {
53 return new ExampleActor(id, peerAddresses, configParams);
58 @Override public void onReceiveCommand(Object message){
59 if(message instanceof KeyValue){
61 String persistId = Long.toString(persistIdentifier++);
62 persistData(getSender(), persistId, (Payload) message);
64 if(getLeader() != null) {
65 getLeader().forward(message, getContext());
69 } else if (message instanceof PrintState) {
70 if(LOG.isDebugEnabled()) {
71 LOG.debug("State of the node:{} has entries={}, {}",
72 getId(), state.size(), getReplicatedLogState());
75 } else if (message instanceof PrintRole) {
76 if(LOG.isDebugEnabled()) {
77 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
81 super.onReceiveCommand(message);
85 @Override protected void applyState(ActorRef clientActor, String identifier,
87 if(data instanceof KeyValue){
88 KeyValue kv = (KeyValue) data;
89 state.put(kv.getKey(), kv.getValue());
90 if(clientActor != null) {
91 clientActor.tell(new KeyValueSaved(), getSelf());
96 @Override protected void createSnapshot() {
99 bs = fromObject(state);
100 } catch (Exception e) {
101 LOG.error("Exception in creating snapshot", e);
103 getSelf().tell(new CaptureSnapshotReply(bs), null);
106 @Override protected void applySnapshot(ByteString snapshot) {
109 state.putAll((HashMap) toObject(snapshot));
110 } catch (Exception e) {
111 LOG.error("Exception in applying snapshot", e);
113 if(LOG.isDebugEnabled()) {
114 LOG.debug("Snapshot applied to state :" + ((HashMap) state).size());
118 private ByteString fromObject(Object snapshot) throws Exception {
119 ByteArrayOutputStream b = null;
120 ObjectOutputStream o = null;
122 b = new ByteArrayOutputStream();
123 o = new ObjectOutputStream(b);
124 o.writeObject(snapshot);
125 byte[] snapshotBytes = b.toByteArray();
126 return ByteString.copyFrom(snapshotBytes);
138 private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
140 ByteArrayInputStream bis = null;
141 ObjectInputStream ois = null;
143 bis = new ByteArrayInputStream(bs.toByteArray());
144 ois = new ObjectInputStream(bis);
145 obj = ois.readObject();
157 @Override protected void onStateChanged() {
161 @Override public void onReceiveRecover(Object message) {
162 super.onReceiveRecover(message);
165 @Override public String persistenceId() {