Критические секции. Состояния потоков Специальные объекты ядра

Последнее обновление: 31.10.2015

Нередко в потоках используются некоторые разделяемые ресурсы, общие для всей программы. Это могут быть общие переменные, файлы, другие ресурсы. Например:

Class Program { static int x=0; static void Main(string args) { for (int i = 0; i < 5; i++) { Thread myThread = new Thread(Count); myThread.Name = "Поток " + i.ToString(); myThread.Start(); } Console.ReadLine(); } public static void Count() { x = 1; for (int i = 1; i < 9; i++) { Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x); x++; Thread.Sleep(100); } } }

Здесь у нас запускаются пять потоков, которые работают с общей переменной x. И мы предполагаем, что метод выведет все значения x от 1 до 8. И так для каждого потока. Однако в реальности в процессе работы будет происходить переключение между потоками, и значение переменной x становится непредсказуемым.

Решение проблемы состоит в том, чтобы синхронизировать потоки и ограничить доступ к разделяемым ресурсам на время их использования каким-нибудь потоком. Для этого используется ключевое слово lock . Оператор lock определяет блок кода, внутри которого весь код блокируется и становится недоступным для других потоков до завершения работы текущего потока. И мы можем переделать предыдущий пример следующим образом:

Class Program { static int x=0; static object locker = new object(); static void Main(string args) { for (int i = 0; i < 5; i++) { Thread myThread = new Thread(Count); myThread.Name = "Поток " + i.ToString(); myThread.Start(); } Console.ReadLine(); } public static void Count() { lock (locker) { x = 1; for (int i = 1; i < 9; i++) { Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x); x++; Thread.Sleep(100); } } } }

Для блокировки с ключевым словом lock используется объект-заглушка, в данном случае это переменная locker . Когда выполнение доходит до оператора lock, объект locker блокируется, и на время его блокировки монопольный доступ к блоку кода имеет только один поток. После окончания работы блока кода, объект locker освобождается и становится доступным для других потоков.

План занятия:

· Семафоры

· Мьютекс

· Правила упрощенного параллелизма

· Рекурсивный мьютекс

· Условные переменные

Механизмами синхронизации являются средства операционной системы, которые помогают решать основную задачу синхронизации - обеспечивать координацию потоков, которые работают с совместно используемыми данными. Такие средства являются минимальными блоками для построения многопоточных программ, их называют синхронизационными механизмами.

Синхронизационные механизмы подразделяют на следующие основные категории:

  • универсальные для низкого уровня - можно использовать различными способами (семафоры);
  • простые для низкого уровня - каждый из которых приспособлен к решению только одной задачи (мьютексы и условные переменные);
  • универсальные для высокого уровня, выраженные через простые - к этой группе относится концепция монитора, которая может быть выражена через мьютексы и условные переменные;
  • простые для высокого уровня - приспособленные к решению конкретной синхронизационной задачи (блокировка чтения-записи и барьеры).

Семафоры

Концепцию семафоров предложил в 1965 году Э. Дейкстра - известный голландский специалист в области компьютерных наук. Семафоры являются старейшими синхронизационными примитивами из числа применяемых на практике.

Семафор - это совместно используемый неотъемлемый целочисленный счетчик, для которого задано начальное значение и определены следующие атомарные операции.

  • Уменьшение семафора (down): если значение семафора больше нуля, его уменьшают на единицу, если же значение равно нулю, этот поток переходит в состояние ожидания до тех пор, пока оно не станет больше нуля (говорят, что поток «ожидает семафор» или «заблокирован на семафоре»). Эту операцию называют также ожиданиям - wait;
  • Увеличение семафора (up): значение семафора увеличивается на единицу; когда при этом есть потоки, которые ожидают на семафоре, один из них выходит из ожидания и выполняет свою операцию уменьшения. Если на семафоре ожидают несколько потоков, то вследствие выполнения операции увеличения, его значение остается нулевым, но один из потоков продолжает выполнение (в большинстве реализаций выбор этого потока будет случайным). Эту операцию также называют сигнализацией - post.

Фактически значение семафора определяет количество потоков, которое может пройти через этот семафор без блокировки. Когда для семафора задано нулевое начальное значение, то он будет блокировать все потоки до тех пор, пока какой-то поток его не "откроет", выполнив операцию up. Операции up и down могут быть выполнены любыми потоками, имеющих доступ к семафора.


Мьютекс называют синхронизацией примитив, что не допускает выполнения некоторого фрагмента кода более чем одним потоком. Фактически мьютекс является реализацией блокировки на уровне ОС.

Мьютекс, как и следует из его названия, реализует взаимное исключение. Его основная задача - блокировать все потоки, которые пытаются получить доступ к коду, когда этот код уже выполняет некоторый поток.

Мьютекс может находиться в двух состояниях: свободном и занятом. Начальным состоянием является «свободный». Над мьютекс возможны две атомарные операции.

  • Занять мьютекс: если мьютекс был свободен, он становится занятым, и поток продолжает свое выполнение (входя в критическую секцию); если мьютекс был занят, поток переходит в состояние ожидания (говорят, что поток «ожидает мьютекс», или «заблокирован на мьютекс»), выполнение продолжает другой поток. Поток, который занял мьютекс, называют владельцем мьютекс;
  • Освободить мьютекс: мьютекс становится свободным; если на нем ожидают несколько потоков, из них выбирают один, он начинает выполняться, занимает мьютекс и входит в критическую секцию. В большинстве реализаций выбор потока будет случайным. Освободить мьютекс может только его владелец.

Правила упрощенного параллелизма

