Fix Akka exception: cannot use non-static local Creator to create actors
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.protobuf.ByteString;
15 import java.io.ByteArrayInputStream;
16 import java.io.ByteArrayOutputStream;
17 import java.io.IOException;
18 import java.io.ObjectInputStream;
19 import java.io.ObjectOutputStream;
20 import java.util.HashMap;
21 import java.util.Map;
22 import org.opendaylight.controller.cluster.DataPersistenceProvider;
23 import org.opendaylight.controller.cluster.example.messages.KeyValue;
24 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
25 import org.opendaylight.controller.cluster.example.messages.PrintRole;
26 import org.opendaylight.controller.cluster.example.messages.PrintState;
27 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftState;
31 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
32 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
33 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
34
35 /**
36  * A sample actor showing how the RaftActor is to be extended
37  */
38 public class ExampleActor extends RaftActor {
39
40     private final Map<String, String> state = new HashMap();
41     private final DataPersistenceProvider dataPersistenceProvider;
42
43     private long persistIdentifier = 1;
44     private Optional<ActorRef> roleChangeNotifier;
45
46
47     public ExampleActor(String id, Map<String, String> peerAddresses,
48         Optional<ConfigParams> configParams) {
49         super(id, peerAddresses, configParams);
50         this.dataPersistenceProvider = new PersistentDataProvider();
51         roleChangeNotifier = createRoleChangeNotifier(id);
52     }
53
54     public static Props props(final String id, final Map<String, String> peerAddresses,
55             final Optional<ConfigParams> configParams) {
56         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
57     }
58
59     @Override public void onReceiveCommand(Object message) throws Exception{
60         if(message instanceof KeyValue){
61             if(isLeader()) {
62                 String persistId = Long.toString(persistIdentifier++);
63                 persistData(getSender(), persistId, (Payload) message);
64             } else {
65                 if(getLeader() != null) {
66                     getLeader().forward(message, getContext());
67                 }
68             }
69
70         } else if (message instanceof PrintState) {
71             if(LOG.isDebugEnabled()) {
72                 LOG.debug("State of the node:{} has entries={}, {}",
73                     getId(), state.size(), getReplicatedLogState());
74             }
75
76         } else if (message instanceof PrintRole) {
77             if(LOG.isDebugEnabled()) {
78                 String followers = "";
79                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
80                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
81                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
82                         getRaftActorContext().getPeerAddresses().keySet(), followers);
83                 } else {
84                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
85                         getRaftActorContext().getPeerAddresses().keySet());
86                 }
87
88
89             }
90
91         } else {
92             super.onReceiveCommand(message);
93         }
94     }
95
96     protected String getReplicatedLogState() {
97         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
98             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
99             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
100     }
101
102     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
103         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
104             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
105         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
106     }
107
108     @Override
109     protected Optional<ActorRef> getRoleChangeNotifier() {
110         return roleChangeNotifier;
111     }
112
113     @Override protected void applyState(final ActorRef clientActor, final String identifier,
114         final Object data) {
115         if(data instanceof KeyValue){
116             KeyValue kv = (KeyValue) data;
117             state.put(kv.getKey(), kv.getValue());
118             if(clientActor != null) {
119                 clientActor.tell(new KeyValueSaved(), getSelf());
120             }
121         }
122     }
123
124     @Override protected void createSnapshot() {
125         ByteString bs = null;
126         try {
127             bs = fromObject(state);
128         } catch (Exception e) {
129             LOG.error(e, "Exception in creating snapshot");
130         }
131         getSelf().tell(new CaptureSnapshotReply(bs), null);
132     }
133
134     @Override protected void applySnapshot(ByteString snapshot) {
135         state.clear();
136         try {
137             state.putAll((HashMap) toObject(snapshot));
138         } catch (Exception e) {
139            LOG.error(e, "Exception in applying snapshot");
140         }
141         if(LOG.isDebugEnabled()) {
142             LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
143         }
144     }
145
146     private ByteString fromObject(Object snapshot) throws Exception {
147         ByteArrayOutputStream b = null;
148         ObjectOutputStream o = null;
149         try {
150             b = new ByteArrayOutputStream();
151             o = new ObjectOutputStream(b);
152             o.writeObject(snapshot);
153             byte[] snapshotBytes = b.toByteArray();
154             return ByteString.copyFrom(snapshotBytes);
155         } finally {
156             if (o != null) {
157                 o.flush();
158                 o.close();
159             }
160             if (b != null) {
161                 b.close();
162             }
163         }
164     }
165
166     private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
167         Object obj = null;
168         ByteArrayInputStream bis = null;
169         ObjectInputStream ois = null;
170         try {
171             bis = new ByteArrayInputStream(bs.toByteArray());
172             ois = new ObjectInputStream(bis);
173             obj = ois.readObject();
174         } finally {
175             if (bis != null) {
176                 bis.close();
177             }
178             if (ois != null) {
179                 ois.close();
180             }
181         }
182         return obj;
183     }
184
185     @Override protected void onStateChanged() {
186
187     }
188
189     @Override
190     protected DataPersistenceProvider persistence() {
191         return dataPersistenceProvider;
192     }
193
194     @Override public void onReceiveRecover(Object message)throws Exception {
195         super.onReceiveRecover(message);
196     }
197
198     @Override public String persistenceId() {
199         return getId();
200     }
201
202     @Override
203     protected void startLogRecoveryBatch(int maxBatchSize) {
204     }
205
206     @Override
207     protected void appendRecoveredLogEntry(Payload data) {
208     }
209
210     @Override
211     protected void applyCurrentLogRecoveryBatch() {
212     }
213
214     @Override
215     protected void onRecoveryComplete() {
216     }
217
218     @Override
219     protected void applyRecoverySnapshot(ByteString snapshot) {
220     }
221 }