00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 #include <sys/socket.h> 
00022 #include <arpa/inet.h>  
00023 #include <string.h>     
00024 #include <unistd.h>     
00025 #include <errno.h>
00026 #include <fcntl.h>
00027 
00028 #include <vector>
00029 
00030 #include "TcpServer.h"
00031 #include "Error.h"
00032 #include "Logger.h"
00033 
00034 #include "IoStream.h"
00035 
00036 using namespace std;
00037 
00038 namespace mdw
00039 {
00040 namespace
00041 {
00042 const int MaxBackLog = 20;
00043 const int MaxConnection = 1000;
00044 const int MaxReadSize = 65536;
00045 }
00046 
00047 TcpServer::TcpServer (short port, ObjectPool<vector<char> > &pool)
00048 {
00049     _pool = &pool;
00050     init (port);
00051 }
00052 
00053 TcpServer::TcpServer (short port)
00054 {
00055     _pool = 0;
00056     init (port);
00057 }
00058 
00059 void TcpServer::init (short port)
00060 {
00061     _connectedSocket.resize (MaxConnection);
00062     struct sockaddr_in localAddr; 
00063 
00064     
00065     if ( (_servSock = socket (PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
00066     {
00067         ERROR_ERRNO ("socket() failed", errno);
00068     }
00069     fcntl (_servSock, F_SETFL, O_NONBLOCK);
00070 
00071     int yes = 1;
00072     if (setsockopt (_servSock, SOL_SOCKET, SO_REUSEADDR, (char *) &yes, sizeof (yes)) < 0)
00073     {
00074         ::close (_servSock);
00075         ERROR_ERRNO ("setsockopt() failed", errno);
00076     }
00077 
00078     
00079     memset (&localAddr, 0, sizeof (localAddr)); 
00080     localAddr.sin_family = AF_INET;                
00081     localAddr.sin_addr.s_addr = htonl (INADDR_ANY); 
00082     localAddr.sin_port = htons (port);             
00083 
00084     
00085     if (bind (_servSock, (struct sockaddr *) &localAddr, sizeof (localAddr)) < 0)
00086     {
00087         ::close (_servSock);
00088         ERROR_ERRNO ("bind() failed", errno);
00089     }
00090 
00091     
00092     if (listen (_servSock, MaxBackLog) < 0)
00093     {
00094         close (_servSock);
00095         ERROR_ERRNO ("listen() failed", errno);
00096     }
00097 
00098     _maxSelectFd = _servSock + 1;
00099 }
00100 
00101 Stream &TcpServer::outStream (int streamId)
00102 {
00103     return _connectedSocket[streamId].stream;
00104 }
00105 
00106 TcpServer::~TcpServer()
00107 {
00108     close (_servSock);
00109     for (int i = 0; i < _maxSelectFd; ++i)
00110     {
00111         if (_connectedSocket[i].connected)
00112         {
00113             close (i);
00114         }
00115     }
00116 }
00117 
00118 void TcpServer::accept (StreamServerVisitor &visitor, int timeOutSec, int timeOutMicro)
00119 {
00120     struct timeval tv;
00121     tv.tv_sec = timeOutSec;
00122     tv.tv_usec = timeOutMicro;
00123     fd_set rfds;
00124     fd_set efds;
00125     FD_ZERO (&rfds);
00126     FD_ZERO (&efds);
00127     FD_SET (_servSock, &rfds);
00128 
00129     for (int i = 0; i < _maxSelectFd; ++i)
00130     {
00131         if (_connectedSocket[i].connected)
00132         {
00133             FD_SET (i, &rfds);
00134             FD_SET (i, &efds);
00135         }
00136     }
00137 
00138     int retval = select (_maxSelectFd, &rfds, NULL, &efds, &tv);
00139  
00140     if (retval == -1)
00141     {
00142         if (errno != EINTR)
00143         {
00144             ERROR_ERRNO ("select()", errno);
00145         }
00146         else
00147         {
00148             
00149             return;
00150         }
00151     }
00152     else if (retval == 0)
00153     {
00154         
00155         return;
00156     }
00157 
00158     if (FD_ISSET (_servSock, &rfds))
00159     {
00160         
00161         struct sockaddr_in clientAddr;
00162         socklen_t size = sizeof (clientAddr);
00163         int newConnection = ::accept (_servSock, (struct sockaddr *) & clientAddr, &size);
00164         fcntl (newConnection, F_SETFL, O_NONBLOCK);
00165         if (newConnection >= 0)
00166         {
00167             
00168             string ip (inet_ntoa (clientAddr.sin_addr));
00169             _connectedSocket[newConnection].connected = true;
00170             _connectedSocket[newConnection].stream.reset (*this, newConnection);
00171             _connectedSocket[newConnection].info.setIp (ip);
00172             _connectedSocket[newConnection].info.setStreamId (newConnection);
00173             _maxSelectFd = max (_maxSelectFd, newConnection + 1);
00174             visitor.visitConnect (_connectedSocket[newConnection].info);
00175         }
00176     }
00177 
00178     for (int i = 0; i < _maxSelectFd; ++i)
00179     {
00180         if (_connectedSocket[i].connected && (FD_ISSET (i, &rfds) || FD_ISSET (i, &efds)))
00181         {
00182             
00183             static vector<char> buffer;
00184 
00185             vector<char> * bufferToUse;
00186             if (_pool == 0)
00187             {
00188                 bufferToUse = &buffer;
00189             }
00190             else
00191             {
00192                 bufferToUse = & _pool->obtain();
00193             }
00194             bufferToUse->reserve (MaxReadSize);
00195             int res = read (i, & (*bufferToUse->begin()), MaxReadSize);
00196             if (res < 0)
00197             {
00198                 if (errno == EAGAIN)
00199                 {
00200                     
00201                 }
00202                 else
00203                 {
00204                     
00205                     LOG_ERRNO ("Read error", errno);
00206                     
00207                     _connectedSocket[i].connected = false;
00208                     close (i);
00209                     visitor.visitDisconnect (_connectedSocket[i].info);
00210                 }
00211             }
00212             else if (res == 0)
00213             {
00214                 
00215                 
00216                 _connectedSocket[i].connected = false;
00217                 close (i);
00218                 visitor.visitDisconnect (_connectedSocket[i].info);
00219             }
00220             else
00221             {
00222                 
00223                 visitor.visitRead (&*bufferToUse->begin(), res, _connectedSocket[i].info);
00224             }
00225         }
00226     }
00227 }
00228 
00229 void TcpServer::write (const char* buffer, int size, int socketId)
00230 {
00231     if (_connectedSocket[socketId].connected)
00232     {
00233         int res = ::write (socketId, buffer, size);
00234         if (res < 0)
00235         {
00236             LOG_ERRNO ("Cannot write to socket " << socketId, errno);
00237         }
00238         else if (res != size)
00239         {
00240             LOG_WARN ("Could not write the whole thing (only " << res << " instead of " << size << ")");
00241         }
00242     }
00243 }
00244 
00245 void TcpServer::close (int sockedId)
00246 {
00247     _connectedSocket[sockedId].connected = false;
00248     ::close (sockedId);
00249 }
00250 
00251 void TcpServer::TcpStream::reset (TcpServer &t, int streamId)
00252 {
00253     _t = &t;
00254     _id = streamId;
00255 }
00256 void TcpServer::TcpStream::output (const char *buffer, int size)
00257 {
00258     _t->write (buffer, size, _id);
00259 }
00260 }