2 * Copyright (c) 2004 University of Murcia. All rights reserved.
3 * --------------------------------------------------------------
4 * For more information, please see <http://www.umu.euro6ix.org/>.
7 package org.umu.cops.prpep;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11 import org.umu.cops.stack.*;
13 import java.io.IOException;
14 import java.net.Socket;
16 import java.util.concurrent.ConcurrentHashMap;
19 * COPSPepConnection represents a PEP-PDP Connection Manager.
20 * Responsible for processing messages received from PDP.
22 public class COPSPepConnection implements Runnable {
24 public final static Logger logger = LoggerFactory.getLogger(COPSPepConnection.class);
26 /** Socket connected to PDP */
27 protected Socket _sock;
29 /** Time to wait responses (milliseconds), default is 10 seconds */
30 protected int _responseTime;
32 /** COPS Client-type */
33 protected short _clientType;
36 Accounting timer value (secs)
38 protected short _acctTimer;
41 Keep-alive timer value (secs)
43 protected short _kaTimer;
46 * Time of the latest keep-alive received
48 protected Date _lastRecKa;
51 Opcode of the latest message sent
53 protected byte _lastmessage;
56 Maps a COPS Client Handle to a Request State Manager
58 protected final Map<String, COPSPepReqStateMan> _managerMap;
59 // map < String(COPSHandle), COPSPepReqStateMan>;
62 COPS error returned by PDP
64 protected COPSError _error;
67 * Creates a new PEP connection
68 * @param clientType PEP's client-type
69 * @param sock Socket connected to PDP
71 public COPSPepConnection(short clientType, Socket sock) {
73 _clientType = clientType;
79 _responseTime = 10000;
80 _lastmessage = COPSHeader.COPS_OP_CAT;
82 _managerMap = new ConcurrentHashMap<>();
86 * Gets the response time
87 * @return Response time value (msecs)
89 public int getResponseTime() {
94 * Gets the socket connected to the PDP
95 * @return Socket connected to PDP
97 public Socket getSocket() {
102 * Gets keep-alive timer
103 * @return Keep-alive timer value (secs)
105 public short getKaTimer () {
110 * Gets accounting timer
111 * @return Accounting timer value (secs)
113 public short getAcctTimer () {
118 * Gets all request state managers
119 * @return A <tt>Hashatable</tt> holding all request state managers
121 protected Hashtable getReqStateMans() {
122 return new Hashtable(_managerMap);
126 * Checks whether the socket to the PDP is closed or not
127 * @return <tt>true</tt> if the socket is closed, <tt>false</tt> otherwise
129 public boolean isClosed() {
130 return _sock.isClosed();
136 * @throws java.io.IOException
138 protected void close()
144 * Gets the opcode of the lastest message sent
145 * @return Message opcode
147 public byte getLastmessage() {
153 * @param respTime Response time value (msecs)
155 public void setResponseTime(int respTime) {
156 _responseTime = respTime;
160 * Sets keep-alive timer
161 * @param kaTimer Keep-alive timer value (secs)
163 public void setKaTimer (short kaTimer) {
168 * Sets accounting timer
169 * @param acctTimer Accounting timer value (secs)
171 public void setAcctTimer (short acctTimer) {
172 _acctTimer = acctTimer;
176 * Message-processing loop
179 Date _lastSendKa = new Date();
180 Date _lastSendAcc = new Date();
181 _lastRecKa = new Date();
183 while (!_sock.isClosed()) {
184 if (_sock.getInputStream().available() != 0) {
185 _lastmessage = processMessage(_sock);
186 _lastRecKa = new Date();
192 int _startTime = (int) (_lastRecKa.getTime());
193 int cTime = (int) (new Date().getTime());
195 if ((cTime - _startTime) > _kaTimer*1000) {
197 // Notify all Request State Managers
198 notifyNoKAAllReqStateMan();
202 _startTime = (int) (_lastSendKa.getTime());
203 cTime = (int) (new Date().getTime());
205 if ((cTime - _startTime) > ((_kaTimer*3/4) * 1000)) {
206 COPSHeader hdr = new COPSHeader(COPSHeader.COPS_OP_KA);
207 COPSKAMsg msg = new COPSKAMsg();
211 COPSTransceiver.sendMsg(msg, _sock);
212 _lastSendKa = new Date();
217 if (_acctTimer > 0) {
218 int _startTime = (int) (_lastSendAcc.getTime());
219 int cTime = (int) (new Date().getTime());
221 if ((cTime - _startTime) > ((_acctTimer*3/4)*1000)) {
222 // Notify all Request State Managers
223 notifyAcctAllReqStateMan();
224 _lastSendAcc = new Date();
230 } catch (Exception e) {
231 logger.error("Exception thrown while sleeping", e);
234 } catch (Exception e) {
235 logger.error("Error while processing socket messages", e);
238 // connection closed by server
239 // COPSDebug.out(getClass().getName(),"Connection closed by server");
242 } catch (IOException e) {
243 logger.error("Error closing socket", e);
246 // Notify all Request State Managers
248 notifyCloseAllReqStateMan();
249 } catch (COPSPepException e) {
250 logger.error("Error closing state managers");
255 * Gets a COPS message from the socket and processes it
256 * @param conn Socket connected to the PDP
257 * @return COPS message type
258 * @throws COPSPepException
259 * @throws COPSException
260 * @throws IOException
262 protected byte processMessage(Socket conn)
263 throws COPSPepException, COPSException, IOException {
264 COPSMsg msg = COPSTransceiver.receiveMsg(conn);
266 if (msg.getHeader().isAClientClose()) {
267 handleClientCloseMsg(conn, msg);
268 return COPSHeader.COPS_OP_CC;
269 } else if (msg.getHeader().isADecision()) {
270 handleDecisionMsg(conn, msg);
271 return COPSHeader.COPS_OP_DEC;
272 } else if (msg.getHeader().isASyncStateReq()) {
273 handleSyncStateReqMsg(conn, msg);
274 return COPSHeader.COPS_OP_SSQ;
275 } else if (msg.getHeader().isAKeepAlive()) {
276 handleKeepAliveMsg(conn, msg);
277 return COPSHeader.COPS_OP_KA;
279 throw new COPSPepException("Message not expected (" + msg.getHeader().getOpCode() + ").");
284 * Handle Client Close Message, close the passed connection
286 * @param conn a Socket
287 * @param msg a COPSMsg
290 * <Client-Close> ::= <Common Header>
294 * Not support [<Integrity>]
297 private void handleClientCloseMsg(Socket conn, COPSMsg msg) {
298 COPSClientCloseMsg cMsg = (COPSClientCloseMsg) msg;
299 _error = cMsg.getError();
301 // COPSDebug.out(getClass().getName(),"Got close request, closing connection " +
302 // conn.getInetAddress() + ":" + conn.getPort() + ":[Error " + _error.getDescription() + "]");
306 if (cMsg.getIntegrity() != null) {
307 logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
311 } catch (Exception unae) {
312 logger.error("Unexpected exception closing connection", unae);
319 * @return a COPSError
322 protected COPSError getError() {
327 * Handle Keep Alive Message
329 * <Keep-Alive> ::= <Common Header>
332 * Not support [<Integrity>]
334 * @param conn a Socket
335 * @param msg a COPSMsg
338 private void handleKeepAliveMsg(Socket conn, COPSMsg msg) {
339 COPSKAMsg cMsg = (COPSKAMsg) msg;
341 // COPSDebug.out(getClass().getName(),"Get KAlive Msg");
345 if (cMsg.getIntegrity() != null) {
346 logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
349 // should we do anything else?? ....
351 } catch (Exception unae) {
352 logger.error("Unexpected exception while writing COPS data", unae);
357 * Method handleDecisionMsg
359 * <Decision Message> ::= <Common Header: Flag SOLICITED>
361 * *(<Decision>) | <Error>
363 * <Decision> ::= <Context>
365 * [<Named Decision Data: Provisioning>]
366 * <Decision: Flags> ::= <Command-Code> NULLFlag
367 * <Command-Code> ::= NULLDecision | Install | Remove
368 * <Named Decision Data> ::= <<Install Decision> | <Remove Decision>>
369 * <Install Decision> ::= *(<PRID> <EPD>)
370 * <Remove Decision> ::= *(<PRID> | <PPRID>)
372 * Very important, this is actually being treated like this:
373 * <Install Decision> ::= <PRID> | <EPD>
374 * <Remove Decision> ::= <PRID> | <PPRID>
376 * @param conn a Socket
377 * @param msg a COPSMsg
380 private void handleDecisionMsg(Socket conn, COPSMsg msg)
381 throws COPSPepException {
382 COPSDecisionMsg dMsg = (COPSDecisionMsg) msg;
383 COPSHandle handle = dMsg.getClientHandle();
384 Hashtable decisions = dMsg.getDecisions();
386 for (Enumeration e = decisions.keys() ; e.hasMoreElements() ;) {
388 COPSContext context = (COPSContext) e.nextElement();
389 Vector v = (Vector) decisions.get(context);
391 Enumeration ee = v.elements();
392 if (ee.hasMoreElements()) {
393 COPSDecision decision = (COPSDecision) ee.nextElement();
395 // Get the associated manager
396 COPSPepReqStateMan manager = _managerMap.get(handle.getId().str());
398 logger.warn("Unable to find state manager with key - " + handle.getId().str());
400 // Check message type
401 if (decision.getFlags() == COPSDecision.F_REQSTATE) {
402 if (decision.isRemoveDecision())
403 // Delete Request State
404 manager.processDeleteRequestState(dMsg);
406 // Open new Request State
407 handleOpenNewRequestStateMsg(conn, handle);
410 manager.processDecision(dMsg);
417 * Method handleOpenNewRequestStateMsg
419 * @param conn a Socket
420 * @param handle a COPSHandle
423 private void handleOpenNewRequestStateMsg(Socket conn, COPSHandle handle)
424 throws COPSPepException {
426 COPSPepReqStateMan manager = _managerMap.get(handle.getId().str());
428 logger.warn("Unable to find state manager with key - " + handle.getId().str());
430 manager.processOpenNewRequestState();
434 * Method handleSyncStateReqMsg
436 * <Synchronize State> ::= <Common Header>
440 * @param conn a Socket
441 * @param msg a COPSMsg
444 private void handleSyncStateReqMsg(Socket conn, COPSMsg msg)
445 throws COPSPepException {
446 COPSSyncStateMsg cMsg = (COPSSyncStateMsg) msg;
447 // COPSHandle handle = cMsg.getClientHandle();
448 // COPSHeader header = cMsg.getHeader();
451 if (cMsg.getIntegrity() != null) {
452 logger.warn("Unsupported objects (Integrity) to connection " + conn.getInetAddress());
455 COPSPepReqStateMan manager = (COPSPepReqStateMan) _managerMap.get(cMsg.getClientHandle().getId().str());
456 if (manager == null) {
457 logger.warn("Unable to find state manager with key - " + cMsg.getClientHandle().getId().str());
459 manager.processSyncStateRequest(cMsg);
464 * Method createRequestState
466 * @param clientHandle a String
467 * @param process a COPSPepDataProcess
469 * @return a COPSPepmanager
471 * @throws COPSException
472 * @throws COPSPepException
475 protected COPSPepReqStateMan addRequestState(String clientHandle, COPSPepDataProcess process)
476 throws COPSException, COPSPepException {
477 COPSPepReqStateMan manager = new COPSPepReqStateMan(_clientType,clientHandle);
478 if (_managerMap.get(clientHandle) != null)
479 throw new COPSPepException("Duplicate Handle, rejecting " + clientHandle);
481 manager.setDataProcess(process);
482 _managerMap.put(clientHandle,manager);
483 manager.initRequestState(_sock);
488 * Method deleteRequestState
490 * @param manager a COPSPepReqStateMan
492 * @throws COPSException
493 * @throws COPSPepException
496 protected void deleteRequestState(COPSPepReqStateMan manager)
497 throws COPSException, COPSPepException {
498 manager.finalizeRequestState();
501 private void notifyCloseAllReqStateMan() throws COPSPepException {
502 for (final COPSPepReqStateMan man: _managerMap.values()) {
503 man.processClosedConnection(_error);
507 private void notifyNoKAAllReqStateMan() throws COPSPepException {
508 for (final COPSPepReqStateMan man: _managerMap.values()) {
509 man.processNoKAConnection();
513 private void notifyAcctAllReqStateMan() throws COPSPepException {
514 for (final COPSPepReqStateMan man: _managerMap.values()) {
515 man.processAcctReport();