본문 바로가기
개인공부/IOCP 서버 제작 실습

IOCP 서버 제작 실습 3

by 하고싶은건많은놈 2023. 3. 22.

서버 로직(Send / Recv) 처리를 별도의 쓰레드로 분리

  • 클라이언트의 I/O 관련 함수 및 구조체를 ClientInfo.h로 분리
  • 패킷 관련 정보를 PacketData 구조체에 저장하여 Packet.h로 분리

 

Main 함수의 흐름을 따라가면서 구조를 설명

#include "EchoServer.h"
#include <string>
#include <iostream>

const UINT16 SERVER_PORT = 11021;
const UINT16 MAX_CLIENT = 100;

int main()
{
	EchoServer server;

	//소켓 초기화
	server.InitSocket();

	//소켓 서버 등록
	server.BindandListen(SERVER_PORT);

	server.Run(MAX_CLIENT);

	std::cout << "아무 키나 누를 때까지 대기\n";
	while (true)
	{
		std::string inputCmd;
		std::getline(std::cin, inputCmd);

		if (inputCmd == "quit")
		{
			break;
		}
	}

	server.End();
	return 0;
}

 

  • IOCPServer.h
    • 실습 1,2에서와 동일하게 InitSocket() / BindandListen() 함수로 서버 소켓을 등록
    • Run() 함수에서 Send 작업을 할 쓰레드(ProcessPacket())를 분리
    • StartServer() 함수에서 클라이언트 / Completion Port / Accpter 쓰레드 / Worker 쓰레드  생성
      • Accepter 쓰레드에서는 클라이언트를 서버와 연결하고 Completion Port에 등록
        이후 클라이언트의 OnConnect() 함수를 호출하여 Recv 요청
        마지막으로 애플리케이션의 OnConncet() 함수를 호출하여 클라이언트 연결을 알림
      • Worker 쓰레드에서는 GetQueuedCompletionStatus() 함수로 I/O 작업 완료를 감지한 후 해당 I/O 작업의 종류에 따라 처리
        • I/O가 Recv인 경우 애플리케이션의 OnReceive() 함수를 호출하여 패킷 데이터를 세팅하고 다시 클라이언트에서 Recv를 요청
          이 때 패킷 데이터에 여러 쓰레드들이 동시에 접근할 수 있으므로 lock_guard를 걸어줌
        • I/O가 Send인 경우 할당했던 송신 버퍼 메모리를 해제한 후 전송 완료를 알림
#pragma once
#pragma comment(lib, "ws2_32")

#include "ClientInfo.h"
#include "Define.h"
#include <thread>
#include <vector>
#include <iostream>

class IOCPServer
{

private:
	// 클라이언트 정보 저장 구조체
	std::vector<stClientInfo>	mClientInfos;
	//리슨 소켓
	SOCKET						mListenSocket = INVALID_SOCKET;
	//접속중인 클라이언트 수
	int							mClientCnt = 0;
	//IO Worker 쓰레드
	std::vector<std::thread>	mIOWorkerThreads;
	//Accpet 쓰레드
	std::thread					mAccepterThread;
	//CompletionPort 객체 핸들
	HANDLE						mIOCPHandle = INVALID_HANDLE_VALUE;
	//작업 쓰레드 동작 플래그
	bool						mIsWorkerRun = true;
	//접속 쓰레드 동작 플래그
	bool						mIsAccepterRun = true;

	void CreateClient(const UINT32 maxClientCount)
	{
		for (UINT32 i = 0; i < maxClientCount; ++i)
		{
			mClientInfos.emplace_back();
			mClientInfos[i].Init(i);
		}
	}

	//worker 쓰레드 생성
	bool CreateWorkerThread()
	{
		for (int i = 0; i < MAX_WORKERTHREAD; i++)
		{
			mIOWorkerThreads.emplace_back([this]() {WorkerThread(); });
		}

		std::cout << "WorkerThread start...\n";
		return true;
	}

