Thread mit Queue synchronisieren

Tikonteroga

Erfahrenes Mitglied
Hallo,

ich programmiere gerade mit der Win32 API. Ich benötige eine Queue in die ein Thread ein Element einfügen und ein anderer Thread ein Element herausnehmen kann. Wenn die Queue leer ist, soll der Thread, der Elemente herausnimmt schlafen.

Grüße
 
Hallo Tokonteroga

Und woran steckst du gerade fest? Was hast du dir schon für Überlegungen gemacht, welches Teilproblem ist dir nicht klar?

Viele Grüsse
Cromon
 
Also ich eine Semaphore könnte ich verwenden um die Anzahl an Elemente abzubilden. Also der Thread, der ein Element zur Queue hinzufügt könnte nach dem Hinzufügen die Funktion ReleaseSemaphore aufrufen. Der Thread der Elemente herausnimmt, wartet über die Funktion WaitForSingleObject bis Elemente in der Queue sind. Die Frage wäre jetzt darf ich bei einer Queue zur gleichen Zeit Elemente hinzufügen und herausnehmen?
 
Hallo

Was nutzt du denn für eine Queue? std::vector, array, ...? std::vector soll thread-safe sein.
Grundsätzlich fährt man aber mit einem Mutex recht gut.

Gruss
cwriter

/EDIT: Hach, richtig lesen. Ein Semaphor geht natürlich auch.
Schlussendlich hast du dann einen Mutex, der die Queue "bewacht" und ein Semaphor für den Rest.
Falsch, sorry. Der Semaphor schaut ja bereits auf die Zugriffe, mehr brauchst du also nicht.
 
Zuletzt bearbeitet:
Hallo,

wenn du sagst du programmierst gerade mit der Win32 API, würde es sich ja anbieten auch
gleich auf diese zurück zu greifen.

Mit GetMessage und PostThreadMessage kann man das von dir beschriebene Verhalten auch umsetzen. Weitere Informationen gibt es dazu in der MSDN:
http://msdn.microsoft.com/en-us/library/windows/desktop/ms644946(v=vs.85).aspx
http://msdn.microsoft.com/en-us/library/windows/desktop/ms644936(v=vs.85).aspx

Im großen und ganzen würde es so aussehen, dass ein Thread sich die nächste Nachricht seines
eigenen Message Queues holt(mit GetMessage), oder wartet, falls keine Nachricht
vorhanden ist. Die anderen Threads könnten nun mit PostThreadMessage Nachrichten an
den Message Queue des verarbeitenden Threads anhängen.

Ich habe noch ein kleines Beispiel gemacht(ohne Fehlerbehandlung):
C:
#include <Windows.h>

#include <stdio.h>

#define NUM_PRODUCER_THREADS 3

#define MSG_PRODUCER_STARTED (WM_USER + 1)
#define MSG_PRODUCER_TICK (WM_USER + 2)

struct ProducerParams
{
	DWORD dwConsumerThread;
	HANDLE hShutdownEvent;
};

static DWORD WINAPI ProducerThread(LPVOID lpvData);

int main(int argc, char** argv)
{
	HANDLE hShutdownEvent;
	struct ProducerParams threadParams;
	SIZE_T i;
	HANDLE* hProducerThreads;
	BOOL bRes;
	MSG msg;

	// Make sure the message queue exists before we start the producers
	PeekMessageW(&msg, (HWND)-1, 0, 0, PM_NOREMOVE);
	// Shutdown event, set in this thread when WM_QUIT was recived
	hShutdownEvent = CreateEventW(NULL, TRUE, FALSE, L"app_shutdown_event");

	threadParams.dwConsumerThread = GetCurrentThreadId();
	threadParams.hShutdownEvent = hShutdownEvent;
	hProducerThreads = (HANDLE*)malloc(sizeof(HANDLE) * NUM_PRODUCER_THREADS);
	for (i = 0;i < NUM_PRODUCER_THREADS;++ i)
	{
		hProducerThreads[i] = CreateThread(NULL, 0, &ProducerThread, &threadParams, 0, NULL);
	}

	ZeroMemory(&msg, sizeof(msg));
	// -1 as third parameter ensures we only get thread messages with hwnd=NULL
	// GetMessage returns 0 when it recives the WM_QUIT message
	// GetMessage returns -1 when an error occured
	while (bRes = GetMessage(&msg, (HWND)-1, 0, 0))
	{
		switch (msg.message)
		{
		case MSG_PRODUCER_STARTED:
			// wParam is set by the producer thread to it's own thread id
			printf("Producer started: %u\n", msg.wParam);
			break;
		case MSG_PRODUCER_TICK:
			// wParam is set by the producer thread to it's own thread id
			printf("Producer tick: %u\n", msg.wParam);
			break;
		default:
			printf("Unknown message.\n");
			break;
		}
	}

	SetEvent(hShutdownEvent);
	WaitForMultipleObjects(NUM_PRODUCER_THREADS, hProducerThreads, TRUE, INFINITE);
	free(hProducerThreads);

	// -1 indicates an error with GetMessage
	if (bRes == -1)
	{
		// message loop exit with error
		printf("Message loop terminated with error %u!\n", GetLastError());
		return 1;
	}

	return 0;
}

