本文共 5743 字,大约阅读时间需要 19 分钟。
#pragma once
#include <WinSock2.h> #define IP_SIZE 32 //ip地址长度 #define BUFFER_SIZE 1024 #include <stdio.h> #include <process.h> enum SOCKET_STATE { ACCEPT = 1, SEND, RECV }; /*传送给处理函数的参数*/ typedef struct tagPleData { SOCKET sSocket; CHAR szClientIP[IP_SIZE]; UINT uiClientPort; /* 其他信息 */ }PLEDATA, * LPPLEDATA; typedef struct tagIOData { OVERLAPPED oOverlapped; WSABUF wsBuffer; CHAR szBuffer[BUFFER_SIZE]; DWORD dSend; DWORD dRecv; SOCKET_STATE sState; }IOData, *LPIOData; typedef void (*ReadProc)(LPPLEDATA lpData, CHAR * RecvData); class Iocp { public: Iocp(const CHAR * host, UINT port); ~Iocp(void); VOID SetThreadNums(); UINT GetThreadNums(); VOID SetPort(UINT port); UINT GetPort(); VOID Close(); static VOID ServerWorkThread( VOID * _this ); VOID SetReadProc(VOID * lprFun); public: BOOL ListenEx(UINT backlog); public: /*读取回调函数*/ ReadProc lpFun; HANDLE h_ComPlePort; static VOID AcceptEx(VOID * _this); UINT iThreadNums; BOOL bIsListen; SOCKADDR_IN m_SockAddr; // socket address structure SOCKET m_ListenSocketID; // host CHAR m_Host[IP_SIZE]; // port UINT m_Port; };#include "Iocp.h" Iocp::Iocp(const CHAR * host, UINT port) { /*协商套接字版本*/ WSADATA wsaData; DWORD dwRet = WSAStartup( 0x0202, &wsaData ); if (0 != dwRet ) { WSACleanup(); throw 1; } m_ListenSocketID = INVALID_SOCKET ; memset( &m_SockAddr, 0, sizeof(SOCKADDR_IN) ) ; memset( m_Host, 0, IP_SIZE ) ; m_Port = 0 ; SYSTEM_INFO mySysInfo; GetSystemInfo( &mySysInfo ); iThreadNums = mySysInfo.dwNumberOfProcessors * 2 + 1; BOOL ret = FALSE ; bIsListen = TRUE; strncpy_s(m_Host, host, IP_SIZE - 1); m_SockAddr.sin_family = AF_INET; m_SockAddr.sin_addr.s_addr =inet_addr(host); m_SockAddr.sin_port = htons(port); /*创建监听套接字*/ m_ListenSocketID = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED ); if( m_ListenSocketID== INVALID_SOCKET ) { throw 1; } /*设置套接字选项*/ CHAR opt = 1; ret = setsockopt( m_ListenSocketID , SOL_SOCKET , SO_REUSEADDR , (const CHAR * )&opt , sizeof(opt) ); if( ret != 0 ) { throw 1 ; } /*绑定套接字*/ if (SOCKET_ERROR == bind(m_ListenSocketID, (const struct sockaddr *)&m_SockAddr, sizeof(struct sockaddr))) { throw 1 ; } /*创建完成端口*/ h_ComPlePort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 ); if ( h_ComPlePort == NULL ) { throw 1 ; } for ( DWORD i = 0; i < ( mySysInfo.dwNumberOfProcessors * 2 + 1 ); ++i ) { _beginthread(Iocp::ServerWorkThread, 0, (VOID *)this); } } Iocp::~Iocp(void) { WSACleanup(); } /************************************************* Function:AcceptEx Description:接受套接字的线程函数 Input: Output: Others: *************************************************/ VOID Iocp::AcceptEx(VOID * _this) { SOCKET acSocket; DWORD dwRecvBytes; Iocp * pTemp = (Iocp *)_this; SOCKADDR_IN sAddr; INT uiClientSize = sizeof(sAddr); //struct socketaddrin while (TRUE) { int x = 6; acSocket = WSAAccept( pTemp->m_ListenSocketID, (SOCKADDR *)&sAddr, &uiClientSize, NULL, 0 ); if ( acSocket == SOCKET_ERROR ) { return; } LPPLEDATA lpSocketData = (LPPLEDATA)malloc(sizeof(PLEDATA)); if ( NULL == lpSocketData ) { return; } lpSocketData->sSocket = acSocket; sprintf(lpSocketData->szClientIP, inet_ntoa(sAddr.sin_addr)); lpSocketData->uiClientPort = sAddr.sin_port; if ( CreateIoCompletionPort( (HANDLE)acSocket, pTemp->h_ComPlePort, (ULONG_PTR)lpSocketData, 0 ) == NULL ) { return; } /*这里停止监听会有问题*/ if (pTemp->bIsListen = FALSE) { break; } LPIOData lpIoData = (LPIOData )malloc(sizeof(IOData)); if ( lpIoData == NULL ) { return; } #pragma region 投递线程事件 ZeroMemory( &( lpIoData->oOverlapped ), sizeof( lpIoData->oOverlapped) ); lpIoData->dSend = 0; lpIoData->dRecv = 0; lpIoData->wsBuffer.len = BUFFER_SIZE; lpIoData->wsBuffer.buf = lpIoData->szBuffer; lpIoData->sState = SEND; DWORD flags = 0; if ( WSARecv(acSocket, &(lpIoData->wsBuffer), 1, &dwRecvBytes, &flags, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR ) { if ( WSAGetLastError() != ERROR_IO_PENDING ) { return; } else { //return; printf("ERROR_IO_PENDING:ok\n"); } } #pragma endregion 投递线程事件 } } /************************************************* Function:ListenEx Description:监听函数 Input: Output: Others: *************************************************/ BOOL Iocp::ListenEx(UINT backlog) { if (SOCKET_ERROR == listen(m_ListenSocketID, backlog)) { return FALSE; } /*创建监听线程*/ if (-1 == _beginthread(Iocp::AcceptEx, 0, (VOID *)this)) { return FALSE; } return TRUE; } /************************************************* Function:ServerWorkThread Description:端口上的工作线程 Input: Output: Others: *************************************************/ VOID Iocp:: ServerWorkThread( VOID * _this ) { Iocp * lpTemp = (Iocp *)_this; HANDLE hPlePort = (HANDLE)lpTemp->h_ComPlePort; DWORD dwBytes; LPPLEDATA lpPleData = NULL; LPIOData lpIoData = NULL; DWORD sendBytes = 0; DWORD recvBytes = 0; DWORD dwFlag = 0; while (TRUE) { int x = 89; if ( GetQueuedCompletionStatus( hPlePort, &dwBytes, (PULONG_PTR)&lpPleData, (LPOVERLAPPED *)&lpIoData, INFINITE ) == 0 ) { return ; } if ( dwBytes == 0 || NULL == lpIoData) { printf("there is a socket away\n"); free( lpPleData ); free( lpIoData ); continue; } else { #pragma region 接受到数据 lpIoData->dRecv = dwBytes; lpIoData->szBuffer[lpIoData->dRecv] = 0; //printf("ServerWorkThread:R[%s]\n", lpIoData->szBuffer); lpTemp->lpFun(lpPleData, lpIoData->szBuffer); #pragma endregion 接受到数据 #pragma region 再次投递 lpIoData->dRecv = 0; ZeroMemory( &(lpIoData->oOverlapped), sizeof( OVERLAPPED ) ); lpIoData->wsBuffer.len = BUFFER_SIZE; lpIoData->wsBuffer.buf = lpIoData->szBuffer; if ( WSARecv( lpPleData->sSocket, &(lpIoData->wsBuffer), 1, &recvBytes, &dwFlag, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR ) { if ( WSAGetLastError() != ERROR_IO_PENDING ) { return ; } } #pragma endregion 再次投递 } } } VOID Iocp::SetReadProc(VOID * lprFun) { lpFun = (ReadProc)lprFun; }
转载地址:http://iqojz.baihongyu.com/