	//accept요청 처리 쓰레드 생성
	bool CreateAccepterThread()
	{
		mAccepterThread = std::thread([this]() {AccepterThread(); });

		std::cout << "AccepterThread start...\n";
		return true;
	}

	//사용하지 않는 클라이언트 정보 구조체 반환
	stClientInfo *GetEmptyClientInfo()
	{
		for (auto& client : mClientInfos)
		{
			if (client.IsConnectd() == false)
			{
				return &client;
			}
		}
	}

	stClientInfo *GetClientInfo(const UINT32 sessionIndex)
	{
		return &mClientInfos[sessionIndex];
	}

	//worker Thread
	void WorkerThread()
	{
		stClientInfo*	pClientInfo = NULL;
		BOOL			bSuccess = TRUE;
		DWORD			dwIoSize = 0;
		LPOVERLAPPED	lpOverlapped = NULL;

		while (mIsWorkerRun)
		{
			bSuccess = GetQueuedCompletionStatus(mIOCPHandle, &dwIoSize, (PULONG_PTR)&pClientInfo, &lpOverlapped, INFINITE);

			//쓰레드 종료 메시지 처리
			if (bSuccess == TRUE && dwIoSize == 0 && lpOverlapped == NULL)
			{
				mIsWorkerRun = false;
				continue;
			}

			if (lpOverlapped == NULL)
			{
				continue;
			}

			// 클라이언트 접속 해제
			if (bSuccess == FALSE || (dwIoSize == 0 && bSuccess == TRUE))
			{
				CloseSocket(pClientInfo);
				continue;
			}

			auto pOverlappedEx = (stOverlappedEx*)lpOverlapped;

			//Overlapped I/O Recv 
			if (pOverlappedEx->m_eOperation == IOOperation::RECV)
			{
				OnReceive(pClientInfo->GetIndex(), dwIoSize, pClientInfo->GetRecvBuffer());

				pClientInfo->BindRecv();
			}
			//Overlapped I/O Send
			else if (pOverlappedEx->m_eOperation == IOOperation::SEND)
			{
				delete[] pOverlappedEx->m_wsaBuf.buf;
				delete pOverlappedEx;
				pClientInfo->SendCompleted(dwIoSize);
			}
			else
			{
				std::cout << "socket " << (int)pClientInfo->GetIndex() << " exception";
			}
		} 
	}

	//사용자 접속 쓰레드
	void AccepterThread()
	{
		SOCKADDR_IN		stClientAddr;
		int				nAddrLen = sizeof(SOCKADDR_IN);

		while (mIsAccepterRun)
		{
			stClientInfo* pClientInfo = GetEmptyClientInfo();
			if (pClientInfo == NULL)
			{
				std::cout << "Client Full Error\n";
				return;
			}

			auto newSocket = accept(mListenSocket, (SOCKADDR*)&stClientAddr, &nAddrLen);
			if (newSocket == INVALID_SOCKET)
			{
				continue;
			}

			if (pClientInfo->OnConnect(mIOCPHandle, newSocket) == false)
			{
				pClientInfo->Close(true);
				return;
			}

			OnConnect(pClientInfo->GetIndex());

			++mClientCnt;
		}
	}

	//소켓 연결 종료
	void CloseSocket(stClientInfo* pClientInfo, bool bIsForce = false)
	{
		auto clientIndex = pClientInfo->GetIndex();

		pClientInfo->Close(bIsForce);

		OnClose(clientIndex);
	}

public:
	IOCPServer(void) {}

	~IOCPServer(void)
	{
		WSACleanup();
	}

	virtual void OnConnect(const UINT32 clientINdex_) {}
	virtual void OnClose(const UINT32 clientINdex_) {}
	virtual void OnReceive(const UINT32 clientINdex_, const UINT32 size_, char *pData_) {}

