Reduce JSR305 proliferation
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.example;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import com.google.common.base.Optional;
14 import com.google.common.base.Throwables;
15 import com.google.common.io.ByteSource;
16 import java.io.IOException;
17 import java.io.OutputStream;
18 import java.io.Serializable;
19 import java.util.HashMap;
20 import java.util.Map;
21 import org.apache.commons.lang3.SerializationUtils;
22 import org.opendaylight.controller.cluster.example.messages.KeyValue;
23 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
24 import org.opendaylight.controller.cluster.example.messages.PrintRole;
25 import org.opendaylight.controller.cluster.example.messages.PrintState;
26 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
27 import org.opendaylight.controller.cluster.raft.ConfigParams;
28 import org.opendaylight.controller.cluster.raft.RaftActor;
29 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
30 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
31 import org.opendaylight.controller.cluster.raft.RaftState;
32 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
33 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
34 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
35 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
36 import org.opendaylight.yangtools.concepts.Identifier;
37 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
38
39 /**
40  * A sample actor showing how the RaftActor is to be extended.
41  */
42 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
43     private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
44         private static final long serialVersionUID = 1L;
45
46         PayloadIdentifier(final long identifier) {
47             super(String.valueOf(identifier));
48         }
49     }
50
51     private final Map<String, String> state = new HashMap<>();
52     private final Optional<ActorRef> roleChangeNotifier;
53
54     private long persistIdentifier = 1;
55
56     public ExampleActor(String id, Map<String, String> peerAddresses,
57         Optional<ConfigParams> configParams) {
58         super(id, peerAddresses, configParams, (short)0);
59         setPersistence(true);
60         roleChangeNotifier = createRoleChangeNotifier(id);
61     }
62
63     public static Props props(final String id, final Map<String, String> peerAddresses,
64             final Optional<ConfigParams> configParams) {
65         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
66     }
67
68     @Override
69     protected void handleNonRaftCommand(Object message) {
70         if (message instanceof KeyValue) {
71             if (isLeader()) {
72                 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
73             } else {
74                 if (getLeader() != null) {
75                     getLeader().forward(message, getContext());
76                 }
77             }
78
79         } else if (message instanceof PrintState) {
80             if (LOG.isDebugEnabled()) {
81                 LOG.debug("State of the node:{} has entries={}, {}",
82                     getId(), state.size(), getReplicatedLogState());
83             }
84
85         } else if (message instanceof PrintRole) {
86             if (LOG.isDebugEnabled()) {
87                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
88                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
89                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
90                         getRaftActorContext().getPeerIds(), followers);
91                 } else {
92                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
93                         getRaftActorContext().getPeerIds());
94                 }
95
96
97             }
98
99         } else {
100             super.handleNonRaftCommand(message);
101         }
102     }
103
104     protected String getReplicatedLogState() {
105         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
106             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
107             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
108     }
109
110     public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
111         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
112             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
113         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
114     }
115
116     @Override
117     protected Optional<ActorRef> getRoleChangeNotifier() {
118         return roleChangeNotifier;
119     }
120
121     @Override
122     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
123         if (data instanceof KeyValue) {
124             KeyValue kv = (KeyValue) data;
125             state.put(kv.getKey(), kv.getValue());
126             if (clientActor != null) {
127                 clientActor.tell(new KeyValueSaved(), getSelf());
128             }
129         }
130     }
131
132     @Override
133     @SuppressWarnings("checkstyle:IllegalCatch")
134     public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
135         try {
136             if (installSnapshotStream.isPresent()) {
137                 SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
138             }
139         } catch (RuntimeException e) {
140             LOG.error("Exception in creating snapshot", e);
141         }
142
143         getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
144     }
145
146     @Override
147     public void applySnapshot(Snapshot.State snapshotState) {
148         state.clear();
149         state.putAll(((MapState)snapshotState).state);
150
151         if (LOG.isDebugEnabled()) {
152             LOG.debug("Snapshot applied to state : {}", state.size());
153         }
154     }
155
156     @Override
157     protected void onStateChanged() {
158
159     }
160
161     @Override
162     public String persistenceId() {
163         return getId();
164     }
165
166     @Override
167     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
168         return this;
169     }
170
171     @Override
172     public void startLogRecoveryBatch(int maxBatchSize) {
173     }
174
175     @Override
176     public void appendRecoveredLogEntry(Payload data) {
177     }
178
179     @Override
180     public void applyCurrentLogRecoveryBatch() {
181     }
182
183     @Override
184     public void onRecoveryComplete() {
185     }
186
187     @Override
188     public void applyRecoverySnapshot(Snapshot.State snapshotState) {
189     }
190
191     @Override
192     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
193         return this;
194     }
195
196     @Override
197     public Snapshot getRestoreFromSnapshot() {
198         return null;
199     }
200
201     @SuppressWarnings("unchecked")
202     @Override
203     public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
204         try {
205             return new MapState((Map<String, String>) SerializationUtils.deserialize(snapshotBytes.read()));
206         } catch (IOException e) {
207             throw Throwables.propagate(e);
208         }
209     }
210
211     private static class MapState implements Snapshot.State {
212         private static final long serialVersionUID = 1L;
213
214         Map<String, String> state;
215
216         MapState(Map<String, String> state) {
217             this.state = state;
218         }
219     }
220 }