Правила упрощенного параллелизма предназначены для упрощения программирования на базе мьютексов. Они основываются на том очевидном факте, что мьютекс защищает не код критической секции, а совместно используемые данные внутри этой секции. Особенности работы упрощенного параллелизма:

  • Каждая переменная, которую совместно использует более одного поток, должна быть защищена отдельным мьютексом;
  • Перед каждой операцией изменения такой переменной, соответствующий мьютекс должен быть занят, а после изменения освобожден;
  • Если надо работать одновременно с несколькими совместно используемыми переменными, необходимо занять все их мьютексы до начала работы и освободить их только после полного окончания работы.

Рекурсивный мьютекс

Рекурсивный мьютекс - особый вид мьютекса. Он позволяет повторное занятие тем же потоком, а также отслеживает, какой поток пытается его занять.

Когда это не тот поток, который уже занимает, мьютекс ведет себя как обычный, и поток переходит в состояние ожидания. Когда же это тот самый поток, внутренний счетчик блокировок этого мьютекс увеличивают на единицу, и поток продолжает свое выполнение. В случае увольнения мьютекс внутренний счетчик уменьшается на единицу, для других потоков рекурсивный мьютекс будет разблокирована только тогда, когда счетчик дойдет до нуля (то есть когда все его блокировки одним потоком будут сняты).

Рекурсивные мьютексы менее эффективны в реализации, не могут быть использованы вместе с условными переменными (этот вопрос рассмотрим в следующем разделе), поэтому обращаться к ним нужно только тогда, когда без этого нельзя обойтись. Например, библиотека функций может использовать такие мьютексы для того, чтобы избежать взаимных блокировок при повторном вызове таких функций одним потоком.

Условные переменные

Условной переменной называют синхронизационные примитивы, позволяющие организовать ожидания выполнения условия внутри критической секции, заданной мьютексом. Условная переменная всегда связана с конкретным мьютексом и данными, защищенными этим мьютексом. Для условной переменной определены следующие операции:

  1. Ожидания (wait). Дополнительным входным параметром эта операция принимает мьютекс, который должен находиться в закрытом состоянии. Вызов ожидания происходит в ситуации, когда не выполняется некоторое условие и нужны потоки для продолжения работы. Вследствие выполнения ожидания поток прекращается (говорят, что он «ожидает условной переменной»), а мьютекс открывается (эти два действия происходят атомарно). Так другие потоки получают возможность войти в критическую секцию и изменить там данные, которые она защищает, возможно, выполнив условие, необходимое потоку. На этом операция ожидания не заканчивается - ее завершит другой поток, вызвав операцию сигнализации после того, как условие будет выполнено.
  2. Сигнализация (signal). Эту операцию поток должен выполнить после того, как войдет в критическую секцию и завершит работу с данными (выполнив условие, которое ожидал поток, вызвавший операцию wait). Эта операция проверяет, нет ли потоков, ожидающих условной переменной, и если такие потоки есть, переводит один из них в состояние готовности. В результате восстановления поток завершает выполнение операции ожидания и блокирует мьютекс (обновления и блокировки тоже происходят атомарно). Если нет ни одного потока, который ожидает условной переменной, операция сигнализирования не делает ничего, и информацию о ее выполнении в системе не сохраняют.
  3. Широковещательная сигнализация (broadcast) отличается от обычной тем, что перевод в состояние готовности и восстановление выполняют для всех потоков, ожидающих этой условной переменной, а не только для одного из них.

Таким образом, выполнение операции ожидания состоит из следующих этапов: открытие мьютекс, ожидания (пока другой поток не выполнит операцию signal или broadcast), закрытие мьютекс.

В операционной системе Windows поддерживаются четыре типа объекта синхронизации.

  • Первый тип представляет собой классический семафор и может быть использован для управления доступом к определенным ресурсам ограниченного количества потоков. Разделяемый ресурс в этом случае может быть использован одним, и только одним потоком, либо некоторым числом потоков из множества претендующих на этот ресурс. Семафоры реализуются как простые счетчики, значения которых увеличиваются, когда поток освобождает семафор, и уменьшаются, когда поток занимает семафор.
  • Второй тип объектов синхронизации называется исключающим семафором (mutex). Исключающие семафоры применяются для разделения ресурсов таким образом, что в любой момент времени их может использовать один и только один поток. Очевидно, что исключающий семафор представляет собой специальный тип обычного семафора.
  • Третий тип объектов синхронизации - это событие (event). События могут служить для блокировки доступа к ресурсу до тех пор, пока другой поток не сигнализирует об его освобождении.
  • Четвертый тип объектов синхронизации представляет собой критическую секцию (critical section). При вхождении потока в критическую секцию никакой другой поток не может начать ее выполнение до того, как работающий с ней поток не выйдет из нее.

Следует отметить, что синхронизация потоков с помощью критических секций может осуществляться только в рамках одного процесса, так как невозможно передать адрес критической секции из одного процесса в другой, так как критическая секция располагается в области глобальных переменных запущенного процесса.

Семафор создается с помощью функции CreateSemaphore. Количество задач, одновременно имеющих доступ к некоторому ресурсу, определяется одним из параметров функции. Если значение этой функции равно 1, то семафор работает как исключающий. При успешном создании семафора возвращается его дескриптор, в противном случае - null. Функция WaitForSingleObject обеспечивает режим ожидания семафора. Один из параметров указывает время его ожидания в миллисекундах. Если значение этого параметра равно INFINITE, то время ожидания неопределено. При успешном завершении функции значение счетчика, связанного с семафором, уменьшается не единицу. Функция ReleaseSemaphore() освобождает семафор, позволяя использовать его другому потоку.

