Fix HashMap references
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import com.google.common.base.Optional;
15 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.io.ObjectOutputStream;
21 import java.util.HashMap;
22 import java.util.Map;
23 import org.opendaylight.controller.cluster.DataPersistenceProvider;
24 import org.opendaylight.controller.cluster.example.messages.KeyValue;
25 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
26 import org.opendaylight.controller.cluster.example.messages.PrintRole;
27 import org.opendaylight.controller.cluster.example.messages.PrintState;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.RaftState;
31 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
32 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
33 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
34
35 /**
36  * A sample actor showing how the RaftActor is to be extended
37  */
38 public class ExampleActor extends RaftActor {
39
40     private final Map<String, String> state = new HashMap<>();
41     private final DataPersistenceProvider dataPersistenceProvider;
42
43     private long persistIdentifier = 1;
44
45
46     public ExampleActor(final String id, final Map<String, String> peerAddresses,
47         final Optional<ConfigParams> configParams) {
48         super(id, peerAddresses, configParams);
49         this.dataPersistenceProvider = new PersistentDataProvider();
50     }
51
52     public static Props props(final String id, final Map<String, String> peerAddresses,
53         final Optional<ConfigParams> configParams){
54         return Props.create(new Creator<ExampleActor>(){
55
56             @Override public ExampleActor create() throws Exception {
57                 return new ExampleActor(id, peerAddresses, configParams);
58             }
59         });
60     }
61
62     @Override public void onReceiveCommand(final Object message) throws Exception{
63         if(message instanceof KeyValue){
64             if(isLeader()) {
65                 String persistId = Long.toString(persistIdentifier++);
66                 persistData(getSender(), persistId, (Payload) message);
67             } else {
68                 if(getLeader() != null) {
69                     getLeader().forward(message, getContext());
70                 }
71             }
72
73         } else if (message instanceof PrintState) {
74             if(LOG.isDebugEnabled()) {
75                 LOG.debug("State of the node:{} has entries={}, {}",
76                     getId(), state.size(), getReplicatedLogState());
77             }
78
79         } else if (message instanceof PrintRole) {
80             if(LOG.isDebugEnabled()) {
81                 String followers = "";
82                 if (getRaftState() == RaftState.Leader) {
83                     followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
84                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getPeers(), followers);
85                 } else {
86                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
87                 }
88
89
90             }
91
92         } else {
93             super.onReceiveCommand(message);
94         }
95     }
96
97     @Override protected void applyState(final ActorRef clientActor, final String identifier,
98         final Object data) {
99         if(data instanceof KeyValue){
100             KeyValue kv = (KeyValue) data;
101             state.put(kv.getKey(), kv.getValue());
102             if(clientActor != null) {
103                 clientActor.tell(new KeyValueSaved(), getSelf());
104             }
105         }
106     }
107
108     @Override protected void createSnapshot() {
109         ByteString bs = null;
110         try {
111             bs = fromObject(state);
112         } catch (Exception e) {
113             LOG.error(e, "Exception in creating snapshot");
114         }
115         getSelf().tell(new CaptureSnapshotReply(bs), null);
116     }
117
118     @Override protected void applySnapshot(final ByteString snapshot) {
119         state.clear();
120         try {
121             state.putAll((Map<String, String>) toObject(snapshot));
122         } catch (Exception e) {
123            LOG.error(e, "Exception in applying snapshot");
124         }
125         if(LOG.isDebugEnabled()) {
126             LOG.debug("Snapshot applied to state : {}", ((Map<?, ?>) state).size());
127         }
128     }
129
130     private ByteString fromObject(final Object snapshot) throws Exception {
131         ByteArrayOutputStream b = null;
132         ObjectOutputStream o = null;
133         try {
134             b = new ByteArrayOutputStream();
135             o = new ObjectOutputStream(b);
136             o.writeObject(snapshot);
137             byte[] snapshotBytes = b.toByteArray();
138             return ByteString.copyFrom(snapshotBytes);
139         } finally {
140             if (o != null) {
141                 o.flush();
142                 o.close();
143             }
144             if (b != null) {
145                 b.close();
146             }
147         }
148     }
149
150     private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
151         Object obj = null;
152         ByteArrayInputStream bis = null;
153         ObjectInputStream ois = null;
154         try {
155             bis = new ByteArrayInputStream(bs.toByteArray());
156             ois = new ObjectInputStream(bis);
157             obj = ois.readObject();
158         } finally {
159             if (bis != null) {
160                 bis.close();
161             }
162             if (ois != null) {
163                 ois.close();
164             }
165         }
166         return obj;
167     }
168
169     @Override protected void onStateChanged() {
170
171     }
172
173     @Override
174     protected DataPersistenceProvider persistence() {
175         return dataPersistenceProvider;
176     }
177
178     @Override public void onReceiveRecover(final Object message)throws Exception {
179         super.onReceiveRecover(message);
180     }
181
182     @Override public String persistenceId() {
183         return getId();
184     }
185
186     @Override
187     protected void startLogRecoveryBatch(final int maxBatchSize) {
188     }
189
190     @Override
191     protected void appendRecoveredLogEntry(final Payload data) {
192     }
193
194     @Override
195     protected void applyCurrentLogRecoveryBatch() {
196     }
197
198     @Override
199     protected void onRecoveryComplete() {
200     }
201
202     @Override
203     protected void applyRecoverySnapshot(final ByteString snapshot) {
204     }
205 }