import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
import org.opendaylight.controller.config.persist.api.Persister;
import org.opendaylight.controller.config.persist.storage.file.xml.model.ConfigSnapshot;
public static ConfigSnapshotHolder loadLastConfig(final File file) throws JAXBException {
JAXBContext jaxbContext = JAXBContext.newInstance(ConfigSnapshot.class);
Unmarshaller um = jaxbContext.createUnmarshaller();
-
- return asHolder((ConfigSnapshot) um.unmarshal(file));
+ XMLInputFactory xif = XMLInputFactory.newFactory();
+ xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+ xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+ try {
+ XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(file));
+ return asHolder((ConfigSnapshot) um.unmarshal(xsr));
+ } catch (final XMLStreamException e) {
+ throw new JAXBException(e);
+ }
}
private static ConfigSnapshotHolder asHolder(final ConfigSnapshot unmarshalled) {
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
import org.apache.karaf.features.ConfigFileInfo;
import org.apache.karaf.features.Feature;
import org.opendaylight.controller.config.persist.api.ConfigSnapshotHolder;
Preconditions.checkNotNull(feature);
this.fileInfo = fileInfo;
this.featureChain.add(feature);
+ // TODO extract utility method for umarshalling config snapshots
JAXBContext jaxbContext = JAXBContext.newInstance(ConfigSnapshot.class);
Unmarshaller um = jaxbContext.createUnmarshaller();
- File file = new File(fileInfo.getFinalname());
- unmarshalled = ((ConfigSnapshot) um.unmarshal(file));
+ XMLInputFactory xif = XMLInputFactory.newFactory();
+ xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+ xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+ try {
+ XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(new File(fileInfo.getFinalname())));
+ unmarshalled = ((ConfigSnapshot) um.unmarshal(xsr));
+ } catch (final XMLStreamException e) {
+ throw new JAXBException(e);
+ }
}
/*
* (non-Javadoc)
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
import org.apache.commons.lang3.StringUtils;
@XmlRootElement(name = "persisted-snapshots")
try {
JAXBContext jaxbContext = JAXBContext.newInstance(Config.class);
Unmarshaller um = jaxbContext.createUnmarshaller();
-
- return (Config) um.unmarshal(from);
- } catch (JAXBException e) {
+ XMLInputFactory xif = XMLInputFactory.newFactory();
+ xif.setProperty(XMLInputFactory.IS_SUPPORTING_EXTERNAL_ENTITIES, false);
+ xif.setProperty(XMLInputFactory.SUPPORT_DTD, false);
+ XMLStreamReader xsr = xif.createXMLStreamReader(new StreamSource(from));
+ return ((Config) um.unmarshal(xsr));
+ } catch (JAXBException | XMLStreamException e) {
throw new PersistException("Unable to restore configuration", e);
}
}
scheduleElection(electionDuration());
}
+ private boolean isLogEntryPresent(long index){
+ if(index == context.getReplicatedLog().getSnapshotIndex()){
+ return true;
+ }
+
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(index);
+
+ return previousEntry != null;
+
+ }
+
+ private long getLogEntryTerm(long index){
+ if(index == context.getReplicatedLog().getSnapshotIndex()){
+ return context.getReplicatedLog().getSnapshotTerm();
+ }
+
+ ReplicatedLogEntry previousEntry = context.getReplicatedLog()
+ .get(index);
+
+ if(previousEntry != null){
+ return previousEntry.getTerm();
+ }
+
+ return -1;
+ }
+
@Override protected RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ AppendEntries appendEntries) {
if(appendEntries.getEntries() != null && appendEntries.getEntries().size() > 0) {
if(LOG.isDebugEnabled()) {
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
- ReplicatedLogEntry previousEntry = context.getReplicatedLog()
- .get(appendEntries.getPrevLogIndex());
+ long prevLogTerm = getLogEntryTerm(appendEntries.getPrevLogIndex());
+ boolean prevEntryPresent = isLogEntryPresent(appendEntries.getPrevLogIndex());
boolean outOfSync = true;
// First check if the logs are in sync or not
if (lastIndex() == -1
- && appendEntries.getPrevLogIndex() != -1) {
+ && appendEntries.getPrevLogIndex() != -1) {
// The follower's log is out of sync because the leader does have
// an entry at prevLogIndex and this follower has no entries in
if(LOG.isDebugEnabled()) {
LOG.debug("The followers log is empty and the senders prevLogIndex is {}",
- appendEntries.getPrevLogIndex());
+ appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
- && appendEntries.getPrevLogIndex() != -1
- && previousEntry == null) {
+ && appendEntries.getPrevLogIndex() != -1
+ && !prevEntryPresent) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry was not found in it's log
if(LOG.isDebugEnabled()) {
LOG.debug("The log is not empty but the prevLogIndex {} was not found in it",
- appendEntries.getPrevLogIndex());
+ appendEntries.getPrevLogIndex());
}
} else if (lastIndex() > -1
- && previousEntry != null
- && previousEntry.getTerm()!= appendEntries.getPrevLogTerm()) {
+ && prevEntryPresent
+ && prevLogTerm != appendEntries.getPrevLogTerm()) {
// The follower's log is out of sync because the Leader's
// prevLogIndex entry does exist in the follower's log but it has
// a different term in it
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug(
- "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
- , previousEntry.getTerm()
- , appendEntries.getPrevLogTerm());
+ "Cannot append entries because previous entry term {} is not equal to append entries prevLogTerm {}"
+ , prevLogTerm
+ , appendEntries.getPrevLogTerm());
}
} else {
outOfSync = false;
// We found that the log was out of sync so just send a negative
// reply and return
if(LOG.isDebugEnabled()) {
- LOG.debug("Follower is out-of-sync, " +
+ LOG.debug("Follower ({}) is out-of-sync, " +
"so sending negative reply, lastIndex():{}, lastTerm():{}",
- lastIndex(), lastTerm()
+ context.getId(), lastIndex(), lastTerm()
);
}
sender.tell(
}};
}
+ @Test
+ public void testHandleAppendEntriesPreviousLogEntryMissing(){
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 0, new MockRaftActorContext.MockPayload("zero")));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 1, new MockRaftActorContext.MockPayload("one")));
+ log.append(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 2, new MockRaftActorContext.MockPayload("two")));
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+ AppendEntries appendEntries =
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftActorBehavior raftBehavior =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftBehavior);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(false, out);
+
+ }};
+
+ }
+
+ @Test
+ public void testHandleAppendAfterInstallingSnapshot(){
+ new JavaTestKit(getSystem()) {{
+
+ MockRaftActorContext context = (MockRaftActorContext)
+ createActorContext();
+
+
+ // Prepare the receivers log
+ MockRaftActorContext.SimpleReplicatedLog log =
+ new MockRaftActorContext.SimpleReplicatedLog();
+
+ // Set up a log as if it has been snapshotted
+ log.setSnapshotIndex(3);
+ log.setSnapshotTerm(1);
+
+ context.setReplicatedLog(log);
+
+ // Prepare the entries to be sent with AppendEntries
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+ entries.add(
+ new MockRaftActorContext.MockReplicatedLogEntry(1, 4, new MockRaftActorContext.MockPayload("two-1")));
+
+ AppendEntries appendEntries =
+ new AppendEntries(1, "leader-1", 3, 1, entries, 4);
+
+ RaftActorBehavior behavior = createBehavior(context);
+
+ // Send an unknown message so that the state of the RaftActor remains unchanged
+ RaftActorBehavior expected = behavior.handleMessage(getRef(), "unknown");
+
+ RaftActorBehavior raftBehavior =
+ behavior.handleMessage(getRef(), appendEntries);
+
+ assertEquals(expected, raftBehavior);
+
+ // Also expect an AppendEntriesReply to be sent where success is false
+ final Boolean out = new ExpectMsg<Boolean>(duration("1 seconds"),
+ "AppendEntriesReply") {
+ // do not put code outside this method, will run afterwards
+ protected Boolean match(Object in) {
+ if (in instanceof AppendEntriesReply) {
+ AppendEntriesReply reply = (AppendEntriesReply) in;
+ return reply.isSuccess();
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get();
+
+ assertEquals(true, out);
+
+ }};
+
+ }
+
/**
* This test verifies that when InstallSnapshot is received by
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.UnknownFieldSet;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.ref.SoftReference;
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeModificationByteStringPayload extends Payload implements
+ Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private ByteString byteString;
+ private SoftReference<PersistentMessages.CompositeModification> modificationReference;
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeModificationByteStringPayload.class);
+
+ public CompositeModificationByteStringPayload(){
+ byteString = null;
+ }
+ public CompositeModificationByteStringPayload(Object modification){
+ this(((PersistentMessages.CompositeModification) modification).toByteString());
+ this.modificationReference = new SoftReference<>((PersistentMessages.CompositeModification) modification);
+ }
+
+ private CompositeModificationByteStringPayload(ByteString byteString){
+ this.byteString = Preconditions.checkNotNull(byteString, "byteString should not be null");
+ }
+
+
+ @Override
+ public Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> encode() {
+ Preconditions.checkState(byteString!=null);
+ Map<GeneratedMessage.GeneratedExtension, PersistentMessages.CompositeModification> map = new HashMap<>();
+ map.put(org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification,
+ getModificationInternal());
+ return map;
+ }
+
+ @Override
+ public Payload decode(
+ AppendEntriesMessages.AppendEntries.ReplicatedLogEntry.Payload payload) {
+ PersistentMessages.CompositeModification modification = payload
+ .getExtension(
+ org.opendaylight.controller.protobuff.messages.shard.CompositeModificationPayload.modification);
+
+ // The extension was put in the unknown field.
+ // This is because extensions need to be registered
+ // see org.opendaylight.controller.mdsal.CompositeModificationPayload.registerAllExtensions
+ // also see https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/ExtensionRegistry
+ // If that is not done then on the other end the extension shows up as an unknown field
+ // Need to figure out a better way to do this
+ if(payload.getUnknownFields().hasField(2)){
+ UnknownFieldSet.Field field =
+ payload.getUnknownFields().getField(2);
+
+ return new CompositeModificationByteStringPayload(field.getLengthDelimitedList().get(0));
+ }
+
+ return new CompositeModificationByteStringPayload(modification);
+ }
+
+ public Object getModification(){
+ return getModificationInternal();
+ }
+
+ private PersistentMessages.CompositeModification getModificationInternal(){
+ if(this.modificationReference != null && this.modificationReference.get() != null){
+ return this.modificationReference.get();
+ }
+ try {
+ PersistentMessages.CompositeModification compositeModification = PersistentMessages.CompositeModification.parseFrom(this.byteString);
+ this.modificationReference = new SoftReference<>(compositeModification);
+ return compositeModification;
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Unexpected exception occurred when parsing byteString to CompositeModification", e);
+ }
+
+ return null;
+ }
+
+ public int size(){
+ return byteString.size();
+ }
+
+ private void writeObject(java.io.ObjectOutputStream stream)
+ throws IOException {
+ byteString.writeTo(stream);
+ }
+
+ private void readObject(java.io.ObjectInputStream stream)
+ throws IOException, ClassNotFoundException {
+ byteString = ByteString.readFrom(stream);
+ }
+
+ @VisibleForTesting
+ public void clearModificationReference(){
+ if(this.modificationReference != null) {
+ this.modificationReference.clear();
+ }
+ }
+}
\ No newline at end of file
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
cohortEntry.getCohort().preCommit().get();
Shard.this.persistData(getSender(), transactionID,
- new CompositeModificationPayload(cohortEntry.getModification().toSerializable()));
+ new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
} catch (InterruptedException | ExecutionException e) {
LOG.error(e, "An exception occurred while preCommitting transaction {}",
cohortEntry.getTransactionID());
protected void appendRecoveredLogEntry(final Payload data) {
if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
+ } else if (data instanceof CompositeModificationByteStringPayload) {
+ currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
} else {
LOG.error("Unknown state received {} during recovery", data);
}
if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
- if(modification == null) {
- LOG.error(
- "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
- identifier, clientActor != null ? clientActor.path().toString() : null);
- } else if(clientActor == null) {
- // There's no clientActor to which to send a commit reply so we must be applying
- // replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(
- modification, schemaContext));
- } else {
- // This must be the OK to commit after replication consensus.
- finishCommit(clientActor, identifier);
- }
+ applyModificationToState(clientActor, identifier, modification);
+ } else if(data instanceof CompositeModificationByteStringPayload ){
+ Object modification = ((CompositeModificationByteStringPayload) data).getModification();
+
+ applyModificationToState(clientActor, identifier, modification);
+
} else {
LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
data, data.getClass().getClassLoader(),
}
+ private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
+ if(modification == null) {
+ LOG.error(
+ "modification is null - this is very unexpected, clientActor = {}, identifier = {}",
+ identifier, clientActor != null ? clientActor.path().toString() : null);
+ } else if(clientActor == null) {
+ // There's no clientActor to which to send a commit reply so we must be applying
+ // replicated state from the leader.
+ commitWithNewTransaction(MutableCompositeModification.fromSerializable(
+ modification, schemaContext));
+ } else {
+ // This must be the OK to commit after replication consensus.
+ finishCommit(clientActor, identifier);
+ }
+ }
+
private void updateJournalStats() {
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class CompositeModificationByteStringPayloadTest {
+
+ private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
+
+ @Test
+ public void testSerialization(){
+ WriteModification writeModification =
+ new WriteModification(TestModel.TEST_PATH, ImmutableNodes
+ .containerNode(TestModel.TEST_QNAME),
+ TestModel.createTestContext());
+
+ MutableCompositeModification compositeModification =
+ new MutableCompositeModification();
+
+ compositeModification.addModification(writeModification);
+
+ CompositeModificationByteStringPayload compositeModificationByteStringPayload
+ = new CompositeModificationByteStringPayload(compositeModification.toSerializable());
+
+ byte[] bytes = SerializationUtils.serialize(compositeModificationByteStringPayload);
+
+ Object deserialize = SerializationUtils.deserialize(bytes);
+
+ assertTrue(deserialize instanceof CompositeModificationByteStringPayload);
+
+ }
+
+ @Test
+ public void testAppendEntries(){
+ List<ReplicatedLogEntry> entries = new ArrayList<>();
+
+ CompositeModificationByteStringPayload payload = newByteStringPayload(
+ new WriteModification(TestModel.OUTER_LIST_PATH,
+ ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
+ SCHEMA_CONTEXT));
+
+ payload.clearModificationReference();
+
+ entries.add(new ReplicatedLogImplEntry(0, 1, payload));
+
+
+ assertNotNull(new AppendEntries(10, "foobar", 10, 10, entries, 10).toSerializable());
+ }
+
+
+
+ private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+ MutableCompositeModification compMod = new MutableCompositeModification();
+ for(Modification mod: mods) {
+ compMod.addModification(mod);
+ }
+
+ return new CompositeModificationByteStringPayload(compMod.toSerializable());
+ }
+
+}
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
SCHEMA_CONTEXT))));
- int nListEntries = 11;
+ int nListEntries = 16;
Set<Integer> listEntryKeys = new HashSet<>();
- for(int i = 1; i <= nListEntries; i++) {
+ for(int i = 1; i <= nListEntries-5; i++) {
listEntryKeys.add(Integer.valueOf(i));
YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
newPayload(mod)));
}
+ // Add some of the new CompositeModificationByteStringPayload
+ for(int i = 11; i <= nListEntries; i++) {
+ listEntryKeys.add(Integer.valueOf(i));
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+ Modification mod = new MergeModification(path,
+ ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
+ SCHEMA_CONTEXT);
+ InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ newByteStringPayload(mod)));
+ }
+
+
InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
new ApplyLogEntries(nListEntries));
return new CompositeModificationPayload(compMod.toSerializable());
}
+ private CompositeModificationByteStringPayload newByteStringPayload(final Modification... mods) {
+ MutableCompositeModification compMod = new MutableCompositeModification();
+ for(Modification mod: mods) {
+ compMod.addModification(mod);
+ }
+
+ return new CompositeModificationByteStringPayload(compMod.toSerializable());
+ }
+
+
private DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
final MutableCompositeModification modification) {
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * A single YANG notification.
+ */
+public interface DOMNotification {
+ /**
+ * Return the type of this notification.
+ *
+ * @return Notification type.
+ */
+ @Nonnull SchemaPath getType();
+
+ /**
+ * Return the body of this notification.
+ *
+ * @return Notification body.
+ */
+ @Nonnull ContainerNode getBody();
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import java.util.EventListener;
+import javax.annotation.Nonnull;
+
+/**
+ * Interface implemented by listeners interested in {@link DOMNotification}s.
+ */
+public interface DOMNotificationListener extends EventListener {
+ /**
+ * Invoked whenever a {@link DOMNotification} matching the subscription
+ * criteria is received.
+ *
+ * @param notification Received notification
+ */
+ void onNotification(@Nonnull DOMNotification notification);
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * A registration of a {@link DOMNotificationListener}. Invoking {@link #close()} will prevent further
+ * delivery of events to the listener.
+ */
+public interface DOMNotificationListenerRegistration extends ListenerRegistration<DOMNotificationListener> {
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+/**
+ * A {@link DOMService} which allows its user to send {@link DOMNotification}s. It
+ * provides two styles of initiating the notification delivery, similar to
+ * {@link java.util.concurrent.BlockingQueue}:
+ * - a put-style method which waits until the implementation can accept the notification
+ * for delivery, and
+ * - an offer-style method, which attempts to enqueue the notification, but allows
+ * the caller to specify that it should never wait, or put an upper bound on how
+ * long it is going to wait.
+ */
+public interface DOMNotificationPublishService extends DOMService {
+ /**
+ * Well-known value indicating that the implementation is currently not
+ * able to accept a notification.
+ */
+ ListenableFuture<Object> REJECTED = Futures.immediateFailedFuture(new Throwable("Unacceptable blocking conditions encountered"));
+
+ /**
+ * Publish a notification. The result of this method is a {@link ListenableFuture}
+ * which will complete once the notification has been delivered to all immediate
+ * registrants. The type of the object resulting from the future is not defined
+ * and implementations may use it to convey additional information related to the
+ * publishing process.
+ *
+ * Abstract subclasses can refine the return type as returning a promise of a
+ * more specific type, e.g.:
+ *
+ * public interface DeliveryStatus { int getListenerCount(); }
+ * ListenableFuture<? extends DeliveryStatus> putNotification(DOMNotification notification);
+ *
+ * Once the Future succeeds, the resulting object can be queried for traits using
+ * instanceof, e.g:
+ *
+ * // Can block when (for example) the implemention's ThreadPool queue is full
+ * Object o = service.putNotification(notif).get();
+ * if (o instanceof DeliveryStatus) {
+ * DeliveryStatus ds = (DeliveryStatus)o;
+ * LOG.debug("Notification was received by {} listeners", ds.getListenerCount(););
+ * }
+ * }
+ *
+ * In case an implementation is running out of resources, it can block the calling
+ * thread until enough resources become available to accept the notification for
+ * processing, or it is interrupted.
+ *
+ * Caution: completion here means that the implementation has completed processing
+ * of the notification. This does not mean that all existing registrants
+ * have seen the notification. Most importantly, the delivery process at
+ * other cluster nodes may have not begun yet.
+ *
+ * @param notification Notification to be published.
+ * @return A listenable future which will report completion when the service
+ * has finished propagating the notification to its immediate registrants.
+ * @throws InterruptedException if interrupted while waiting
+ * @throws NullPointerException if notification is null.
+ */
+ @Nonnull ListenableFuture<? extends Object> putNotification(@Nonnull DOMNotification notification) throws InterruptedException;
+
+ /**
+ * Attempt to publish a notification. The result of this method is a {@link ListenableFuture}
+ * which will complete once the notification has been delivered to all immediate
+ * registrants. The type of the object resulting from the future is not defined
+ * and implementations may use it to convey additional information related to the
+ * publishing process. Unlike {@link #putNotification(DOMNotification)}, this method
+ * is guaranteed not to block if the underlying implementation encounters contention.
+ *
+ * @param notification Notification to be published.
+ * @return A listenable future which will report completion when the service
+ * has finished propagating the notification to its immediate registrants,
+ * or {@value #REJECTED} if resource constraints prevent
+ * the implementation from accepting the notification for delivery.
+ * @throws NullPointerException if notification is null.
+ */
+ @Nonnull ListenableFuture<? extends Object> offerNotification(@Nonnull DOMNotification notification);
+
+ /**
+ * Attempt to publish a notification. The result of this method is a {@link ListenableFuture}
+ * which will complete once the notification has been delivered to all immediate
+ * registrants. The type of the object resulting from the future is not defined
+ * and implementations may use it to convey additional information related to the
+ * publishing process. Unlike {@link #putNotification(DOMNotification)}, this method
+ * is guaranteed to block more than the specified timeout.
+ *
+ * @param notification Notification to be published.
+ * @param timeout how long to wait before giving up, in units of unit
+ * @param unit a TimeUnit determining how to interpret the timeout parameter
+ * @return A listenable future which will report completion when the service
+ * has finished propagating the notification to its immediate registrants,
+ * or {@value #REJECTED} if resource constraints prevent
+ * the implementation from accepting the notification for delivery.
+ * @throws InterruptedException if interrupted while waiting
+ * @throws NullPointerException if notification or unit is null.
+ * @throws IllegalArgumentException if timeout is negative.
+ */
+ @Nonnull ListenableFuture<? extends Object> offerNotification(@Nonnull DOMNotification notification,
+ @Nonnegative long timeout, @Nonnull TimeUnit unit) throws InterruptedException;
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.api;
+
+import java.util.Collection;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * A {@link DOMService} which allows its users to subscribe to receive
+ * {@link DOMNotification}s.
+ */
+public interface DOMNotificationService {
+ /**
+ * Register a {@link DOMNotificationListener} to receive a set of notifications. As with
+ * other ListenerRegistration-based interfaces, registering an instance multiple times
+ * results in notifications being delivered for each registration.
+ *
+ * @param listener Notification instance to register
+ * @param types Notification types which should be delivered to the listener. Duplicate
+ * entries are processed only once, null entries are ignored.
+ * @return Registration handle. Invoking {@link DOMNotificationListenerRegistration#close()}
+ * will stop the delivery of notifications to the listener
+ * @throws IllegalArgumentException if types is empty or contains an invalid element, such as
+ * null or a SchemaPath which does not represent a valid {@link DOMNotification} type.
+ * @throws NullPointerException if either of the arguments is null
+ */
+ DOMNotificationListenerRegistration registerNotificationListener(@Nonnull DOMNotificationListener listener, @Nonnull Collection<SchemaPath> types);
+
+ /**
+ * Register a {@link DOMNotificationListener} to receive a set of notifications. As with
+ * other ListenerRegistration-based interfaces, registering an instance multiple times
+ * results in notifications being delivered for each registration.
+ *
+ * @param listener Notification instance to register
+ * @param types Notification types which should be delivered to the listener. Duplicate
+ * entries are processed only once, null entries are ignored.
+ * @return Registration handle. Invoking {@link DOMNotificationListenerRegistration#close()}
+ * will stop the delivery of notifications to the listener
+ * @throws IllegalArgumentException if types is empty or contains an invalid element, such as
+ * null or a SchemaPath which does not represent a valid {@link DOMNotification} type.
+ * @throws NullPointerException if listener is null
+ */
+ // FIXME: Java 8: provide a default implementation of this method.
+ DOMNotificationListenerRegistration registerNotificationListener(@Nonnull DOMNotificationListener listener, SchemaPath... types);
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html.
*/
-
package org.opendaylight.controller.md.sal.dom.api;
+/**
+ * Marker interface for services which can be obtained from a {@link DOMMountPoint}
+ * instance. No further semantics are implied.
+ */
public interface DOMService {
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+
+/**
+ * Utility base class for {@link DOMNotificationListenerRegistration}
+ * implementations.
+ */
+public abstract class AbstractDOMNotificationListenerRegistration extends AbstractListenerRegistration<DOMNotificationListener> implements DOMNotificationListenerRegistration {
+ /**
+ * Default constructor. Subclasses need to invoke it from their
+ * constructor(s).
+ *
+ * @param listener {@link DOMNotificationListener} instance which is
+ * being held by this registration. May not be null.
+ */
+ protected AbstractDOMNotificationListenerRegistration(final @Nonnull DOMNotificationListener listener) {
+ super(listener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+
+/**
+ * Utility implementations of {@link DOMNotificationPublishService} which forwards
+ * all requests to a delegate instance.
+ */
+public abstract class ForwardingDOMNotificationPublishService extends ForwardingObject implements DOMNotificationPublishService {
+ @Override
+ protected abstract DOMNotificationPublishService delegate();
+
+ @Override
+ public ListenableFuture<? extends Object> putNotification(final DOMNotification notification) throws InterruptedException {
+ return delegate().putNotification(notification);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification) {
+ return delegate().offerNotification(notification);
+ }
+
+ @Override
+ public ListenableFuture<? extends Object> offerNotification(final DOMNotification notification, final long timeout,
+ final TimeUnit unit) throws InterruptedException {
+ return delegate().offerNotification(notification, timeout, unit);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.spi;
+
+import com.google.common.collect.ForwardingObject;
+import java.util.Collection;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationListenerRegistration;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+/**
+ * Utility implementation of a {@link DOMNotificationService} which forwards all requests
+ * to a delegate instance.
+ */
+public abstract class ForwardingDOMNotificationService extends ForwardingObject implements DOMNotificationService {
+ @Override
+ protected abstract DOMNotificationService delegate();
+
+ @Override
+ public DOMNotificationListenerRegistration registerNotificationListener(final DOMNotificationListener listener,
+ final Collection<SchemaPath> types) {
+ return delegate().registerNotificationListener(listener, types);
+ }
+
+ @Override
+ public DOMNotificationListenerRegistration registerNotificationListener(final DOMNotificationListener listener,
+ final SchemaPath... types) {
+ return delegate().registerNotificationListener(listener, types);
+ }
+}