2 * Copyright (c) 2004 University of Murcia. All rights reserved.
3 * --------------------------------------------------------------
4 * For more information, please see <http://www.umu.euro6ix.org/>.
7 package org.umu.cops.prpep;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.umu.cops.stack.*;
12 import org.umu.cops.stack.COPSDecision.Command;
13 import org.umu.cops.stack.COPSDecision.DecisionFlag;
14 import org.umu.cops.stack.COPSHeader.OPCode;
16 import java.io.IOException;
17 import java.net.Socket;
18 import java.util.Date;
21 import java.util.concurrent.ConcurrentHashMap;
24 * COPSPepConnection represents a PEP-PDP Connection Manager.
25 * Responsible for processing messages received from PDP.
27 public class COPSPepConnection implements Runnable {
29 private final static Logger logger = LoggerFactory.getLogger(COPSPepConnection.class);
31 /** Socket connected to PDP */
32 protected Socket _sock;
34 /** Time to wait responses (milliseconds), default is 10 seconds */
35 protected int _responseTime;
37 /** COPS Client-type */
38 protected short _clientType;
41 Accounting timer value (secs)
43 protected short _acctTimer;
46 Keep-alive timer value (secs)
48 protected short _kaTimer;
51 * Time of the latest keep-alive received
53 protected Date _lastRecKa;
56 Maps a COPS Client Handle to a Request State Manager
58 protected final Map<COPSHandle, COPSPepReqStateMan> _managerMap;
61 COPS error returned by PDP
63 protected COPSError _error;
66 * Creates a new PEP connection
67 * @param clientType PEP's client-type
68 * @param sock Socket connected to PDP
70 public COPSPepConnection(final short clientType, final Socket sock) {
71 _clientType = clientType;
77 _responseTime = 10000;
78 _managerMap = new ConcurrentHashMap<>();
82 * Gets the response time
83 * @return Response time value (msecs)
85 public int getResponseTime() {
90 * Gets the socket connected to the PDP
91 * @return Socket connected to PDP
93 public Socket getSocket() {
98 * Gets keep-alive timer
99 * @return Keep-alive timer value (secs)
101 public short getKaTimer () {
106 * Gets accounting timer
107 * @return Accounting timer value (secs)
109 public short getAcctTimer () {
114 * Checks whether the socket to the PDP is closed or not
115 * @return <tt>true</tt> if the socket is closed, <tt>false</tt> otherwise
117 public boolean isClosed() {
118 return _sock.isClosed();
124 * @throws java.io.IOException
126 protected void close()
133 * @param respTime Response time value (msecs)
135 public void setResponseTime(int respTime) {
136 _responseTime = respTime;
140 * Sets keep-alive timer
141 * @param kaTimer Keep-alive timer value (secs)
143 public void setKaTimer (short kaTimer) {
148 * Sets accounting timer
149 * @param acctTimer Accounting timer value (secs)
151 public void setAcctTimer (short acctTimer) {
152 _acctTimer = acctTimer;
156 * Message-processing loop
159 Date _lastSendKa = new Date();
160 Date _lastSendAcc = new Date();
161 _lastRecKa = new Date();
163 while (!_sock.isClosed()) {
164 if (_sock.getInputStream().available() != 0) {
165 processMessage(_sock);
166 _lastRecKa = new Date();
172 int _startTime = (int) (_lastRecKa.getTime());
173 int cTime = (int) (new Date().getTime());
175 if ((cTime - _startTime) > _kaTimer*1000) {
177 // Notify all Request State Managers
178 notifyNoKAAllReqStateMan();
182 _startTime = (int) (_lastSendKa.getTime());
183 cTime = (int) (new Date().getTime());
185 if ((cTime - _startTime) > ((_kaTimer*3/4) * 1000)) {
186 final COPSKAMsg msg = new COPSKAMsg(null);
187 COPSTransceiver.sendMsg(msg, _sock);
188 _lastSendKa = new Date();
193 if (_acctTimer > 0) {
194 int _startTime = (int) (_lastSendAcc.getTime());
195 int cTime = (int) (new Date().getTime());
197 if ((cTime - _startTime) > ((_acctTimer*3/4)*1000)) {
198 // Notify all Request State Managers
199 notifyAcctAllReqStateMan();
200 _lastSendAcc = new Date();
206 } catch (Exception e) {
207 logger.error("Exception thrown while sleeping", e);
210 } catch (Exception e) {
211 logger.error("Error while processing socket messages", e);
214 // connection closed by server
215 // COPSDebug.out(getClass().getName(),"Connection closed by server");
218 } catch (IOException e) {
219 logger.error("Error closing socket", e);
222 // Notify all Request State Managers
224 notifyCloseAllReqStateMan();
225 } catch (COPSPepException e) {
226 logger.error("Error closing state managers");
231 * Gets a COPS message from the socket and processes it
232 * @param conn Socket connected to the PDP
233 * @return COPS message type
234 * @throws COPSPepException
235 * @throws COPSException
236 * @throws IOException
238 protected byte processMessage(final Socket conn) throws COPSException, IOException {
239 final COPSMsg msg = COPSTransceiver.receiveMsg(conn);
241 switch (msg.getHeader().getOpCode()) {
243 handleClientCloseMsg(conn, (COPSClientCloseMsg)msg);
244 return (byte)OPCode.CC.ordinal();
246 handleDecisionMsg((COPSDecisionMsg)msg);
247 return (byte)OPCode.DEC.ordinal();
249 handleSyncStateReqMsg((COPSSyncStateMsg)msg);
250 return (byte)OPCode.SSQ.ordinal();
252 handleKeepAliveMsg((COPSKAMsg)msg);
253 return (byte)OPCode.KA.ordinal();
255 throw new COPSPepException("Message not expected (" + msg.getHeader().getOpCode() + ").");
260 * Handle Client Close Message, close the passed connection
261 * @param conn a Socket
262 * @param cMsg a COPSClientCloseMsg
264 private void handleClientCloseMsg(final Socket conn, final COPSClientCloseMsg cMsg) {
265 _error = cMsg.getError();
266 logger.info("Got close request, closing connection "
267 + conn.getInetAddress() + ":" + conn.getPort() + ":[Error " + _error.getDescription() + "]");
270 if (cMsg.getIntegrity() != null) {
271 logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
274 } catch (Exception unae) {
275 logger.error("Unexpected exception closing connection", unae);
281 * @return a COPSError
283 protected COPSError getError() {
288 * Handle Keep Alive Message
289 * @param cMsg a COPSKAMsg
291 private void handleKeepAliveMsg(final COPSKAMsg cMsg) {
292 logger.info("Get KAlive Msg");
295 if (cMsg.getIntegrity() != null) {
296 logger.warn("Unsupported objects (Integrity)");
299 // should we do anything else?? ....
301 } catch (Exception unae) {
302 logger.error("Unexpected exception while writing COPS data", unae);
307 * Method handleDecisionMsg
308 * @param dMsg a COPSDecisionMsg
310 private void handleDecisionMsg(final COPSDecisionMsg dMsg) throws COPSException {
311 final COPSHandle handle = dMsg.getClientHandle();
312 final Map<COPSContext, Set<COPSDecision>> decisions = dMsg.getDecisions();
314 for (final Set<COPSDecision> copsDecisions: decisions.values()) {
315 for (final COPSDecision decision : copsDecisions) {
316 // Get the associated manager
317 final COPSPepReqStateMan manager = _managerMap.get(handle);
318 if (manager == null) {
319 logger.warn("Unable to find state manager with key - " + handle);
323 // Check message type
324 // TODO FIXME - Use of manager object could result in a NPE
325 if (decision.getFlag().equals(DecisionFlag.REQSTATE)) {
326 if (decision.getCommand().equals(Command.REMOVE))
327 // Delete Request State
328 manager.processDeleteRequestState(dMsg);
330 // Open new Request State
331 handleOpenNewRequestStateMsg(handle);
334 manager.processDecision(dMsg, _sock);
341 * Method handleOpenNewRequestStateMsg
342 * @param handle a COPSHandle
344 private void handleOpenNewRequestStateMsg(final COPSHandle handle) throws COPSPepException {
345 final COPSPepReqStateMan manager = _managerMap.get(handle);
347 logger.warn("Unable to find state manager with key - " + handle.getId().str());
349 manager.processOpenNewRequestState();
353 * Method handleSyncStateReqMsg
354 * @param cMsg a COPSSyncStateMsg
356 private void handleSyncStateReqMsg(final COPSSyncStateMsg cMsg) throws COPSPepException {
357 if (cMsg.getIntegrity() != null) {
358 logger.warn("Unsupported objects (Integrity)");
361 final COPSPepReqStateMan manager = _managerMap.get(cMsg.getClientHandle());
363 logger.warn("Unable to find state manager with key - " + cMsg.getClientHandle().getId().str());
365 manager.processSyncStateRequest(cMsg);
369 * Method createRequestState
370 * @param clientHandle a String
371 * @param process a COPSPepDataProcess
372 * @return a COPSPepmanager
373 * @throws COPSException
374 * @throws COPSPepException
376 protected COPSPepReqStateMan addRequestState(final COPSHandle clientHandle, final COPSPepDataProcess process)
377 throws COPSException {
378 final COPSPepReqStateMan manager = new COPSPepReqStateMan(_clientType, clientHandle, process);
379 if (_managerMap.get(clientHandle) != null)
380 throw new COPSPepException("Duplicate Handle, rejecting " + clientHandle);
382 _managerMap.put(clientHandle, manager);
383 logger.info("Added state manager with key - " + clientHandle);
384 manager.initRequestState(_sock);
389 * Method deleteRequestState
390 * @param manager a COPSPepReqStateMan
391 * @throws COPSException
392 * @throws COPSPepException
394 protected void deleteRequestState(COPSPepReqStateMan manager) throws COPSException {
395 manager.finalizeRequestState();
398 private void notifyCloseAllReqStateMan() throws COPSPepException {
399 for (final COPSPepReqStateMan man: _managerMap.values()) {
400 man.processClosedConnection(_error);
404 private void notifyNoKAAllReqStateMan() throws COPSPepException {
405 for (final COPSPepReqStateMan man: _managerMap.values()) {
406 man.processNoKAConnection();
410 private void notifyAcctAllReqStateMan() throws COPSPepException {
411 for (final COPSPepReqStateMan man: _managerMap.values()) {
412 man.processAcctReport();