/*
* Copyright (c) 2004 University of Murcia. All rights reserved.
* --------------------------------------------------------------
* For more information, please see .
*/
package org.umu.cops.prpdp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.umu.cops.COPSConnection;
import org.umu.cops.stack.*;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.net.Socket;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Class for managing an provisioning connection at the PDP side.
*/
@ThreadSafe
public class COPSPdpConnection extends COPSConnection {
public final static Logger logger = LoggerFactory.getLogger(COPSPdpConnection.class);
/**
* PEP identifier
* TODO FIXME - Why is this member never being used?
*/
private final COPSPepId _pepId;
/**
* Time of the latest keep-alive sent
* TODO FIXME - Why is this member never being used?
*/
private volatile Date _lastKa;
/**
* Maps a Client Handle to a Handler
*/
protected final Map _managerMap;
/**
* PDP policy data processor class
*/
protected final COPSPdpDataProcess _process;
/**
* Creates a new PDP connection
*
* @param pepId PEP-ID of the connected PEP
* @param sock Socket connected to PEP
* @param process Object for processing policy data
*/
public COPSPdpConnection(final COPSPepId pepId, Socket sock, final COPSPdpDataProcess process) {
this(pepId, sock, process, (short)0, (short)0);
}
/**
* Constructor for this or extended classes
* @param pepId - PEP-ID of the connected PEP
* @param sock - Socket connected to PEP
* @param process - Object for processing policy data
* @param kaTimer - the Keep-alive timer value
* @param acctTimer - the accounting timer value
*/
protected COPSPdpConnection(final COPSPepId pepId, Socket sock, final COPSPdpDataProcess process,
final short kaTimer, final short acctTimer) {
super(sock, kaTimer, acctTimer);
_pepId = pepId;
_process = process;
_lastKa = new Date();
_managerMap = new ConcurrentHashMap<>();
}
public void addStateMan(final COPSHandle handle, final COPSPdpReqStateMan man) {
_managerMap.put(handle, man);
}
/**
* Main loop
*/
public void run () {
Date lastSendKa = new Date();
Date lastRecKa = new Date();
while (!_sock.isClosed()) {
try {
if (_sock.getInputStream().available() != 0) {
processMessage(_sock);
lastRecKa = new Date();
}
// Keep Alive
if (_kaTimer > 0) {
// Timeout at PDP
int _startTime = (int) (lastRecKa.getTime());
int cTime = (int) (new Date().getTime());
if ((cTime - _startTime) > _kaTimer*1000) {
_sock.close();
// Notify all Request State Managers
notifyNoKAAllReqStateMan();
}
// Send to PEP
_startTime = (int) (lastSendKa.getTime());
cTime = (int) (new Date().getTime());
if ((cTime - _startTime) > ((_kaTimer*3/4)*1000)) {
// TODO - what should the real clientType be here???
final COPSKAMsg msg = new COPSKAMsg(null);
COPSTransceiver.sendMsg(msg, _sock);
lastSendKa = new Date();
}
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
logger.warn("Thread interrupted, shutting down", e);
break;
}
} catch (Exception e) {
logger.error("Error while processing socket messages, continue processing", e);
}
}
// connection closed by server
try {
logger.info("Closing socket");
_sock.close();
} catch (IOException e) {
logger.error("Error closing socket", e);
}
// Notify all Request State Managers
try {
logger.info("Notifying state managers that PDP connection is closing");
notifyCloseAllReqStateMan();
} catch (COPSException e) {
logger.error("Error closing state managers");
}
}
/**
* Gets a COPS message from the socket and processes it
* @param conn Socket connected to the PEP
*/
private void processMessage(final Socket conn) throws COPSException, IOException {
final COPSMsg msg = COPSTransceiver.receiveMsg(conn);
switch (msg.getHeader().getOpCode()) {
case CC:
handleClientCloseMsg(conn, (COPSClientCloseMsg)msg);
break;
case KA:
handleKeepAliveMsg(conn, (COPSKAMsg)msg);
break;
case REQ:
handleRequestMsg(conn, (COPSReqMsg)msg);
break;
case RPT:
handleReportMsg(conn, (COPSReportMsg)msg);
break;
case DRQ:
handleDeleteRequestMsg(conn, (COPSDeleteMsg)msg);
break;
case SSC:
handleSyncComplete(conn, (COPSSyncStateMsg)msg);
break;
default:
throw new COPSPdpException("Message not expected (" + msg.getHeader().getOpCode() + ").");
}
}
/**
* Handle Keep Alive Message
* @param conn a Socket
* @param kaMsg a COPSKAMsg
*/
private void handleKeepAliveMsg(final Socket conn, final COPSKAMsg kaMsg) {
try {
// Support
if (kaMsg.getIntegrity() != null) {
logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
}
kaMsg.writeData(conn);
_lastKa = new Date();
} catch (Exception unae) {
logger.error("Unexpected exception while writing COPS data", unae);
}
}
/**
* Handle Delete Request Message
* @param conn a Socket
* @param cMsg a COPSDeleteMsg
*/
private void handleDeleteRequestMsg(final Socket conn, final COPSDeleteMsg cMsg) throws COPSException {
// Support
if (cMsg.getIntegrity() != null) {
logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
}
final COPSPdpReqStateMan man = _managerMap.remove(cMsg.getClientHandle());
if (man == null) {
logger.warn("No state manager found with ID - " + cMsg.getClientHandle().getId().str());
} else {
man.processDeleteRequestState(cMsg);
}
}
/**
* Handle Request Message
* @param conn a Socket
* @param reqMsg a COPSReqMsg
*/
protected void handleRequestMsg(final Socket conn, final COPSReqMsg reqMsg) throws COPSException {
final COPSHeader header = reqMsg.getHeader();
// Support
if (reqMsg.getIntegrity() != null) {
logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
}
final COPSPdpReqStateMan man;
if (_managerMap.get(reqMsg.getClientHandle()) == null) {
man = createStateManager(reqMsg);
_managerMap.put(reqMsg.getClientHandle(), man);
logger.info("createHandler called, clientType=" + header.getClientType() + " msgType=" + ", connId=" +
conn.toString());
} else {
man = _managerMap.get(reqMsg.getClientHandle());
}
man.processRequest(reqMsg);
}
/**
* Returns an instance of a COPSPdpReqStateMan
* @param reqMsg - the request on which to create the state manager
* @return - the state manager
*/
protected COPSPdpReqStateMan createStateManager(final COPSReqMsg reqMsg) {
return new COPSPdpReqStateMan(reqMsg.getHeader().getClientType(), reqMsg.getClientHandle(), _process, _sock);
}
/**
* Handle Report Message
* @param conn a Socket
* @param repMsg a COPSReportMsg
*/
private void handleReportMsg(final Socket conn, final COPSReportMsg repMsg) throws COPSException {
// Support
if (repMsg.getIntegrity() != null) {
logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
}
final COPSPdpReqStateMan man = _managerMap.get(repMsg.getClientHandle());
if (man == null) {
logger.warn("No state manager found with ID - " + repMsg.getClientHandle().getId().str());
} else {
man.processReport(repMsg);
}
}
/**
* Method handleSyncComplete
* @param conn a Socket
* @param cMsg a COPSSyncStateMsg
*/
private void handleSyncComplete(final Socket conn, final COPSSyncStateMsg cMsg) throws COPSException {
// Support
if (cMsg.getIntegrity() != null) {
logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
}
final COPSPdpReqStateMan man = _managerMap.get(cMsg.getClientHandle());
if (man == null) {
logger.warn("No state manager found with ID - " + cMsg.getClientHandle().getId().str());
} else {
man.processSyncComplete(cMsg);
}
}
/**
* Requests a COPS sync from the PEP
* @throws COPSException
* @throws COPSPdpException
*/
public void syncAllRequestState() throws COPSException {
for (final COPSPdpReqStateMan man : _managerMap.values()) {
man.syncRequestState();
}
}
private void notifyCloseAllReqStateMan() throws COPSException {
for (final COPSPdpReqStateMan man : _managerMap.values()) {
man.processClosedConnection(_error);
}
}
private void notifyNoKAAllReqStateMan() throws COPSException {
for (final COPSPdpReqStateMan man : _managerMap.values()) {
man.processNoKAConnection();
}
}
}