DWORD WINAPI ProducerThread(LPVOID lpvData)
{
	struct ProducerParams* threadParams;
	SIZE_T i;

	threadParams = (struct ProducerParams*)lpvData;

	PostThreadMessageW(threadParams->dwConsumerThread, MSG_PRODUCER_STARTED, GetCurrentThreadId(), NULL);

	for (i = 0;i < 10;++ i)
	{
		if (WaitForSingleObject(threadParams->hShutdownEvent, 0) != WAIT_TIMEOUT)
		{
			break;
		}

		PostThreadMessage(threadParams->dwConsumerThread, MSG_PRODUCER_TICK, GetCurrentThreadId(), NULL);
	}

	PostThreadMessageW(threadParams->dwConsumerThread, WM_QUIT, 0, 0);
	WaitForSingleObject(threadParams->hShutdownEvent, INFINITE);
	return 0;
}
 
Zuletzt bearbeitet:
Hallo,

danke für das Code-Beispiel.

Also ich habe mir das jetzt nochmal vom Konzept her überlegt.

Also ich muss eine API erstellen, die aus einer Header-Datei (*.h) und einer Source-Datei (*.cpp) besteht. Beim Kompilieren wird eine statische Bibliothek erstellt.

Innerhalb der Source-Datei verwende ich die WinSock2- und die Win32-API um über Sockets pro HANDLE jeweils TCP- oder UDP-Nachrichten zu senden und zu empfangen. Dabei gibt es zwei unterschiedliche Kommunikationsarten.

  1. Der Master sendet ein Request, das vom Slave durch ein Response beantwortet werden muss. Nach der Verarbeitung von einem Response, kann der nächste Request erfolgen.
  2. Der Slave sendet zyklisch Nachrichten an den Master, die nicht beantwortet werden müssen.

Ich implementiere den Master.

  1. Es gibt mehrere unterschiedliche Funktionen, die einen bestimmten Request mit der Funktion send() absenden. Im Anschluss wird die Funktion WaitForSingleObject aufgerufen.
  2. Zusätzlich gibt es einen internen Thread, der in einer Schleife mit der select() Funktion den SOCKET abhört. Sobald die blockierende Funktion select() beendet wurde, wird die Nachricht mit recv() empfangen. Wenn es sich um ein Response vom Slave handelt wird die Nachricht in einem Speicherbereich abgelegt und mit SetEvent)) ein Event ausgelegt. Die Funktion, die den Request gesendet hat, wird dann nicht mehr durch WaitForSingleObject() blockiert (siehe 1.). Wenn es sich aber um eine zyklische Nachricht vom Slave handelt, soll die Nachricht in einer Queue abgelegt werden und mit ReleaseSemaphore() eine Semaphore inkrementiert werden.

Jetzt fehlt mir noch folgendes.

Ich muss über meine API einen Aufrufer darüber informieren, dass ein Element in die Queue eingefügt wurde. Hierfür habe ich m. M. nach zwei Möglichkeiten.

  1. Ich gebe dem Aufrufer den HANDLE auf die Semaphore, damit er mit WaitForSingleObject() in einem Thread warten kann und in über eine Funktion ein Element aus der Queue lesen kann.
  2. Ich stelle dem Aufrufer eine Funktion zur Verfügung, die die Semaphore intern verwendet und blockiert, wenn kein Element in der Queue ist.

Ich finde die zweite Lösung besser, weil hier die Implementieren besser verborgen wird und der Aufrufer meinen HANDLE mit CloseHandle() nicht schließen kann.

Jetzt benötige ich nur noch eine Queue, die über die Semaphore synchronisiert wird und in die ich zur gleichen Zeit Elemente einfügen und herausnehmen kann.

Ich glaube die MessageQueue ist in meinem Fall nicht geeignet, da ich den Thread des Aufrufers nicht kenne...

Grüße
 
Zurück