서버 로직(Send / Recv) 처리를 별도의 쓰레드로 분리
- 클라이언트의 I/O 관련 함수 및 구조체를 ClientInfo.h로 분리
- 패킷 관련 정보를 PacketData 구조체에 저장하여 Packet.h로 분리
Main 함수의 흐름을 따라가면서 구조를 설명
#include "EchoServer.h"
#include <string>
#include <iostream>
const UINT16 SERVER_PORT = 11021;
const UINT16 MAX_CLIENT = 100;
int main()
{
EchoServer server;
//소켓 초기화
server.InitSocket();
//소켓 서버 등록
server.BindandListen(SERVER_PORT);
server.Run(MAX_CLIENT);
std::cout << "아무 키나 누를 때까지 대기\n";
while (true)
{
std::string inputCmd;
std::getline(std::cin, inputCmd);
if (inputCmd == "quit")
{
break;
}
}
server.End();
return 0;
}
- IOCPServer.h
- 실습 1,2에서와 동일하게 InitSocket() / BindandListen() 함수로 서버 소켓을 등록
- Run() 함수에서 Send 작업을 할 쓰레드(ProcessPacket())를 분리
- StartServer() 함수에서 클라이언트 / Completion Port / Accpter 쓰레드 / Worker 쓰레드 생성
- Accepter 쓰레드에서는 클라이언트를 서버와 연결하고 Completion Port에 등록
이후 클라이언트의 OnConnect() 함수를 호출하여 Recv 요청
마지막으로 애플리케이션의 OnConncet() 함수를 호출하여 클라이언트 연결을 알림 - Worker 쓰레드에서는 GetQueuedCompletionStatus() 함수로 I/O 작업 완료를 감지한 후 해당 I/O 작업의 종류에 따라 처리
- I/O가 Recv인 경우 애플리케이션의 OnReceive() 함수를 호출하여 패킷 데이터를 세팅하고 다시 클라이언트에서 Recv를 요청
이 때 패킷 데이터에 여러 쓰레드들이 동시에 접근할 수 있으므로 lock_guard를 걸어줌 - I/O가 Send인 경우 할당했던 송신 버퍼 메모리를 해제한 후 전송 완료를 알림
- I/O가 Recv인 경우 애플리케이션의 OnReceive() 함수를 호출하여 패킷 데이터를 세팅하고 다시 클라이언트에서 Recv를 요청
- Accepter 쓰레드에서는 클라이언트를 서버와 연결하고 Completion Port에 등록
#pragma once
#pragma comment(lib, "ws2_32")
#include "ClientInfo.h"
#include "Define.h"
#include <thread>
#include <vector>
#include <iostream>
class IOCPServer
{
private:
// 클라이언트 정보 저장 구조체
std::vector<stClientInfo> mClientInfos;
//리슨 소켓
SOCKET mListenSocket = INVALID_SOCKET;
//접속중인 클라이언트 수
int mClientCnt = 0;
//IO Worker 쓰레드
std::vector<std::thread> mIOWorkerThreads;
//Accpet 쓰레드
std::thread mAccepterThread;
//CompletionPort 객체 핸들
HANDLE mIOCPHandle = INVALID_HANDLE_VALUE;
//작업 쓰레드 동작 플래그
bool mIsWorkerRun = true;
//접속 쓰레드 동작 플래그
bool mIsAccepterRun = true;
void CreateClient(const UINT32 maxClientCount)
{
for (UINT32 i = 0; i < maxClientCount; ++i)
{
mClientInfos.emplace_back();
mClientInfos[i].Init(i);
}
}
//worker 쓰레드 생성
bool CreateWorkerThread()
{
for (int i = 0; i < MAX_WORKERTHREAD; i++)
{
mIOWorkerThreads.emplace_back([this]() {WorkerThread(); });
}
std::cout << "WorkerThread start...\n";
return true;
}
//accept요청 처리 쓰레드 생성
bool CreateAccepterThread()
{
mAccepterThread = std::thread([this]() {AccepterThread(); });
std::cout << "AccepterThread start...\n";
return true;
}
//사용하지 않는 클라이언트 정보 구조체 반환
stClientInfo *GetEmptyClientInfo()
{
for (auto& client : mClientInfos)
{
if (client.IsConnectd() == false)
{
return &client;
}
}
}
stClientInfo *GetClientInfo(const UINT32 sessionIndex)
{
return &mClientInfos[sessionIndex];
}
//worker Thread
void WorkerThread()
{
stClientInfo* pClientInfo = NULL;
BOOL bSuccess = TRUE;
DWORD dwIoSize = 0;
LPOVERLAPPED lpOverlapped = NULL;
while (mIsWorkerRun)
{
bSuccess = GetQueuedCompletionStatus(mIOCPHandle, &dwIoSize, (PULONG_PTR)&pClientInfo, &lpOverlapped, INFINITE);
//쓰레드 종료 메시지 처리
if (bSuccess == TRUE && dwIoSize == 0 && lpOverlapped == NULL)
{
mIsWorkerRun = false;
continue;
}
if (lpOverlapped == NULL)
{
continue;
}
// 클라이언트 접속 해제
if (bSuccess == FALSE || (dwIoSize == 0 && bSuccess == TRUE))
{
CloseSocket(pClientInfo);
continue;
}
auto pOverlappedEx = (stOverlappedEx*)lpOverlapped;
//Overlapped I/O Recv
if (pOverlappedEx->m_eOperation == IOOperation::RECV)
{
OnReceive(pClientInfo->GetIndex(), dwIoSize, pClientInfo->GetRecvBuffer());
pClientInfo->BindRecv();
}
//Overlapped I/O Send
else if (pOverlappedEx->m_eOperation == IOOperation::SEND)
{
delete[] pOverlappedEx->m_wsaBuf.buf;
delete pOverlappedEx;
pClientInfo->SendCompleted(dwIoSize);
}
else
{
std::cout << "socket " << (int)pClientInfo->GetIndex() << " exception";
}
}
}
//사용자 접속 쓰레드
void AccepterThread()
{
SOCKADDR_IN stClientAddr;
int nAddrLen = sizeof(SOCKADDR_IN);
while (mIsAccepterRun)
{
stClientInfo* pClientInfo = GetEmptyClientInfo();
if (pClientInfo == NULL)
{
std::cout << "Client Full Error\n";
return;
}
auto newSocket = accept(mListenSocket, (SOCKADDR*)&stClientAddr, &nAddrLen);
if (newSocket == INVALID_SOCKET)
{
continue;
}
if (pClientInfo->OnConnect(mIOCPHandle, newSocket) == false)
{
pClientInfo->Close(true);
return;
}
OnConnect(pClientInfo->GetIndex());
++mClientCnt;
}
}
//소켓 연결 종료
void CloseSocket(stClientInfo* pClientInfo, bool bIsForce = false)
{
auto clientIndex = pClientInfo->GetIndex();
pClientInfo->Close(bIsForce);
OnClose(clientIndex);
}
public:
IOCPServer(void) {}
~IOCPServer(void)
{
WSACleanup();
}
virtual void OnConnect(const UINT32 clientINdex_) {}
virtual void OnClose(const UINT32 clientINdex_) {}
virtual void OnReceive(const UINT32 clientINdex_, const UINT32 size_, char *pData_) {}
//소켓 초기화
bool InitSocket()
{
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
std::cout << "WSAStartup() Error : " << WSAGetLastError() << std::endl;
return false;
}
SOCKET mListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, NULL, WSA_FLAG_OVERLAPPED);
if (INVALID_SOCKET == mListenSocket)
{
std::cout << "WSASocket() Error : " << WSAGetLastError() << std::endl;
return false;
}
std::cout << "socket init success\n";
return true;
}
bool BindandListen(int nBindPort)
{
SOCKADDR_IN stServerAddr;
stServerAddr.sin_family = AF_INET;
stServerAddr.sin_port = htons(nBindPort);
stServerAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(mListenSocket, (SOCKADDR*)&stServerAddr, sizeof(SOCKADDR_IN)) == SOCKET_ERROR)
{
std::cout << "bind() Error : " << WSAGetLastError() << std::endl;
return false;
}
if (listen(mListenSocket, 5) == SOCKET_ERROR)
{
std::cout << "listen() Error : " << WSAGetLastError() << std::endl;
return false;
}
std::cout << "server registration success\n";
return true;
}
//접속 요청 수락, 메시지 처리
bool StartServer(const UINT32 maxClientCount)
{
CreateClient(maxClientCount);
mIOCPHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, MAX_WORKERTHREAD);
if (mIOCPHandle == NULL)
{
std::cout << "CreateIoCompletionPort() Error : " << GetLastError() << std::endl;
return false;
}
if (CreateWorkerThread() == false)
{
return false;
}
if (CreateAccepterThread() == false)
{
return false;
}
std::cout << "Server start\n";
return true;
}
//생성되어있던 쓰레드 파괴
void DestroyThread()
{
mIsWorkerRun = false;
CloseHandle(mIOCPHandle);
for (auto& th : mIOWorkerThreads)
{
if (th.joinable())
{
th.join();
}
}
mIsAccepterRun = false;
closesocket(mListenSocket);
if (mAccepterThread.joinable())
{
mAccepterThread.join();
}
}
bool SendMsg(const UINT32 sessionIndex_, const UINT32 dataSize_, char* pData)
{
auto pClient = GetClientInfo(sessionIndex_);
return pClient->SendMsg(dataSize_, pData);
}
};
- EchoServer.h
- ProcessPacket() 함수는 별도의 쓰레드로 분리됨
- DequePacketData() 함수로 수신받은 패킷 데이터를 꺼내옴
이 때 여러 쓰레드들이 동시에 패킷 데이터에 접근할 수 있으므로 lock_guard를 걸어줌 - 패킷 데이터가 존재할경우 클라이언트에 Send
- 패킷 데이터가 없다면 강제로 sleep시켜 대기상태로 전환
- DequePacketData() 함수로 수신받은 패킷 데이터를 꺼내옴
- End() 함수가 호출될경우 Send 작업을 위해 호출된 쓰레드들을 join()한 후 DestroyThread() 함수로 Worker 쓰레드와 Accepter 쓰레드들도 join()
- ProcessPacket() 함수는 별도의 쓰레드로 분리됨
#pragma once
#include "IOCPServer.h"
#include "Packet.h"
#include <vector>
#include <deque>
#include <thread>
#include <mutex>
class EchoServer : public IOCPServer
{
private:
bool mIsRunProcessThread = false;
std::thread mProcessThread;
std::mutex mLock;
std::deque<PacketData> mPacketDataQueue;
void ProcessPacket()
{
while (mIsRunProcessThread)
{
auto packetData = DequePacketData();
if (packetData.DataSize != 0)
{
SendMsg(packetData.SessionIndex, packetData.DataSize, packetData.pPacketData);
}
else
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
PacketData DequePacketData()
{
PacketData packetData;
std::lock_guard<std::mutex> guard(mLock);
if (mPacketDataQueue.empty())
{
return PacketData();
}
packetData.Set(mPacketDataQueue.front());
mPacketDataQueue.front().Release();
mPacketDataQueue.pop_front();
return packetData;
}
public:
EchoServer() = default;
virtual ~EchoServer() = default;
virtual void OnConnect(const UINT32 clientIndex_) override
{
std::cout << "[OnConnect] 클라이언트 : Index " << clientIndex_ << std::endl;
}
virtual void OnClose(const UINT32 clientIndex_) override
{
std::cout << "[OnClose] 클라이언트 : Index " << clientIndex_ << std::endl;
}
virtual void OnReceive(const UINT32 clientIndex_, const UINT32 size_, char* pData_) override
{
std::cout << "[OnReceive] 클라이언트 : Index " << clientIndex_ << ", dataSize " << size_ << std::endl;
PacketData packet;
packet.Set(clientIndex_, size_, pData_);
std::lock_guard<std::mutex> guard(mLock);
mPacketDataQueue.push_back(packet);
}
void Run(const UINT32 maxClient)
{
mIsRunProcessThread = true;
mProcessThread = std::thread([this]() { ProcessPacket(); });
StartServer(maxClient);
}
void End()
{
mIsRunProcessThread = false;
if (mProcessThread.joinable())
{
mProcessThread.join();
}
DestroyThread();
}
};
- ClientInfo.h
- 각 클라이언트에서 처리해야될 기능들이 존재
- BindIOCOMpletionPort() 함수에서 클라이언트를 Completion Port에 등록
- BindRecv() 함수에서 Recv 요청
- SendMsg() 함수에서 Send
#pragma once
#include "Define.h"
#include <iostream>
class stClientInfo
{
private:
INT32 mIndex = 0;
SOCKET mSock;
stOverlappedEx mRecvOverlappedEx;
char mRecvBuf[MAX_SOCKBUF];
public:
stClientInfo()
{
ZeroMemory(&mRecvOverlappedEx, sizeof(stOverlappedEx));
mSock = INVALID_SOCKET;
}
void Init(const UINT32 index)
{
mIndex = index;
}
UINT32 GetIndex() { return mIndex; }
bool IsConnectd() { return mSock != INVALID_SOCKET; }
SOCKET GetSock() { return mSock; }
char* GetRecvBuffer() { return mRecvBuf; }
bool OnConnect(HANDLE iocpHandle_, SOCKET socket_)
{
mSock = socket_;
Clear();
//I/O Completion Port 객체와 소켓 연결
if (BindIOCompletionPort(iocpHandle_) == false)
{
return false;
}
//Recv Overlapped I/O 작업 요청
return BindRecv();
}
void Close(bool bIsForce = false)
{
// SO_DONTLINGER로 설정
struct linger stLinger = { 0, 0 };
// SO_LINGER, timeout = 0으로 설정하여 강제 종료, 데이터 손실이 있을 수 있음
if (bIsForce == true)
{
stLinger.l_onoff = 1;
}
shutdown(mSock, SD_BOTH);
setsockopt(mSock, SOL_SOCKET, SO_LINGER, (char*)&stLinger, sizeof(stLinger));
closesocket(mSock);
mSock = INVALID_SOCKET;
}
void Clear()
{
}
bool BindIOCompletionPort(HANDLE iocpHandle_)
{
auto hIOCP = CreateIoCompletionPort((HANDLE)GetSock(), iocpHandle_, (ULONG_PTR)(this), 0);
if (hIOCP == INVALID_HANDLE_VALUE)
{
std::cout << "CreateIoCompletionPort() Error : " << GetLastError() << std::endl;
return false;
}
return true;
}
bool BindRecv()
{
DWORD dwFlag = 0;
DWORD dwRecvNumBytes = 0;
mRecvOverlappedEx.m_wsaBuf.len = MAX_SOCKBUF;
mRecvOverlappedEx.m_wsaBuf.buf = mRecvBuf;
mRecvOverlappedEx.m_eOperation = IOOperation::RECV;
if (WSARecv(mSock, &(mRecvOverlappedEx.m_wsaBuf), 1,
&dwRecvNumBytes, &dwFlag, (LPWSAOVERLAPPED) & (mRecvOverlappedEx), NULL) == SOCKET_ERROR
&& (WSAGetLastError() != ERROR_IO_PENDING))
{
std::cout << "WSARecv() Error : " << WSAGetLastError() << std::endl;
return false;
}
return true;
}
bool SendMsg(const UINT32 dataSize_, char *pMsg_)
{
auto sendOverlappedEx = new stOverlappedEx;
ZeroMemory(sendOverlappedEx, sizeof(stOverlappedEx));
sendOverlappedEx->m_wsaBuf.len = dataSize_;
sendOverlappedEx->m_wsaBuf.buf = new char[dataSize_];
CopyMemory(sendOverlappedEx->m_wsaBuf.buf, pMsg_, dataSize_);
sendOverlappedEx->m_eOperation = IOOperation::SEND;
DWORD dwRecvNumBytes = 0;
if (WSASend(mSock, &(sendOverlappedEx->m_wsaBuf), 1,
&dwRecvNumBytes, 0, (LPWSAOVERLAPPED) & (sendOverlappedEx), NULL) == SOCKET_ERROR
&& (WSAGetLastError() != ERROR_IO_PENDING))
{
std::cout << "WSASend() Error : " << WSAGetLastError() << std::endl;
return false;
}
return true;
}
void SendCompleted(const UINT32 dataSize_)
{
std::cout << "Send Bytes : " << dataSize_ << std::endl;
}
};
- Packet.h
- 수신받은 패킷에 대한 정보를 바탕으로 송신 버퍼를 할당
- 패킷 데이터 자체는 애플리케이션에서 deque로 저장
#pragma once
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
//클라이언트가 보낸 패킷을 저장하는 구조체
struct PacketData
{
UINT32 SessionIndex = 0;
UINT32 DataSize = 0;
char *pPacketData = nullptr;
void Set(PacketData& value)
{
SessionIndex = value.SessionIndex;
DataSize = value.DataSize;
pPacketData = new char[value.DataSize];
CopyMemory(pPacketData, value.pPacketData, value.DataSize);
}
void Set(UINT32 sessionIndex_, UINT32 dataSize_, char* pData)
{
SessionIndex = sessionIndex_;
DataSize = dataSize_;
pPacketData = new char[dataSize_];
CopyMemory(pPacketData, pData, dataSize_);
}
void Release()
{
delete pPacketData;
}
};
'개인공부 > IOCP 서버 제작 실습' 카테고리의 다른 글
IOCP 서버 제작 실습 6 (0) | 2023.03.23 |
---|---|
IOCP 서버 제작 실습 5 (0) | 2023.03.23 |
IOCP 서버 제작 실습 4 (0) | 2023.03.22 |
IOCP 서버 제작 실습 2 (0) | 2023.03.21 |
IOCP 서버 제작 실습 1 (0) | 2023.03.21 |
댓글