Merge "Fix checkstyle warnings in opendaylight/config"
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.japi.Creator;
14 import com.google.common.base.Optional;
15 import com.google.protobuf.ByteString;
16 import java.io.ByteArrayInputStream;
17 import java.io.ByteArrayOutputStream;
18 import java.io.IOException;
19 import java.io.ObjectInputStream;
20 import java.io.ObjectOutputStream;
21 import java.util.HashMap;
22 import java.util.Map;
23 import org.opendaylight.controller.cluster.DataPersistenceProvider;
24 import org.opendaylight.controller.cluster.example.messages.KeyValue;
25 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
26 import org.opendaylight.controller.cluster.example.messages.PrintRole;
27 import org.opendaylight.controller.cluster.example.messages.PrintState;
28 import org.opendaylight.controller.cluster.raft.ConfigParams;
29 import org.opendaylight.controller.cluster.raft.RaftActor;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
32
33 /**
34  * A sample actor showing how the RaftActor is to be extended
35  */
36 public class ExampleActor extends RaftActor {
37
38     private final Map<String, String> state = new HashMap<>();
39     private final DataPersistenceProvider dataPersistenceProvider;
40
41     private long persistIdentifier = 1;
42
43
44     public ExampleActor(final String id, final Map<String, String> peerAddresses,
45         final Optional<ConfigParams> configParams) {
46         super(id, peerAddresses, configParams);
47         this.dataPersistenceProvider = new PersistentDataProvider();
48     }
49
50     public static Props props(final String id, final Map<String, String> peerAddresses,
51         final Optional<ConfigParams> configParams){
52         return Props.create(new Creator<ExampleActor>(){
53
54             @Override public ExampleActor create() throws Exception {
55                 return new ExampleActor(id, peerAddresses, configParams);
56             }
57         });
58     }
59
60     @Override public void onReceiveCommand(final Object message) throws Exception{
61         if(message instanceof KeyValue){
62             if(isLeader()) {
63                 String persistId = Long.toString(persistIdentifier++);
64                 persistData(getSender(), persistId, (Payload) message);
65             } else {
66                 if(getLeader() != null) {
67                     getLeader().forward(message, getContext());
68                 }
69             }
70
71         } else if (message instanceof PrintState) {
72             if(LOG.isDebugEnabled()) {
73                 LOG.debug("State of the node:{} has entries={}, {}",
74                     getId(), state.size(), getReplicatedLogState());
75             }
76
77         } else if (message instanceof PrintRole) {
78             if(LOG.isDebugEnabled()) {
79                 LOG.debug("{} = {}, Peers={}", getId(), getRaftState(), getPeers());
80             }
81
82         } else {
83             super.onReceiveCommand(message);
84         }
85     }
86
87     @Override protected void applyState(final ActorRef clientActor, final String identifier,
88         final Object data) {
89         if(data instanceof KeyValue){
90             KeyValue kv = (KeyValue) data;
91             state.put(kv.getKey(), kv.getValue());
92             if(clientActor != null) {
93                 clientActor.tell(new KeyValueSaved(), getSelf());
94             }
95         }
96     }
97
98     @Override protected void createSnapshot() {
99         ByteString bs = null;
100         try {
101             bs = fromObject(state);
102         } catch (Exception e) {
103             LOG.error(e, "Exception in creating snapshot");
104         }
105         getSelf().tell(new CaptureSnapshotReply(bs), null);
106     }
107
108     @Override protected void applySnapshot(final ByteString snapshot) {
109         state.clear();
110         try {
111             state.putAll((HashMap) toObject(snapshot));
112         } catch (Exception e) {
113            LOG.error(e, "Exception in applying snapshot");
114         }
115         if(LOG.isDebugEnabled()) {
116             LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size());
117         }
118     }
119
120     private ByteString fromObject(final Object snapshot) throws Exception {
121         ByteArrayOutputStream b = null;
122         ObjectOutputStream o = null;
123         try {
124             b = new ByteArrayOutputStream();
125             o = new ObjectOutputStream(b);
126             o.writeObject(snapshot);
127             byte[] snapshotBytes = b.toByteArray();
128             return ByteString.copyFrom(snapshotBytes);
129         } finally {
130             if (o != null) {
131                 o.flush();
132                 o.close();
133             }
134             if (b != null) {
135                 b.close();
136             }
137         }
138     }
139
140     private Object toObject(final ByteString bs) throws ClassNotFoundException, IOException {
141         Object obj = null;
142         ByteArrayInputStream bis = null;
143         ObjectInputStream ois = null;
144         try {
145             bis = new ByteArrayInputStream(bs.toByteArray());
146             ois = new ObjectInputStream(bis);
147             obj = ois.readObject();
148         } finally {
149             if (bis != null) {
150                 bis.close();
151             }
152             if (ois != null) {
153                 ois.close();
154             }
155         }
156         return obj;
157     }
158
159     @Override protected void onStateChanged() {
160
161     }
162
163     @Override
164     protected DataPersistenceProvider persistence() {
165         return dataPersistenceProvider;
166     }
167
168     @Override public void onReceiveRecover(final Object message)throws Exception {
169         super.onReceiveRecover(message);
170     }
171
172     @Override public String persistenceId() {
173         return getId();
174     }
175
176     @Override
177     protected void startLogRecoveryBatch(final int maxBatchSize) {
178     }
179
180     @Override
181     protected void appendRecoveredLogEntry(final Payload data) {
182     }
183
184     @Override
185     protected void applyCurrentLogRecoveryBatch() {
186     }
187
188     @Override
189     protected void onRecoveryComplete() {
190     }
191
192     @Override
193     protected void applyRecoverySnapshot(final ByteString snapshot) {
194     }
195 }