2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.actor.Status.Failure;
15 import java.io.Closeable;
16 import java.util.LinkedHashSet;
18 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import scala.concurrent.duration.FiniteDuration;
24 * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
26 * @author Thomas Pantelis
28 class ShardTransactionMessageRetrySupport implements Closeable {
29 private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
31 static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
33 private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
34 private final Shard shard;
36 ShardTransactionMessageRetrySupport(final Shard shard) {
40 void addMessageToRetry(final Object message, final ActorRef replyTo, final String failureMessage) {
41 LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
43 MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
45 FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
46 messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
47 messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
49 messagesToRetry.add(messageInfo);
52 void retryMessages() {
53 if (messagesToRetry.isEmpty()) {
57 MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
58 messagesToRetry.clear();
60 for (MessageInfo info: copy) {
61 LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
66 void onTimerMessage(final Object message) {
67 MessageInfo messageInfo = (MessageInfo)message;
69 LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
71 messagesToRetry.remove(messageInfo);
72 messageInfo.timedOut(shard);
77 for (MessageInfo info: messagesToRetry) {
81 messagesToRetry.clear();
84 private static class MessageInfo {
86 final ActorRef replyTo;
87 final String failureMessage;
90 MessageInfo(final Object message, final ActorRef replyTo, final String failureMessage) {
91 this.message = message;
92 this.replyTo = replyTo;
93 this.failureMessage = requireNonNull(failureMessage);
96 void retry(final Shard shard) {
98 shard.getSelf().tell(message, replyTo);
101 void timedOut(final Shard shard) {
102 replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),