import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@GuardedBy("lock")
final void finishReplay(final ReconnectForwarder forwarder) {
- setForwarder(forwarder);
+ queue.setForwarder(forwarder);
lock.unlock();
}
@GuardedBy("lock")
final void setForwarder(final ReconnectForwarder forwarder) {
- queue.setForwarder(forwarder, currentTime());
+ final long now = currentTime();
+ final Iterator<ConnectionEntry> it = queue.asIterable().iterator();
+ while (it.hasNext()) {
+ final ConnectionEntry e = it.next();
+ forwarder.forwardEntry(e, now);
+ it.remove();
+ }
+
+ queue.setForwarder(forwarder);
}
@GuardedBy("lock")
poisonQueue(pending, cause);
}
- final void setForwarder(final ReconnectForwarder forwarder, final long now) {
+ final void setForwarder(final ReconnectForwarder forwarder) {
Verify.verify(successor == null, "Successor {} already set on connection {}", successor, this);
- successor = Preconditions.checkNotNull(forwarder);
- LOG.debug("Connection {} superseded by {}, splicing queue", this, successor);
-
- ConnectionEntry entry = inflight.poll();
- while (entry != null) {
- successor.forwardEntry(entry, now);
- entry = inflight.poll();
- }
+ Verify.verify(inflight.isEmpty(), "In-flight requests after replay: %s", inflight);
+ Verify.verify(pending.isEmpty(), "Pending requests after replay: %s", pending);
- entry = pending.poll();
- while (entry != null) {
- successor.forwardEntry(entry, now);
- entry = pending.poll();
- }
+ successor = Preconditions.checkNotNull(forwarder);
+ LOG.debug("Connection {} superseded by {}", this, successor);
}
final void remove(final long now) {
package org.opendaylight.controller.cluster.access.client;
import static org.hamcrest.CoreMatchers.hasItems;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
+import com.google.common.collect.Iterables;
import java.util.Optional;
import java.util.function.Consumer;
import org.junit.After;
connection.sendRequest(request2, callback);
final Iterable<ConnectionEntry> entries = connection.startReplay();
Assert.assertThat(entries, hasItems(entryWithRequest(request1), entryWithRequest(request2)));
+ Assert.assertEquals(2, Iterables.size(entries));
+ Iterables.removeIf(entries, e -> true);
final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
connection.finishReplay(forwarder);
- verify(forwarder).forwardEntry(argThat(entryWithRequest(request1)), anyLong());
- verify(forwarder).forwardEntry(argThat(entryWithRequest(request2)), anyLong());
}
@After
extends AbstractClientConnectionTest<ConnectedClientConnection<BackendInfo>, BackendInfo> {
@Override
- protected ConnectedClientConnection createConnection() {
+ protected ConnectedClientConnection<BackendInfo> createConnection() {
final BackendInfo backend = new BackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON, 10);
return new ConnectedClientConnection<>(context, 0L, backend);
}
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
final Consumer<Response<?, ?>> callback = createConsumerMock();
final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
- queue.enqueue(entry, ticker.read());
final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
- final long setForwarderNow = ticker.read();
- queue.setForwarder(forwarder, setForwarderNow);
- verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
+ queue.setForwarder(forwarder);
final long secondEnqueueNow = ticker.read();
queue.enqueue(entry, secondEnqueueNow);
verify(forwarder).forwardEntry(entry, secondEnqueueNow);
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
@Override
void replayRequests(final Iterable<ConnectionEntry> previousEntries) {
// First look for our Create message
- for (ConnectionEntry e : previousEntries) {
+ Iterator<ConnectionEntry> it = previousEntries.iterator();
+ while (it.hasNext()) {
+ final ConnectionEntry e = it.next();
final Request<?, ?> req = e.getRequest();
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
if (req instanceof CreateLocalHistoryRequest) {
successor.connection.sendRequest(req, e.getCallback());
+ it.remove();
break;
}
}
}
// Now look for any finalizing messages
- for (ConnectionEntry e : previousEntries) {
+ it = previousEntries.iterator();
+ while (it.hasNext()) {
+ final ConnectionEntry e = it.next();
final Request<?, ?> req = e.getRequest();
if (identifier.equals(req.getTarget())) {
Verify.verify(req instanceof LocalHistoryRequest);
- successor.connection.sendRequest(req, e.getCallback());
+ if (req instanceof DestroyLocalHistoryRequest) {
+ successor.connection.sendRequest(req, e.getCallback());
+ it.remove();
+ break;
+ }
}
}
}
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
+import akka.actor.Props;
import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.ActorSystemProvider;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.AbstractTest;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
}
@Test
- public void testSingleNodeWrites() throws Exception {
+ public void testSingleNodeWritesAndRead() throws Exception {
initEmptyDatastores();
final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
verifyNoMoreInteractions(mockedDataTreeListener);
+ final String shardName = ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier());
+ LOG.debug("Creating distributed datastore client for shard {}", shardName);
+
+ final ActorContext actorContext = leaderDistributedDataStore.getActorContext();
+ final Props distributedDataStoreClientProps =
+ SimpleDataStoreClientActor.props(actorContext.getCurrentMemberName(),
+ "Shard-" + shardName, actorContext, shardName);
+
+ final ActorRef clientActor = leaderSystem.actorOf(distributedDataStoreClientProps);
+ final DataStoreClient distributedDataStoreClient = SimpleDataStoreClientActor
+ .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+
+ final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
+ final ClientTransaction tx2 = localHistory.createTransaction();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>,
+ org.opendaylight.mdsal.common.api.ReadFailedException> read =
+ tx2.read(YangInstanceIdentifier.EMPTY);
+
+ final Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
+ tx2.abort();
+ localHistory.close();
+
shardRegistration.close().toCompletableFuture().get();
}