0a099885e4df083d5f2dff95faf5882de0b7a23c
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.util.HashMap;
21 import java.util.Map;
22 import javax.annotation.Nonnull;
23 import org.opendaylight.controller.cluster.example.messages.KeyValue;
24 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
25 import org.opendaylight.controller.cluster.example.messages.PrintRole;
26 import org.opendaylight.controller.cluster.example.messages.PrintState;
27 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
31 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
32 import org.opendaylight.controller.cluster.raft.RaftState;
33 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
34 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
35 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
36
37 /**
38  * A sample actor showing how the RaftActor is to be extended
39  */
40 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
41
42     private final Map<String, String> state = new HashMap<>();
43
44     private long persistIdentifier = 1;
45     private final Optional<ActorRef> roleChangeNotifier;
46
47
48     public ExampleActor(String id, Map<String, String> peerAddresses,
49         Optional<ConfigParams> configParams) {
50         super(id, peerAddresses, configParams, (short)0);
51         setPersistence(true);
52         roleChangeNotifier = createRoleChangeNotifier(id);
53     }
54
55     public static Props props(final String id, final Map<String, String> peerAddresses,
56             final Optional<ConfigParams> configParams) {
57         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
58     }
59
60     @Override
61     protected void handleCommand(Object message) {
62         if(message instanceof KeyValue){
63             if(isLeader()) {
64                 String persistId = Long.toString(persistIdentifier++);
65                 persistData(getSender(), persistId, (Payload) message);
66             } else {
67                 if(getLeader() != null) {
68                     getLeader().forward(message, getContext());
69                 }
70             }
71
72         } else if (message instanceof PrintState) {
73             if(LOG.isDebugEnabled()) {
74                 LOG.debug("State of the node:{} has entries={}, {}",
75                     getId(), state.size(), getReplicatedLogState());
76             }
77
78         } else if (message instanceof PrintRole) {
79             if(LOG.isDebugEnabled()) {
80                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
81                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
82                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
83                         getRaftActorContext().getPeerIds(), followers);
84                 } else {
85                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
86                         getRaftActorContext().getPeerIds());
87                 }
88
89
90             }
91
92         } else {
93             super.handleCommand(message);
94         }
95     }
96
97     protected String getReplicatedLogState() {
98         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
99             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
100             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
101     }
102
103     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
104         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
105             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
106         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
107     }
108
109     @Override
110     protected Optional<ActorRef> getRoleChangeNotifier() {
111         return roleChangeNotifier;
112     }
113
114     @Override protected void applyState(final ActorRef clientActor, final String identifier,
115         final Object data) {
116         if(data instanceof KeyValue){
117             KeyValue kv = (KeyValue) data;
118             state.put(kv.getKey(), kv.getValue());
119             if(clientActor != null) {
120                 clientActor.tell(new KeyValueSaved(), getSelf());
121             }
122         }
123     }
124
125     @Override
126     public void createSnapshot(ActorRef actorRef) {
127         ByteString bs = null;
128         try {
129             bs = fromObject(state);
130         } catch (Exception e) {
131             LOG.error("Exception in creating snapshot", e);
132         }
133         getSelf().tell(new CaptureSnapshotReply(bs.toByteArray()), null);
134     }
135
136     @Override
137     public void applySnapshot(byte [] snapshot) {
138         state.clear();
139         try {
140             state.putAll((HashMap<String, String>) toObject(snapshot));
141         } catch (Exception e) {
142            LOG.error("Exception in applying snapshot", e);
143         }
144         if(LOG.isDebugEnabled()) {
145             LOG.debug("Snapshot applied to state : {}", ((HashMap<?, ?>) state).size());
146         }
147     }
148
149     private static ByteString fromObject(Object snapshot) throws Exception {
150         ByteArrayOutputStream b = null;
151         ObjectOutputStream o = null;
152         try {
153             b = new ByteArrayOutputStream();
154             o = new ObjectOutputStream(b);
155             o.writeObject(snapshot);
156             byte[] snapshotBytes = b.toByteArray();
157             return ByteString.copyFrom(snapshotBytes);
158         } finally {
159             if (o != null) {
160                 o.flush();
161                 o.close();
162             }
163             if (b != null) {
164                 b.close();
165             }
166         }
167     }
168
169     private static Object toObject(byte [] bs) throws ClassNotFoundException, IOException {
170         Object obj = null;
171         ByteArrayInputStream bis = null;
172         ObjectInputStream ois = null;
173         try {
174             bis = new ByteArrayInputStream(bs);
175             ois = new ObjectInputStream(bis);
176             obj = ois.readObject();
177         } finally {
178             if (bis != null) {
179                 bis.close();
180             }
181             if (ois != null) {
182                 ois.close();
183             }
184         }
185         return obj;
186     }
187
188     @Override protected void onStateChanged() {
189
190     }
191
192     @Override public String persistenceId() {
193         return getId();
194     }
195
196     @Override
197     @Nonnull
198     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
199         return this;
200     }
201
202     @Override
203     public void startLogRecoveryBatch(int maxBatchSize) {
204     }
205
206     @Override
207     public void appendRecoveredLogEntry(Payload data) {
208     }
209
210     @Override
211     public void applyCurrentLogRecoveryBatch() {
212     }
213
214     @Override
215     public void onRecoveryComplete() {
216     }
217
218     @Override
219     public void applyRecoverySnapshot(byte[] snapshot) {
220     }
221
222     @Override
223     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
224         return this;
225     }
226
227     @Override
228     public byte[] getRestoreFromSnapshot() {
229         return null;
230     }
231 }