Хорошо, мы избавились от проблемы последнего обновления, но взгляните на следующий код:
unsigned __stdcall thread_proc(void*){ CProxy& prx = CObject::GetObject(CObject::READ_UNCOMMITTED); prx.value = 20; // Эмулируемработу Sleep(1000);prx.value = 40; prx.Commit(); // Закрытие сессииreturn 0;}int main(int argc, char* argv[]){ // Открытиесессии _beginthreadex(0,0,thread_proc,0,0,0); // Эмулируемработу Sleep(100); CProxy& fake = CObject::GetObject(CObject::READ_UNCOMMITTED);// В этой строчке происходит чтение «грязных данных»// fake.get_Value() возвращает 20 int* pAr = new int[fake.get_Value()]; // Эмулируемработу Sleep(1000); // fake.value = 40 for(int i = 0;i < fake.value;i++) pAr[i] = 0; if (pAr) delete[] pAr;fake.Commit(); return 0;} |
Если откомпилировать и запустить этот код, он гарантированно приведет к ошибке во время исполнения, так как будет осуществлен выход за границу массива в цикле. Почему? Потому что при создании массива используется значение незафиксированных данных, а в цикле – зафиксированных. Эта проблема известна как проблема «грязного чтения». Она возникает, когда одна транзакция пытается прочитать данные, с которыми работает другая параллельная транзакция. В таком случае временные, неподтвержденные данные могут не удовлетворять ограничениям целостности или правилам. И, хотя к моменту фиксации транзакции они могут быть приведены в «порядок», другая транзакция уже может воспользоваться этими неверными данными, что приведет к нарушению ее работы.
Для решения этой проблемы вводится новый уровень изоляции, на котором запрещается «грязное» чтение. Вот такие изменения нужно внести в реализацию классов CProxy и CObject для того, чтобы программа удовлетворяла второму уровню изоляции:
class CObject{ friend class CProxy;public: enum {READ_UNCOMMITTED,READ_COMMITTED}; static CProxy& GetObject(int level = -1); ~CObject() { DeleteCriticalSection(&exclusive); if (hShared) CloseHandle(hShared); if (hMutex) CloseHandle(hMutex); }protected: CProxy& BeginTran(int level) { return *(new CProxy(this,level)); } void RequestExclusive(int level) { if (level >= READ_UNCOMMITTED) TestExclusive(); } void RequestShared(int level) { if (level > READ_UNCOMMITTED) TestShared(level); } void RemoveShared(int level) { if (level == READ_COMMITTED){ RemoveSharedLock(); } } void RemoveLocks() { RemoveAllLocks(); }private: CObject() { value = 0; InitializeCriticalSection(&exclusive); hShared = CreateEvent(NULL,FALSE,TRUE,NULL);} void TestShared(int level) { //Проверка на монопольную блокировку EnterCriticalSection(&exclusive); //Устанавливаем разделяемую блокировку //только если не была установлена монопольная блокировкаif (exclusive.RecursionCount == 1) ResetEvent(hShared); //Снимаеммонопольнуюблокировку LeaveCriticalSection(&exclusive); }void TestExclusive() { //Проверка на разделяемую блокировку WaitForSingleObject(hShared,INFINITE); // Проверка на монопольную блокировкуEnterCriticalSection(&exclusive); // Вошлибольшеодногораза if (exclusive.RecursionCount > 1) LeaveCriticalSection(&exclusive); } void RemoveSharedLock() { SetEvent(hShared); } void RemoveAllLocks(){ RemoveSharedLock(); // Если была установлена монопольная блокировка - снимаемif (exclusive.OwningThread == (HANDLE)GetCurrentThreadId()) LeaveCriticalSection(&exclusive); } int value; CRITICAL_SECTION exclusive; HANDLE hShared; static HANDLE hMutex;}; |
Теперь, если изменить константу READ_UNCOMMITTED в предыдущем примере на READ_COMMITTED в качестве параметра GetObject, все станет на свои места. При инициализации массива главный поток перейдет в состояние ожидания до тех пор, пока второй поток не выполнит строчку prx.Commit(); Размер массива в главном потоке будет равен 40 элементам.
Хорошо, прекрасно! Где там следующий уровень? :) Чтобы понять, зачем нужен следующий уровень изоляции транзакций «повторяющееся чтение», рассмотрим такой пример:
unsigned __stdcall thread_proc(void*){ { // Началотранзакции CProxy& prx = CObject::GetObject(CObject::READ_COMMITTED); prx.value = 20; prx.Commit(); } // Эмулируемработу Sleep(500); { // Началотранзакции CProxy& prx = CObject::GetObject(CObject::READ_COMMITTED); prx.value = 40; prx.Commit(); } return 0;}int main(int argc, char* argv[]){ // Началосессии _beginthreadex(0,0,thread_proc,0,0,0); // Эмулируемработу Sleep(100); CProxy& fake = CObject::GetObject(CObject::READ_COMMITTED); // Созданиемассива int* pAr = new int[fake.get_Value()]; // Эмулируемработу Sleep(1000); // Инициализациямассива for(int i = 0;i < fake.value;i++) pAr[i] = 0; if (pAr) delete[] pAr;fake.Commit(); return 0;} |
Если запустить этот пример, он, как и предыдущий, приведет к ошибке доступа к памяти. Дело в том, что изначально создается массив размером в 20 элементов, а в цикле инициализации используется значение 40, и на 21 элементе мы получим ошибку доступа.
Проблема повторного чтения состоит в том, что между операциями чтения в одной транзакции другие транзакции могут беспрепятственно вносить любые изменения, так что повторное чтение тех же данные приведет к другому результату.
Для поддержки третьего уровня изоляции в код изменений вносить не надо! :) Необходимо лишь не снимать разделяемые блокировки до конца транзакции. Так как метод, приведенный ниже, снимает блокировку только на уровне READ_COMMITTED:
void RemoveShared(int level) { if (level == READ_COMMITTED){RemoveSharedLock(); } } |
нам нужно лишь добавить новую константу в перечисление типов блокировок.
enum {READ_UNCOMMITTED,READ_COMMITTED,REPEATABLE_READ}; |
Теперь, если в приведенном выше примере изменить константу READ_COMMITTED на REPEATABLE_READ в качестве параметра GetObject, код заработает правильно и без ошибок.
ПРИМЕЧАНИЕСовершенно не обязательно менять уровень изоляции транзакций в потоке thread_proc, работа примера не изменится, даже если изменить уровень изоляции на READ_UNCOMMITTED. |
Здесь мы ставим блокировку обновления, если транзакция читает данные с уровнем изоляции REPEATABLE_READ.
В заключение, перед тем как привести полностью код с поддержкой первых трех уровней изоляции, давайте поговорим вот о чем. Созданный код реализует блокирующую модель, которая характерна для СУБД MS SQL Server 2000. Существует также версионная модель реализации блокировок, которую поддерживает такая известная СУБД, как Oracle. Чем отличаются эти модели? Рассмотрим такой код:
unsigned __stdcall thread_proc(void*){ // Print CObject::value variable CProxy& fake = CObject::GetObject(); printf("in second session: %d\n",fake.value); fake.Commit(); return 0;}int main(int argc, char* argv[]){ // Началотранзакции CProxy& prx = CObject::GetObject(); prx.value = 10; // Началоновойсессии _beginthreadex(0,0,thread_proc,0,0,0); // Эмулируемработу Sleep(100); printf("in primary session: %d\n",prx.value);prx.Commit(); return 0;} |
Здесь во второй сессии (выполняемой в отдельном потоке) мы просто читаем данные и выводим их на консоль. Так как значение переменной value мы изменили перед стартом второй сессии, совершенно очевидно, что на экран будет выведено
in second session: 10in primary session: 10 |
Однако при использовании версионной модели мы должны получить
in second session: 0in primary session: 10 |
Причина в том, что для каждой транзакции хранится своя копия данных (snap-shot), которая синхронизируется с основными данными только в момент фиксирования транзакции.
ПРИМЕЧАНИЕOracle хранит эти копии данных в специальном хранилище, который называется rollback segment. |
Версионная модель характеризуется тем, что в ней отсутствует нулевой уровень изоляции транзакций (READ UNCOMMITTED), и вместо него вводится новый уровень, который в приведенном далее коде я назвал SNAP_SHOT. Он отличается от стандартного тем, что позволяет читать действительные зафиксированные данные, даже при наличии незавершенных транзакций обновления.
Вот конечный вариант классов CProxy и CObject, который реализует обе модели и, вдобавок к этому, поддерживает два «хинта»: UPDLOCK и XLOCK. Они предназначены для изменения уровня изоляции непосредственно при работе со значением переменной, а их смысл я поясню в следующих разделах.
#define MSSQL// #define ORACLEclass CObject;class CProxy{ friend class CObject;public: __declspec(property(get=get_Value,put=put_Value)) int value; int get_Value(int level = -1) const; void put_Value(int i); void Commit(); void Rollback();private: int _level; int _value; bool fUpd; CProxy(CObject* par,int level) { fUpd = false; parent = par; _level = level; } CObject* parent;};class CObject{ friend class CProxy;public: enum {#ifdef MSSQL READ_UNCOMMITTED,#elif defined ORACLE SNAP_SHOT,#endif READ_COMMITTED,REPEATABLE_READ,UPDLOCK,XLOCK}; static CProxy& GetObject(int level = -1); ~CObject() { DeleteCriticalSection(&exclusive); DeleteCriticalSection(&update); if (hShared) CloseHandle(hShared); if (hMutex) CloseHandle(hMutex); }protected: CProxy& BeginTran(int level) { return *(new CProxy(this,level)); } void RequestExclusive(int level) { ATLASSERT(level <= REPEATABLE_READ);#ifdef MSSQL if (level >= READ_UNCOMMITTED)#elif defined ORACLE if (level >= SNAP_SHOT)#endif TestExclusive(); } void RequestShared(int level) {#ifdef MSSQL if (level > READ_UNCOMMITTED)#elif defined ORACLE if (level > SNAP_SHOT)#endif TestShared(level); } void RemoveShared(int level) { if (level == READ_COMMITTED){ RemoveSharedLock(); } } void RemoveLocks() { RemoveAllLocks(); }private: CObject() { value = 0; InitializeCriticalSection(&update); InitializeCriticalSection(&exclusive); hShared = CreateEvent(NULL,FALSE,TRUE,NULL);} void TestShared(int level) { // Проверка на монопольную блокировку EnterCriticalSection(&exclusive); // Устанавливаем блокировку обновления if (level == UPDLOCK){EnterCriticalSection(&update); // Вошлибольшеодногораза if (update.RecursionCount > 1) LeaveCriticalSection(&update); } else if (level != XLOCK){ // Устанавливаем разделяемую блокировку // только если не была установлена блокировка обновления или // монопольная блокировка if (update.OwningThread != (HANDLE)GetCurrentThreadId() && exclusive.RecursionCount == 1) ResetEvent(hShared); // Снимаеммонопольнуюблокировку LeaveCriticalSection(&exclusive); }// Если указан XLOCK монопольная блокировка остается } void TestExclusive() { // Проверка на разделяемую блокировку WaitForSingleObject(hShared,INFINITE); // Проверка на блокировку обновления EnterCriticalSection(&update); // Проверка на монопольную блокировкуEnterCriticalSection(&exclusive); // Снимаемблокировкуобновления LeaveCriticalSection(&update); // Вошлибольшеодногораза if (exclusive.RecursionCount > 1) LeaveCriticalSection(&exclusive); } void RemoveSharedLock() { SetEvent(hShared);} void RemoveAllLocks() { RemoveSharedLock(); // Если была установлена блокировка обновления - снимаемif (update.OwningThread == (HANDLE)GetCurrentThreadId())LeaveCriticalSection(&update); // Если была установлена монопольная блокировка - снимаемif (exclusive.OwningThread == (HANDLE)GetCurrentThreadId()) LeaveCriticalSection(&exclusive); } int value; CRITICAL_SECTION update; CRITICAL_SECTION exclusive; HANDLE hShared; static HANDLE hMutex;};__declspec(selectany) HANDLE CObject::hMutex = NULL;CProxy& CObject::GetObject(int level){ HANDLE hLocMutex = CreateMutex(NULL,TRUE,_T("Guard-Lock-Mutex")); bool flg = GetLastError() == ERROR_ALREADY_EXISTS; if (flg) WaitForSingleObject(hLocMutex,INFINITE); else CObject::hMutex = hLocMutex; static CObject obj; ReleaseMutex(hLocMutex); if (flg) CloseHandle(hLocMutex); return obj.BeginTran(level);}void CProxy::Commit(){#ifdef ORACLE parent->value = _value;#endif parent->RemoveLocks(); delete this;}void CProxy::Rollback(){#ifdef MSSQL if (fUpd) parent->value = _value;#endif parent->RemoveLocks(); delete this;}void CProxy::put_Value(int i){ parent->RequestExclusive(_level);#ifdef MSSQL if (!fUpd) _value = parent->value; parent->value = i;#elif defined ORACLE _value = i;#endif fUpd = true;}int CProxy::get_Value(int level) const{ if (level == -1) level = _level; parent->RequestShared(level);#ifdef MSSQL int v = parent->value; parent->RemoveShared(level); return v;#elif defined ORACLE return _value;#endif} |
Из этих примеров должно быть понятно, что блокировки – дело серьезное. :) Но, прежде чем перейти к рассмотрению их реализации в MS SQL Server 2000, я приведу обещанные в начале уровни определения изоляции транзакций. Каждый уровень включает в себя предыдущий с предъявлением более жестких требований к изоляции.