package org.pcmm.rcd.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Objects;
+import com.google.common.primitives.Bytes;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.Vector;
+import org.pcmm.base.impl.PCMMBaseObject;
+import org.pcmm.gates.IAMID;
+import org.pcmm.gates.IClassifier;
+import org.pcmm.gates.IGateID;
+import org.pcmm.gates.IGateSpec;
import org.pcmm.gates.IGateSpec.Direction;
+import org.pcmm.gates.IGateState;
import org.pcmm.gates.IPCMMError;
import org.pcmm.gates.IPCMMError.ErrorCode;
+import org.pcmm.gates.ISubscriberID;
+import org.pcmm.gates.ITransactionID;
+import org.pcmm.gates.impl.AMID;
+import org.pcmm.gates.impl.DOCSISServiceClassNameTrafficProfile;
import org.pcmm.gates.impl.GateID;
+import org.pcmm.gates.impl.GateSpec;
+import org.pcmm.gates.impl.GateState;
+import org.pcmm.gates.impl.GateTimeInfo;
+import org.pcmm.gates.impl.GateUsageInfo;
import org.pcmm.gates.impl.PCMMError;
import org.pcmm.gates.impl.PCMMGateReq;
+import org.pcmm.gates.impl.TransactionID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.umu.cops.prpep.COPSPepException;
import org.umu.cops.prpep.COPSPepMsgSender;
import org.umu.cops.prpep.COPSPepReqStateMan;
-import org.umu.cops.stack.*;
+import org.umu.cops.stack.COPSClientSI;
+import org.umu.cops.stack.COPSContext;
+import org.umu.cops.stack.COPSData;
+import org.umu.cops.stack.COPSDecision;
import org.umu.cops.stack.COPSDecision.DecisionFlag;
+import org.umu.cops.stack.COPSDecisionMsg;
+import org.umu.cops.stack.COPSException;
+import org.umu.cops.stack.COPSHandle;
+import org.umu.cops.stack.COPSMsgParser;
import org.umu.cops.stack.COPSObjHeader.CNum;
import org.umu.cops.stack.COPSObjHeader.CType;
+import org.umu.cops.stack.COPSReportMsg;
+import org.umu.cops.stack.COPSReportType;
import org.umu.cops.stack.COPSReportType.ReportType;
-import java.io.IOException;
-import java.net.Socket;
-import java.util.*;
-
/**
* PEP State manager implementation for use in a CMTS.
*/
public class CmtsPepReqStateMan extends COPSPepReqStateMan {
- private final static Logger logger = LoggerFactory.getLogger(CmtsPepReqStateMan.class);
+ private static final Logger logger = LoggerFactory.getLogger(CmtsPepReqStateMan.class);
- /**
- * The configured gates
- */
- private final Map<Direction, Set<String>> gateConfig;
+ private final CMTSConfig config;
- /**
- * The connected CMTSs and whether or not they are up
- */
- private final Map<String, Boolean> cmStatus;
+ private final Map<IGateID, GateMetaData> gateStateMap;
- /**
- * Contains the gates that have been set where the key is the gate name and the value is a Set of subIds
- * that are using this gate
- */
- private final Map<String, Set<String>> gatesSetMap;
+ private static class GateMetaData {
+
+ private final PCMMGateReq gateReq;
+ private long commitTime;
+ private long kiloBytesTransmitted;
+
+ private Random random;
+
+ public GateMetaData(final PCMMGateReq gateReq) {
+ this.gateReq = checkNotNull(gateReq);
+ updateCommitTime();
+ kiloBytesTransmitted = 0;
+ this.random = new Random(gateReq.getGateID().getGateID());
+ }
+
+ public long updateCommitTime() {
+ commitTime = System.currentTimeMillis() / 1000L;
+ return commitTime;
+ }
+
+ public PCMMGateReq getGateReq() {
+ return gateReq;
+ }
+
+ public long getCommitTime() {
+ return commitTime;
+ }
+
+ public int getCommitDuration() {
+ return (int)((System.currentTimeMillis() / 1000L) - commitTime);
+ }
+
+ public long updateKiloBytesTransmitted() {
+ kiloBytesTransmitted += random.nextInt(2000);
+ return kiloBytesTransmitted;
+ }
+
+ public long getKiloBytesTransmitted() {
+ return kiloBytesTransmitted;
+ }
+ }
+
+ private static class GateKey {
+ private IAMID amID;
+ private ISubscriberID subscriberID;
+ private IGateID gateID;
+
+ public GateKey(final IAMID amID, final ISubscriberID subscriberID, final IGateID gateID) {
+ this.amID = checkNotNull(amID);
+ this.subscriberID = checkNotNull(subscriberID);
+ this.gateID = checkNotNull(gateID);
+ }
+
+ public IAMID getAmID() {
+ return amID;
+ }
+
+ public ISubscriberID getSubscriberID() {
+ return subscriberID;
+ }
+
+ public IGateID getGateID() {
+ return gateID;
+ }
+
+ public boolean matches(final AMID otherAMID, final ISubscriberID otherSubscriberID) {
+ checkNotNull(otherAMID);
+ checkNotNull(otherSubscriberID);
+
+ return Objects.equal(amID, otherAMID) && Objects.equal(subscriberID, otherSubscriberID);
+
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final GateKey otherGateKey = (GateKey) o;
+ return Objects.equal(amID, otherGateKey.amID) &&
+ Objects.equal(subscriberID, otherGateKey.subscriberID) &&
+ Objects.equal(gateID, otherGateKey.gateID);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(amID, subscriberID, gateID);
+ }
+ }
/**
* Create a State Request Manager
*
- * @param clientType - the client type for this connection
- * @param clientHandle - the client-handle for this connection
- * @param process - the data processor
- * @param socket - the socket connection
- * @param gateConfig - the configured service class names (gates)
+ * @param clientType
+ * - the client type for this connection
+ * @param clientHandle
+ * - the client-handle for this connection
+ * @param process
+ * - the data processor
+ * @param socket
+ * - the socket connection
*/
public CmtsPepReqStateMan(final short clientType, final COPSHandle clientHandle, final CmtsDataProcessor process,
- final Socket socket, final Map<Direction, Set<String>> gateConfig,
- final Map<String, Boolean> cmStatus) {
+ final Socket socket, final CMTSConfig config) {
super(clientType, clientHandle, process, socket, new COPSPepMsgSender(clientType, clientHandle, socket));
- this.gateConfig = Collections.unmodifiableMap(gateConfig);
- this.cmStatus = Collections.unmodifiableMap(cmStatus);
-
- this.gatesSetMap = new HashMap<>();
- for (final Set<String> gateIdSet: gateConfig.values()) {
- for (final String gateId : gateIdSet) {
- gatesSetMap.put(gateId, new HashSet<String>());
- }
- }
+ this.config = checkNotNull(config);
+ gateStateMap = new HashMap<>();
}
@Override
final Map<String, String> removeDecs = new HashMap<>();
final Map<String, String> installDecs = new HashMap<>();
- for (final Set<COPSDecision> copsDecisions: decisions.values()) {
+ for (final Set<COPSDecision> copsDecisions : decisions.values()) {
final COPSDecision cmddecision = copsDecisions.iterator().next();
+ logger.debug("decision command: " + cmddecision.getCommand());
switch (cmddecision.getCommand()) {
case INSTALL:
for (final COPSDecision decision : copsDecisions) {
if (decision.getFlag().equals(DecisionFlag.REQERROR)) {
- logger.info("processing decision");
+ logger.info("processing decision: " + dMsg.getDecSI());
// This is assuming a gate set right or wrong
if (dMsg.getDecisions().size() == 1 && dMsg.getDecSI() != null) {
final PCMMGateReq gateReq = PCMMGateReq.parse(dMsg.getDecSI().getData().getData());
- if (gateReq.getGateSpec() != null) {
+ if (gateReq != null) {
processGateReq(gateReq, _socket);
}
+ else {
+ logger.error("gateReq failed to parse");
+ }
}
}
}
// TODO - Check and/or Set state here
// Gate ADD gateReq.getTrafficProfile() != null
// Gate REMOVE gateReq.getTrafficProfile() == null
- final String subId = gateReq.getSubscriberID().getSourceIPAddress().getHostAddress();
- // Get direction here
+ switch (gateReq.getTransactionID().getGateCommandType()) {
+ case GATE_SET:
+ processGateSet(gateReq, socket);
+ break;
+ case GATE_INFO:
+ processGateInfo(gateReq, socket);
+ break;
+ default:
+ logger.error("Emulator does not support gate command: {}",
+ gateReq.getTransactionID().getGateCommandType());
+ }
+
+ }
+
+
+ private IPCMMError checkForMissingObjects(final PCMMGateReq gateReq) {
+ // In cases where multiple valid alternatives exist for the S-Type of a missing object,
+ // this portion of the Error-Subcode MUST be set to zero.
+
+ if (gateReq.getTransactionID() == null) {
+ final short subCode =
+ COPSMsgParser.bytesToShort(PCMMBaseObject.SNum.TRANSACTION_ID.getValue(), TransactionID.STYPE);
+ return new PCMMError(ErrorCode.MISSING_REQ_OBJ, subCode);
+ }
+
+ final ITransactionID.GateCommandType gateCommand = gateReq.getTransactionID().getGateCommandType();
+
+ if (gateCommand == ITransactionID.GateCommandType.GATE_SET) {
+ // Gate set does not allow gateID
+ if (gateReq.getGateID() != null) {
+ final short subCode = COPSMsgParser.bytesToShort(PCMMBaseObject.SNum.GATE_ID.getValue(), GateID.STYPE);
+ return new PCMMError(ErrorCode.MISSING_REQ_OBJ, subCode);
+ }
+
+ if (gateReq.getTrafficProfile() == null) {
+ final short subCode = COPSMsgParser.bytesToShort(PCMMBaseObject.SNum.TRAFFIC_PROFILE.getValue(), (byte) 0);
+ return new PCMMError(ErrorCode.MISSING_REQ_OBJ, subCode);
+ }
+
+ if (gateReq.getClassifiers() == null || gateReq.getClassifiers().isEmpty()) {
+ final short subCode = COPSMsgParser.bytesToShort(PCMMBaseObject.SNum.CLASSIFIERS.getValue(), (byte) 0);
+ return new PCMMError(ErrorCode.MISSING_REQ_OBJ, subCode);
+ }
+
+ if (gateReq.getGateSpec() == null) {
+ final short subCode = COPSMsgParser.bytesToShort(PCMMBaseObject.SNum.GATE_SPEC.getValue(), GateSpec.STYPE);
+ return new PCMMError(ErrorCode.MISSING_REQ_OBJ, subCode);
+ }
+
+ final IGateSpec gateSpec = gateReq.getGateSpec();
+ if (gateSpec.getDirection() == null) {
+ return new PCMMError(ErrorCode.INVALID_FIELD);
+ }
+
+ }
+ else {
+
+ if (gateReq.getGateID() == null) {
+ final short subCode = COPSMsgParser.bytesToShort(PCMMBaseObject.SNum.GATE_ID.getValue(), GateID.STYPE);
+ return new PCMMError(ErrorCode.MISSING_REQ_OBJ, subCode);
+ }
+ }
+
+ if (gateReq.getAMID() == null) {
+ final short subCode = COPSMsgParser.bytesToShort(PCMMBaseObject.SNum.AMID.getValue(), AMID.STYPE);
+ return new PCMMError(ErrorCode.MISSING_REQ_OBJ, subCode);
+ }
+
+ if (gateReq.getSubscriberID() == null || gateReq.getSubscriberID().getSourceIPAddress() == null
+ || gateReq.getSubscriberID().getSourceIPAddress().getHostAddress() == null) {
+ final short subCode = COPSMsgParser.bytesToShort(PCMMBaseObject.SNum.SUBSCRIBER_ID.getValue(), (byte) 0);
+ return new PCMMError(ErrorCode.MISSING_REQ_OBJ, subCode);
+ }
+
+
+ return null;
+ }
+
+ private IPCMMError checkForInvalidObjects(final PCMMGateReq gateReq) {
+ final ITransactionID.GateCommandType gateCommand = gateReq.getTransactionID().getGateCommandType();
+
+ // GateID
+ if (gateCommand == ITransactionID.GateCommandType.GATE_INFO) {
+ if (!gateStateMap.containsKey(gateReq.getGateID())) {
+ return new PCMMError(ErrorCode.UNK_GATE_ID);
+ }
+ }
+ else {
+ // Traffic profile
+
+ if (gateReq.getTrafficProfile() instanceof DOCSISServiceClassNameTrafficProfile) {
+ final DOCSISServiceClassNameTrafficProfile scnTrafficProfile =
+ (DOCSISServiceClassNameTrafficProfile) gateReq.getTrafficProfile();
+
+ Set<String> directionSCNs;
+ if (gateReq.getGateSpec().getDirection().equals(Direction.DOWNSTREAM)) {
+ directionSCNs = config.getDownstreamServiceClassNames();
+ } else {
+ directionSCNs = config.getUpstreamServiceClassNames();
+ }
+ if (!directionSCNs.contains(scnTrafficProfile.getScnName())) {
+ return new PCMMError(ErrorCode.UNDEF_SCN_NAME);
+ }
+ } else {
+ // TODO remote this after other profiles are supported
+ logger.error("Currently only DOCSIS Service Class Name Traffic Profiles are supported: attempted {}",
+ gateReq.getTrafficProfile().getClass().getName());
+ return new PCMMError(ErrorCode.OTHER_UNSPECIFIED);
+ }
+
+ // number of classifiers
+ if (config.getNumberOfSupportedClassifiers() < gateReq.getClassifiers().size()) {
+ return new PCMMError(ErrorCode.NUM_CLASSIFIERS, config.getNumberOfSupportedClassifiers());
+ }
+ }
+
+
+
+ // SubscriberID
+ String subId = gateReq.getSubscriberID().getSourceIPAddress().getHostAddress();
+ if(!config.getModemStatus().containsKey(subId) || !config.getModemStatus().get(subId)) {
+ return new PCMMError(ErrorCode.INVALID_SUB_ID);
+ }
+
+ // Iff the gate exists
+ if (gateReq.getGateID() != null
+ && gateStateMap.containsKey(gateReq.getGateID())) {
+ GateMetaData existingGate = gateStateMap.get(gateReq.getGateID());
+
+ // Unauthorized AMID - only the AM that created a gate may change it
+ if (!existingGate.getGateReq().getAMID().equals(gateReq.getAMID())) {
+ return new PCMMError(ErrorCode.UNAUTH_AMID);
+ }
+ }
+
+
+
+
+ return null;
+ }
+
+ private IPCMMError getGateError(final PCMMGateReq gateReq) {
+
+ IPCMMError error = null;
+ error = checkForMissingObjects(gateReq);
+ if (error != null) {
+ return error;
+ }
+
+ error = checkForInvalidObjects(gateReq);
+ if (error != null) {
+ return error;
+ }
+
+
+ return null;
+ }
+
+ private void processGateSet(final PCMMGateReq gateReq, final Socket socket) throws COPSException {
+
+ final String subId = gateReq.getSubscriberID().getSourceIPAddress().getHostAddress();
final Direction gateDir = gateReq.getGateSpec().getDirection();
- final Set<String> gateNames = gateConfig.get(gateDir);
- // TODO - Determine if this is the best means to derive the gate name???
- final String gateName = new String(gateReq.getTrafficProfile().getAsBinaryArray());
-
- final IPCMMError error;
- if (subId == null || gateDir == null || gateNames == null) {
- // Missing required object
- // TODO - Determine if this is the correct code. 3 was being used previously and I don't see any corresponding code.
- error = new PCMMError(ErrorCode.UNK_GATE_ID);
- } else if (!cmStatus.keySet().contains(subId)
- || (cmStatus.keySet().contains(subId) && !cmStatus.get(subId))) {
- // Invalid Object
- // TODO - Determine if this code is correct
- error = new PCMMError(ErrorCode.INVALID_SUB_ID);
- } else if (!gateNames.contains(gateName.trim())) {
- // TODO - Determine if this code is correct
- error = new PCMMError(ErrorCode.UNDEF_SCN_NAME);
+
+ final String serviceClassName;
+ if ((gateReq.getTrafficProfile() instanceof DOCSISServiceClassNameTrafficProfile)) {
+ serviceClassName = ((DOCSISServiceClassNameTrafficProfile) gateReq.getTrafficProfile()).getScnName();
} else {
- error = null;
- gatesSetMap.get(gateName.trim()).add(subId);
+ serviceClassName = null;
}
- gateReq.setError(error);
- logger.info("Processing gate request [" + gateName + "] with direction [" + gateDir + ']');
+ final IPCMMError error = getGateError(gateReq);
+ gateReq.setError(error);
- // Get gate name
+ logger.info("Processing gate set request [" + serviceClassName + "] with direction [" + gateDir + ']');
// Set response
+
+ final ITransactionID.GateCommandType gateCommand = (error == null)
+ ? ITransactionID.GateCommandType.GATE_SET_ACK
+ : ITransactionID.GateCommandType.GATE_SET_ERR;
+
+ final TransactionID transactionID =
+ new TransactionID(gateReq.getTransactionID().getTransactionIdentifier(), gateCommand);
+
final List<Byte> data = new ArrayList<>();
- for (final byte val : gateReq.getTransactionID().getAsBinaryArray())
- data.add(val);
- for (final byte val : gateReq.getAMID().getAsBinaryArray())
- data.add(val);
- for (final byte val : gateReq.getSubscriberID().getAsBinaryArray())
- data.add(val);
- if (error != null) for (final byte val : gateReq.getError().getAsBinaryArray())
- data.add(val);
+ addBytesToList(transactionID.getAsBinaryArray(), data);
+ addBytesToList(gateReq.getAMID().getAsBinaryArray(), data);
+ addBytesToList(gateReq.getSubscriberID().getAsBinaryArray(), data);
- // Assign a gate ID
- final GateID gateID = new GateID(UUID.randomUUID().hashCode());
- for (final byte val : gateID.getAsBinaryArray())
- data.add(val);
+ if (error == null) {
+ // Assign a gate ID
+ final GateID gateID = new GateID(UUID.randomUUID().hashCode());
+ for (final byte val : gateID.getAsBinaryArray()) {
+ data.add(val);
+ }
+ gateReq.setGateID(gateID);
+
+ int timeStamp = (int)(System.currentTimeMillis() / 1000L);
+ gateReq.setGateTimeInfo(new GateTimeInfo(timeStamp));
- final byte[] csiArr = new byte[data.size()];
- for (int i = 0; i < data.size(); i++) {
- csiArr[i] = data.get(i);
+ gateStateMap.put(gateID, new GateMetaData(gateReq));
+ }
+ else {
+ addBytesToList(error.getAsBinaryArray(), data);
}
+
+ final byte[] csiArr = Bytes.toArray(data);
final COPSClientSI si = new COPSClientSI(CNum.CSI, CType.DEF, new COPSData(csiArr, 0, csiArr.length));
final ReportType reportType;
- if (gateReq.getError() == null) reportType = ReportType.SUCCESS; else reportType = ReportType.FAILURE;
+ if (gateReq.getError() == null) {
+ reportType = ReportType.SUCCESS;
+ } else {
+ reportType = ReportType.FAILURE;
+ }
- logger.info("Returning " + reportType + " for gate request [" + gateName + "] direction [" + gateDir
+ logger.info("Returning " + reportType + " for gate request [" + serviceClassName + "] direction [" + gateDir
+ "] for host - " + subId);
- final COPSReportMsg reportMsg = new COPSReportMsg(_clientType, getClientHandle(),
- new COPSReportType(reportType), si, null);
+ sendReport(reportType, si, socket);
+
+ }
+
+ private void processGateInfo(final PCMMGateReq gateReq, final Socket socket) throws COPSException {
+ logger.info("GateInfo");
+
+ IPCMMError error = getGateError(gateReq);
+
+ final TransactionID transactionID;
+ final ReportType reportType;
+ if (error != null) {
+ transactionID = new TransactionID(gateReq.getTransactionID().getTransactionIdentifier(),
+ ITransactionID.GateCommandType.GATE_INFO_ERR);
+ reportType = ReportType.FAILURE;
+ }
+ else {
+ transactionID = new TransactionID(gateReq.getTransactionID().getTransactionIdentifier(),
+ ITransactionID.GateCommandType.GATE_INFO_ACK);
+ reportType = ReportType.SUCCESS;
+ }
+
+ final List<Byte> data = new ArrayList<>();
+ addBytesToList(transactionID.getAsBinaryArray(), data);
+ addBytesToList(gateReq.getAMID().getAsBinaryArray(), data);
+ addBytesToList(gateReq.getSubscriberID().getAsBinaryArray(), data);
+ addBytesToList(gateReq.getGateID().getAsBinaryArray(), data);
+
+ if (error != null) {
+ addBytesToList(error.getAsBinaryArray(), data);
+ }
+ else {
+ GateMetaData exisitingGate = gateStateMap.get(gateReq.getGateID());
+
+ addBytesToList(exisitingGate.getGateReq().getGateSpec().getAsBinaryArray(), data);
+
+ for (IClassifier classifier : exisitingGate.getGateReq().getClassifiers()) {
+ addBytesToList(classifier.getAsBinaryArray(), data);
+ }
+
+ addBytesToList(exisitingGate.getGateReq().getTrafficProfile().getAsBinaryArray(), data);
+
+ GateTimeInfo timeInfo = new GateTimeInfo(exisitingGate.getCommitDuration());
+ addBytesToList(timeInfo.getAsBinaryArray(), data);
+
+ GateUsageInfo gateUsageInfo = new GateUsageInfo(exisitingGate.updateKiloBytesTransmitted());
+ addBytesToList(gateUsageInfo.getAsBinaryArray(), data);
+
+ GateState gateState = new GateState(IGateState.GateStateType.COMMITTED,
+ IGateState.GateStateReasonType.OTHER);
+ addBytesToList(gateState.getAsBinaryArray(), data);
+
+ logger.info("Returning " + reportType + " for gate info request on gate " + exisitingGate.getGateReq().getGateID() );
+ }
+
+ final byte[] csiArr = Bytes.toArray(data);
+ COPSClientSI copsClientSI = new COPSClientSI(CNum.CSI, CType.DEF, new COPSData(csiArr, 0, csiArr.length));
+
+ sendReport(reportType, copsClientSI, socket);
+ }
+
+ private void sendReport(ReportType reportType, COPSClientSI copsClientSI, final Socket socket)
+ throws COPSPepException {
+ logger.info("Returning {} for gate request", reportType);
+
+ final COPSReportMsg reportMsg =
+ new COPSReportMsg(_clientType, getClientHandle(), new COPSReportType(reportType), copsClientSI, null);
try {
reportMsg.writeData(socket);
} catch (IOException e) {
throw new COPSPepException("Error writing gate set SUCCESS Report", e);
}
+
+ }
+
+ private static void addBytesToList(byte[] array, List<Byte> list) {
+ checkNotNull(array);
+ checkNotNull(list);
+
+ if (array.length == 0) return;
+
+ // if list supports resizing do so
+ if (list instanceof ArrayList) {
+ ArrayList<Byte> arrayList = (ArrayList<Byte>) list;
+ arrayList.ensureCapacity(list.size() + array.length);
+ }
+ else if (list instanceof Vector){
+ Vector<Byte> vector = (Vector<Byte>) list;
+ vector.ensureCapacity(vector.size() + array.length);
+ }
+
+ // Add all
+ for (byte b : array) {
+ list.add(b);
+ }
}
}