Migrate away from Throwables.propagate()
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.example;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import com.google.common.io.ByteSource;
13 import java.io.IOException;
14 import java.io.OutputStream;
15 import java.io.Serializable;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import org.apache.commons.lang3.SerializationUtils;
20 import org.opendaylight.controller.cluster.example.messages.KeyValue;
21 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
22 import org.opendaylight.controller.cluster.example.messages.PrintRole;
23 import org.opendaylight.controller.cluster.example.messages.PrintState;
24 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
25 import org.opendaylight.controller.cluster.raft.ConfigParams;
26 import org.opendaylight.controller.cluster.raft.RaftActor;
27 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
28 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
29 import org.opendaylight.controller.cluster.raft.RaftState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
32 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
33 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
34 import org.opendaylight.yangtools.concepts.Identifier;
35 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
36
37 /**
38  * A sample actor showing how the RaftActor is to be extended.
39  */
40 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
41     private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
42         private static final long serialVersionUID = 1L;
43
44         PayloadIdentifier(final long identifier) {
45             super(String.valueOf(identifier));
46         }
47     }
48
49     private final Map<String, String> state = new HashMap<>();
50     private final Optional<ActorRef> roleChangeNotifier;
51
52     private long persistIdentifier = 1;
53
54     public ExampleActor(final String id, final Map<String, String> peerAddresses,
55         final Optional<ConfigParams> configParams) {
56         super(id, peerAddresses, configParams, (short)0);
57         setPersistence(true);
58         roleChangeNotifier = createRoleChangeNotifier(id);
59     }
60
61     public static Props props(final String id, final Map<String, String> peerAddresses,
62             final Optional<ConfigParams> configParams) {
63         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
64     }
65
66     @Override
67     protected void handleNonRaftCommand(final Object message) {
68         if (message instanceof KeyValue) {
69             if (isLeader()) {
70                 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
71             } else {
72                 if (getLeader() != null) {
73                     getLeader().forward(message, getContext());
74                 }
75             }
76
77         } else if (message instanceof PrintState) {
78             if (LOG.isDebugEnabled()) {
79                 LOG.debug("State of the node:{} has entries={}, {}",
80                     getId(), state.size(), getReplicatedLogState());
81             }
82
83         } else if (message instanceof PrintRole) {
84             if (LOG.isDebugEnabled()) {
85                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
86                     final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates();
87                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
88                         getRaftActorContext().getPeerIds(), followers);
89                 } else {
90                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
91                         getRaftActorContext().getPeerIds());
92                 }
93
94
95             }
96
97         } else {
98             super.handleNonRaftCommand(message);
99         }
100     }
101
102     protected String getReplicatedLogState() {
103         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
104             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
105             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
106     }
107
108     public Optional<ActorRef> createRoleChangeNotifier(final String actorId) {
109         ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
110             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
111         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
112     }
113
114     @Override
115     protected Optional<ActorRef> getRoleChangeNotifier() {
116         return roleChangeNotifier;
117     }
118
119     @Override
120     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
121         if (data instanceof KeyValue) {
122             KeyValue kv = (KeyValue) data;
123             state.put(kv.getKey(), kv.getValue());
124             if (clientActor != null) {
125                 clientActor.tell(new KeyValueSaved(), getSelf());
126             }
127         }
128     }
129
130     @Override
131     @SuppressWarnings("checkstyle:IllegalCatch")
132     public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
133         try {
134             if (installSnapshotStream.isPresent()) {
135                 SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
136             }
137         } catch (RuntimeException e) {
138             LOG.error("Exception in creating snapshot", e);
139         }
140
141         getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
142     }
143
144     @Override
145     public void applySnapshot(final Snapshot.State snapshotState) {
146         state.clear();
147         state.putAll(((MapState)snapshotState).state);
148
149         if (LOG.isDebugEnabled()) {
150             LOG.debug("Snapshot applied to state : {}", state.size());
151         }
152     }
153
154     @Override
155     protected void onStateChanged() {
156
157     }
158
159     @Override
160     public String persistenceId() {
161         return getId();
162     }
163
164     @Override
165     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
166         return this;
167     }
168
169     @Override
170     public void startLogRecoveryBatch(final int maxBatchSize) {
171     }
172
173     @Override
174     public void appendRecoveredLogEntry(final Payload data) {
175     }
176
177     @Override
178     public void applyCurrentLogRecoveryBatch() {
179     }
180
181     @Override
182     public void onRecoveryComplete() {
183     }
184
185     @Override
186     public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
187     }
188
189     @Override
190     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
191         return this;
192     }
193
194     @Override
195     public Snapshot getRestoreFromSnapshot() {
196         return null;
197     }
198
199     @SuppressWarnings("unchecked")
200     @Override
201     public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
202         try {
203             return new MapState((Map<String, String>) SerializationUtils.deserialize(snapshotBytes.read()));
204         } catch (IOException e) {
205             throw new RuntimeException(e);
206         }
207     }
208
209     private static class MapState implements Snapshot.State {
210         private static final long serialVersionUID = 1L;
211
212         Map<String, String> state;
213
214         MapState(final Map<String, String> state) {
215             this.state = state;
216         }
217     }
218 }