From 4a21fa3b9e59736493c1e998b2a80f3d6f94041f Mon Sep 17 00:00:00 2001 From: Dana Kutenicsova Date: Fri, 7 Nov 2014 16:24:17 +0100 Subject: [PATCH] BUG-731 : more warnings down Change-Id: I83598fdb04d4357d3efc6ed26c1d5beac70686c1 Signed-off-by: Dana Kutenicsova --- .../ExtendedCommunitiesAttributeParser.java | 64 ++++--- .../provider/LinkstateTopologyBuilder.java | 114 ++++++------ .../bgp/topology/provider/UriBuilder.java | 19 +- .../bgp/util/BinaryBGPDumpFileParser.java | 5 +- .../bgp/util/HexDumpBGPFileParser.java | 6 +- .../stateful02/Stateful02LspObjectParser.java | 20 +-- .../CInitiated00LspObjectParser.java | 43 +++-- .../Stateful07ErrorMessageParser.java | 16 +- .../Stateful07LSPIdentifierIpv4TlvParser.java | 2 - .../stateful07/Stateful07LspObjectParser.java | 37 ++-- .../impl/AbstractPCEPSessionNegotiator.java | 165 ++++++++--------- .../AbstractPCEPSessionNegotiatorFactory.java | 134 +------------- .../protocol/pcep/impl/PCEPSessionImpl.java | 2 +- .../pcep/impl/PCEPSessionNegotiator.java | 167 ++++++++++++++++++ .../PCEPNotificationMessageParser.java | 32 ++-- .../impl/message/PCEPReplyMessageParser.java | 65 +++---- .../message/PCEPRequestMessageParser.java | 23 ++- .../protocol/pcep/testtool/Main.java | 6 +- .../tunnel/provider/NodeChangedListener.java | 79 +++++---- .../programming/impl/InstructionImpl.java | 156 ++++++++-------- .../impl/ProgrammingServiceImpl.java | 84 ++++----- 21 files changed, 637 insertions(+), 602 deletions(-) create mode 100644 pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionNegotiator.java diff --git a/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/update/ExtendedCommunitiesAttributeParser.java b/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/update/ExtendedCommunitiesAttributeParser.java index bf17438184..ace1cb71b8 100644 --- a/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/update/ExtendedCommunitiesAttributeParser.java +++ b/bgp/parser-impl/src/main/java/org/opendaylight/protocol/bgp/parser/impl/message/update/ExtendedCommunitiesAttributeParser.java @@ -111,22 +111,11 @@ public final class ExtendedCommunitiesAttributeParser implements AttributeParser ExtendedCommunity c = null; switch (comm.getCommType()) { case AS_TYPE_TRANS: - ShortAsNumber as = new ShortAsNumber((long) buffer.readUnsignedShort()); - byte[] value = ByteArray.readBytes(buffer, AS_LOCAL_ADMIN_LENGTH); - if (comm.getCommSubType() == ROUTE_TARGET_SUBTYPE) { - c = new RouteTargetExtendedCommunityCaseBuilder().setRouteTargetExtendedCommunity( - new RouteTargetExtendedCommunityBuilder().setGlobalAdministrator(as).setLocalAdministrator(value).build()).build(); - } else if (comm.getCommSubType() == ROUTE_ORIGIN_SUBTYPE) { - c = new RouteOriginExtendedCommunityCaseBuilder().setRouteOriginExtendedCommunity( - new RouteOriginExtendedCommunityBuilder().setGlobalAdministrator(as).setLocalAdministrator(value).build()).build(); - } else { - c = new AsSpecificExtendedCommunityCaseBuilder().setAsSpecificExtendedCommunity( - new AsSpecificExtendedCommunityBuilder().setTransitive(false).setGlobalAdministrator(as).setLocalAdministrator(value).build()).build(); - } + c = parseAsTransCommunity(comm, buffer); break; case AS_TYPE_NON_TRANS: - as = new ShortAsNumber((long) buffer.readUnsignedShort()); - value = ByteArray.readBytes(buffer, AS_LOCAL_ADMIN_LENGTH); + ShortAsNumber as = new ShortAsNumber((long) buffer.readUnsignedShort()); + byte[] value = ByteArray.readBytes(buffer, AS_LOCAL_ADMIN_LENGTH); c = new AsSpecificExtendedCommunityCaseBuilder().setAsSpecificExtendedCommunity( new AsSpecificExtendedCommunityBuilder().setTransitive(true).setGlobalAdministrator(as).setLocalAdministrator(value).build()).build(); break; @@ -142,20 +131,7 @@ public final class ExtendedCommunitiesAttributeParser implements AttributeParser } break; case INET_TYPE_TRANS: - if (comm.getCommSubType() == ROUTE_TARGET_SUBTYPE) { - c = new RouteTargetExtendedCommunityCaseBuilder().setRouteTargetExtendedCommunity( - new RouteTargetExtendedCommunityBuilder().setGlobalAdministrator(new ShortAsNumber((long) buffer.readUnsignedShort())) - .setLocalAdministrator(ByteArray.readBytes(buffer, AS_LOCAL_ADMIN_LENGTH)).build()).build(); - } else if (comm.getCommSubType() == ROUTE_ORIGIN_SUBTYPE) { - c = new RouteOriginExtendedCommunityCaseBuilder().setRouteOriginExtendedCommunity( - new RouteOriginExtendedCommunityBuilder().setGlobalAdministrator(new ShortAsNumber((long) buffer.readUnsignedShort())) - .setLocalAdministrator(ByteArray.readBytes(buffer, AS_LOCAL_ADMIN_LENGTH)).build()).build(); - } else { - c = new Inet4SpecificExtendedCommunityCaseBuilder().setInet4SpecificExtendedCommunity( - new Inet4SpecificExtendedCommunityBuilder().setTransitive(false).setGlobalAdministrator( - Ipv4Util.addressForByteBuf(buffer)).setLocalAdministrator( - ByteArray.readBytes(buffer, INET_LOCAL_ADMIN_LENGTH)).build()).build(); - } + c = parseInetTypeCommunity(comm, buffer); break; case INET_TYPE_NON_TRANS: c = new Inet4SpecificExtendedCommunityCaseBuilder().setInet4SpecificExtendedCommunity( @@ -175,6 +151,38 @@ public final class ExtendedCommunitiesAttributeParser implements AttributeParser return comm.setExtendedCommunity(c).build(); } + private static ExtendedCommunity parseAsTransCommunity(final ExtendedCommunitiesBuilder comm, final ByteBuf buffer) { + final ShortAsNumber as = new ShortAsNumber((long) buffer.readUnsignedShort()); + final byte[] value = ByteArray.readBytes(buffer, AS_LOCAL_ADMIN_LENGTH); + if (comm.getCommSubType() == ROUTE_TARGET_SUBTYPE) { + return new RouteTargetExtendedCommunityCaseBuilder().setRouteTargetExtendedCommunity( + new RouteTargetExtendedCommunityBuilder().setGlobalAdministrator(as).setLocalAdministrator(value).build()).build(); + } + if (comm.getCommSubType() == ROUTE_ORIGIN_SUBTYPE) { + return new RouteOriginExtendedCommunityCaseBuilder().setRouteOriginExtendedCommunity( + new RouteOriginExtendedCommunityBuilder().setGlobalAdministrator(as).setLocalAdministrator(value).build()).build(); + } + return new AsSpecificExtendedCommunityCaseBuilder().setAsSpecificExtendedCommunity( + new AsSpecificExtendedCommunityBuilder().setTransitive(false).setGlobalAdministrator(as).setLocalAdministrator(value).build()).build(); + } + + private static ExtendedCommunity parseInetTypeCommunity(final ExtendedCommunitiesBuilder comm, final ByteBuf buffer) { + if (comm.getCommSubType() == ROUTE_TARGET_SUBTYPE) { + return new RouteTargetExtendedCommunityCaseBuilder().setRouteTargetExtendedCommunity( + new RouteTargetExtendedCommunityBuilder().setGlobalAdministrator(new ShortAsNumber((long) buffer.readUnsignedShort())) + .setLocalAdministrator(ByteArray.readBytes(buffer, AS_LOCAL_ADMIN_LENGTH)).build()).build(); + } + if (comm.getCommSubType() == ROUTE_ORIGIN_SUBTYPE) { + return new RouteOriginExtendedCommunityCaseBuilder().setRouteOriginExtendedCommunity( + new RouteOriginExtendedCommunityBuilder().setGlobalAdministrator(new ShortAsNumber((long) buffer.readUnsignedShort())) + .setLocalAdministrator(ByteArray.readBytes(buffer, AS_LOCAL_ADMIN_LENGTH)).build()).build(); + } + return new Inet4SpecificExtendedCommunityCaseBuilder().setInet4SpecificExtendedCommunity( + new Inet4SpecificExtendedCommunityBuilder().setTransitive(false).setGlobalAdministrator( + Ipv4Util.addressForByteBuf(buffer)).setLocalAdministrator( + ByteArray.readBytes(buffer, INET_LOCAL_ADMIN_LENGTH)).build()).build(); + } + @Override public void serializeAttribute(final DataObject attribute, final ByteBuf byteAggregator) { Preconditions.checkArgument(attribute instanceof PathAttributes, "Attribute parameter is not a PathAttribute object."); diff --git a/bgp/topology-provider/src/main/java/org/opendaylight/bgpcep/bgp/topology/provider/LinkstateTopologyBuilder.java b/bgp/topology-provider/src/main/java/org/opendaylight/bgpcep/bgp/topology/provider/LinkstateTopologyBuilder.java index 7cf7c0cee8..b7e79cc5e8 100644 --- a/bgp/topology-provider/src/main/java/org/opendaylight/bgpcep/bgp/topology/provider/LinkstateTopologyBuilder.java +++ b/bgp/topology-provider/src/main/java/org/opendaylight/bgpcep/bgp/topology/provider/LinkstateTopologyBuilder.java @@ -583,7 +583,6 @@ public final class LinkstateTopologyBuilder extends AbstractTopologyBuilder ids = new ArrayList<>(); if (na != null) { if (na.getIpv4RouterId() != null) { @@ -636,31 +650,13 @@ public final class LinkstateTopologyBuilder extends AbstractTopologyBuilder= MINIMAL_LENGTH, "Invalid message at index " + start + ", length atribute is lower than " + MINIMAL_LENGTH); diff --git a/bgp/util/src/main/java/org/opendaylight/protocol/bgp/util/HexDumpBGPFileParser.java b/bgp/util/src/main/java/org/opendaylight/protocol/bgp/util/HexDumpBGPFileParser.java index 412e30e646..e7556228f6 100644 --- a/bgp/util/src/main/java/org/opendaylight/protocol/bgp/util/HexDumpBGPFileParser.java +++ b/bgp/util/src/main/java/org/opendaylight/protocol/bgp/util/HexDumpBGPFileParser.java @@ -56,14 +56,16 @@ public final class HexDumpBGPFileParser { public static List parseMessages(final String c) { final String content = clearWhiteSpaceToUpper(c); + final int sixteen = 16; + final int four = 4; // search for 16 FFs final List messages = Lists.newLinkedList(); int idx = content.indexOf(FF_16, 0); while (idx > -1) { // next 2 bytes are length - final int lengthIdx = idx + 16 * 2; - final int messageIdx = lengthIdx + 4; + final int lengthIdx = idx + sixteen * 2; + final int messageIdx = lengthIdx + four; final String hexLength = content.substring(lengthIdx, messageIdx); byte[] byteLength = null; try { diff --git a/pcep/ietf-stateful02/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful02/Stateful02LspObjectParser.java b/pcep/ietf-stateful02/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful02/Stateful02LspObjectParser.java index 8a40b77fbf..0c8efaa3bc 100644 --- a/pcep/ietf-stateful02/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful02/Stateful02LspObjectParser.java +++ b/pcep/ietf-stateful02/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful02/Stateful02LspObjectParser.java @@ -65,7 +65,7 @@ public class Stateful02LspObjectParser extends AbstractObjectWithTlvsParser> TWO_B_OFFSET)))); final BitSet flags = ByteArray.bytesToBitSet(ByteArray.readBytes(bytes, FLAGS_SIZE)); builder.setDelegate(flags.get(DELEGATE_FLAG_OFFSET)); @@ -98,18 +98,18 @@ public class Stateful02LspObjectParser extends AbstractObjectWithTlvsParser> FOUR_BITS_SHIFT)))); final BitSet flags = ByteArray.bytesToBitSet(ByteArray.readBytes(bytes, 2)); builder.setDelegate(flags.get(DELEGATE_FLAG_OFFSET)); @@ -68,34 +71,30 @@ public class CInitiated00LspObjectParser extends Stateful07LspObjectParser { Preconditions.checkArgument(object instanceof Lsp, "Wrong instance of PCEPObject. Passed %s. Needed LspObject.", object.getClass()); final Lsp specObj = (Lsp) object; final ByteBuf body = Unpooled.buffer(); - - final byte[] retBytes = new byte[BODY_LENGTH]; - Preconditions.checkArgument(specObj.getPlspId() != null, "PLSP-ID not present"); - final int lspID = specObj.getPlspId().getValue().intValue(); - retBytes[0] = (byte) (lspID >> TWELVE_BITS_SHIFT); - retBytes[1] = (byte) (lspID >> FOUR_BITS_SHIFT); - retBytes[2] = (byte) (lspID << FOUR_BITS_SHIFT); - if (specObj.isDelegate() != null && specObj.isDelegate()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (DELEGATE_FLAG_OFFSET - Byte.SIZE) - 1); + writeMedium(specObj.getPlspId().getValue().intValue() << FOUR_BITS_SHIFT, body); + final BitSet flags = new BitSet(2 * Byte.SIZE); + if (specObj.isDelegate() != null) { + flags.set(DELEGATE_FLAG_OFFSET, specObj.isDelegate()); } - if (specObj.isRemove() != null && specObj.isRemove()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (REMOVE_FLAG_OFFSET - Byte.SIZE) - 1); + if (specObj.isRemove() != null) { + flags.set(REMOVE_FLAG_OFFSET, specObj.isRemove()); } - if (specObj.isSync() != null && specObj.isSync()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (SYNC_FLAG_OFFSET - Byte.SIZE) - 1); + if (specObj.isSync() != null) { + flags.set(SYNC_FLAG_OFFSET, specObj.isSync()); } - if (specObj.isAdministrative() != null && specObj.isAdministrative()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (ADMINISTRATIVE_FLAG_OFFSET - Byte.SIZE) - 1); + if (specObj.isAdministrative() != null) { + flags.set(ADMINISTRATIVE_FLAG_OFFSET, specObj.isAdministrative()); } - if (specObj.getAugmentation(Lsp1.class) != null && specObj.getAugmentation(Lsp1.class).isCreate()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (CREATE_FLAG_OFFSET - Byte.SIZE) - 1); + if (specObj.getAugmentation(Lsp1.class) != null && specObj.getAugmentation(Lsp1.class).isCreate() != null) { + flags.set(CREATE_FLAG_OFFSET, specObj.getAugmentation(Lsp1.class).isCreate()); } + byte op = 0; if (specObj.getOperational() != null) { - final int op = specObj.getOperational().getIntValue(); - retBytes[FLAGS_INDEX] |= (op & OP_VALUE_BITS_OFFSET) << FOUR_BITS_SHIFT; + op = UnsignedBytes.checkedCast(specObj.getOperational().getIntValue()); + op = (byte) (op << FOUR_BITS_SHIFT); } - body.writeBytes(retBytes); + body.writeByte(ByteArray.bitSetToBytes(flags, 2)[1] | op); serializeTlvs(specObj.getTlvs(), body); ObjectUtil.formatSubobject(TYPE, CLASS, object.isProcessingRule(), object.isIgnore(), body, buffer); } diff --git a/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07ErrorMessageParser.java b/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07ErrorMessageParser.java index 0ce108f0e5..339af2146d 100644 --- a/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07ErrorMessageParser.java +++ b/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07ErrorMessageParser.java @@ -92,21 +92,16 @@ public final class Stateful07ErrorMessageParser extends PCEPErrorMessageParser { if (objects.isEmpty()) { throw new PCEPDeserializerException("Error message is empty."); } - final List requestParameters = new ArrayList<>(); final List srps = new ArrayList<>(); final List errorObjects = new ArrayList<>(); final PcerrMessageBuilder b = new PcerrMessageBuilder(); - - Object obj; + Object obj = objects.get(0); State state = State.Init; - obj = objects.get(0); - if (obj instanceof ErrorObject) { final ErrorObject o = (ErrorObject) obj; errorObjects.add(new ErrorsBuilder().setErrorObject(o).build()); state = State.ErrorIn; - objects.remove(0); } else if (obj instanceof Rp) { final Rp o = (Rp) obj; if (o.isProcessingRule()) { @@ -115,21 +110,19 @@ public final class Stateful07ErrorMessageParser extends PCEPErrorMessageParser { } requestParameters.add(new RpsBuilder().setRp(o).build()); state = State.RpIn; - objects.remove(0); } else if (obj instanceof Srp) { final Srp s = (Srp) obj; srps.add(new SrpsBuilder().setSrp(s).build()); state = State.SrpIn; + } + if (!state.equals(State.Init)) { objects.remove(0); } - while (!objects.isEmpty()) { obj = objects.get(0); - if (obj instanceof UnknownObject) { return new PcerrBuilder().setPcerrMessage(b.setErrors(((UnknownObject) obj).getErrors()).build()).build(); } - switch (state) { case ErrorIn: state = State.Open; @@ -181,11 +174,9 @@ public final class Stateful07ErrorMessageParser extends PCEPErrorMessageParser { objects.remove(0); } } - if (errorObjects.isEmpty()) { throw new PCEPDeserializerException("At least one PCEPErrorObject is mandatory."); } - if (!objects.isEmpty()) { throw new PCEPDeserializerException("Unprocessed Objects: " + objects); } @@ -195,7 +186,6 @@ public final class Stateful07ErrorMessageParser extends PCEPErrorMessageParser { if (!srps.isEmpty()) { b.setErrorType(new StatefulCaseBuilder().setStateful(new StatefulBuilder().setSrps(srps).build()).build()); } - return new PcerrBuilder().setPcerrMessage(b.setErrors(errorObjects).build()).build(); } diff --git a/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07LSPIdentifierIpv4TlvParser.java b/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07LSPIdentifierIpv4TlvParser.java index d15d4a8c63..579888e39f 100644 --- a/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07LSPIdentifierIpv4TlvParser.java +++ b/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07LSPIdentifierIpv4TlvParser.java @@ -39,8 +39,6 @@ public final class Stateful07LSPIdentifierIpv4TlvParser implements TlvParser, Tl public static final int TYPE = 18; - private static final int EX_TUNNEL_ID4_F_LENGTH = 4; - private static final int V4_LENGTH = 16; @Override diff --git a/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07LspObjectParser.java b/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07LspObjectParser.java index 122ac09c89..c4a3ab5638 100644 --- a/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07LspObjectParser.java +++ b/pcep/ietf-stateful07/src/main/java/org/opendaylight/protocol/pcep/ietf/stateful07/Stateful07LspObjectParser.java @@ -7,7 +7,10 @@ */ package org.opendaylight.protocol.pcep.ietf.stateful07; +import static org.opendaylight.protocol.util.ByteBufWriteUtil.writeMedium; + import com.google.common.base.Preconditions; +import com.google.common.primitives.UnsignedBytes; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.BitSet; @@ -74,7 +77,7 @@ public class Stateful07LspObjectParser extends AbstractObjectWithTlvsParser> FOUR_BITS_SHIFT)))); final BitSet flags = ByteArray.bytesToBitSet(ByteArray.readBytes(bytes, 2)); builder.setDelegate(flags.get(DELEGATE_FLAG_OFFSET)); @@ -112,31 +115,27 @@ public class Stateful07LspObjectParser extends AbstractObjectWithTlvsParser> TWELVE_BITS_SHIFT); - retBytes[1] = (byte) (lspID >> FOUR_BITS_SHIFT); - retBytes[2] = (byte) (lspID << FOUR_BITS_SHIFT); - if (specObj.isDelegate() != null && specObj.isDelegate()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (DELEGATE_FLAG_OFFSET - Byte.SIZE) - 1); + writeMedium(specObj.getPlspId().getValue().intValue() << FOUR_BITS_SHIFT, body); + final BitSet flags = new BitSet(2 * Byte.SIZE); + if (specObj.isDelegate() != null) { + flags.set(DELEGATE_FLAG_OFFSET, specObj.isDelegate()); } - if (specObj.isRemove() != null && specObj.isRemove()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (REMOVE_FLAG_OFFSET - Byte.SIZE) - 1); + if (specObj.isRemove() != null) { + flags.set(REMOVE_FLAG_OFFSET, specObj.isRemove()); } - if (specObj.isSync() != null && specObj.isSync()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (SYNC_FLAG_OFFSET - Byte.SIZE) - 1); + if (specObj.isSync() != null) { + flags.set(SYNC_FLAG_OFFSET, specObj.isSync()); } - if (specObj.isAdministrative() != null && specObj.isAdministrative()) { - retBytes[FLAGS_INDEX] |= 1 << (Byte.SIZE - (ADMINISTRATIVE_FLAG_OFFSET - Byte.SIZE) - 1); + if (specObj.isAdministrative() != null) { + flags.set(ADMINISTRATIVE_FLAG_OFFSET, specObj.isAdministrative()); } + byte op = 0; if (specObj.getOperational() != null) { - final int op = specObj.getOperational().getIntValue(); - retBytes[FLAGS_INDEX] |= (op & OP_VALUE_BITS_OFFSET) << FOUR_BITS_SHIFT; + op = UnsignedBytes.checkedCast(specObj.getOperational().getIntValue()); + op = (byte) (op << FOUR_BITS_SHIFT); } - body.writeBytes(retBytes); + body.writeByte(ByteArray.bitSetToBytes(flags, 2)[1] | op); serializeTlvs(specObj.getTlvs(), body); ObjectUtil.formatSubobject(TYPE, CLASS, object.isProcessingRule(), object.isIgnore(), body, buffer); } diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java index b0389977c6..45fc048f22 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiator.java @@ -9,14 +9,11 @@ package org.opendaylight.protocol.pcep.impl; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - import io.netty.channel.Channel; import io.netty.util.concurrent.Promise; - import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.opendaylight.protocol.pcep.spi.PCEPErrors; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive; @@ -172,6 +169,90 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot LOG.info("PCEP session with {} started, sent proposal {}", this.channel, this.localPrefs); } + private boolean handleMessageKeepWait(final Message msg) { + if (msg instanceof Keepalive) { + this.localOK = true; + if (this.remoteOK) { + LOG.info("PCEP peer {} completed negotiation", this.channel); + negotiationSuccessful(createSession(this.channel, this.localPrefs, this.remotePrefs)); + this.state = State.Finished; + } else { + scheduleFailTimer(); + this.state = State.OpenWait; + LOG.debug("Channel {} moved to OpenWait state with localOK=1", this.channel); + } + return true; + } else if (msg instanceof Pcerr) { + final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcerr.message.PcerrMessage err = ((Pcerr) msg).getPcerrMessage(); + if (err.getErrorType() == null) { + final ErrorObject obj = err.getErrors().get(0).getErrorObject(); + LOG.warn("Unexpected error received from PCC: type {} value {}", obj.getType(), obj.getValue()); + negotiationFailed(new IllegalStateException("Unexpected error received from PCC.")); + this.state = State.Idle; + return true; + } + this.localPrefs = getRevisedProposal(((SessionCase) err.getErrorType()).getSession().getOpen()); + if (this.localPrefs == null) { + sendErrorMessage(PCEPErrors.PCERR_NON_ACC_SESSION_CHAR); + negotiationFailed(new IllegalStateException("Peer suggested unacceptable retry proposal")); + this.state = State.Finished; + return true; + } + this.sendMessage(new OpenBuilder().setOpenMessage(new OpenMessageBuilder().setOpen(this.localPrefs).build()).build()); + if (!this.remoteOK) { + this.state = State.OpenWait; + } + scheduleFailTimer(); + return true; + } + return false; + } + + private boolean handleMessageOpenWait(final Message msg) { + if (msg instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Open) { + final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.message.OpenMessage o = ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Open) msg).getOpenMessage(); + final Open open = o.getOpen(); + if (isProposalAcceptable(open)) { + this.sendMessage(KEEPALIVE); + this.remotePrefs = open; + this.remoteOK = true; + if (this.localOK) { + negotiationSuccessful(createSession(this.channel, this.localPrefs, this.remotePrefs)); + LOG.info("PCEP peer {} completed negotiation", this.channel); + this.state = State.Finished; + } else { + scheduleFailTimer(); + this.state = State.KeepWait; + LOG.debug("Channel {} moved to KeepWait state with remoteOK=1", this.channel); + } + return true; + } + + if (this.openRetry) { + sendErrorMessage(PCEPErrors.SECOND_OPEN_MSG); + negotiationFailed(new IllegalStateException("OPEN renegotiation failed")); + this.state = State.Finished; + return true; + } + + final Open newPrefs = getCounterProposal(open); + if (newPrefs == null) { + sendErrorMessage(PCEPErrors.NON_ACC_NON_NEG_SESSION_CHAR); + negotiationFailed(new IllegalStateException("Peer sent unacceptable session parameters")); + this.state = State.Finished; + return true; + } + + this.sendMessage(Util.createErrorMessage(PCEPErrors.NON_ACC_NEG_SESSION_CHAR, newPrefs)); + + this.openRetry = true; + this.state = this.localOK ? State.OpenWait : State.KeepWait; + scheduleFailTimer(); + return true; + } + return false; + } + @Override protected final void handleMessage(final Message msg) { this.failTimer.cancel(false); @@ -183,90 +264,16 @@ public abstract class AbstractPCEPSessionNegotiator extends AbstractSessionNegot case Idle: throw new IllegalStateException("Unexpected handleMessage in state " + this.state); case KeepWait: - if (msg instanceof Keepalive) { - this.localOK = true; - if (this.remoteOK) { - LOG.info("PCEP peer {} completed negotiation", this.channel); - negotiationSuccessful(createSession(this.channel, this.localPrefs, this.remotePrefs)); - this.state = State.Finished; - } else { - scheduleFailTimer(); - this.state = State.OpenWait; - LOG.debug("Channel {} moved to OpenWait state with localOK=1", this.channel); - } - - return; - } else if (msg instanceof Pcerr) { - final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcerr.message.PcerrMessage err = ((Pcerr) msg).getPcerrMessage(); - if (err.getErrorType() == null) { - final ErrorObject obj = err.getErrors().get(0).getErrorObject(); - LOG.warn("Unexpected error received from PCC: type {} value {}", obj.getType(), obj.getValue()); - negotiationFailed(new IllegalStateException("Unexpected error received from PCC.")); - this.state = State.Idle; - return; - } - this.localPrefs = getRevisedProposal(((SessionCase) err.getErrorType()).getSession().getOpen()); - if (this.localPrefs == null) { - sendErrorMessage(PCEPErrors.PCERR_NON_ACC_SESSION_CHAR); - negotiationFailed(new IllegalStateException("Peer suggested unacceptable retry proposal")); - this.state = State.Finished; - return; - } - this.sendMessage(new OpenBuilder().setOpenMessage(new OpenMessageBuilder().setOpen(this.localPrefs).build()).build()); - if (!this.remoteOK) { - this.state = State.OpenWait; - } - scheduleFailTimer(); + if (handleMessageKeepWait(msg)) { return; } - break; case OpenWait: - if (msg instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Open) { - final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.message.OpenMessage o = ((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Open) msg).getOpenMessage(); - final Open open = o.getOpen(); - if (isProposalAcceptable(open)) { - this.sendMessage(KEEPALIVE); - this.remotePrefs = open; - this.remoteOK = true; - if (this.localOK) { - negotiationSuccessful(createSession(this.channel, this.localPrefs, this.remotePrefs)); - LOG.info("PCEP peer {} completed negotiation", this.channel); - this.state = State.Finished; - } else { - scheduleFailTimer(); - this.state = State.KeepWait; - LOG.debug("Channel {} moved to KeepWait state with remoteOK=1", this.channel); - } - return; - } - - if (this.openRetry) { - sendErrorMessage(PCEPErrors.SECOND_OPEN_MSG); - negotiationFailed(new IllegalStateException("OPEN renegotiation failed")); - this.state = State.Finished; - return; - } - - final Open newPrefs = getCounterProposal(open); - if (newPrefs == null) { - sendErrorMessage(PCEPErrors.NON_ACC_NON_NEG_SESSION_CHAR); - negotiationFailed(new IllegalStateException("Peer sent unacceptable session parameters")); - this.state = State.Finished; - return; - } - - this.sendMessage(Util.createErrorMessage(PCEPErrors.NON_ACC_NEG_SESSION_CHAR, newPrefs)); - - this.openRetry = true; - this.state = this.localOK ? State.OpenWait : State.KeepWait; - scheduleFailTimer(); + if (handleMessageOpenWait(msg)) { return; } - break; } - LOG.warn("Channel {} in state {} received unexpected message {}", this.channel, this.state, msg); sendErrorMessage(PCEPErrors.NON_OR_INVALID_OPEN_MSG); negotiationFailed(new Exception("Illegal message encountered")); diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java index fca042dfda..b288df3772 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/AbstractPCEPSessionNegotiatorFactory.java @@ -7,26 +7,8 @@ */ package org.opendaylight.protocol.pcep.impl; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.primitives.UnsignedBytes; - import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.util.concurrent.Promise; - -import java.net.InetSocketAddress; -import java.util.Comparator; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import javax.annotation.concurrent.GuardedBy; - -import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.opendaylight.protocol.framework.SessionListenerFactory; import org.opendaylight.protocol.framework.SessionNegotiator; import org.opendaylight.protocol.framework.SessionNegotiatorFactory; @@ -41,36 +23,8 @@ import org.slf4j.LoggerFactory; */ public abstract class AbstractPCEPSessionNegotiatorFactory implements SessionNegotiatorFactory { - private static final Comparator COMPARATOR = UnsignedBytes.lexicographicalComparator(); - private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class); - - /** - * The total amount of time we should remember a peer having been present, unless some other pressure forces us to - * forget about it due to {@link PEER_CACHE_SIZE}. - */ - private static final long PEER_CACHE_SECONDS = 24 * 3600; - - /** - * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines how many - * peers we can see turn around. - */ - private static final long PEER_CACHE_SIZE = 1024; - - /** - * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse it. - */ - private static final long ID_CACHE_SECONDS = 3 * 3600; - - @GuardedBy("this") - private final BiMap sessions = HashBiMap.create(); - @GuardedBy("this") - private final Cache formerClients = CacheBuilder.newBuilder().expireAfterAccess(PEER_CACHE_SECONDS, - TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build(); - - private interface SessionReference extends AutoCloseable { - Short getSessionId(); - } + private static final Logger LOG = LoggerFactory.getLogger(AbstractPCEPSessionNegotiatorFactory.class); /** * Create a new negotiator. This method needs to be implemented by subclasses to actually provide a negotiator. @@ -87,92 +41,8 @@ public abstract class AbstractPCEPSessionNegotiatorFactory implements public final SessionNegotiator getSessionNegotiator(final SessionListenerFactory factory, final Channel channel, final Promise promise) { - final Object lock = this; - LOG.debug("Instantiating bootstrap negotiator for channel {}", channel); - return new AbstractSessionNegotiator(promise, channel) { - @Override - protected void startNegotiation() throws ExecutionException { - LOG.debug("Bootstrap negotiation for channel {} started", this.channel); - - /* - * We have a chance to see if there's a client session already - * registered for this client. - */ - final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress(); - - synchronized (lock) { - if (AbstractPCEPSessionNegotiatorFactory.this.sessions.containsKey(clientAddress)) { - final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress(); - if (COMPARATOR.compare(serverAddress, clientAddress) > 0) { - final SessionReference n = AbstractPCEPSessionNegotiatorFactory.this.sessions.remove(clientAddress); - try { - n.close(); - } catch (final Exception e) { - LOG.error("Unexpected failure to close old session", e); - } - } else { - negotiationFailed(new IllegalStateException("A conflicting session for address " - + ((InetSocketAddress) this.channel.remoteAddress()).getAddress() + " found.")); - return; - } - } - - final Short sessionId = nextSession(clientAddress); - final AbstractPCEPSessionNegotiator n = createNegotiator(promise, factory.getSessionListener(), this.channel, sessionId); - - AbstractPCEPSessionNegotiatorFactory.this.sessions.put(clientAddress, new SessionReference() { - @Override - public void close() throws ExecutionException { - try { - formerClients.get(clientAddress, new Callable() { - @Override - public PeerRecord call() { - return new PeerRecord(ID_CACHE_SECONDS, getSessionId()); - } - }); - } finally { - channel.close(); - } - } - - @Override - public Short getSessionId() { - return sessionId; - } - }); - - this.channel.closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture future) { - synchronized (lock) { - AbstractPCEPSessionNegotiatorFactory.this.sessions.inverse().remove(this); - } - } - }); - - LOG.info("Replacing bootstrap negotiator for channel {}", this.channel); - this.channel.pipeline().replace(this, "negotiator", n); - n.startNegotiation(); - } - } - - @Override - protected void handleMessage(final Message msg) { - throw new IllegalStateException("Bootstrap negotiator should have been replaced"); - } - }; + return new PCEPSessionNegotiator(channel, promise, factory, this); } - @GuardedBy("this") - private Short nextSession(final byte[] clientAddress) throws ExecutionException { - final PeerRecord peer = formerClients.get(clientAddress, new Callable() { - @Override - public PeerRecord call() { - return new PeerRecord(ID_CACHE_SECONDS, null); - } - }); - - return peer.allocId(); - } } diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java index a8af7ba161..69feacda4f 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java @@ -296,7 +296,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements this.sendErrorMessage(error); if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) { this.unknownMessagesTimes.add(ct); - while (ct - this.unknownMessagesTimes.peek() > 60 * 1E9) { + while (ct - this.unknownMessagesTimes.peek() > TimeUnit.MINUTES.toNanos(1)) { this.unknownMessagesTimes.poll(); } if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) { diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionNegotiator.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionNegotiator.java new file mode 100644 index 0000000000..5fcb0a1171 --- /dev/null +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionNegotiator.java @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.protocol.pcep.impl; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.primitives.UnsignedBytes; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.util.concurrent.Promise; +import java.net.InetSocketAddress; +import java.util.Comparator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import javax.annotation.concurrent.GuardedBy; +import org.opendaylight.protocol.framework.AbstractSessionNegotiator; +import org.opendaylight.protocol.framework.SessionListenerFactory; +import org.opendaylight.protocol.pcep.PCEPSessionListener; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PCEPSessionNegotiator extends AbstractSessionNegotiator { + + private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionNegotiator.class); + + private static final Comparator COMPARATOR = UnsignedBytes.lexicographicalComparator(); + + /** + * The maximum lifetime for which we should hold on to a session ID before assuming it is okay to reuse it. + */ + private static final long ID_CACHE_SECONDS = 3 * 3600; + + /** + * The total amount of time we should remember a peer having been present, unless some other pressure forces us to + * forget about it due to {@link PEER_CACHE_SIZE}. + */ + private static final long PEER_CACHE_SECONDS = 24 * 3600; + + /** + * Maximum total number of peers we keep track of. Combined with {@link PEER_CACHE_SECONDS}, this defines how many + * peers we can see turn around. + */ + private static final long PEER_CACHE_SIZE = 1024; + + @GuardedBy("this") + private final Cache formerClients = CacheBuilder.newBuilder().expireAfterAccess(PEER_CACHE_SECONDS, + TimeUnit.SECONDS).maximumSize(PEER_CACHE_SIZE).build(); + + private final Channel channel; + + private final Promise promise; + + private final SessionListenerFactory factory; + + private final AbstractPCEPSessionNegotiatorFactory negFactory; + + @GuardedBy("this") + private final BiMap sessions = HashBiMap.create(); + + private interface SessionReference extends AutoCloseable { + Short getSessionId(); + } + + public PCEPSessionNegotiator(final Channel channel, final Promise promise, final SessionListenerFactory factory, + final AbstractPCEPSessionNegotiatorFactory negFactory) { + super(promise, channel); + this.channel = channel; + this.promise = promise; + this.factory = factory; + this.negFactory = negFactory; + } + + @Override + protected void startNegotiation() throws ExecutionException { + final Object lock = this; + + LOG.debug("Bootstrap negotiation for channel {} started", this.channel); + + /* + * We have a chance to see if there's a client session already + * registered for this client. + */ + final byte[] clientAddress = ((InetSocketAddress) this.channel.remoteAddress()).getAddress().getAddress(); + + synchronized (lock) { + if (this.sessions.containsKey(clientAddress)) { + final byte[] serverAddress = ((InetSocketAddress) this.channel.localAddress()).getAddress().getAddress(); + if (COMPARATOR.compare(serverAddress, clientAddress) > 0) { + final SessionReference n = this.sessions.remove(clientAddress); + try { + n.close(); + } catch (final Exception e) { + LOG.error("Unexpected failure to close old session", e); + } + } else { + negotiationFailed(new IllegalStateException("A conflicting session for address " + + ((InetSocketAddress) this.channel.remoteAddress()).getAddress() + " found.")); + return; + } + } + + final Short sessionId = nextSession(clientAddress); + final AbstractPCEPSessionNegotiator n = this.negFactory.createNegotiator(this.promise, this.factory.getSessionListener(), this.channel, sessionId); + + this.sessions.put(clientAddress, new SessionReference() { + @Override + public void close() throws ExecutionException { + try { + PCEPSessionNegotiator.this.formerClients.get(clientAddress, new Callable() { + @Override + public PeerRecord call() { + return new PeerRecord(ID_CACHE_SECONDS, getSessionId()); + } + }); + } finally { + PCEPSessionNegotiator.this.channel.close(); + } + } + + @Override + public Short getSessionId() { + return sessionId; + } + }); + + this.channel.closeFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(final ChannelFuture future) { + synchronized (lock) { + PCEPSessionNegotiator.this.sessions.inverse().remove(this); + } + } + }); + + LOG.info("Replacing bootstrap negotiator for channel {}", this.channel); + this.channel.pipeline().replace(this, "negotiator", n); + n.startNegotiation(); + } + } + + @GuardedBy("this") + protected Short nextSession(final byte[] clientAddress) throws ExecutionException { + final PeerRecord peer = this.formerClients.get(clientAddress, new Callable() { + @Override + public PeerRecord call() { + return new PeerRecord(ID_CACHE_SECONDS, null); + } + }); + + return peer.allocId(); + } + + @Override + protected void handleMessage(final Message msg) { + throw new IllegalStateException("Bootstrap negotiator should have been replaced"); + } +} diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPNotificationMessageParser.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPNotificationMessageParser.java index 401f10cb48..db14b67661 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPNotificationMessageParser.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPNotificationMessageParser.java @@ -9,9 +9,9 @@ package org.opendaylight.protocol.pcep.impl.message; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.ArrayList; import java.util.List; import org.opendaylight.protocol.pcep.spi.AbstractMessageParser; import org.opendaylight.protocol.pcep.spi.MessageUtil; @@ -24,7 +24,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.typ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcntfMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.notification.object.CNotification; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.PcntfMessageBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.notifications.Notifications; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.Notifications; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.notifications.NotificationsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.notifications.Rps; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.notifications.RpsBuilder; @@ -44,21 +44,18 @@ public class PCEPNotificationMessageParser extends AbstractMessageParser { @Override public void serializeMessage(final Message message, final ByteBuf out) { Preconditions.checkArgument(message instanceof PcntfMessage, "Wrong instance of Message. Passed instance of %s. Need PcntfMessage.", message.getClass()); - final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.PcntfMessage msg = ((PcntfMessage) message).getPcntfMessage(); - - ByteBuf buffer = Unpooled.buffer(); - for (final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.Notifications n : msg.getNotifications()) { - if (n.getRps() != null && !n.getRps().isEmpty()) { + final ByteBuf buffer = Unpooled.buffer(); + for (final Notifications n : ((PcntfMessage) message).getPcntfMessage().getNotifications()) { + if (n.getRps() != null) { for (final Rps rps : n.getRps()) { serializeObject(rps.getRp(), buffer); } } if (n.getNotifications() == null || n.getNotifications().isEmpty()) { throw new IllegalArgumentException("Message must contain at least one notification object"); - } else { - for (final Notifications not : n.getNotifications()) { - serializeObject(not.getCNotification(), buffer); - } + } + for (final org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.notifications.Notifications not : n.getNotifications()) { + serializeObject(not.getCNotification(), buffer); } } MessageUtil.formatMessage(TYPE, buffer, out); @@ -73,12 +70,10 @@ public class PCEPNotificationMessageParser extends AbstractMessageParser { throw new PCEPDeserializerException("Notification message cannot be empty."); } - final List compositeNotifications = Lists.newArrayList(); + final List compositeNotifications = new ArrayList<>(); while (!objects.isEmpty()) { - org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.Notifications comObj; - comObj = getValidNotificationComposite(objects, errors); - + final Notifications comObj = getValidNotificationComposite(objects, errors); if (comObj == null) { break; } @@ -93,10 +88,9 @@ public class PCEPNotificationMessageParser extends AbstractMessageParser { return new PcntfBuilder().setPcntfMessage(new PcntfMessageBuilder().setNotifications(compositeNotifications).build()).build(); } - private static org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.pcntf.message.pcntf.message.Notifications getValidNotificationComposite( - final List objects, final List errors) { - final List requestParameters = Lists.newArrayList(); - final List notifications = Lists.newArrayList(); + private static Notifications getValidNotificationComposite(final List objects, final List errors) { + final List requestParameters = new ArrayList<>(); + final List notifications = new ArrayList<>(); Object obj; State state = State.Init; diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPReplyMessageParser.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPReplyMessageParser.java index a0d46ec99e..5f82607c3c 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPReplyMessageParser.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPReplyMessageParser.java @@ -9,9 +9,9 @@ package org.opendaylight.protocol.pcep.impl.message; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.ArrayList; import java.util.List; import org.opendaylight.protocol.pcep.spi.AbstractMessageParser; import org.opendaylight.protocol.pcep.spi.MessageUtil; @@ -77,38 +77,39 @@ public class PCEPReplyMessageParser extends AbstractMessageParser { protected void serializeReply(final Replies reply, final ByteBuf buffer) { serializeObject(reply.getRp(), buffer); serializeVendorInformationObjects(reply.getVendorInformationObject(), buffer); - if (reply.getResult() != null) { - if (reply.getResult() instanceof FailureCase) { - final FailureCase f = ((FailureCase) reply.getResult()); - if (f != null) { - serializeObject(f.getNoPath(), buffer); - serializeObject(f.getLspa(), buffer); - serializeObject(f.getBandwidth(), buffer); - if (f.getMetrics() != null && !f.getMetrics().isEmpty()) { - for (final Metrics m : f.getMetrics()) { - serializeObject(m.getMetric(), buffer); - } + if (reply.getResult() == null) { + return; + } + if (reply.getResult() instanceof FailureCase) { + final FailureCase f = ((FailureCase) reply.getResult()); + if (f != null) { + serializeObject(f.getNoPath(), buffer); + serializeObject(f.getLspa(), buffer); + serializeObject(f.getBandwidth(), buffer); + if (f.getMetrics() != null) { + for (final Metrics m : f.getMetrics()) { + serializeObject(m.getMetric(), buffer); } - serializeObject(f.getIro(), buffer); } - } else { - final SuccessCase s = (SuccessCase) reply.getResult(); - if (s != null && s.getSuccess() != null) { - for (final Paths p : s.getSuccess().getPaths()) { - serializeObject(p.getEro(), buffer); - serializeObject(p.getLspa(), buffer); - serializeObject(p.getOf(), buffer); - serializeObject(p.getBandwidth(), buffer); - if (p.getMetrics() != null && !p.getMetrics().isEmpty()) { - for (final Metrics m : p.getMetrics()) { - serializeObject(m.getMetric(), buffer); - } - } - serializeObject(p.getIro(), buffer); + serializeObject(f.getIro(), buffer); + } + return; + } + final SuccessCase s = (SuccessCase) reply.getResult(); + if (s != null && s.getSuccess() != null) { + for (final Paths p : s.getSuccess().getPaths()) { + serializeObject(p.getEro(), buffer); + serializeObject(p.getLspa(), buffer); + serializeObject(p.getOf(), buffer); + serializeObject(p.getBandwidth(), buffer); + if (p.getMetrics() != null) { + for (final Metrics m : p.getMetrics()) { + serializeObject(m.getMetric(), buffer); } - serializeVendorInformationObjects(s.getSuccess().getVendorInformationObject(), buffer); } + serializeObject(p.getIro(), buffer); } + serializeVendorInformationObjects(s.getSuccess().getVendorInformationObject(), buffer); } } @@ -120,7 +121,7 @@ public class PCEPReplyMessageParser extends AbstractMessageParser { if (objects.isEmpty()) { throw new PCEPDeserializerException("Pcrep message cannot be empty."); } - final List replies = Lists.newArrayList(); + final List replies = new ArrayList<>(); while (!objects.isEmpty()) { final Replies r = this.getValidReply(objects, errors); if (r != null) { @@ -156,7 +157,7 @@ public class PCEPReplyMessageParser extends AbstractMessageParser { final Ero ero = (Ero) objects.get(0); objects.remove(0); final SuccessBuilder builder = new SuccessBuilder(); - final List paths = Lists.newArrayList(); + final List paths = new ArrayList<>(); final PathsBuilder pBuilder = new PathsBuilder(); pBuilder.setEro(ero); while (!objects.isEmpty()) { @@ -179,7 +180,7 @@ public class PCEPReplyMessageParser extends AbstractMessageParser { } protected void parseAttributes(final FailureCaseBuilder builder, final List objects) { - final List pathMetrics = Lists.newArrayList(); + final List pathMetrics = new ArrayList<>(); Object obj; State state = State.Init; @@ -228,7 +229,7 @@ public class PCEPReplyMessageParser extends AbstractMessageParser { } protected void parsePath(final PathsBuilder builder, final List objects) { - final List pathMetrics = Lists.newArrayList(); + final List pathMetrics = new ArrayList<>(); Object obj; State state = State.Init; diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPRequestMessageParser.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPRequestMessageParser.java index 4c8926a07e..83b0a1a651 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPRequestMessageParser.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/message/PCEPRequestMessageParser.java @@ -9,9 +9,9 @@ package org.opendaylight.protocol.pcep.impl.message; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.ArrayList; import java.util.List; import org.opendaylight.protocol.pcep.spi.AbstractMessageParser; import org.opendaylight.protocol.pcep.spi.MessageUtil; @@ -134,9 +134,8 @@ public class PCEPRequestMessageParser extends AbstractMessageParser { if (objects == null) { throw new IllegalArgumentException("Passed list can't be null."); } - - final List requests = Lists.newArrayList(); - final List svecList = Lists.newArrayList(); + final List requests = new ArrayList<>(); + final List svecList = new ArrayList<>(); while (!objects.isEmpty()) { final RequestsBuilder rBuilder = new RequestsBuilder(); Rp rpObj = null; @@ -207,8 +206,8 @@ public class PCEPRequestMessageParser extends AbstractMessageParser { protected SegmentComputation getSegmentComputation(final P2pBuilder builder, final List objects, final List errors, final Rp rp) { - final List metrics = Lists.newArrayList(); - final List viObjects = Lists.newArrayList(); + final List metrics = new ArrayList<>(); + final List viObjects = new ArrayList<>(); State state = State.Init; while (!objects.isEmpty() && state != State.End) { @@ -330,16 +329,14 @@ public class PCEPRequestMessageParser extends AbstractMessageParser { if (objects == null || objects.isEmpty()) { throw new IllegalArgumentException("List cannot be null or empty."); } - - if (objects.get(0) instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.svec.object.Svec) { - builder.setSvec((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.svec.object.Svec) objects.get(0)); - objects.remove(0); - } else { + if (!(objects.get(0) instanceof org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.svec.object.Svec)) { return null; } + builder.setSvec((org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.svec.object.Svec) objects.get(0)); + objects.remove(0); - final List metrics = Lists.newArrayList(); - final List viObjects = Lists.newArrayList(); + final List metrics = new ArrayList<>(); + final List viObjects = new ArrayList<>(); Object obj = null; SvecState state = SvecState.Init; diff --git a/pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/Main.java b/pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/Main.java index 5b1f0625b7..663f7a9266 100644 --- a/pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/Main.java +++ b/pcep/testtool/src/main/java/org/opendaylight/protocol/pcep/testtool/Main.java @@ -122,9 +122,9 @@ public final class Main { try (final StatefulActivator activator07 = new StatefulActivator()) { activator07.start(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance()); - final PCEPDispatcherImpl dispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry(), new DefaultPCEPSessionNegotiatorFactory(prefs, 5), new NioEventLoopGroup(), new NioEventLoopGroup()); - - dispatcher.createServer(address, new TestingSessionListenerFactory()).get(); + try (final PCEPDispatcherImpl dispatcher = new PCEPDispatcherImpl(ServiceLoaderPCEPExtensionProviderContext.getSingletonInstance().getMessageHandlerRegistry(), new DefaultPCEPSessionNegotiatorFactory(prefs, 5), new NioEventLoopGroup(), new NioEventLoopGroup())) { + dispatcher.createServer(address, new TestingSessionListenerFactory()).get(); + } } } } diff --git a/pcep/tunnel-provider/src/main/java/org/opendaylight/bgpcep/pcep/tunnel/provider/NodeChangedListener.java b/pcep/tunnel-provider/src/main/java/org/opendaylight/bgpcep/pcep/tunnel/provider/NodeChangedListener.java index 063abbf303..b8f2b0da37 100644 --- a/pcep/tunnel-provider/src/main/java/org/opendaylight/bgpcep/pcep/tunnel/provider/NodeChangedListener.java +++ b/pcep/tunnel-provider/src/main/java/org/opendaylight/bgpcep/pcep/tunnel/provider/NodeChangedListener.java @@ -128,46 +128,45 @@ public final class NodeChangedListener implements DataChangeListener { return snb.build(); } + private void handleSni(final InstanceIdentifier sni, final Node n, final Boolean inControl, final ReadWriteTransaction trans) { + if (sni != null) { + final NodeKey k = InstanceIdentifier.keyOf(sni); + boolean have = false; + /* + * We may have found a termination point which has been created as a destination, + * so it does not have a supporting node pointer. Since we now know what it is, + * fill it in. + */ + if (n.getSupportingNode() != null) { + for (final SupportingNode sn : n.getSupportingNode()) { + if (sn.getNodeRef().equals(k.getNodeId())) { + have = true; + break; + } + } + } + if (!have) { + final SupportingNode sn = createSupportingNode(k.getNodeId(), inControl); + trans.put(LogicalDatastoreType.OPERATIONAL, this.target.child(Node.class, n.getKey()).child( + SupportingNode.class, sn.getKey()), sn); + } + } + } + private InstanceIdentifier getIpTerminationPoint(final ReadWriteTransaction trans, final IpAddress addr, final InstanceIdentifier sni, final Boolean inControl) throws ReadFailedException { final Topology topo = trans.read(LogicalDatastoreType.OPERATIONAL, this.target).checkedGet().get(); - if (topo.getNode() != null && !topo.getNode().isEmpty()) { + if (topo.getNode() != null) { for (final Node n : topo.getNode()) { - if(n.getTerminationPoint() != null && !n.getTerminationPoint().isEmpty()) { + if(n.getTerminationPoint() != null) { for (final TerminationPoint tp : n.getTerminationPoint()) { final TerminationPoint1 tpa = tp.getAugmentation(TerminationPoint1.class); - if (tpa != null) { final TerminationPointType tpt = tpa.getIgpTerminationPointAttributes().getTerminationPointType(); - if (tpt instanceof Ip) { for (final IpAddress a : ((Ip) tpt).getIpAddress()) { if (addr.equals(a)) { - if (sni != null) { - final NodeKey k = InstanceIdentifier.keyOf(sni); - boolean have = false; - - /* - * We may have found a termination point which has been created as a destination, - * so it does not have a supporting node pointer. Since we now know what it is, - * fill it in. - */ - if (n.getSupportingNode() != null) { - for (final SupportingNode sn : n.getSupportingNode()) { - if (sn.getNodeRef().equals(k.getNodeId())) { - have = true; - break; - } - } - } - - if (!have) { - final SupportingNode sn = createSupportingNode(k.getNodeId(), inControl); - - trans.put(LogicalDatastoreType.OPERATIONAL, this.target.child(Node.class, n.getKey()).child( - SupportingNode.class, sn.getKey()), sn); - } - } + handleSni(sni, n, inControl, trans); return this.target.builder().child(Node.class, n.getKey()).child(TerminationPoint.class, tp.getKey()).toInstance(); } } @@ -179,9 +178,12 @@ public final class NodeChangedListener implements DataChangeListener { } } } - LOG.debug("Termination point for {} not found, creating a new one", addr); + return createTP(addr, sni, inControl, trans); + } + private InstanceIdentifier createTP(final IpAddress addr, final InstanceIdentifier sni, + final Boolean inControl, final ReadWriteTransaction trans) { final String url = "ip://" + addr.toString(); final TerminationPointKey tpk = new TerminationPointKey(new TpId(url)); final TerminationPointBuilder tpb = new TerminationPointBuilder(); @@ -197,7 +199,6 @@ public final class NodeChangedListener implements DataChangeListener { if (sni != null) { nb.setSupportingNode(Lists.newArrayList(createSupportingNode(InstanceIdentifier.keyOf(sni).getNodeId(), inControl))); } - final InstanceIdentifier nid = this.target.child(Node.class, nb.getKey()); trans.put(LogicalDatastoreType.OPERATIONAL, nid, nb.build()); return nid.child(TerminationPoint.class, tpb.getKey()); @@ -274,7 +275,7 @@ public final class NodeChangedListener implements DataChangeListener { return; } - final Link l = (Link) ol.get(); + final Link l = ol.get(); LOG.debug("Removing link {} (was {})", li, l); trans.delete(LogicalDatastoreType.OPERATIONAL, li); @@ -282,11 +283,11 @@ public final class NodeChangedListener implements DataChangeListener { final Optional ot = trans.read(LogicalDatastoreType.OPERATIONAL, this.target).checkedGet(); Preconditions.checkState(ot.isPresent()); - final Topology t = (Topology) ot.get(); - NodeId srcNode = l.getSource().getSourceNode(); - NodeId dstNode = l.getDestination().getDestNode(); - TpId srcTp = l.getSource().getSourceTp(); - TpId dstTp = l.getDestination().getDestTp(); + final Topology t = ot.get(); + final NodeId srcNode = l.getSource().getSourceNode(); + final NodeId dstNode = l.getDestination().getDestNode(); + final TpId srcTp = l.getSource().getSourceTp(); + final TpId dstTp = l.getDestination().getDestTp(); boolean orphSrcNode = true, orphDstNode = true, orphDstTp = true, orphSrcTp = true; for (final Link lw : t.getLink()) { @@ -409,14 +410,14 @@ public final class NodeChangedListener implements DataChangeListener { if (oldValue != null) { try { remove(trans, i, oldValue); - } catch (ReadFailedException e) { + } catch (final ReadFailedException e) { LOG.warn("Failed to remove LSP {}", i, e); } } if (newValue != null) { try { create(trans, i, newValue); - } catch (ReadFailedException e) { + } catch (final ReadFailedException e) { LOG.warn("Failed to add LSP {}", i, e); } } diff --git a/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/InstructionImpl.java b/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/InstructionImpl.java index e556a83d63..ba10b987bb 100644 --- a/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/InstructionImpl.java +++ b/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/InstructionImpl.java @@ -11,15 +11,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; - import io.netty.util.Timeout; - import java.util.ArrayList; import java.util.Iterator; import java.util.List; - import javax.annotation.concurrent.GuardedBy; - import org.opendaylight.bgpcep.programming.spi.ExecutionResult; import org.opendaylight.bgpcep.programming.spi.Instruction; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.CancelFailure; @@ -53,7 +49,7 @@ final class InstructionImpl implements Instruction { } InstructionId getId() { - return id; + return this.id; } synchronized InstructionStatus getStatus() { @@ -63,7 +59,7 @@ final class InstructionImpl implements Instruction { synchronized void setStatus(final InstructionStatus status, final Details details) { // Set the status this.status = status; - LOG.debug("Instruction {} transitioned to status {}", id, status); + LOG.debug("Instruction {} transitioned to status {}", this.id, status); // Send out a notification this.queue.instructionUpdated(status, details); @@ -84,54 +80,52 @@ final class InstructionImpl implements Instruction { @GuardedBy("this") private void cancelTimeout() { - if (timeout != null) { - timeout.cancel(); - timeout = null; + if (this.timeout != null) { + this.timeout.cancel(); + this.timeout = null; } } public synchronized void timeout() { - if (timeout != null) { - timeout = null; - - switch (status) { - case Cancelled: - case Failed: - case Successful: - LOG.debug("Instruction {} has status {}, timeout is a no-op", id, status); - break; - case Unknown: - LOG.warn("Instruction {} has status {} before timeout completed", id, status); - break; - case Executing: - LOG.info("Instruction {} timed out while executing, transitioning into Unknown", id); - setStatus(InstructionStatus.Unknown, null); - cancelDependants(); - break; - case Queued: - LOG.debug("Instruction {} timed out while Queued, cancelling it", id); - - final List ids = new ArrayList<>(); - for (final InstructionImpl d : dependencies) { - if (d.getStatus() != InstructionStatus.Successful) { - ids.add(d.getId()); - } + if (this.timeout == null) { + return; + } + this.timeout = null; + switch (this.status) { + case Cancelled: + case Failed: + case Successful: + LOG.debug("Instruction {} has status {}, timeout is a no-op", this.id, this.status); + break; + case Unknown: + LOG.warn("Instruction {} has status {} before timeout completed", this.id, this.status); + break; + case Executing: + LOG.info("Instruction {} timed out while executing, transitioning into Unknown", this.id); + setStatus(InstructionStatus.Unknown, null); + cancelDependants(); + break; + case Queued: + LOG.debug("Instruction {} timed out while Queued, cancelling it", this.id); + final List ids = new ArrayList<>(); + for (final InstructionImpl d : this.dependencies) { + if (d.getStatus() != InstructionStatus.Successful) { + ids.add(d.getId()); } - - cancel(new DetailsBuilder().setUnmetDependencies(ids).build()); - break; - case Scheduled: - LOG.debug("Instruction {} timed out while Scheduled, cancelling it", id); - cancel(heldUpDetails); - break; } + cancel(new DetailsBuilder().setUnmetDependencies(ids).build()); + break; + case Scheduled: + LOG.debug("Instruction {} timed out while Scheduled, cancelling it", this.id); + cancel(this.heldUpDetails); + break; } } @GuardedBy("this") private void cancelDependants() { - final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(id)).build(); - for (final InstructionImpl d : dependants) { + final Details details = new DetailsBuilder().setUnmetDependencies(ImmutableList.of(this.id)).build(); + for (final InstructionImpl d : this.dependants) { d.tryCancel(details); } } @@ -139,18 +133,18 @@ final class InstructionImpl implements Instruction { @GuardedBy("this") private void cancel(final Details details) { cancelTimeout(); - schedulingFuture.cancel(false); + this.schedulingFuture.cancel(false); setStatus(InstructionStatus.Cancelled, details); } synchronized Class tryCancel(final Details details) { - switch (status) { + switch (this.status) { case Cancelled: case Executing: case Failed: case Successful: case Unknown: - LOG.debug("Instruction {} can no longer be cancelled due to status {}", id, status); + LOG.debug("Instruction {} can no longer be cancelled due to status {}", this.id, this.status); return UncancellableInstruction.class; case Queued: case Scheduled: @@ -158,12 +152,12 @@ final class InstructionImpl implements Instruction { return null; } - throw new IllegalStateException("Unhandled instruction state " + status); + throw new IllegalStateException("Unhandled instruction state " + this.status); } @Override public synchronized boolean checkedExecutionStart() { - if (status != InstructionStatus.Scheduled) { + if (this.status != InstructionStatus.Scheduled) { return false; } @@ -173,7 +167,7 @@ final class InstructionImpl implements Instruction { @Override public synchronized boolean executionHeldUp(final Details details) { - if (status != InstructionStatus.Scheduled) { + if (this.status != InstructionStatus.Scheduled) { return false; } @@ -183,61 +177,50 @@ final class InstructionImpl implements Instruction { @Override public synchronized void executionCompleted(final InstructionStatus status, final Details details) { - Preconditions.checkState(executionFuture != null); + Preconditions.checkState(this.executionFuture != null); cancelTimeout(); // We reuse the preconditions set down in this class final ExecutionResult
result = new ExecutionResult
(status, details); setStatus(status, details); - executionFuture.set(result); + this.executionFuture.set(result); } synchronized void addDependant(final InstructionImpl d) { - dependants.add(d); + this.dependants.add(d); } private synchronized void removeDependant(final InstructionImpl d) { - dependants.remove(d); + this.dependants.remove(d); } private synchronized void removeDependency(final InstructionImpl other) { - dependencies.remove(other); + this.dependencies.remove(other); } synchronized Iterator getDependants() { - return dependants.iterator(); + return this.dependants.iterator(); } synchronized void clean() { - for (final Iterator it = dependencies.iterator(); it.hasNext();) { + for (final Iterator it = this.dependencies.iterator(); it.hasNext();) { it.next().removeDependant(this); } - dependencies.clear(); + this.dependencies.clear(); - for (final Iterator it = dependants.iterator(); it.hasNext();) { + for (final Iterator it = this.dependants.iterator(); it.hasNext();) { it.next().removeDependency(this); } - dependants.clear(); + this.dependants.clear(); this.queue.instructionRemoved(); } - synchronized ListenableFuture> ready() { - Preconditions.checkState(status == InstructionStatus.Queued); - Preconditions.checkState(executionFuture == null); - - /* - * Check all vertices we depend on. We start off as ready for - * scheduling. If we encounter a cancelled/failed/unknown - * dependency, we cancel this instruction (and cascade). If we - * encounter an executing/queued/scheduled dependency, we hold - * of scheduling this one. - */ + private Boolean checkDependencies() { boolean ready = true; - final List unmet = new ArrayList<>(); - for (final InstructionImpl d : dependencies) { + for (final InstructionImpl d : this.dependencies) { switch (d.getStatus()) { case Cancelled: case Failed: @@ -254,21 +237,32 @@ final class InstructionImpl implements Instruction { break; } } - if (!unmet.isEmpty()) { - LOG.warn("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", id); + LOG.warn("Instruction {} was Queued, while some dependencies were resolved unsuccessfully, cancelling it", this.id); cancel(new DetailsBuilder().setUnmetDependencies(unmet).build()); return null; } + return ready; + } - if (!ready) { + synchronized ListenableFuture> ready() { + Preconditions.checkState(this.status == InstructionStatus.Queued); + Preconditions.checkState(this.executionFuture == null); + /* + * Check all vertices we depend on. We start off as ready for + * scheduling. If we encounter a cancelled/failed/unknown + * dependency, we cancel this instruction (and cascade). If we + * encounter an executing/queued/scheduled dependency, we hold + * of scheduling this one. + */ + final Boolean ready = checkDependencies(); + if (ready == null || !ready) { return null; } - - LOG.debug("Instruction {} is ready for execution", id); + LOG.debug("Instruction {} is ready for execution", this.id); setStatus(InstructionStatus.Scheduled, null); - executionFuture = SettableFuture.create(); - schedulingFuture.set(this); - return executionFuture; + this.executionFuture = SettableFuture.create(); + this.schedulingFuture.set(this); + return this.executionFuture; } -} \ No newline at end of file +} diff --git a/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java b/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java index fa69d7e501..6103b7afaa 100644 --- a/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java +++ b/programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java @@ -78,34 +78,34 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS private final InstructionBuilder builder = new InstructionBuilder(); InstructionPusher(final InstructionId id, final Nanotime deadline) { - builder.setDeadline(deadline); - builder.setId(id); - builder.setKey(new InstructionKey(id)); - builder.setStatus(InstructionStatus.Queued); + this.builder.setDeadline(deadline); + this.builder.setId(id); + this.builder.setKey(new InstructionKey(id)); + this.builder.setStatus(InstructionStatus.Queued); } @Override public void instructionUpdated(final InstructionStatus status, final Details details) { - if (!status.equals(builder.getStatus())) { - builder.setStatus(status); + if (!status.equals(this.builder.getStatus())) { + this.builder.setStatus(status); - final WriteTransaction t = dataProvider.newWriteOnlyTransaction(); + final WriteTransaction t = ProgrammingServiceImpl.this.dataProvider.newWriteOnlyTransaction(); t.put(LogicalDatastoreType.OPERATIONAL, - qid.child( + ProgrammingServiceImpl.this.qid.child( org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.Instruction.class, - new InstructionKey(builder.getId())), builder.build()); + new InstructionKey(this.builder.getId())), this.builder.build()); t.submit(); } - notifs.publish(new InstructionStatusChangedBuilder().setId(builder.getId()).setStatus(status).setDetails(details).build()); + ProgrammingServiceImpl.this.notifs.publish(new InstructionStatusChangedBuilder().setId(this.builder.getId()).setStatus(status).setDetails(details).build()); } @Override public void instructionRemoved() { - final WriteTransaction t = dataProvider.newWriteOnlyTransaction(); - t.delete(LogicalDatastoreType.OPERATIONAL, qid.child( + final WriteTransaction t = ProgrammingServiceImpl.this.dataProvider.newWriteOnlyTransaction(); + t.delete(LogicalDatastoreType.OPERATIONAL, ProgrammingServiceImpl.this.qid.child( org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev130930.instruction.queue.Instruction.class, - new InstructionKey(builder.getId()))); + new InstructionKey(this.builder.getId()))); t.submit(); } } @@ -116,16 +116,16 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS this.notifs = Preconditions.checkNotNull(notifs); this.executor = Preconditions.checkNotNull(executor); this.timer = Preconditions.checkNotNull(timer); - qid = InstanceIdentifier.builder(InstructionsQueue.class).toInstance(); + this.qid = InstanceIdentifier.builder(InstructionsQueue.class).toInstance(); final ReadWriteTransaction t = dataProvider.newReadWriteTransaction(); try { - Preconditions.checkState(!t.read(LogicalDatastoreType.OPERATIONAL, qid).get().isPresent(), "Conflicting instruction queue found"); + Preconditions.checkState(!t.read(LogicalDatastoreType.OPERATIONAL, this.qid).get().isPresent(), "Conflicting instruction queue found"); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException("Failed to acquire instruction queue", e); } - t.put(LogicalDatastoreType.OPERATIONAL, qid, + t.put(LogicalDatastoreType.OPERATIONAL, this.qid, new InstructionsQueueBuilder().setInstruction( Collections. emptyList()).build()); t.submit(); @@ -204,35 +204,16 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS return SuccessfulRpcResult.create(ob.build()); } - @Override - public synchronized ListenableFuture scheduleInstruction(final SubmitInstructionInput input) throws SchedulerException { - final InstructionId id = input.getId(); - if (this.insns.get(id) != null) { - LOG.info("Instruction ID {} already present", id); - throw new SchedulerException("Instruction ID currently in use", new FailureBuilder().setType(DuplicateInstructionId.class).build()); - } - - // First things first: check the deadline - final Nanotime now = NanotimeUtil.currentTime(); - final BigInteger left = input.getDeadline().getValue().subtract(now.getValue()); - - if (left.compareTo(BigInteger.ZERO) <= 0) { - LOG.debug("Instruction {} deadline has already passed by {}ns", id, left); - throw new SchedulerException("Instruction arrived after specified deadline", new FailureBuilder().setType(DeadOnArrival.class).build()); - } - - // Resolve dependencies + private List checkDependencies(final SubmitInstructionInput input) throws SchedulerException { final List dependencies = new ArrayList<>(); for (final InstructionId pid : input.getPreconditions()) { final InstructionImpl i = this.insns.get(pid); if (i == null) { - LOG.info("Instruction {} depends on {}, which is not a known instruction", id, pid); + LOG.info("Instruction {} depends on {}, which is not a known instruction", input.getId(), pid); throw new SchedulerException("Unknown dependency ID specified", new FailureBuilder().setType(UnknownPreconditionId.class).build()); } - dependencies.add(i); } - // Check if all dependencies are non-failed final List unmet = new ArrayList<>(); for (final InstructionImpl d : dependencies) { @@ -249,7 +230,6 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS break; } } - /* * Some dependencies have failed, declare the request dead-on-arrival * and fail the operation. @@ -258,6 +238,28 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS throw new SchedulerException("Instruction's dependencies are already unsuccessful", new FailureBuilder().setType( DeadOnArrival.class).setFailedPreconditions(unmet).build()); } + return dependencies; + } + + @Override + public synchronized ListenableFuture scheduleInstruction(final SubmitInstructionInput input) throws SchedulerException { + final InstructionId id = input.getId(); + if (this.insns.get(id) != null) { + LOG.info("Instruction ID {} already present", id); + throw new SchedulerException("Instruction ID currently in use", new FailureBuilder().setType(DuplicateInstructionId.class).build()); + } + + // First things first: check the deadline + final Nanotime now = NanotimeUtil.currentTime(); + final BigInteger left = input.getDeadline().getValue().subtract(now.getValue()); + + if (left.compareTo(BigInteger.ZERO) <= 0) { + LOG.debug("Instruction {} deadline has already passed by {}ns", id, left); + throw new SchedulerException("Instruction arrived after specified deadline", new FailureBuilder().setType(DeadOnArrival.class).build()); + } + + // Resolve dependencies + final List dependencies = checkDependencies(input); /* * All pre-flight checks done are at this point, the following @@ -338,12 +340,12 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS @Override public synchronized void close() { try { - for (InstructionImpl i : insns.values()) { + for (final InstructionImpl i : this.insns.values()) { i.tryCancel(null); } } finally { - final WriteTransaction t = dataProvider.newWriteOnlyTransaction(); - t.delete(LogicalDatastoreType.OPERATIONAL, qid); + final WriteTransaction t = this.dataProvider.newWriteOnlyTransaction(); + t.delete(LogicalDatastoreType.OPERATIONAL, this.qid); t.submit(); } } -- 2.36.6