Исключающий семафор mutex создается с помощью функции CreateMutex(), которая возвращает идентификатор созданного объекта или null в случае ошибки. В случае необходимости объект освобождается с помощью универсальной функции CloseHandle(). Зная имя объекта mutex, его можно открыть с помощью функции OpenMutex(). С помощью этой функции несколько потоков могут открыть один и тот же объект, а затем одновременно выполнить его ожидание. После того, как имя объекта стало известно потоку, он может им завладеть, используя функции WaitForSingleObject или WaitForMultipleObjects. Освобождение объекта mutex осуществляется с помощью функции ReleaseMutex().

Событие создается с помощью функции CreateEvent. Она возвращает дескриптор созданного события или null в случае неуспешного завершения. После создании события поток просто ожидает его наступления используя функцию WaitForSingleObject, задавая в качестве первого параметра для нее дескриптор этого события. Тем самым выполнение потока приостанавливается до наступления соответствующего события. После вызова функции SetEvent процесс, ожидающий данного события с помощью функции WaitForSingleObject, продолжит свое выполнение. Событие может быть сброшено с помощью функции ResetEvent.

При одновременном доступе нескольких процессов (или нескольких потоков одного процесса) к какому-либо ресурсу возникает проблема синхронизации. Поскольку поток в Win32 может быть остановлен в любой, заранее ему неизвестный момент времени, возможна ситуация, когда один из потоков не успел завершить модификацию ресурса (например, отображенной на файл области памяти), но был остановлен, а другой поток попытался обратиться к тому же ресурсу. В этот момент ресурс находится в несогласованном состоянии, и последствия обращения к нему могут быть самыми неожиданными - от порчи данных до нарушения защиты памяти.

Главной идеей, заложенной в основе синхронизации потоков в Win32, является использование объектов синхронизации и функций ожидания. Объекты могут находиться в одном из двух состояний - Signaled или Not Signaled. Функции ожидания блокируют выполнение потока до тех пор, пока заданный объект находится в состоянии Not Signaled. Таким образом, поток, которому необходим эксклюзивный доступ к ресурсу, должен выставить какой-либо объект синхронизации в несигнальное состояние, а по окончании - сбросить его в сигнальное. Остальные потоки должны перед доступом к этому ресурсу вызвать функцию ожидания, которая позволит им дождаться освобождения ресурса.

Рассмотрим, какие объекты и функции синхронизации предоставляет нам Win32 API.

Функции синхронизации

Функции синхронизации делятся на две основные категории: функции, ожидающие единственный объект, и функции, ожидающие один из нескольких объектов.

Функции, ожидающие единственный объект

Простейшей функцией ожидания является функция WaitForSingleObject:

