Bug-2397:Provide a mechanism for stakeholders to get notifications on Raft state...
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import com.google.common.base.Optional;
15 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.io.ObjectOutputStream;
21 import java.util.HashMap;
22 import java.util.Map;
23 import org.opendaylight.controller.cluster.DataPersistenceProvider;
24 import org.opendaylight.controller.cluster.example.messages.KeyValue;
25 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
26 import org.opendaylight.controller.cluster.example.messages.PrintRole;
27 import org.opendaylight.controller.cluster.example.messages.PrintState;
28 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
29 import org.opendaylight.controller.cluster.raft.ConfigParams;
30 import org.opendaylight.controller.cluster.raft.RaftActor;
31 import org.opendaylight.controller.cluster.raft.RaftState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
33 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
34 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
35
36 /**
37  * A sample actor showing how the RaftActor is to be extended
38  */
39 public class ExampleActor extends RaftActor {
40
41     private final Map<String, String> state = new HashMap();
42     private final DataPersistenceProvider dataPersistenceProvider;
43
44     private long persistIdentifier = 1;
45     private Optional<ActorRef> roleChangeNotifier;
46
47
48     public ExampleActor(String id, Map<String, String> peerAddresses,
49         Optional<ConfigParams> configParams) {
50         super(id, peerAddresses, configParams);
51         this.dataPersistenceProvider = new PersistentDataProvider();
52         roleChangeNotifier = createRoleChangeNotifier(id);
53     }
54
55     public static Props props(final String id, final Map<String, String> peerAddresses,
56         final Optional<ConfigParams> configParams){
57         return Props.create(new Creator<ExampleActor>(){
58
59             @Override public ExampleActor create() throws Exception {
60                 return new ExampleActor(id, peerAddresses, configParams);
61             }
62         });
63     }
64
65     @Override public void onReceiveCommand(Object message) throws Exception{
66         if(message instanceof KeyValue){
67             if(isLeader()) {
68                 String persistId = Long.toString(persistIdentifier++);
69                 persistData(getSender(), persistId, (Payload) message);
70             } else {
71                 if(getLeader() != null) {
72                     getLeader().forward(message, getContext());
73                 }
74             }
75
76         } else if (message instanceof PrintState) {
77             if(LOG.isDebugEnabled()) {
78                 LOG.debug("State of the node:{} has entries={}, {}",
79                     getId(), state.size(), getReplicatedLogState());
80             }
81
82         } else if (message instanceof PrintRole) {
83             if(LOG.isDebugEnabled()) {
84                 String followers = "";
85                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
86                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
87                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
88                         getRaftActorContext().getPeerAddresses().keySet(), followers);
89                 } else {
90                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
91                         getRaftActorContext().getPeerAddresses().keySet());
92                 }
93
94
95             }
96
97         } else {
98             super.onReceiveCommand(message);
99         }
100     }
101
102     protected String getReplicatedLogState() {
103         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
104             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
105             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
106     }
107
108     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
109         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
110             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
111         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
112     }
113
114     @Override
115     protected Optional<ActorRef> getRoleChangeNotifier() {
116         return roleChangeNotifier;
117     }
118
119     @Override protected void applyState(final ActorRef clientActor, final String identifier,
120         final Object data) {
121         if(data instanceof KeyValue){
122             KeyValue kv = (KeyValue) data;
123             state.put(kv.getKey(), kv.getValue());
124             if(clientActor != null) {
125                 clientActor.tell(new KeyValueSaved(), getSelf());
126             }
127         }
128     }
129
130     @Override protected void createSnapshot() {
131         ByteString bs = null;
132         try {
133             bs = fromObject(state);
134         } catch (Exception e) {
135             LOG.error(e, "Exception in creating snapshot");
136         }
137         getSelf().tell(new CaptureSnapshotReply(bs), null);
138     }
139
140     @Override protected void applySnapshot(ByteString snapshot) {
141         state.clear();
142         try {
143             state.putAll((HashMap) toObject(snapshot));
144         } catch (Exception e) {
145            LOG.error(e, "Exception in applying snapshot");
146         }
147         if(LOG.isDebugEnabled()) {
148             LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
149         }
150     }
151
152     private ByteString fromObject(Object snapshot) throws Exception {
153         ByteArrayOutputStream b = null;
154         ObjectOutputStream o = null;
155         try {
156             b = new ByteArrayOutputStream();
157             o = new ObjectOutputStream(b);
158             o.writeObject(snapshot);
159             byte[] snapshotBytes = b.toByteArray();
160             return ByteString.copyFrom(snapshotBytes);
161         } finally {
162             if (o != null) {
163                 o.flush();
164                 o.close();
165             }
166             if (b != null) {
167                 b.close();
168             }
169         }
170     }
171
172     private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
173         Object obj = null;
174         ByteArrayInputStream bis = null;
175         ObjectInputStream ois = null;
176         try {
177             bis = new ByteArrayInputStream(bs.toByteArray());
178             ois = new ObjectInputStream(bis);
179             obj = ois.readObject();
180         } finally {
181             if (bis != null) {
182                 bis.close();
183             }
184             if (ois != null) {
185                 ois.close();
186             }
187         }
188         return obj;
189     }
190
191     @Override protected void onStateChanged() {
192
193     }
194
195     @Override
196     protected DataPersistenceProvider persistence() {
197         return dataPersistenceProvider;
198     }
199
200     @Override public void onReceiveRecover(Object message)throws Exception {
201         super.onReceiveRecover(message);
202     }
203
204     @Override public String persistenceId() {
205         return getId();
206     }
207
208     @Override
209     protected void startLogRecoveryBatch(int maxBatchSize) {
210     }
211
212     @Override
213     protected void appendRecoveredLogEntry(Payload data) {
214     }
215
216     @Override
217     protected void applyCurrentLogRecoveryBatch() {
218     }
219
220     @Override
221     protected void onRecoveryComplete() {
222     }
223
224     @Override
225     protected void applyRecoverySnapshot(ByteString snapshot) {
226     }
227 }