import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
+import org.opendaylight.controller.cluster.access.client.ClientActorContext;
import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection;
import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
@GuardedBy("lock")
@Override
- void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
+ void replayRequests(final Collection<ConnectionEntry> previousEntries) {
// First look for our Create message
Iterator<ConnectionEntry> it = previousEntries.iterator();
while (it.hasNext()) {
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof CreateLocalHistoryRequest) {
- successor.connection.sendRequest(req, e.getCallback());
+ successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
it.remove();
break;
}
}
for (AbstractProxyTransaction t : proxies.values()) {
- LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
- final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(),
- t.isSnapshotOnly());
- LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
- t.replayMessages(newProxy, previousEntries);
+ LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor);
+ t.replayMessages(successor, previousEntries);
}
// Now look for any finalizing messages
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof DestroyLocalHistoryRequest) {
- successor.connection.sendRequest(req, e.getCallback());
+ successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks());
it.remove();
break;
}
return identifier;
}
+ final ClientActorContext context() {
+ return connection.context();
+ }
+
final long currentTime() {
return connection.currentTime();
}
return parent;
}
- final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
+ AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId,
final boolean snapshotOnly) {
lock.lock();
try {
final void abortTransaction(final AbstractProxyTransaction tx) {
lock.lock();
try {
- proxies.remove(tx.getIdentifier());
- LOG.debug("Proxy {} aborting transaction {}", this, tx);
+ // Removal will be completed once purge completes
+ LOG.debug("Proxy {} aborted transaction {}", this, tx);
onTransactionAborted(tx);
} finally {
lock.unlock();