	//소켓 초기화
	bool InitSocket()
	{
		WSADATA wsaData;

		if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
		{
			std::cout << "WSAStartup() Error : " << WSAGetLastError() << std::endl;
			return false;
		}

		SOCKET mListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, NULL, WSA_FLAG_OVERLAPPED);

		if (INVALID_SOCKET == mListenSocket)
		{
			std::cout << "WSASocket() Error : " << WSAGetLastError() << std::endl;
			return false;
		}

		std::cout << "socket init success\n";
		return true;
	}

	bool BindandListen(int nBindPort)
	{
		SOCKADDR_IN			stServerAddr;
		stServerAddr.sin_family = AF_INET;
		stServerAddr.sin_port = htons(nBindPort);
		stServerAddr.sin_addr.s_addr = htonl(INADDR_ANY);

		if (bind(mListenSocket, (SOCKADDR*)&stServerAddr, sizeof(SOCKADDR_IN)) == SOCKET_ERROR)
		{
			std::cout << "bind() Error : " << WSAGetLastError() << std::endl;
			return false;
		}

		if (listen(mListenSocket, 5) == SOCKET_ERROR)
		{
			std::cout << "listen() Error : " << WSAGetLastError() << std::endl;
			return false;
		}

		std::cout << "server registration success\n";
		return true;
	}

	//접속 요청 수락, 메시지 처리
	bool StartServer(const UINT32 maxClientCount)
	{
		CreateClient(maxClientCount);

		mIOCPHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, MAX_WORKERTHREAD);
		if (mIOCPHandle == NULL)
		{
			std::cout << "CreateIoCompletionPort() Error : " << GetLastError() << std::endl;
			return false;
		}

		if (CreateWorkerThread() == false)
		{
			return false;
		}

		if (CreateAccepterThread() == false)
		{
			return false;
		}

		std::cout << "Server start\n";
		return true;
	}

	//생성되어있던 쓰레드 파괴
	void DestroyThread()
	{
		mIsWorkerRun = false;
		CloseHandle(mIOCPHandle);

		for (auto& th : mIOWorkerThreads)
		{
			if (th.joinable())
			{
				th.join();
			}
		}

		mIsAccepterRun = false;
		closesocket(mListenSocket);

		if (mAccepterThread.joinable())
		{
			mAccepterThread.join();
		}
	}

	bool SendMsg(const UINT32 sessionIndex_, const UINT32 dataSize_, char* pData)
	{
		auto pClient = GetClientInfo(sessionIndex_);
		return pClient->SendMsg(dataSize_, pData);
	}
};

 

 

  • EchoServer.h
    • ProcessPacket() 함수는 별도의 쓰레드로 분리됨
      •  DequePacketData() 함수로 수신받은 패킷 데이터를 꺼내옴
        이 때 여러 쓰레드들이 동시에 패킷 데이터에 접근할 수 있으므로 lock_guard를 걸어줌
      • 패킷 데이터가 존재할경우 클라이언트에 Send
      • 패킷 데이터가 없다면 강제로 sleep시켜 대기상태로 전환
    • End() 함수가 호출될경우 Send 작업을 위해 호출된 쓰레드들을 join()한 후 DestroyThread() 함수로 Worker 쓰레드와 Accepter 쓰레드들도 join()
#pragma once

#include "IOCPServer.h"
#include "Packet.h"
#include <vector>
#include <deque>
#include <thread>
#include <mutex>

class EchoServer : public IOCPServer
{
private:
	bool					mIsRunProcessThread = false;
	std::thread				mProcessThread;
	std::mutex				mLock;
	std::deque<PacketData>	mPacketDataQueue;

	void ProcessPacket()
	{
		while (mIsRunProcessThread)
		{
			auto packetData = DequePacketData();
			if (packetData.DataSize != 0)
			{
				SendMsg(packetData.SessionIndex, packetData.DataSize, packetData.pPacketData);
			}
			else
			{
				std::this_thread::sleep_for(std::chrono::milliseconds(1));
			}
		}
	}

