00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #include <sys/socket.h>
00022 #include <arpa/inet.h>
00023 #include <string.h>
00024 #include <unistd.h>
00025 #include <errno.h>
00026 #include <fcntl.h>
00027
00028 #include <vector>
00029
00030 #include "TcpServer.h"
00031 #include "Error.h"
00032 #include "Logger.h"
00033
00034 #include "IoStream.h"
00035
00036 using namespace std;
00037
00038 namespace mdw
00039 {
00040 namespace
00041 {
00042 const int MaxBackLog = 20;
00043 const int MaxConnection = 1000;
00044 const int MaxReadSize = 65536;
00045 }
00046
00047 TcpServer::TcpServer (short port, ObjectPool<vector<char> > &pool)
00048 {
00049 _pool = &pool;
00050 init (port);
00051 }
00052
00053 TcpServer::TcpServer (short port)
00054 {
00055 _pool = 0;
00056 init (port);
00057 }
00058
00059 void TcpServer::init (short port)
00060 {
00061 _connectedSocket.resize (MaxConnection);
00062 struct sockaddr_in localAddr;
00063
00064
00065 if ( (_servSock = socket (PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
00066 {
00067 ERROR_ERRNO ("socket() failed", errno);
00068 }
00069 fcntl (_servSock, F_SETFL, O_NONBLOCK);
00070
00071 int yes = 1;
00072 if (setsockopt (_servSock, SOL_SOCKET, SO_REUSEADDR, (char *) &yes, sizeof (yes)) < 0)
00073 {
00074 ::close (_servSock);
00075 ERROR_ERRNO ("setsockopt() failed", errno);
00076 }
00077
00078
00079 memset (&localAddr, 0, sizeof (localAddr));
00080 localAddr.sin_family = AF_INET;
00081 localAddr.sin_addr.s_addr = htonl (INADDR_ANY);
00082 localAddr.sin_port = htons (port);
00083
00084
00085 if (bind (_servSock, (struct sockaddr *) &localAddr, sizeof (localAddr)) < 0)
00086 {
00087 ::close (_servSock);
00088 ERROR_ERRNO ("bind() failed", errno);
00089 }
00090
00091
00092 if (listen (_servSock, MaxBackLog) < 0)
00093 {
00094 close (_servSock);
00095 ERROR_ERRNO ("listen() failed", errno);
00096 }
00097
00098 _maxSelectFd = _servSock + 1;
00099 }
00100
00101 Stream &TcpServer::outStream (int streamId)
00102 {
00103 return _connectedSocket[streamId].stream;
00104 }
00105
00106 TcpServer::~TcpServer()
00107 {
00108 close (_servSock);
00109 for (int i = 0; i < _maxSelectFd; ++i)
00110 {
00111 if (_connectedSocket[i].connected)
00112 {
00113 close (i);
00114 }
00115 }
00116 }
00117
00118 void TcpServer::accept (StreamServerVisitor &visitor, int timeOutSec, int timeOutMicro)
00119 {
00120 struct timeval tv;
00121 tv.tv_sec = timeOutSec;
00122 tv.tv_usec = timeOutMicro;
00123 fd_set rfds;
00124 fd_set efds;
00125 FD_ZERO (&rfds);
00126 FD_ZERO (&efds);
00127 FD_SET (_servSock, &rfds);
00128
00129 for (int i = 0; i < _maxSelectFd; ++i)
00130 {
00131 if (_connectedSocket[i].connected)
00132 {
00133 FD_SET (i, &rfds);
00134 FD_SET (i, &efds);
00135 }
00136 }
00137
00138 int retval = select (_maxSelectFd, &rfds, NULL, &efds, &tv);
00139
00140 if (retval == -1)
00141 {
00142 if (errno != EINTR)
00143 {
00144 ERROR_ERRNO ("select()", errno);
00145 }
00146 else
00147 {
00148
00149 return;
00150 }
00151 }
00152 else if (retval == 0)
00153 {
00154
00155 return;
00156 }
00157
00158 if (FD_ISSET (_servSock, &rfds))
00159 {
00160
00161 struct sockaddr_in clientAddr;
00162 socklen_t size = sizeof (clientAddr);
00163 int newConnection = ::accept (_servSock, (struct sockaddr *) & clientAddr, &size);
00164 fcntl (newConnection, F_SETFL, O_NONBLOCK);
00165 if (newConnection >= 0)
00166 {
00167
00168 string ip (inet_ntoa (clientAddr.sin_addr));
00169 _connectedSocket[newConnection].connected = true;
00170 _connectedSocket[newConnection].stream.reset (*this, newConnection);
00171 _connectedSocket[newConnection].info.setIp (ip);
00172 _connectedSocket[newConnection].info.setStreamId (newConnection);
00173 _maxSelectFd = max (_maxSelectFd, newConnection + 1);
00174 visitor.visitConnect (_connectedSocket[newConnection].info);
00175 }
00176 }
00177
00178 for (int i = 0; i < _maxSelectFd; ++i)
00179 {
00180 if (_connectedSocket[i].connected && (FD_ISSET (i, &rfds) || FD_ISSET (i, &efds)))
00181 {
00182
00183 static vector<char> buffer;
00184
00185 vector<char> * bufferToUse;
00186 if (_pool == 0)
00187 {
00188 bufferToUse = &buffer;
00189 }
00190 else
00191 {
00192 bufferToUse = & _pool->obtain();
00193 }
00194 bufferToUse->reserve (MaxReadSize);
00195 int res = read (i, & (*bufferToUse->begin()), MaxReadSize);
00196 if (res < 0)
00197 {
00198 if (errno == EAGAIN)
00199 {
00200
00201 }
00202 else
00203 {
00204
00205 LOG_ERRNO ("Read error", errno);
00206
00207 _connectedSocket[i].connected = false;
00208 close (i);
00209 visitor.visitDisconnect (_connectedSocket[i].info);
00210 }
00211 }
00212 else if (res == 0)
00213 {
00214
00215
00216 _connectedSocket[i].connected = false;
00217 close (i);
00218 visitor.visitDisconnect (_connectedSocket[i].info);
00219 }
00220 else
00221 {
00222
00223 visitor.visitRead (&*bufferToUse->begin(), res, _connectedSocket[i].info);
00224 }
00225 }
00226 }
00227 }
00228
00229 void TcpServer::write (const char* buffer, int size, int socketId)
00230 {
00231 if (_connectedSocket[socketId].connected)
00232 {
00233 int res = ::write (socketId, buffer, size);
00234 if (res < 0)
00235 {
00236 LOG_ERRNO ("Cannot write to socket " << socketId, errno);
00237 }
00238 else if (res != size)
00239 {
00240 LOG_WARN ("Could not write the whole thing (only " << res << " instead of " << size << ")");
00241 }
00242 }
00243 }
00244
00245 void TcpServer::close (int sockedId)
00246 {
00247 _connectedSocket[sockedId].connected = false;
00248 ::close (sockedId);
00249 }
00250
00251 void TcpServer::TcpStream::reset (TcpServer &t, int streamId)
00252 {
00253 _t = &t;
00254 _id = streamId;
00255 }
00256 void TcpServer::TcpStream::output (const char *buffer, int size)
00257 {
00258 _t->write (buffer, size, _id);
00259 }
00260 }