2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.actor.Status.Failure;
15 import java.io.Closeable;
16 import java.util.LinkedHashSet;
18 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import scala.concurrent.duration.FiniteDuration;
24 * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
26 * @author Thomas Pantelis
28 @Deprecated(since = "9.0.0", forRemoval = true)
29 class ShardTransactionMessageRetrySupport implements Closeable {
30 private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
32 static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
34 private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
35 private final Shard shard;
37 ShardTransactionMessageRetrySupport(final Shard shard) {
41 void addMessageToRetry(final Object message, final ActorRef replyTo, final String failureMessage) {
42 LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
44 MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
46 FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
47 messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
48 messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
50 messagesToRetry.add(messageInfo);
53 void retryMessages() {
54 if (messagesToRetry.isEmpty()) {
58 MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
59 messagesToRetry.clear();
61 for (MessageInfo info: copy) {
62 LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
67 void onTimerMessage(final Object message) {
68 MessageInfo messageInfo = (MessageInfo)message;
70 LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
72 messagesToRetry.remove(messageInfo);
73 messageInfo.timedOut(shard);
78 for (MessageInfo info: messagesToRetry) {
82 messagesToRetry.clear();
85 private static final class MessageInfo {
87 final ActorRef replyTo;
88 final String failureMessage;
91 MessageInfo(final Object message, final ActorRef replyTo, final String failureMessage) {
92 this.message = message;
93 this.replyTo = replyTo;
94 this.failureMessage = requireNonNull(failureMessage);
97 void retry(final Shard shard) {
99 shard.getSelf().tell(message, replyTo);
102 void timedOut(final Shard shard) {
103 replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),