	PacketData DequePacketData()
	{
		PacketData packetData;

		std::lock_guard<std::mutex> guard(mLock);
		if (mPacketDataQueue.empty())
		{
			return PacketData();
		}

		packetData.Set(mPacketDataQueue.front());

		mPacketDataQueue.front().Release();
		mPacketDataQueue.pop_front();

		return packetData;
	}

public:
	EchoServer() = default;
	virtual ~EchoServer() = default;

	virtual void OnConnect(const UINT32 clientIndex_) override
	{
		std::cout << "[OnConnect] 클라이언트 : Index " << clientIndex_ << std::endl;
	}

	virtual void OnClose(const UINT32 clientIndex_) override
	{
		std::cout << "[OnClose] 클라이언트 : Index " << clientIndex_ << std::endl;
	}

	virtual void OnReceive(const UINT32 clientIndex_, const UINT32 size_, char* pData_) override
	{
		std::cout << "[OnReceive] 클라이언트 : Index " << clientIndex_ << ", dataSize " << size_ << std::endl;

		PacketData packet;
		packet.Set(clientIndex_, size_, pData_);

		std::lock_guard<std::mutex> guard(mLock);
		mPacketDataQueue.push_back(packet);
	}

	void Run(const UINT32 maxClient)
	{
		mIsRunProcessThread = true;
		mProcessThread = std::thread([this]() { ProcessPacket(); });
		
		StartServer(maxClient);
	}

	void End()
	{
		mIsRunProcessThread = false;

		if (mProcessThread.joinable())
		{
			mProcessThread.join();
		}

		DestroyThread();
	}
};

 

 

  • ClientInfo.h
    • 각 클라이언트에서 처리해야될 기능들이 존재
    • BindIOCOMpletionPort() 함수에서 클라이언트를 Completion Port에 등록
    • BindRecv() 함수에서 Recv 요청
    • SendMsg() 함수에서 Send
#pragma once

#include "Define.h"
#include <iostream>

class stClientInfo
{
private:
	INT32			mIndex = 0;
	SOCKET			mSock;
	stOverlappedEx	mRecvOverlappedEx;

	char			mRecvBuf[MAX_SOCKBUF];

public:
	stClientInfo()
	{
		ZeroMemory(&mRecvOverlappedEx, sizeof(stOverlappedEx));
		mSock = INVALID_SOCKET;
	}

	void Init(const UINT32 index)
	{
		mIndex = index;
	}

	UINT32 GetIndex() { return mIndex; }

	bool IsConnectd() {	return mSock != INVALID_SOCKET;	}

	SOCKET GetSock() { return mSock; }

	char* GetRecvBuffer() { return mRecvBuf; }

	bool OnConnect(HANDLE iocpHandle_, SOCKET socket_)
	{
		mSock = socket_;

		Clear();

		//I/O Completion Port 객체와 소켓 연결
		if (BindIOCompletionPort(iocpHandle_) == false)
		{
			return false;
		}

		//Recv Overlapped I/O 작업 요청
		return BindRecv();
	}

	void Close(bool bIsForce = false)
	{
		// SO_DONTLINGER로 설정
		struct linger stLinger = { 0, 0 };

		// SO_LINGER, timeout = 0으로 설정하여 강제 종료, 데이터 손실이 있을 수 있음
		if (bIsForce == true)
		{
			stLinger.l_onoff = 1;
		}

		shutdown(mSock, SD_BOTH);

		setsockopt(mSock, SOL_SOCKET, SO_LINGER, (char*)&stLinger, sizeof(stLinger));

		closesocket(mSock);
		mSock = INVALID_SOCKET;
	}

	void Clear()
	{
	}

