Merge "BUG-1953: fix SAL compatility layer"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import com.google.common.base.Optional;
15 import com.google.protobuf.ByteString;
16 import org.opendaylight.controller.cluster.DataPersistenceProvider;
17 import org.opendaylight.controller.cluster.example.messages.KeyValue;
18 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
19 import org.opendaylight.controller.cluster.example.messages.PrintRole;
20 import org.opendaylight.controller.cluster.example.messages.PrintState;
21 import org.opendaylight.controller.cluster.raft.ConfigParams;
22 import org.opendaylight.controller.cluster.raft.RaftActor;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
24 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
25
26 import java.io.ByteArrayInputStream;
27 import java.io.ByteArrayOutputStream;
28 import java.io.IOException;
29 import java.io.ObjectInputStream;
30 import java.io.ObjectOutputStream;
31 import java.util.HashMap;
32 import java.util.Map;
33
34 /**
35  * A sample actor showing how the RaftActor is to be extended
36  */
37 public class ExampleActor extends RaftActor {
38
39     private final Map<String, String> state = new HashMap();
40     private final DataPersistenceProvider dataPersistenceProvider;
41
42     private long persistIdentifier = 1;
43
44
45     public ExampleActor(String id, Map<String, String> peerAddresses,
46         Optional<ConfigParams> configParams) {
47         super(id, peerAddresses, configParams);
48         this.dataPersistenceProvider = new PersistentDataProvider();
49     }
50
51     public static Props props(final String id, final Map<String, String> peerAddresses,
52         final Optional<ConfigParams> configParams){
53         return Props.create(new Creator<ExampleActor>(){
54
55             @Override public ExampleActor create() throws Exception {
56                 return new ExampleActor(id, peerAddresses, configParams);
57             }
58         });
59     }
60
61     @Override public void onReceiveCommand(Object message) throws Exception{
62         if(message instanceof KeyValue){
63             if(isLeader()) {
64                 String persistId = Long.toString(persistIdentifier++);
65                 persistData(getSender(), persistId, (Payload) message);
66             } else {
67                 if(getLeader() != null) {
68                     getLeader().forward(message, getContext());
69                 }
70             }
71
72         } else if (message instanceof PrintState) {
73             if(LOG.isDebugEnabled()) {
74                 LOG.debug("State of the node:{} has entries={}, {}",
75                     getId(), state.size(), getReplicatedLogState());
76             }
77
78         } else if (message instanceof PrintRole) {
79             if(LOG.isDebugEnabled()) {
80                 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
81             }
82
83         } else {
84             super.onReceiveCommand(message);
85         }
86     }
87
88     @Override protected void applyState(ActorRef clientActor, String identifier,
89         Object data) {
90         if(data instanceof KeyValue){
91             KeyValue kv = (KeyValue) data;
92             state.put(kv.getKey(), kv.getValue());
93             if(clientActor != null) {
94                 clientActor.tell(new KeyValueSaved(), getSelf());
95             }
96         }
97     }
98
99     @Override protected void createSnapshot() {
100         ByteString bs = null;
101         try {
102             bs = fromObject(state);
103         } catch (Exception e) {
104             LOG.error(e, "Exception in creating snapshot");
105         }
106         getSelf().tell(new CaptureSnapshotReply(bs), null);
107     }
108
109     @Override protected void applySnapshot(ByteString snapshot) {
110         state.clear();
111         try {
112             state.putAll((HashMap) toObject(snapshot));
113         } catch (Exception e) {
114            LOG.error(e, "Exception in applying snapshot");
115         }
116         if(LOG.isDebugEnabled()) {
117             LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
118         }
119     }
120
121     private ByteString fromObject(Object snapshot) throws Exception {
122         ByteArrayOutputStream b = null;
123         ObjectOutputStream o = null;
124         try {
125             b = new ByteArrayOutputStream();
126             o = new ObjectOutputStream(b);
127             o.writeObject(snapshot);
128             byte[] snapshotBytes = b.toByteArray();
129             return ByteString.copyFrom(snapshotBytes);
130         } finally {
131             if (o != null) {
132                 o.flush();
133                 o.close();
134             }
135             if (b != null) {
136                 b.close();
137             }
138         }
139     }
140
141     private Object toObject(ByteString bs) throws ClassNotFoundException, IOException {
142         Object obj = null;
143         ByteArrayInputStream bis = null;
144         ObjectInputStream ois = null;
145         try {
146             bis = new ByteArrayInputStream(bs.toByteArray());
147             ois = new ObjectInputStream(bis);
148             obj = ois.readObject();
149         } finally {
150             if (bis != null) {
151                 bis.close();
152             }
153             if (ois != null) {
154                 ois.close();
155             }
156         }
157         return obj;
158     }
159
160     @Override protected void onStateChanged() {
161
162     }
163
164     @Override
165     protected DataPersistenceProvider persistence() {
166         return dataPersistenceProvider;
167     }
168
169     @Override public void onReceiveRecover(Object message)throws Exception {
170         super.onReceiveRecover(message);
171     }
172
173     @Override public String persistenceId() {
174         return getId();
175     }
176
177     @Override
178     protected void startLogRecoveryBatch(int maxBatchSize) {
179     }
180
181     @Override
182     protected void appendRecoveredLogEntry(Payload data) {
183     }
184
185     @Override
186     protected void applyCurrentLogRecoveryBatch() {
187     }
188
189     @Override
190     protected void onRecoveryComplete() {
191     }
192
193     @Override
194     protected void applyRecoverySnapshot(ByteString snapshot) {
195     }
196 }