Function WaitForSingleObject(hHandle: THandle; // идентификатор объекта dwMilliseconds: DWORD // период ожидания): DWORD; stdcall;

Функция ожидает перехода объекта hHandle в сигнальное состояние в течение dwMilliseconds миллисекунд. Если в качестве параметра dwMilliseconds передать значение INFINITE, функция будет ждать в течение неограниченного времени. Если dwMilliseconds равен 0, то функция проверяет состояние объекта и немедленно возвращает управление.

Функция возвращает одно из следующих значений:

Следующий фрагмент кода запрещает доступ к Action1 до перехода объекта ObjectHandle в сигнальное состояние (например, таким образом можно дожидаться завершения процесса, передав в качестве ObjectHandle его идентификатор, полученный функцией CreateProcess):

Var Reason: DWORD; ErrorCode: DWORD; Action1.Enabled:= FALSE; try repeat Application.ProcessMessages; Reason:= WailForSingleObject(ObjectHandle, 10); if Reason = WAIT_FAILED then begin ErrorCode:= GetLastError; raise Exception.CreateFmt(‘Wait for object failed with error: %d’, ); end; until Reason <> WAIT_TIMEOUT; finally Actionl.Enabled:= TRUE; end;

В случае когда одновременно с ожиданием объекта требуется перевести в сигнальное состояние другой объект, может использоваться функция SignalObjectAndWait:

Function SignalObjectAndWait(hObjectToSignal: THandle; // объект, который будет переведен в // сигнальное состояние hObjectToWaitOn: THandle; // объект, который ожидает функция dwMilliseconds: DWORD; // период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // завершение операции ввода-вывода): DWORD; stdcall;

Возвращаемые значения аналогичны функции WaitForSingleObject.

Объект hObjectToSignal может быть семафором, событием (event) либо мьютексом. Параметр bAlertable определяет, будет ли прерываться ожидание объекта в случае, если операционная система запросит у потока окончание операции асинхронного ввода-вывода либо асинхронный вызов процедуры. Более подробно это будет рассматриваться ниже.

Функции, ожидающие несколько объектов

Иногда требуется задержать выполнение потока до срабатывания одного или сразу всех из группы объектов. Для решения подобной задачи используются следующие функции:

Type TWOHandleArray = array of THandle; PWOHandleArray = ^TWOHandleArray; function WaitForMultipleObjects(nCount: DWORD; // Задает количество объектов lpHandles: PWOHandleArray; // Адрес массива объектов bWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds: DWORD // Период ожидания): DWORD; stdcall;

Функция возвращает одно из следующих значений:

Число в диапазоне от

WAIT_OBJECT_0 до WAIT_OBJECT_0 + nCount – 1

Если bWaitAll равно TRUE, то это число означает, что все объекты перешли в сигнальное состояние. Если FALSE - то, вычтя из возвращенного значения WAIT_OBJECT_0, мы получим индекс объекта в массиве lpHandles

Число в диапазоне от

WAIT_ABANDONED_0 до WAIT_ABANDONED_0 + nCount – 1

Если bWaitAll равно TRUE, это означает, что все объекты перешли в сигнальное состояние, но хотя бы один из владевших ими потоков завершился, не сделав объект сигнальным Если FALSE - то, вычтя из возвращенного значения WAIT_ABANDONED_0, мы получим в массиве lpHandles индекс объекта, при этом поток, владевший этим объектом, завершился, не сделав его сигнальным
WAIT_TIMEOUT Истек период ожидания
WAIT_FAILED Произошла ошибка

Например, в следующем фрагменте кода программа пытается модифицировать два различных ресурса, разделяемых между потоками:

Var Handles: array of THandle; Reason: DWORD; RestIndex: Integer; ... Handles := OpenMutex(SYNCHRONIZE, FALSE, ‘FirstResource’); Handles := OpenMutex(SYNCHRONIZE, FALSE, ‘SecondResource’); // Ждем первый из объектов Reason:= WaitForMultipleObjects(2, @Handles, FALSE, INFINITE); case Reason of WAIT_FAILED: RaiseLastWin32Error; WAIT_OBJECT_0, WAIT_ABANDONED_0: begin ModifyFirstResource; RestIndex:= 1; end; WAIT_OBJECT_0 + 1, WAIT_ABANDONED_0 + 1: begin ModifySecondResource; RestIndex:= 0; end; // WAIT_TIMEOUT возникнуть не может end; // Теперь ожидаем освобождения следующего объекта if WailForSingleObject(Handles, INFINITE) = WAIT_FAILED then RaiseLastWin32Error; // Дождались, модифицируем оставшийся ресурс if RestIndex = 0 then ModifyFirstResource else ModifySecondResource;

Описанную выше технику можно применять, если вы точно знаете, что задержка ожидания объекта будет незначительной. В противном случае ваша программа окажется «замороженной» и не сможет даже перерисовать свое окно. Если период задержки может оказаться значительным, то необходимо дать программе возможность реагировать на сообщения Windows. Выходом может стать использование функций с ограниченным периодом ожидания (и повторный вызов - в случае возврата WAIT_TIMEOUT) либо функции MsgWaitForMultipleObjects:

Function MsgWaitForMultipleObjects(nCount: DWORD; // количество объектов синхронизации var pHandles; // адрес массива объектов fWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds, // Период ожидания dwWakeMask: DWORD // Тип события, прерывающего ожидание): DWORD; stdcall;

Главное отличие этой функции от предыдущей - параметр dwWakeMask, который является комбинацией битовых флагов QS_XXX и задает типы сообщений, прерывающих ожидание функции независимо от состояния ожидаемых объектов. Например, маска QS_KEY позволяет прервать ожидание при появлении в очереди сообщений WM_KEYUP, WM_KEYDOWN, WM_SYSKEYUP или WM_SYSKEYDOWN, а маска QS_PAINT - сообщения WM_PAINT. Полный список значений, допустимых для dwWakeMask, имеется в документации по Windows SDK. При появлении в очереди потока, вызвавшего функцию, сообщений, соответствующих заданной маске, функция возвращает значение WAIT_OBJECT_0 + nCount. Получив это значение, ваша программа может обработать его и снова вызвать функцию ожидания. Рассмотрим пример с запуском внешнего приложения (необходимо, чтобы на время его работы вызывающая программа не реагировала на ввод пользователя, однако ее окно должно продолжать перерисовываться):

Procedure TForm1.Button1Click(Sender: TObject); var PI: TProcessInformation; SI: TStartupInfo; Reason: DWORD; Msg: TMsg; begin // Инициализируем структуру TStartupInfo FillChar(SI, SizeOf(SI), 0); SI.cb:= SizeOf(SI); // Запускаем внешнюю программу Win32Check(CreateProcess(NIL, "COMMAND.COM", NIL, NIL, FALSE, 0, NIL, NIL, SI, PI)); //************************************************** // Попробуйте заменить нижеприведенный код на строку // WaitForSingleObject(PI.hProcess, INFINITE); // и посмотреть, как будет реагировать программа на // перемещение других окон над ее окном //************************************************** repeat // Ожидаем завершения дочернего процесса или сообщения // перерисовки WM_PAINT Reason:= MsgWaitForMultipleObjects(1, PI.hProcess, FALSE, INFINITE, QS_PAINT); if Reason = WAIT_OBJECT_0 + 1 then begin // В очереди сообщений появился WM_PAINT – Windows // требует обновить окно программы. // Удаляем сообщение из очереди PeekMessage(Msg, 0, WM_PAINT, WM_PAINT, PM_REMOVE); // И перерисовываем наше окно Update; end; // Повторяем цикл, пока не завершится дочерний процесс until Reason = WAIT_OBJECT_0; // Удаляем из очереди накопившиеся там сообщения while PeekMessage(Msg, 0, 0, 0, PM_REMOVE) do; CloseHandle(PI.hProcess); CloseHandle(PI.hThread) end;

Если в потоке, вызывающем функции ожидания, явно (функцией CreateWindow) или неявно (используя TForm, DDE, COM) создаются окна Windows - поток должен обрабатывать сообщения. Поскольку широковещательные сообщения посылаются всем окнам в системе, то поток, не обрабатывающий сообщения, может вызвать взаимоблокировку (система ждет, когда поток обработает сообщение, поток - когда система или другие потоки освободят объект) и привести к зависанию Windows. Если в вашей программе имеются подобные фрагменты, необходимо использовать MsgWaitForMultipleObjects или MsgWaitForMultipleObjectsEx и позволять прервать ожидание для обработки сообщений. Алгоритм аналогичен вышеприведенному примеру.

Прерывание ожидания по запросу на завершение операции ввода-вывода или APC

Windows поддерживает асинхронные вызовы процедур. При создании каждого потока (thread) с ним ассоциируется очередь асинхронных вызовов процедур (APC queue). Операционная система (или приложение пользователя - при помощи функции QueueUserAPC) может помещать в нее запросы на выполнение функций в контексте данного потока. Эти функции не могут быть выполнены немедленно, поскольку поток может быть занят. Поэтому операционная система вызывает их, когда поток вызывает одну из следующих функций ожидания:

Function SleepEx(dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function WaitForSingleObjectEx(hHandle: THandle; // Идентификатор объекта dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function WaitForMultipleObjectsEx(nCount: DWORD; // количество объектов lpHandles: PWOHandleArray;// адрес массива идентификаторов объектов bWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds: DWORD; // Период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function SignalObjectAndWait(hObjectToSignal: THandle; // объект, который будет переведен в // сигнальное состояние hObjectToWaitOn: THandle; // объект, которого ожидает функция dwMilliseconds: DWORD; // период ожидания bAlertable: BOOL // задает, должна ли функция возвращать // управление в случае запроса на // асинхронный вызов процедуры): DWORD; stdcall; function MsgWaitForMultipleObjectsEx(nCount: DWORD; // количество объектов синхронизации var pHandles; // адрес массива объектов fWaitAll: BOOL; // Задает, требуется ли ожидание всех // объектов или любого dwMilliseconds, // Период ожидания dwWakeMask: DWORD // Тип события, прерывающего ожидание dwFlags: DWORD // Дополнительные флаги): DWORD; stdcall;

Если параметр bAlertable равен TRUE (либо если dwFlags в функции MsgWaitForMultipleObjectsEx содержит MWMO_ALERTABLE), то при появлении в очереди APC запроса на асинхронный вызов процедуры операционная система выполняет вызовы всех имеющихся в очереди процедур, после чего функция возвращает значение WAIT_IO_COMPLETION.

Такой механизм позволяет реализовать, например, асинхронный ввод-вывод. Поток может инициировать фоновое выполнение одной или нескольких операций ввода-вывода функциями ReadFileEx или WriteFileEx, передав им адреса функций-обработчиков завершения операции. По завершении вызовы этих функций будут поставлены в очередь асинхронного вызова процедур. В свою очередь, инициировавший операции поток, когда он будет готов обработать результаты, может, используя одну из вышеприведенных функций ожидания, позволить операционной системе вызвать функции-обработчики. Поскольку очередь APC реализована на уровне ядра ОС, она более эффективна, чем очередь сообщений, и позволяет реализовать гораздо более эффективный ввод-вывод.

Event (событие)

Event позволяет известить один или несколько ожидающих потоков о наступлении события. Event бывает:

Для создания объекта используется функция CreateEvent:

Function CreateEvent(lpEventAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes bManualReset, // Задает, будет Event переключаемым // вручную (TRUE) или автоматически (FALSE) bInitialState: BOOL; // Задает начальное состояние. Если TRUE - // объект в сигнальном состоянии lpName: PChar // Имя или NIL, если имя не требуется): THandle; stdcall; // Возвращает идентификатор созданного // объекта Структура TSecurityAttributes описана, как: TSecurityAttributes = record nLength: DWORD; // Размер структуры, должен // инициализироваться как // SizeOf(TSecurityAttributes) lpSecurityDescriptor: Pointer; // Адрес дескриптора защиты. В // Windows 95 и 98 игнорируется // Обычно можно указывать NIL bInheritHandle: BOOL; // Задает, могут ли дочерние // процессы наследовать объект end;

Если не требуется задание особых прав доступа под Windows NT или возможности наследования объекта дочерними процессами, в качестве параметра lpEventAttributes можно передавать NIL. В этом случае объект не может наследоваться дочерними процессами и ему задается дескриптор защиты «по умолчанию».

Параметр lpName позволяет разделять объекты между процессами. Если lpName совпадает с именем уже существующего объекта типа Event, созданного текущим или любым другим процессом, то функция не создает нового объекта, а возвращает идентификатор уже существующего. При этом игнорируются параметры bManualReset, bInitialState и lpSecurityDescriptor. Проверить, был ли объект создан или используется уже существующий, можно следующим образом:

HEvent:= CreateEvent(NIL, TRUE, FALSE, ‘EventName’); if hEvent = 0 then RaiseLastWin32Error; if GetLastError = ERROR_ALREADY_EXISTS then begin // Используем ранее созданный объект end;

Если объект используется для синхронизации внутри одного процесса, его можно объявить как глобальную переменную и создавать без имени.

Имя объекта не должно совпадать с именем любого из существующих объектов типов Semaphore, Mutex, Job, Waitable Timer или FileMapping. В случае совпадения имен функция возвращает ошибку.

Если известно, что Event уже создан, для получения доступа к нему можно вместо CreateEvent воспользоваться функцией OpenEvent:

Function OpenEvent(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор объекта либо 0 - в случае ошибки. Параметр dwDesiredAccess может принимать одно из следующих значений:

После получения идентификатора можно приступать к его использованию. Для этого имеются следующие функции:

Function SetEvent(hEvent: THandle): BOOL; stdcall;

Устанавливает объект в сигнальное состояние

Function ResetEvent(hEvent: THandle): BOOL; stdcall;

Сбрасывает объект, устанавливая его в несигнальное состояние

Function PulseEvent(hEvent: THandle): BOOL; stdcall

Устанавливает объект в сигнальное состояние, дает отработать всем функциям ожидания, ожидающим этот объект, а затем снова сбрасывает его.

В Windows API события используются для выполнения операций асинхронного ввода-вывода. Следующий пример показывает, как приложение инициирует запись одновременно в два файла, а затем ожидает завершения записи перед продолжением работы; такой подход может обеспечить более высокую производительность при высокой интенсивности ввода-вывода, чем последовательная запись:

Var Events: array of THandle; // Массив объектов синхронизации Overlapped: array of TOverlapped; ... // Создаем объекты синхронизации Events := CreateEvent(NIL, TRUE, FALSE, NIL); Events := CreateEvent(NIL, TRUE, FALSE, NIL); // Инициализируем структуры TOverlapped FillChar(Overlapped, SizeOf(Overlapped), 0); Overlapped.hEvent:= Events; Overlapped.hEvent:= Events; // Начинаем асинхронную запись в файлы WriteFile(hFirstFile, FirstBuffer, SizeOf(FirstBuffer), FirstFileWritten, @Overlapped); WriteFile(hSecondFile, SecondBuffer, SizeOf(SecondBuffer), SecondFileWritten, @Overlapped); // Ожидаем завершения записи в оба файла WaitForMultipleObjects(2, @Events, TRUE, INFINITE); // Уничтожаем объекты синхронизации CloseHandle(Events); CloseHandle(Events)

По завершении работы с объектом он должен быть уничтожен функцией CloseHandle.

Delphi предоставляет класс TEvent, инкапсулирующий функциональность объекта Event. Класс расположен в модуле SyncObjs.pas и объявлен следующим образом:

Type TWaitResult = (wrSignaled, wrTimeout, wrAbandoned, wrError); TEvent = class(THandleObject) public constructor Create(EventAttributes: PSecurityAttributes; ManualReset, InitialState: Boolean; const Name: string); function WaitFor(Timeout: DWORD): TWaitResult; procedure SetEvent; procedure ResetEvent; end;

Назначение методов очевидно следует из их названий. Использование этого класса позволяет не вдаваться в тонкости реализации вызываемых функций Windows API. Для простейших случаев объявлен еще один класс с упрощенным конструктором:

Type TSimpleEvent = class(TEvent) public constructor Create; end; … constructor TSimpleEvent.Create; begin FHandle:= CreateEvent(nil, True, False, nil); end;

Mutex (Mutually Exclusive)

Мьютекс - это объект синхронизации, который находится в сигнальном состоянии только тогда, когда не принадлежит ни одному из процессов. Как только хотя бы один процесс запрашивает владение мьютексом, он переходит в несигнальное состояние и остается таким до тех пор, пока не будет освобожден владельцем. Такое поведение позволяет использовать мьютексы для синхронизации совместного доступа нескольких процессов к разделяемому ресурсу. Для создания мьютекса используется функция:

Function CreateMutex(lpMutexAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes bInitialOwner: BOOL; // Задает, будет ли процесс владеть // мьютексом сразу после создания lpName: PChar // Имя мьютекса): THandle; stdcall;

Функция возвращает идентификатор созданного объекта либо 0. Если мьютекс с заданным именем уже был создан, возвращается его идентификатор. В этом случае функция GetLastError вернет код ошибки ERROR_ALREDY_EXISTS. Имя не должно совпадать с именем уже существующего объекта типов Semaphore, Event, Job, Waitable Timer или FileMapping.

Если неизвестно, существует ли уже мьютекс с таким именем, программа не должна запрашивать владение объектом при создании (то есть должна передать в качестве bInitialOwner значение FALSE).

Если мьютекс уже существует, приложение может получить его идентификатор функцией OpenMutex:

Function OpenMutex(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор открытого мьютекса либо 0 - в случае ошибки. Мьютекс переходит в сигнальное состояние после срабатывания функции ожидания, в которую был передан его идентификатор. Для возврата в несигнальное состояние служит функция ReleaseMutex:

Function ReleaseMutex(hMutex: THandle): BOOL; stdcall;

Если несколько процессов обмениваются данными, например через файл, отображенный на память, то каждый из них должен содержать следующий код для обеспечения корректного доступа к общему ресурсу:

Var Mutex: THandle; // При инициализации программы Mutex:= CreateMutex(NIL, FALSE, ‘UniqueMutexName’); if Mutex = 0 then RaiseLastWin32Error; ... // Доступ к ресурсу WaitForSingleObject(Mutex, INFINITE); try // Доступ к ресурсу, захват мьютекса гарантирует, // что остальные процессы, пытающиеся получить доступ, // будут остановлены на функции WaitForSingleObject ... finally // Работа с ресурсом окончена, освобождаем его // для остальных процессов ReleaseMutex(Mutex); end; ... // При завершении программы CloseHandle(Mutex);

Подобный код удобно инкапсулировать в класс, который создает защищенный ресурс. Мьютекс имеет свойства и методы для оперирования ресурсом, защищая их при помощи функций синхронизации.

Разумеется, если работа с ресурсом может потребовать значительного времени, то необходимо либо использовать функцию MsgWaitForSingleObject, либо вызывать WaitForSingleObject в цикле с нулевым периодом ожидания, проверяя код возврата. В противном случае ваше приложение окажется замороженным. Всегда защищайте захват-освобождение объекта синхронизации при помощи блока try ... finally, иначе ошибка во время работы с ресурсом приведет к блокированию работы всех процессов, ожидающих его освобождения.

Semaphore (семафор)

Семафор представляет собой счетчик, содержащий целое число в диапазоне от 0 до максимальной величины, заданной при его создании. Счетчик уменьшается каждый раз, когда поток успешно завершает функцию ожидания, использующую семафор, и увеличивается путем вызова функции ReleaseSemaphore. При достижении семафором значения 0 он переходит в несигнальное состояние, при любых других значениях счетчика его состояние - сигнальное. Такое поведение позволяет использовать семафор в качестве ограничителя доступа к ресурсу, поддерживающему заранее заданное количество подключений.

Для создания семафора служит функция CreateSemaphore:

Function CreateSemaphore(lpSemaphoreAttributes: PSecurityAttributes; // Адрес структуры // TSecurityAttributes lInitialCount, // Начальное значение счетчика lMaximumCount: Longint; // Максимальное значение счетчика lpName: PChar // Имя объекта): THandle; stdcall;

Функция возвращает идентификатор созданного семафора либо 0, если создать объект не удалось.

Параметр lMaximumCount задает максимальное значение счетчика семафора, lInitialCount задает начальное значение счетчика и должен быть в диапазоне от 0 до lMaximumCount. lpName задает имя семафора. Если в системе уже есть семафор с таким именем, то новый не создается, а возвращается идентификатор существующего семафора. В случае если семафор используется внутри одного процесса, можно создать его без имени, передав в качестве lpName значение NIL. Имя семафора не должно совпадать с именем уже существующего объекта типов event, mutex, waitable timer, job или file-mapping.

Идентификатор ранее созданного семафора может быть также получен функцией OpenSemaphore:

Function OpenSemaphore(dwDesiredAccess: DWORD; // Задает права доступа к объекту bInheritHandle: BOOL; // Задает, может ли объект наследоваться // дочерними процессами lpName: PChar // Имя объекта): THandle; stdcall;

Параметр dwDesiredAccess может принимать одно из следующих значений:

Для увеличения счетчика семафора используется функция ReleaseSemaphore:

Function ReleaseSemaphore(hSemaphore: THandle; // Идентификатор семафора lReleaseCount: Longint; // Счетчик будет увеличен на эту величину lpPreviousCount: Pointer // Адрес 32-битной переменной, // принимающей предыдущее значение // счетчика): BOOL; stdcall;

Если значение счетчика после выполнения функции превысит заданный для него функцией CreateSemaphore максимум, то ReleaseSemaphore возвращает FALSE и значение семафора не изменяется. В качестве параметра lpPreviousCount можно передать NIL, если это значение нам не нужно.

Рассмотрим пример приложения, запускающего на выполнение несколько заданий в отдельных потоках (например, программа для фоновой загрузки файлов из Internet). Если количество одновременно выполняющихся заданий будет слишком велико, то это приведет к неоправданной загрузке канала. Поэтому реализуем потоки, в которых будет выполняться задание, таким образом, чтобы когда их количество превышает заранее заданную величину, то поток бы останавливался и ожидал завершения работы ранее запущенных заданий:

Unit LimitedThread; interface uses Classes; type TLimitedThread = class(TThread) procedure Execute; override; end; implementation uses Windows; const MAX_THREAD_COUNT = 10; var Semaphore: THandle; procedure TLimitedThread.Execute; begin // Уменьшаем счетчик семафора. Если к этому моменту уже запущено // MAX_THREAD_COUNT потоков - счетчик равен 0 и семафор в // несигнальном состоянии. Поток будет заморожен до завершения // одного из запущенных ранее. WaitForSingleObject(Semaphore, INFINITE); // Здесь располагается код, отвечающий за функциональность потока, // например загрузка файла... // Поток завершил работу, увеличиваем счетчик семафора и позволяем // начать обработку другим потокам. ReleaseSemaphore(Semaphore, 1, NIL); end; initialization // Создаем семафор при старте программы Semaphore:= CreateSemaphore(NIL, MAX_THREAD_COUNT, MAX_THREAD_COUNT, NIL); finalization // Уничтожаем семафор по завершении программы CloseHandle(Semaphore); end;


Аплет Rectangles
Исходные тексты
Описание текстов

Синхронизация потоков

Многопоточный режим работы открывает новые возможности для программистов, однако за эти возможности приходится расплачиваться усложнением процесса проектирования приложения и отладки. Основная трудность, с которой сталкиваются программисты, никогда не создававшие ранее многопоточные приложения, это синхронизация одновременно работающих потоков.

Для чего и когда она нужна?

Однопоточная программа, такая, например, как программа MS-DOS, при запуске получает в монопольное распоряжение все ресурсы компьютера. Так как в однопоточной системе существует только один процесс, он использует эти ресурсы в той последовательности, которая соответствует логике работы программы. Процессы и потоки, работающие одновременно в многопоточной системе, могут пытаться обращаться одновременно к одним и тем же ресурсам, что может привести к неправильной работе приложений.

Поясним это на простом примере.

Пусть мы создаем программу, выполняющую операции с банковским счетом. Операция снятия некоторой суммы денег со счета может происходить в следующей последовательности:

  • на первом шаге проверяется общая сумма денег, которая хранится на счету;
  • если общая сумма равна или превышает размер снимаемой суммы денег, общая сумма уменьшается на необходимую величину;
  • значение остатка записывается на текущий счет.

Если операция уменьшения текущего счета выполняется в однопоточной системе, то никаких проблем не возникнет. Однако представим себе, что два процесса пытаются одновременно выполнить только что описанную операцию с одним и тем же счетом. Пусть при этом на счету находится 5 млн. долларов, а оба процесса пытаются снять с него по 3 млн. долларов.

Допустим, события разворачиваются следующим образом:

  • первый процесс проверяет состояние текущего счета и убеждается, что на нем хранится 5 млн. долларов;
  • второй процесс проверяет состояние текущего счета и также убеждается, что на нем хранится 5 млн. долларов;
  • первый процесс уменьшает счет на 3 млн. долларов и записывает остаток (2 млн. долларов) на текущий счет;
  • второй процесс выполняет ту же самую операцию, так как после проверки считает, что на счету по-прежнему хранится 5 млн. долларов.

В результате получилось, что со счета, на котором находилось 5 млн. долларов, было снято 6 млн. долларов, и при этом там осталось еще 2 млн. долларов! Итого - банку нанесен ущерб в 3 млн. долларов.

Как же составить программу уменьшения счета, чтобы она не позволяла вытворять подобное?

Очень просто - на время выполнения операций над счетом одним процессом необходимо запретить доступ к этому счету со стороны других процессов. В этом случае сценарий работы программы должен быть следующим:

  • процесс блокирует счет для выполнения операций другими процессами, получая его в монопольное владение;
  • процесс проводит процедуру уменьшения счета и записывает на текущий счет новое значение остатка;
  • процесс разблокирует счет, разрешая другим процессам выполнение операций.

Когда первый процесс блокирует счет, он становится недоступен другим процессам. Если второй процесс также попытается заблокировать этот же счет, он будет переведен в состояние ожидания. Когда первый процесс уменьшит счет и на нем останется 2 млн. долларов, второй процесс будет разблокирован. Он проверит остаток, убедится, что сумма недостаточна и не будет проводить операцию.

Таким образом, в многопоточной среде необходима синхронизация потоков при обращении к критическим ресурсам. Если над такими ресурсами будут выполняться операции в неправильной последовательности, это приведет к возникновению трудно обнаруживаемых ошибок.

В языке программирования Java предусмотрено несколько средств для синхронизации потоков, которые мы сейчас рассмотрим.

Синхронизация методов

Возможность синхронизации как бы встроена в каждый объект, создаваемый приложением Java. Для этого объекты снабжаются защелками, которые могут быть использованы для блокировки потоков, обращающихся к этим объектам.

Чтобы воспользоваться защелками, вы можете объявить соответствующий метод как synchronized, сделав его синхронизированным:

public synchronized void decrement() { . . . }

При вызове синхронизированного метода соответствующий ему объект (в котором он определен) блокируется для использования другими синхронизированными методами. В результате предотвращается одновременная запись двумя методами значений в область памяти, принадлежащую данному объекту.

Использование синхронизированных методов - достаточно простой способ синхронизации потоков, обращающихся к общим критическим ресурсам, наподобие описанного выше банковского счета.

Заметим, что не обязательно синхронизовать весь метод - можно выполнить синхронизацию только критичного фрагмента кода.

. . . synchronized(Account) { if(Account.check(3000000)) Account.decrement(3000000); } . . .

Здесь синхронизация выполняется для объекта Account.

Блокировка потока

Синхронизированный поток, определенный как метод типа synchronized, может переходить в заблокированное состояние автоматически при попытке обращения к ресурсу, занятому другим синхронизированным методом, либо при выполнении некоторых операций ввода или вывода. Однако в ряде случаев полезно иметь более тонкие средства синхронизации, допускающие явное использование по запросу приложения.

Блокировка на заданный период времени

С помощью метода sleep можно заблокировать поток на заданный период времени:

try { Thread.sleep(500); } catch (InterruptedException ee) { . . . }

В данном примере работа потока Thread приостанавливается на 500 миллисекунд. Заметим, что во время ожидания приостановленный поток не отнимает ресурсы процессора.

Так как метод sleep может создавать исключение InterruptedException, необходимо предусмотреть его обработку. Для этого мы использовали операторы try и catch.

Временная приостановка и возобновление работы

Методы suspend и resume позволяют, соответственно, временно приостанавливать и возобновлять работу потока.

В следующем фрагменте кода поток m_Rectangles приостанавливает свою работу, когда курсор мыши оказывается над окном аплета:

public boolean mouseEnter(Event evt, int x, int y) { if (m_Rectangles != null) { m_Rectangles.suspend(); } return true; }

Работа потока возобновляется, когда курсор мыши покидает окно аплета:

public boolean mouseExit(Event evt, int x, int y) { if (m_Rectangles != null) { m_Rectangles.resume(); } return true; }

Ожидание извещения

Если вам нужно организовать взаимодействие потоков таким образом, чтобы один поток управлял работой другого или других потоков, вы можете воспользоваться методами wait, notify и notifyAll, определенными в классе Object.

Метод wait может использоваться либо с параметром, либо без параметра. Этот метод переводит поток в состояние ожидания, в котором он будет находиться до тех пор, пока для потока не будет вызван извещающий метод notify, notifyAll, или пока не истечет период времени, указанный в параметре метода wait.

Как пользоваться методами wait, notify и notifyAll?

Метод, который будет переводиться в состояние ожидания, должен быть синхронизированным, то есть его следует описать как synchronized:

public synchronized void run() { while (true) { . . . try { this.wait(); } catch (InterruptedException e) { } } }

В этом примере внутри метода run определен цикл, вызывающий метод wait без параметров. Каждый раз при очередном проходе цикла метод run переводится в состояние ожидания до тех пор, пока другой поток не выполнит извещение с помощью метода notify.

Ниже мы привели пример потока, вызывающией метод notify:

public void run() { while (true) { try { Thread.sleep(30); } catch (InterruptedException e) { } synchronized(STask) { STask.notify(); } } }

Этот поток реализован в рамках отдельного класса, конструктору которого передается ссылка на поток, вызывающую метод wait. Эта ссылка хранится в поле STask.

Обратите внимание, что хотя сам метод run не синхронизированный, вызов метода notify выполняется в синхронизированном режиме. В качестве объекта синхронизации выступает поток, для которого вызывается метод notify.

Ожидание завершения потока

С помощью метода join вы можете выполнять ожидание завершения работы потока, для которой этот метод вызван.

Существует три определения метода join:

public final void join(); public final void join(long millis); public final void join(long millis, int nanos);

Первый из них выполняет ожидание без ограничения во времени, для второго ожидание будет прервано принудительно через millis миллисекунд, а для третьего - через millis миллисекунд и nanos наносекунд. Учтите, что реально вы не сможете указывать время с точностью до наносекунд, так как дискретность системного таймера компьютера намного больше.