runewalsh писал(а):Говорю, забудь про QueueUserAPC, это не то ^_~
Эта штука нужна для <начал долгую операцию типа сетевого I/O> <делаешь что-то ещё> <когда стало нечего делать, вызываешь SleepEx и, если та операция закончилась, автоматически вызывается указанный в ней коллбэк завершения>. QueueUserAPC просто добавляет произвольные коллбэки, которые вызовет SleepEx.
Дак а что надо то? Вот многие говорят об отсутствии задания. Насколько я понял кодеровские форумы, то старт студентского топика, начинающийся с задания, и оканчивающийся !!!памагите чем можити!!! - обречен на закидывание камнями. Давайте дам всё задание, вряд ли это прибавит понимания
ЛАБОРАТОРНАЯ РАБОТА № 2
Цель выполнения лабораторной работы № 2 — освоить реализацию многопоточной обработки данных, а также пула потоков и механизма асинхронного ввода/вывода.
2.1 Задание
В работе необходимо реализовать многопоточную обработку массива структур данных (из N элементов) четырьмя способами:
1. При помощи массива из M потоков (M ≤ N), используя для синхронизации объект ядра — семафор.
2. При помощи массива из M потоков (M ≤ N), используя для синхронизации сеть Петри, моделирующую семафор.
3. При помощи пула из M потоков (M ≤ N), используя системный пул потоков или асинхронные потоки ввода/вывода.
4. При помощи пула из M потоков (M ≤ N), моделируя его при помощи сети Петри.
При обработке массива данных массивом потоков каждый поток либо заранее выбирает диапазон элементов массива данных, которые он будет обрабатывать, либо просто берет первый необработанный элемент. Завершив обработку одного элемента, поток переходит к обработке следующего.
При обработке массива данных пулом потоков, завершив обработку одного элемента массива данных, освободившийся в пуле поток переходит к обработке следующего необработанного элемента.
Чтобы не требовалось создавать слишком большие массивы (для которых эффект от параллельной обработки будет наиболее очевидным), можно имитировать ситуацию, когда обработка одного элемента массива требует больше процессорного времени, чем в действительности. Для этого после обработки очередного
элемента массива поток может делать паузу на указанное количество миллисекунд.
Мой вариант:
Структура содержит анкетные данные студентов (ФИО, группа, номер зачетной книжки, дата рождения). Требуется определить самых старших студентов в каждой группе. Список таких студентов вывести в выходной файл.
Вот я подобрался к 3 заданию, и такой фиг знает, что делать... Если в самом УМП (методичка) написано
В API Windows пула потоков, как такового, нет. Для создания пула рабочих потоков можно вручную создавать массив потоков вызовом функции CreateThread, а затем ожидать окончания их выполнения вызовом функции WaitForMultipleObjects(Ex) или MsgWaitForMultipleObjects(Ex). Для создания очереди асинхронных операций используется функция QueueUserAPC. В качестве сигнальных объектов могут выступать как сами процессы и потоки, так и семафоры, мьютексы, таймеры и события (табл. 3.5, п. 3.2.3). Также для организации пула потоков можно использовать порты завершения ввода/вывода (п. 3.2.3.5).
Простите за повтор.
ОК, делать массив мы умеем. Но вот прямо сейчас, прошу еще помочь. Не работает у меня ф-ция WaitForMultipleObjects.
- Код: Выделить всё
USES Windows, DOS; //Windows для потоков, DOS для времени
TYPE
Stud= record //Исследуемая структура
name :string;
gr,ID :word;
date :0..31;
month :0..12;
year :1960..2000;
end;
FuncFindDB=record //Здесь данные для создания потока
first,last :integer;
ID :LongWord;
end;
link =^FuncFindDB;
VAR
f :text;
Struct,List :array of Stud;
Threads :array of Handle;
ThreadIDs :array of FuncFindDB;
Man,Man2 :Stud;
Sem :Handle;
FUNCTION Find (p:pointer):ptrint; //Ф-ция, используемая потоками
var a :integer;
Man,Man2 :Stud;
UD :FuncFindDB;
begin
UD:=link(p)^;
Man:=Struct[UD.first];
a:=UD.first;
repeat //цикл от first до last
Man2:=Struct[a];
if Man2.gr>Man.gr then //если перешли на другую группу
begin //берем для сравнения первого ее члена
Man:=Struct[a]; continue;
end;
Compare (Man,Man2);
if List[Man.gr].gr<>0 then //возможно какой то поток уже сделал запись
begin //по исследуемой группе
Man2:=List[Man.gr]; Compare (Man,Man2); //значит сравниваем то что есть, с тем
end; //что нашел другой поток
WaitForSingleObject(Sem, INFINITE);
List[Man.gr]:=Man; //делаем запись, ограждая "критическую секцию"
ReleaseSemaphore(Sem, 1, nil);
inc (a);
sleep (PT); //задержка, предложенная в задании
until (a>UD.last);
Find:=0;
end;
BEGIN
repeat write ('Введите количество студентов ');
readln (N); until N>0;
SetLength(Struct,N+1);
repeat write ('Введите количество групп ');
readln (Ngr); until (Ngr>0);
SetLength(List,Ngr+1);
repeat write ('Введите количество потоков ');
readln (M); until (M>=0);
if M=0 then
begin
writeln ('Кол-во потоков будет равняться кол-ву ядер (',GetCore,')');
M:=GetCore;
end;
SetLength (Threads,M);
SetLength (ThreadIDs,M);
{Далее рандомно заполняем структуру Stud и сортируем ее пузырьком по группе}
Sem := CreateSemaphore(nil, 1, 1, nil);
UD.first:=1; UD.last:=N; //подготавливаем и
Find (@UD); //запускаем однопоточное вычисление
MaxD:=1;
start:=time;
for x:=0 to M-1 do //Запускаем массив потоков
begin
ThreadIDs[x].first:=MaxD;
if x=M-1 then ThreadIDs[x].last:=N else ThreadIDs[x].last:=(N div M)*(x+1);
MaxD:=ThreadIDs[x].last+1;
Threads[x]:=BeginThread(nil, 0, @Find, @ThreadIDs[x], 0, ThreadIDs[x].ID);
end;
for y:=0 to M-1 do WaitForSingleObject(Threads[y], INFINITE);//Ждем потоки
//WaitForMultipleObjects(M, @Threads, TRUE, INFINITE ); //не работает, компилируется, но не ждет потоки
for y:=0 to M-1 do CloseHandle(Threads[y]); //Закрываем потоки
CloseHandle(Sem);
END.
Сократил по максимуму, чтобы Вам головы не забивать, но вроде все понятно. Если видите вызов какой то процедуры/ф-ции непонятной, то не обращайте внимания, она есть, а что выдает - не суть.
На мой вопрос, почему не работает, преподаватель отвечает, что массив потоков должен быть динамическим... Блин, это как... Вроде он у меня и не статистический (объявляю его я без размера), но в коде он получается становится статистическим. Преподаватель даёт пример
- Код: Выделить всё
uses Windows;
type
PThreadInfo = ^TThreadInfo;
TThreadInfo = record
Id: LongWord;
N: integer;
{ ... }
end;
PThreadArray = ^TThreadArray;
TThreadArray = array [0..MAXIMUM_WAIT_OBJECTS-1] of TThreadInfo;
var
Count: integer;
Sem: Handle;
function Work(p: Pointer): PtrInt;
var
i: Integer;
t: PThreadInfo;
begin
t := PThreadInfo(p);
for i:=1 to 10 do begin
WriteLn('Thread #', t^.N, ': ', i, '/', 10);
WaitForSingleObject(Sem, INFINITE);
Count := Count + 1;
ReleaseSemaphore(Sem, 1, nil);
Sleep(500);
end;
Work := 0;
end;
var
ThreadH: PWOHandleArray;
Threads: PThreadArray;
i: Integer;
m: Integer;
begin
m := 3;
WriteLn('Start threads...');
Sem := CreateSemaphore(nil, 1, 1, nil);
GetMem(ThreadH, m*SizeOf(Handle));
GetMem(Threads, m*SizeOf(TThreadInfo));
for i:=0 to m-1 do begin
Threads^[i].N := i+1;
ThreadH^[i] := BeginThread(nil, 0, @Work, @Threads^[i], 0, Threads^[i].ID);
end;
WaitForMultipleObjects(m, ThreadH, TRUE, INFINITE);
for i:=0 to m-1 do CloseHandle(ThreadH^[i]);
CloseHandle(Sem);
FreeMem(ThreadH, m*SizeOf(Handle));
FreeMem(Threads, m*SizeOf(TThreadInfo));
WriteLn('Done! count = ', Count);
end.
Типа
Единственное, что мне не нравится - это то, что в этом примере пришлось отключить проверку на выход за границы массива ($R-).
Это он про предыдущий свой перл. Мне тоже нефига не понравились - какие то костыли.
Но данная проверка, если не считать работы с динамическим массивом, может быть полезна для обнаружения ошибок в коде. Конечно, если заранее знать, какого максимального размера может быть массив, эту проблему можно решить. Но ведь мы это знаем? Если посмотреть, как объявлен тип PWOHandleArray, то увидим следующее:
- Код: Выделить всё
type
TWOHandleArray = array[0..MAXIMUM_WAIT_OBJECTS - 1] of THandle;
PWOHandleArray = ^TWOHandleArray;
Т.е. потоков не может быть больше, чем указано в константе MAXIMUM_WAIT_OBJECTS.
Т.о. он объявляет статистический массив размером в максимум возможного. Затем, действует, на мой взгляд довольно финдебоберно... Выделяет память под массив, потом очищает эту память...
Это единственный возможный вариант? Мне он тоже не нравится, и для выполнения задания 2, я ограничился
- Код: Выделить всё
for y:=0 to M-1 do WaitForSingleObject(Threads[y], INFINITE);//Ждем потоки
Покажите, пожалуйста рабочий пример, работающей WaitForMultipleObjectПо предыдущему примеру (топику). Ну наверно и пушшай так будет, а?
Только надо что то типа такого
- Код: Выделить всё
uses windows,crt;
TYPE
DBThreads= record
ID :Cardinal;
{Запись используем для передачи каких то параметров
в нашу ф-цию}
end;
function QueueUserAPC(pfnAPC: Pointer; hThread: HANDLE; dwData: ULONG_PTR): DWORD; stdcall; external 'Kernel32.dll';
function ThreadFunction(lpParameter: Pointer): Cardinal; stdcall;
begin
while True do
SleepEx(INFINITE, True);
Exit(0);
end;
procedure PrintInteger(dwParam: ULONG_PTR); stdcall;
begin
Writeln('Integer: ', dwParam);
Sleep(500); // Изображаем бурную деятельность
end;
var
i,n :word;
Threads :array of HANDLE;
ThreadIDs :array of DBThreads;
begin
write('Skoka potokov budet v pule? ');
readln(n);
SetLength(Threads,n);
SetLength(ThreadIDs,n);
for i:=0 to n-1 do
Threads[i] := CreateThread(nil, 0, @ThreadFunction, nil, 0, ThreadIDs[i].ID);
repeat until keypressed;
for i:=0 to n-1 do
QueueUserAPC(@PrintInteger, Threads[i], i*111);
Sleep(2000);
for i:=0 to n-1 do WaitForSingleObject(Threads[i], INFINITE);
// WaitForMultipleObjects(n, @Threads, TRUE, INFINITE );
for i:=0 to n-1 do TerminateThread (Threads[i],0);
readln
end.
Типа такого не работает. Ошибки при исполнении. Явная рассинхронизация и т.п. И мне трудно понять почему, потому что пока вообще трудно понять весь объём знаний по применению WinAPI. Тем более, что источником этих знаний является форум учебного заведения, где мы с преподавателем беседуем, и теперь - этот форум.
Помогите определиться Что мне надо, чтобы хоть как то выполнить это задание №3? За готовый пример готов полдня печатать СПАСИБО. Могу и вообще в тугриках
Добавлено спустя 40 минут 46 секунд:Про системный пул потоков я понимаю так - создал его, а потом вроде
Эй, кто там в пуле - фас на эту ф-цию с такими то параметрами...
И поток сам отработал, сам уснул
Вот и все что у меня что у меня в голове.
С API я пока ума не приложу что можно сделать
Добавлено спустя 1 час 59 минут 18 секунд:Ещё посидел, почесал затылок...
Оградил строчку вывода записи семафором. Вывод пошел вроде корректно, но в конце виснет колом.
- Код: Выделить всё
uses windows,crt;
TYPE
DBThreads= record
ID :Cardinal;
{Запись используем для передачи каких то параметров
в нашу ф-цию}
end;
var
i,n :word;
Threads :array of HANDLE;
ThreadIDs :array of DBThreads;
Sem :HANDLE;
function QueueUserAPC(pfnAPC: Pointer; hThread: HANDLE; dwData: ULONG_PTR): DWORD; stdcall; external 'Kernel32.dll';
function ThreadFunction(lpParameter: Pointer): Cardinal; stdcall;
begin
while True do
SleepEx(INFINITE, True);
Exit(0);
end;
procedure PrintInteger(dwParam: ULONG_PTR); stdcall;
begin
WaitForSingleObject(Sem, INFINITE);
Writeln('Integer: ', dwParam);
ReleaseSemaphore(Sem, 1, nil);
Sleep(500); // Изображаем бурную деятельность
end;
begin
write('Skoka potokov budet v pule? ');
readln(n);
SetLength(Threads,n);
SetLength(ThreadIDs,n);
for i:=0 to n-1 do
Threads[i] := CreateThread(nil, 0, @ThreadFunction, nil, 0, ThreadIDs[i].ID);
// repeat until keypressed;
Sem := CreateSemaphore(nil, 1, 1, nil);
for i:=0 to n-1 do
QueueUserAPC(@PrintInteger, Threads[i], i*111);
Sleep(2000);
for i:=0 to n-1 do WaitForSingleObject(Threads[i], INFINITE);
// WaitForMultipleObjects(n, @Threads, TRUE, INFINITE );
for i:=0 to n-1 do TerminateThread (Threads[i],0);
// for i:=0 to n-1 do CloseHandle(Threads[i]);
readln
end.
Добавлено спустя 10 минут 30 секунд:Сдается мне, что в последнем этом варианте
Sleep (2000) недостаточно, точнее сказать, она мне кажется затычкой, и работать должна WaitForMultipleObjects. Для этого, наверно сделать PrintInteger функцией с возвратом 0:ptrint
Сижу тут строчу... Все москвичи похоже?