	bool BindIOCompletionPort(HANDLE iocpHandle_)
	{
		auto hIOCP = CreateIoCompletionPort((HANDLE)GetSock(), iocpHandle_, (ULONG_PTR)(this), 0);

		if (hIOCP == INVALID_HANDLE_VALUE)
		{
			std::cout << "CreateIoCompletionPort() Error : " << GetLastError() << std::endl;
			return false;
		}
		return true;
	}
	
	bool BindRecv()
	{
		DWORD dwFlag = 0;
		DWORD dwRecvNumBytes = 0;

		mRecvOverlappedEx.m_wsaBuf.len = MAX_SOCKBUF;
		mRecvOverlappedEx.m_wsaBuf.buf = mRecvBuf;
		mRecvOverlappedEx.m_eOperation = IOOperation::RECV;

		if (WSARecv(mSock, &(mRecvOverlappedEx.m_wsaBuf), 1,
			&dwRecvNumBytes, &dwFlag, (LPWSAOVERLAPPED) & (mRecvOverlappedEx), NULL) == SOCKET_ERROR
			&& (WSAGetLastError() != ERROR_IO_PENDING))
		{
			std::cout << "WSARecv() Error : " << WSAGetLastError() << std::endl;
			return false;
		}
		return true;
	}

	bool SendMsg(const UINT32 dataSize_, char *pMsg_)
	{
		auto sendOverlappedEx = new stOverlappedEx;
		ZeroMemory(sendOverlappedEx, sizeof(stOverlappedEx));
		sendOverlappedEx->m_wsaBuf.len = dataSize_;
		sendOverlappedEx->m_wsaBuf.buf = new char[dataSize_];
		CopyMemory(sendOverlappedEx->m_wsaBuf.buf, pMsg_, dataSize_);
		sendOverlappedEx->m_eOperation = IOOperation::SEND;

		DWORD dwRecvNumBytes = 0;
		if (WSASend(mSock, &(sendOverlappedEx->m_wsaBuf), 1,
			&dwRecvNumBytes, 0, (LPWSAOVERLAPPED) & (sendOverlappedEx), NULL) == SOCKET_ERROR
			&& (WSAGetLastError() != ERROR_IO_PENDING))
		{
			std::cout << "WSASend() Error : " << WSAGetLastError() << std::endl;
			return false;
		}
		return true;
	}

	void SendCompleted(const UINT32 dataSize_)
	{
		std::cout << "Send Bytes : " << dataSize_ << std::endl;
	}
};

 

 

  • Packet.h
    • 수신받은 패킷에 대한 정보를 바탕으로 송신 버퍼를 할당
    • 패킷 데이터 자체는 애플리케이션에서 deque로 저장
#pragma once

#define WIN32_LEAN_AND_MEAN
#include <windows.h>

//클라이언트가 보낸 패킷을 저장하는 구조체
struct PacketData
{
	UINT32	SessionIndex = 0;
	UINT32	DataSize = 0;
	char	*pPacketData = nullptr;

	void Set(PacketData& value)
	{
		SessionIndex = value.SessionIndex;
		DataSize = value.DataSize;

		pPacketData = new char[value.DataSize];
		CopyMemory(pPacketData, value.pPacketData, value.DataSize);
	}

	void Set(UINT32 sessionIndex_, UINT32 dataSize_, char* pData)
	{
		SessionIndex = sessionIndex_;
		DataSize = dataSize_;

		pPacketData = new char[dataSize_];
		CopyMemory(pPacketData, pData, dataSize_);
	}

	void Release()
	{
		delete pPacketData;
	}
};

 

'개인공부 > IOCP 서버 제작 실습' 카테고리의 다른 글

IOCP 서버 제작 실습 6  (0) 2023.03.23
IOCP 서버 제작 실습 5  (0) 2023.03.23
IOCP 서버 제작 실습 4  (0) 2023.03.22
IOCP 서버 제작 실습 2  (0) 2023.03.21
IOCP 서버 제작 실습 1  (0) 2023.03.21

댓글