2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Cancellable;
12 import akka.actor.Status.Failure;
13 import java.io.Closeable;
14 import java.util.LinkedHashSet;
16 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19 import scala.concurrent.duration.FiniteDuration;
22 * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
24 * @author Thomas Pantelis
26 class ShardTransactionMessageRetrySupport implements Closeable {
27 private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
29 static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
31 private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
32 private final Shard shard;
34 ShardTransactionMessageRetrySupport(Shard shard) {
38 void addMessageToRetry(Object message, ActorRef replyTo, String failureMessage) {
39 LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
41 MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
43 FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
44 messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
45 messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
47 messagesToRetry.add(messageInfo);
50 void retryMessages() {
51 if (messagesToRetry.isEmpty()) {
55 MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
56 messagesToRetry.clear();
58 for (MessageInfo info: copy) {
59 LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
64 void onTimerMessage(Object message) {
65 MessageInfo messageInfo = (MessageInfo)message;
67 LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
69 messagesToRetry.remove(messageInfo);
70 messageInfo.timedOut(shard);
75 for (MessageInfo info: messagesToRetry) {
79 messagesToRetry.clear();
82 private static class MessageInfo {
84 final ActorRef replyTo;
85 final String failureMessage;
88 MessageInfo(Object message, ActorRef replyTo, String failureMessage) {
89 this.message = message;
90 this.replyTo = replyTo;
91 this.failureMessage = failureMessage;
94 void retry(Shard shard) {
96 shard.getSelf().tell(message, replyTo);
99 void timedOut(Shard shard) {
100 replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),