X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-akka-raft-example%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fexample%2FExampleActor.java;h=10ccb579380e1ef466f939006067dcbb7d95576b;hb=534bf6f83465cc8a575b097c1e28fbb1f34d110a;hp=5cd4c14ee40783cb3b00507896d769621293469c;hpb=aafb8cb044e992dd784d1f4f66508599cc4cd588;p=controller.git diff --git a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java index 5cd4c14ee4..10ccb57938 100644 --- a/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java +++ b/opendaylight/md-sal/sal-akka-raft-example/src/main/java/org/opendaylight/controller/cluster/example/ExampleActor.java @@ -5,20 +5,17 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.example; import akka.actor.ActorRef; import akka.actor.Props; -import com.google.common.base.Optional; -import com.google.common.base.Throwables; import com.google.common.io.ByteSource; import java.io.IOException; import java.io.OutputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; -import javax.annotation.Nonnull; +import java.util.Optional; import org.apache.commons.lang3.SerializationUtils; import org.opendaylight.controller.cluster.example.messages.KeyValue; import org.opendaylight.controller.cluster.example.messages.KeyValueSaved; @@ -32,13 +29,13 @@ import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply; import org.opendaylight.controller.cluster.raft.behaviors.Leader; +import org.opendaylight.controller.cluster.raft.messages.Payload; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; -import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.util.AbstractStringIdentifier; /** - * A sample actor showing how the RaftActor is to be extended + * A sample actor showing how the RaftActor is to be extended. */ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, RaftActorSnapshotCohort { private static final class PayloadIdentifier extends AbstractStringIdentifier { @@ -50,13 +47,12 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } private final Map state = new HashMap<>(); - - private long persistIdentifier = 1; private final Optional roleChangeNotifier; + private long persistIdentifier = 1; - public ExampleActor(String id, Map peerAddresses, - Optional configParams) { + public ExampleActor(final String id, final Map peerAddresses, + final Optional configParams) { super(id, peerAddresses, configParams, (short)0); setPersistence(true); roleChangeNotifier = createRoleChangeNotifier(id); @@ -68,26 +64,24 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - protected void handleNonRaftCommand(Object message) { - if(message instanceof KeyValue){ - if(isLeader()) { + protected void handleNonRaftCommand(final Object message) { + if (message instanceof KeyValue) { + if (isLeader()) { persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false); - } else { - if(getLeader() != null) { - getLeader().forward(message, getContext()); - } + } else if (getLeader() != null) { + getLeader().forward(message, getContext()); } } else if (message instanceof PrintState) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("State of the node:{} has entries={}, {}", getId(), state.size(), getReplicatedLogState()); } } else if (message instanceof PrintRole) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { if (getRaftState() == RaftState.Leader || getRaftState() == RaftState.IsolatedLeader) { - final String followers = ((Leader)this.getCurrentBehavior()).printFollowerStates(); + final String followers = ((Leader)getCurrentBehavior()).printFollowerStates(); LOG.debug("{} = {}, Peers={}, followers={}", getId(), getRaftState(), getRaftActorContext().getPeerIds(), followers); } else { @@ -109,8 +103,8 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, + ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size(); } - public Optional createRoleChangeNotifier(String actorId) { - ActorRef exampleRoleChangeNotifier = this.getContext().actorOf( + public Optional createRoleChangeNotifier(final String actorId) { + ActorRef exampleRoleChangeNotifier = getContext().actorOf( RoleChangeNotifier.getProps(actorId), actorId + "-notifier"); return Optional.of(exampleRoleChangeNotifier); } @@ -122,22 +116,22 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { - if(data instanceof KeyValue){ - KeyValue kv = (KeyValue) data; + if (data instanceof KeyValue kv) { state.put(kv.getKey(), kv.getValue()); - if(clientActor != null) { + if (clientActor != null) { clientActor.tell(new KeyValueSaved(), getSelf()); } } } @Override - public void createSnapshot(ActorRef actorRef, java.util.Optional installSnapshotStream) { + @SuppressWarnings("checkstyle:IllegalCatch") + public void createSnapshot(final ActorRef actorRef, final Optional installSnapshotStream) { try { if (installSnapshotStream.isPresent()) { SerializationUtils.serialize((Serializable) state, installSnapshotStream.get()); } - } catch (Exception e) { + } catch (RuntimeException e) { LOG.error("Exception in creating snapshot", e); } @@ -145,38 +139,36 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applySnapshot(Snapshot.State snapshotState) { + public void applySnapshot(final Snapshot.State snapshotState) { state.clear(); - try { - state.putAll(((MapState)snapshotState).state); - } catch (Exception e) { - LOG.error("Exception in applying snapshot", e); - } - if(LOG.isDebugEnabled()) { - LOG.debug("Snapshot applied to state : {}", ((HashMap) state).size()); + state.putAll(((MapState)snapshotState).state); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshot applied to state : {}", state.size()); } } - @Override protected void onStateChanged() { + @Override + protected void onStateChanged() { } - @Override public String persistenceId() { + @Override + public String persistenceId() { return getId(); } @Override - @Nonnull protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() { return this; } @Override - public void startLogRecoveryBatch(int maxBatchSize) { + public void startLogRecoveryBatch(final int maxBatchSize) { } @Override - public void appendRecoveredLogEntry(Payload data) { + public void appendRecoveredLogEntry(final Payload data) { } @Override @@ -188,7 +180,7 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, } @Override - public void applyRecoverySnapshot(Snapshot.State snapshotState) { + public void applyRecoverySnapshot(final Snapshot.State snapshotState) { } @Override @@ -201,27 +193,22 @@ public class ExampleActor extends RaftActor implements RaftActorRecoveryCohort, return null; } + @SuppressWarnings("unchecked") @Override - public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) { + public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) { try { - return deserializePreCarbonSnapshot(snapshotBytes.read()); + return new MapState((Map) SerializationUtils.deserialize(snapshotBytes.read())); } catch (IOException e) { - throw Throwables.propagate(e); + throw new IllegalStateException(e); } } - @SuppressWarnings("unchecked") - @Override - public Snapshot.State deserializePreCarbonSnapshot(byte[] from) { - return new MapState((Map) SerializationUtils.deserialize(from)); - } - private static class MapState implements Snapshot.State { private static final long serialVersionUID = 1L; Map state; - MapState(Map state) { + MapState(final Map state) { this.state = state; } }