*/
package org.opendaylight.controller.cluster.access.client;
+import static java.util.Objects.requireNonNull;
+
import akka.persistence.RecoveryCompleted;
import akka.persistence.SnapshotOffer;
-import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.slf4j.Logger;
*/
final class RecoveringClientActorBehavior extends AbstractClientActorBehavior<InitialClientActorContext> {
private static final Logger LOG = LoggerFactory.getLogger(RecoveringClientActorBehavior.class);
+
+ /*
+ * Base for the property name which overrides the initial generation when we fail to find anything from persistence.
+ * The actual property name has the frontend type name appended.
+ */
+ private static final String GENERATION_OVERRIDE_PROP_BASE =
+ "org.opendaylight.controller.cluster.access.client.initial.generation.";
+
private final FrontendIdentifier currentFrontend;
private ClientIdentifier lastId = null;
- RecoveringClientActorBehavior(final InitialClientActorContext context, final FrontendIdentifier frontendId) {
- super(context);
- currentFrontend = Preconditions.checkNotNull(frontendId);
+ RecoveringClientActorBehavior(final AbstractClientActor actor, final FrontendIdentifier frontendId) {
+ super(new InitialClientActorContext(actor, frontendId.toPersistentId()));
+ currentFrontend = requireNonNull(frontendId);
}
@Override
@Override
AbstractClientActorBehavior<?> onReceiveRecover(final Object recover) {
- if (recover instanceof RecoveryCompleted) {
- final ClientIdentifier nextId;
- if (lastId != null) {
- if (!currentFrontend.equals(lastId.getFrontendId())) {
- LOG.error("{}: Mismatched frontend identifier, shutting down. Current: {} Saved: {}",
- persistenceId(), currentFrontend, lastId.getFrontendId());
- return null;
- }
+ if (recover instanceof RecoveryCompleted msg) {
+ return onRecoveryCompleted(msg);
+ } else if (recover instanceof SnapshotOffer snapshotOffer) {
+ onSnapshotOffer(snapshotOffer);
+ } else {
+ LOG.warn("{}: ignoring recovery message {}", persistenceId(), recover);
+ }
+ return this;
+ }
+
+ private void onSnapshotOffer(final SnapshotOffer snapshotOffer) {
+ lastId = (ClientIdentifier) snapshotOffer.snapshot();
+ LOG.debug("{}: recovered identifier {}", persistenceId(), lastId);
+ }
- nextId = ClientIdentifier.create(currentFrontend, lastId.getGeneration() + 1);
- } else {
- nextId = ClientIdentifier.create(currentFrontend, 0);
+ private SavingClientActorBehavior onRecoveryCompleted(final RecoveryCompleted msg) {
+ final ClientIdentifier nextId;
+ if (lastId != null) {
+ if (!currentFrontend.equals(lastId.getFrontendId())) {
+ LOG.error("{}: Mismatched frontend identifier, shutting down. Current: {} Saved: {}",
+ persistenceId(), currentFrontend, lastId.getFrontendId());
+ return null;
}
- LOG.debug("{}: persisting new identifier {}", persistenceId(), nextId);
- context().saveSnapshot(nextId);
- return new SavingClientActorBehavior(context(), nextId);
- } else if (recover instanceof SnapshotOffer) {
- lastId = (ClientIdentifier) ((SnapshotOffer)recover).snapshot();
- LOG.debug("{}: recovered identifier {}", persistenceId(), lastId);
+ nextId = ClientIdentifier.create(currentFrontend, lastId.getGeneration() + 1);
} else {
- LOG.warn("{}: ignoring recovery message {}", persistenceId(), recover);
+ nextId = ClientIdentifier.create(currentFrontend, initialGeneration());
}
- return this;
+ LOG.debug("{}: persisting new identifier {}", persistenceId(), nextId);
+ context().saveSnapshot(nextId);
+ return new SavingClientActorBehavior(context(), nextId);
+ }
+
+ private long initialGeneration() {
+ final String propName = GENERATION_OVERRIDE_PROP_BASE + currentFrontend.getClientType().getName();
+ final String propValue = System.getProperty(propName);
+ if (propValue == null) {
+ LOG.debug("{}: no initial generation override, starting from 0", persistenceId());
+ return 0;
+ }
+
+ final long ret;
+ try {
+ ret = Long.parseUnsignedLong(propValue);
+ } catch (NumberFormatException e) {
+ LOG.warn("{}: failed to parse initial generation override '{}', starting from 0", persistenceId(),
+ propValue, e);
+ return 0;
+ }
+
+ LOG.info("{}: initial generation set to {}", persistenceId(), ret);
+ return ret;
}
-}
\ No newline at end of file
+}