Serwer TCP jest opracowywany przy użyciu SocketAsyncEventArgs i jego asynchronicznych metod jako usługi systemu Windows. Mam te 2 linie kodu na początku Main:20 Odbieranie na sekundę za pomocą SocketAsyncEventArgs
ThreadPool.SetMaxThreads(15000, 30000);
ThreadPool.SetMinThreads(10000, 20000);
I obie zwracają true (zwrócone wartości są rejestrowane). Teraz od 2000 do 3000 klientów zaczyna wysyłać wiadomości do tego serwera i zaczyna akceptować połączenia (liczę liczbę połączeń i jest to zgodne z oczekiwaniami - istnieje pula połączeń). Liczba wątków procesu serwera wzrośnie do ~ 2050 do ~ 3050. Jak na razie dobrze!
Teraz istnieje metoda Received, która będzie wywoływana po tym, jak ReceiveAsync zwróci true lub przez Completed event of SocketAsyncEventArgs.
A tutaj zaczynają się problemy: Nie ma znaczenia, ile klientów jest połączonych i ile wysyłanych wiadomości, Odebrane będą wywoływane co najwyżej 20 razy na sekundę! Wraz ze wzrostem liczby klientów ta liczba (20) spada do ~ 10.
Środowisko: Serwer TCP i klienci są symulowani na tym samym komputerze. Przetestowałem kod na 2 komputerach, jeden ma dwurdzeniowy procesor i 4 GB pamięci RAM, a drugi ma 8-rdzeniowy procesor i 12 GB pamięci RAM. Nie ma utraty danych (jeszcze) i czasami otrzymuję więcej niż 1 komunikat w każdej operacji odbierania. W porządku. Ale w jaki sposób można zwiększyć liczbę operacji odbierania?
Dodatkowe uwagi dotyczące wdrożenia: Kod jest duży i zawiera wiele różnych logiki. Ogólny opis będzie następujący: Mam jedną SocketAsyncEventArgs do akceptowania nowych połączeń. Działa świetnie. Teraz dla każdego nowego zaakceptowanego połączenia tworzę nowy SocketAsyncEventArgs do odbierania danych. Umieściłem ten (SocketAsyncEventArgs stworzony do odbioru) w puli. Nie zostanie ponownie użyty, ale jest to UserToken używany do śledzenia połączeń; na przykład te połączenia, które są rozłączone lub połączenia, które nie wysłały żadnych danych przez 7 minut, zostaną zamknięte i usunięte (AcceptSocket z SocketAsyncEventArgs zostanie wyłączony (oba), zamknięty i usunięty, podobnie samo SocketAsyncEventArgs). Tutaj jest klasa Sudo, który wykonuje te zadania, ale wszystkie inne logiki i rejestrowanie i sprawdzanie błędów i nic innego nie jest usuwany, aby to proste i jasne (może wtedy łatwiej jest dostrzec kod problematyczny):
class Sudo
{
Socket _listener;
int _port = 8797;
public Sudo()
{
var ipEndPoint = new IPEndPoint(IPAddress.Any, _port);
_listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
_listener.Bind(ipEndPoint);
_listener.Listen(100);
Accept(null);
}
void Accept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += AcceptCompleted;
}
else acceptEventArg.AcceptSocket = null;
bool willRaiseEvent = _listener.AcceptAsync(acceptEventArg); ;
if (!willRaiseEvent) Accepted(acceptEventArg);
}
void AcceptCompleted(object sender, SocketAsyncEventArgs e)
{
Accepted(e);
}
void Accepted(SocketAsyncEventArgs e)
{
var acceptSocket = e.AcceptSocket;
var readEventArgs = CreateArg(acceptSocket);
var willRaiseEvent = acceptSocket.ReceiveAsync(readEventArgs);
Accept(e);
if (!willRaiseEvent) Received(readEventArgs);
}
SocketAsyncEventArgs CreateArg(Socket acceptSocket)
{
var arg = new SocketAsyncEventArgs();
arg.Completed += IOCompleted;
var buffer = new byte[64 * 1024];
arg.SetBuffer(buffer, 0, buffer.Length);
arg.AcceptSocket = acceptSocket;
arg.SocketFlags = SocketFlags.None;
return arg;
}
void IOCompleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
Received(e);
break;
default: break;
}
}
void Received(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success || e.BytesTransferred == 0 || e.Buffer == null || e.Buffer.Length == 0)
{
// Kill(e);
return;
}
var bytesList = new List<byte>();
for (var i = 0; i < e.BytesTransferred; i++) bytesList.Add(e.Buffer[i]);
var bytes = bytesList.ToArray();
Process(bytes);
ReceiveRest(e);
Perf.IncOp();
}
void ReceiveRest(SocketAsyncEventArgs e)
{
e.SocketFlags = SocketFlags.None;
for (int i = 0; i < e.Buffer.Length; i++) e.Buffer[i] = 0;
e.SetBuffer(0, e.Buffer.Length);
var willRaiseEvent = e.AcceptSocket.ReceiveAsync(e);
if (!willRaiseEvent) Received(e);
}
void Process(byte[] bytes) { }
}
MB RAM? Czy to jest I-cache? Mam nadzieję, że masz kilka GB pamięci. Czy korzystasz z Monitora zasobów lub Eksploratora procesów, aby znaleźć wąskie gardła? – HABO
Hmm ... Jaka jest asynchroniczna architektura na twoim serwerze? Czy mógłbyś zamieścić jakiś pseudo-kod, może opisać, jak odbywa się słuchanie i jak połączenia są akceptowane, itd.? –
@HABO Dzięki! Poprawiłem to. –