atomic-storage: remove type dependency at segment level I/O
[controller.git] / opendaylight / md-sal / sal-akka-raft-example / src / main / java / org / opendaylight / controller / cluster / example / ExampleActor.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.example;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import com.google.common.io.ByteSource;
13 import java.io.IOException;
14 import java.io.OutputStream;
15 import java.io.Serializable;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Optional;
19 import org.apache.commons.lang3.SerializationUtils;
20 import org.opendaylight.controller.cluster.example.messages.KeyValue;
21 import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
22 import org.opendaylight.controller.cluster.example.messages.PrintRole;
23 import org.opendaylight.controller.cluster.example.messages.PrintState;
24 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
25 import org.opendaylight.controller.cluster.raft.ConfigParams;
26 import org.opendaylight.controller.cluster.raft.RaftActor;
27 import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
28 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
29 import org.opendaylight.controller.cluster.raft.RaftState;
30 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
31 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
32 import org.opendaylight.controller.cluster.raft.messages.Payload;
33 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
34 import org.opendaylight.yangtools.concepts.Identifier;
35 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
36
37 /**
38  * A sample actor showing how the RaftActor is to be extended.
39  */
40 public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort {
41     private static final class PayloadIdentifier extends AbstractStringIdentifier<PayloadIdentifier> {
42         private static final long serialVersionUID = 1L;
43
44         PayloadIdentifier(final long identifier) {
45             super(String.valueOf(identifier));
46         }
47     }
48
49     private final Map<String, String> state = new HashMap<>();
50     private final Optional<ActorRef> roleChangeNotifier;
51
52     private long persistIdentifier = 1;
53
54     public ExampleActor(final String id, final Map<String, String> peerAddresses,
55         final Optional<ConfigParams> configParams) {
56         super(id, peerAddresses, configParams, (short)0);
57         setPersistence(true);
58         roleChangeNotifier = createRoleChangeNotifier(id);
59     }
60
61     public static Props props(final String id, final Map<String, String> peerAddresses,
62             final Optional<ConfigParams> configParams) {
63         return Props.create(ExampleActor.class, id, peerAddresses, configParams);
64     }
65
66     @Override
67     protected void handleNonRaftCommand(final Object message) {
68         if (message instanceof KeyValue) {
69             if (isLeader()) {
70                 persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
71             } else if (getLeader() != null) {
72                 getLeader().forward(message, getContext());
73             }
74
75         } else if (message instanceof PrintState) {
76             if (LOG.isDebugEnabled()) {
77                 LOG.debug("State of the node:{} has entries={}, {}",
78                     getId(), state.size(), getReplicatedLogState());
79             }
80
81         } else if (message instanceof PrintRole) {
82             if (LOG.isDebugEnabled()) {
83                 if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) {
84                     final String followers = ((Leader)getCurrentBehavior()).printFollowerStates();
85                     LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(),
86                         getRaftActorContext().getPeerIds(), followers);
87                 } else {
88                     LOG.debug("{} = {}, Peers={}", getId(), getRaftState(),
89                         getRaftActorContext().getPeerIds());
90                 }
91
92
93             }
94
95         } else {
96             super.handleNonRaftCommand(message);
97         }
98     }
99
100     protected String getReplicatedLogState() {
101         return "snapshotIndex=" + getRaftActorContext().getReplicatedLog().getSnapshotIndex()
102             + ", snapshotTerm=" + getRaftActorContext().getReplicatedLog().getSnapshotTerm()
103             + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
104     }
105
106     public Optional<ActorRef> createRoleChangeNotifier(final String actorId) {
107         ActorRef exampleRoleChangeNotifier = getContext().actorOf(
108             RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
109         return Optional.<ActorRef>of(exampleRoleChangeNotifier);
110     }
111
112     @Override
113     protected Optional<ActorRef> getRoleChangeNotifier() {
114         return roleChangeNotifier;
115     }
116
117     @Override
118     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
119         if (data instanceof KeyValue kv) {
120             state.put(kv.getKey(), kv.getValue());
121             if (clientActor != null) {
122                 clientActor.tell(new KeyValueSaved(), getSelf());
123             }
124         }
125     }
126
127     @Override
128     @SuppressWarnings("checkstyle:IllegalCatch")
129     public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
130         try {
131             if (installSnapshotStream.isPresent()) {
132                 SerializationUtils.serialize((Serializable) state, installSnapshotStream.orElseThrow());
133             }
134         } catch (RuntimeException e) {
135             LOG.error("Exception in creating snapshot", e);
136         }
137
138         getSelf().tell(new CaptureSnapshotReply(new MapState(state), installSnapshotStream), null);
139     }
140
141     @Override
142     public void applySnapshot(final Snapshot.State snapshotState) {
143         state.clear();
144         state.putAll(((MapState)snapshotState).state);
145
146         if (LOG.isDebugEnabled()) {
147             LOG.debug("Snapshot applied to state : {}", state.size());
148         }
149     }
150
151     @Override
152     protected void onStateChanged() {
153
154     }
155
156     @Override
157     public String persistenceId() {
158         return getId();
159     }
160
161     @Override
162     protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
163         return this;
164     }
165
166     @Override
167     public void startLogRecoveryBatch(final int maxBatchSize) {
168     }
169
170     @Override
171     public void appendRecoveredLogEntry(final Payload data) {
172     }
173
174     @Override
175     public void applyCurrentLogRecoveryBatch() {
176     }
177
178     @Override
179     public void onRecoveryComplete() {
180     }
181
182     @Override
183     public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
184     }
185
186     @Override
187     protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
188         return this;
189     }
190
191     @Override
192     public Snapshot getRestoreFromSnapshot() {
193         return null;
194     }
195
196     @SuppressWarnings("unchecked")
197     @Override
198     public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
199         try {
200             return new MapState((Map<String, String>) SerializationUtils.deserialize(snapshotBytes.read()));
201         } catch (IOException e) {
202             throw new IllegalStateException(e);
203         }
204     }
205
206     private static class MapState implements Snapshot.State {
207         private static final long serialVersionUID = 1L;
208
209         Map<String, String> state;
210
211         MapState(final Map<String, String> state) {
212             this.state = state;
213         }
214     }
215 }