/*
* Copyright (c) 2004 University of Murcia. All rights reserved.
* --------------------------------------------------------------
* For more information, please see .
*/
package org.umu.cops.prpep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.umu.cops.COPSConnection;
import org.umu.cops.stack.*;
import org.umu.cops.stack.COPSDecision.Command;
import org.umu.cops.stack.COPSDecision.DecisionFlag;
import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.net.Socket;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* COPSPepConnection represents a PEP-PDP Connection Manager.
* Responsible for processing messages received from PDP.
*/
@ThreadSafe
public class COPSPepConnection extends COPSConnection {
private final static Logger logger = LoggerFactory.getLogger(COPSPepConnection.class);
/** Time to wait responses (milliseconds), default is 10 seconds */
protected transient int _responseTime;
/** COPS Client-type */
protected final short _clientType;
/**
Maps a COPS Client Handle to a Request State Manager
*/
protected final Map _managerMap;
/**
* Creates a new PEP connection
* @param clientType PEP's client-type
* @param sock Socket connected to PDP
*/
public COPSPepConnection(final short clientType, final Socket sock) {
super(sock, (short)0, (short)0);
_clientType = clientType;
_responseTime = 10000;
_managerMap = new ConcurrentHashMap<>();
}
/**
* Message-processing loop
*/
public void run () {
Date lastSendKa = new Date();
Date lastSendAcc = new Date();
Date lastRecKa = new Date();
try {
while (!_sock.isClosed()) {
if (_sock.getInputStream().available() != 0) {
processMessage(_sock);
lastRecKa = new Date();
}
// Keep Alive
if (_kaTimer > 0) {
// Timeout at PDP
int _startTime = (int) (lastRecKa.getTime());
int cTime = (int) (new Date().getTime());
if ((cTime - _startTime) > _kaTimer*1000) {
_sock.close();
// Notify all Request State Managers
notifyNoKAAllReqStateMan();
}
// Send to PEP
_startTime = (int) (lastSendKa.getTime());
cTime = (int) (new Date().getTime());
if ((cTime - _startTime) > ((_kaTimer*3/4) * 1000)) {
final COPSKAMsg msg = new COPSKAMsg(null);
COPSTransceiver.sendMsg(msg, _sock);
lastSendKa = new Date();
}
}
// Accounting
if (_acctTimer > 0) {
int _startTime = (int) (lastSendAcc.getTime());
int cTime = (int) (new Date().getTime());
if ((cTime - _startTime) > ((_acctTimer*3/4)*1000)) {
// Notify all Request State Managers
notifyAcctAllReqStateMan();
lastSendAcc = new Date();
}
}
try {
Thread.sleep(500);
} catch (Exception e) {
logger.error("Exception thrown while sleeping", e);
}
}
} catch (Exception e) {
logger.error("Error while processing socket messages", e);
}
// connection closed by server
// COPSDebug.out(getClass().getName(),"Connection closed by server");
try {
_sock.close();
} catch (IOException e) {
logger.error("Error closing socket", e);
}
// Notify all Request State Managers
try {
notifyCloseAllReqStateMan();
} catch (COPSException e) {
logger.error("Error closing state managers");
}
}
/**
* Gets a COPS message from the socket and processes it
* @param conn Socket connected to the PDP
* @throws COPSException
* @throws IOException
*/
protected void processMessage(final Socket conn) throws COPSException, IOException {
final COPSMsg msg = COPSTransceiver.receiveMsg(conn);
switch (msg.getHeader().getOpCode()) {
case CC:
handleClientCloseMsg(conn, (COPSClientCloseMsg)msg);
break;
case DEC:
handleDecisionMsg((COPSDecisionMsg)msg);
break;
case SSQ:
handleSyncStateReqMsg((COPSSyncStateMsg)msg);
break;
case KA:
handleKeepAliveMsg((COPSKAMsg)msg);
break;
default:
throw new COPSPepException("Message not expected (" + msg.getHeader().getOpCode() + ").");
}
}
/**
* Handle Keep Alive Message
* @param cMsg a COPSKAMsg
*/
private void handleKeepAliveMsg(final COPSKAMsg cMsg) {
logger.info("Get KAlive Msg");
try {
// Support
if (cMsg.getIntegrity() != null) {
logger.warn("Unsupported objects (Integrity)");
}
// should we do anything else?? ....
} catch (Exception unae) {
logger.error("Unexpected exception while writing COPS data", unae);
}
}
/**
* Method handleDecisionMsg
* @param dMsg a COPSDecisionMsg
*/
protected void handleDecisionMsg(final COPSDecisionMsg dMsg) throws COPSException {
final COPSHandle handle = dMsg.getClientHandle();
final Map> decisions = dMsg.getDecisions();
for (final Set copsDecisions: decisions.values()) {
for (final COPSDecision decision : copsDecisions) {
// Get the associated manager
final COPSPepReqStateMan manager = _managerMap.get(handle);
if (manager == null) {
logger.warn("Unable to find state manager with key - " + handle);
return;
}
// Check message type
// TODO FIXME - Use of manager object could result in a NPE
if (decision.getFlag().equals(DecisionFlag.REQSTATE)) {
if (decision.getCommand().equals(Command.REMOVE))
// Delete Request State
manager.processDeleteRequestState(dMsg);
else
// Open new Request State
handleOpenNewRequestStateMsg(handle);
} else
// Decision
manager.processDecision(dMsg);
}
}
}
/**
* Method handleOpenNewRequestStateMsg
* @param handle a COPSHandle
*/
private void handleOpenNewRequestStateMsg(final COPSHandle handle) throws COPSPepException {
final COPSPepReqStateMan manager = _managerMap.get(handle);
if (manager == null)
logger.warn("Unable to find state manager with key - " + handle.getId().str());
else
manager.processOpenNewRequestState();
}
/**
* Method handleSyncStateReqMsg
* @param cMsg a COPSSyncStateMsg
*/
private void handleSyncStateReqMsg(final COPSSyncStateMsg cMsg) throws COPSException {
if (cMsg.getIntegrity() != null) {
logger.warn("Unsupported objects (Integrity)");
}
final COPSPepReqStateMan manager = _managerMap.get(cMsg.getClientHandle());
if (manager == null)
logger.warn("Unable to find state manager with key - " + cMsg.getClientHandle().getId().str());
else
manager.processSyncStateRequest(cMsg);
}
/**
* Method createRequestState
* @param clientHandle a String
* @param process a COPSPepDataProcess
* @return a COPSPepmanager
* @throws COPSException
* @throws COPSPepException
*/
public COPSPepReqStateMan addRequestState(final COPSHandle clientHandle, final COPSPepDataProcess process)
throws COPSException {
final COPSPepReqStateMan manager = new COPSPepReqStateMan(_clientType, clientHandle, process, _sock);
if (_managerMap.get(clientHandle) != null)
throw new COPSPepException("Duplicate Handle, rejecting " + clientHandle);
_managerMap.put(clientHandle, manager);
logger.info("Added state manager with key - " + clientHandle);
manager.initRequestState();
return manager;
}
/**
* Method deleteRequestState
* @param manager a COPSPepReqStateMan
* @throws COPSException
*/
public void deleteRequestState(COPSPepReqStateMan manager) throws COPSException {
manager.finalizeRequestState();
}
private void notifyCloseAllReqStateMan() throws COPSException {
for (final COPSPepReqStateMan man: _managerMap.values()) {
man.processClosedConnection(_error);
}
}
private void notifyNoKAAllReqStateMan() throws COPSException {
for (final COPSPepReqStateMan man: _managerMap.values()) {
man.processNoKAConnection();
}
}
private void notifyAcctAllReqStateMan() throws COPSException {
for (final COPSPepReqStateMan man: _managerMap.values()) {
man.processAcctReport();
}
}
}