From 12188e7a5dde4cdd4304b3e738b7905e55f3ad9a Mon Sep 17 00:00:00 2001 From: rubidium Date: Mon, 14 Apr 2008 19:54:33 +0000 Subject: [PATCH] (svn r12706) -Merge: the thread rewrite from NoAI. The rewrite makes the threading we have better extendable. --- configure | 2 +- projects/generate | 2 +- projects/generate.vbs | 2 +- projects/openttd_vs80.vcproj | 18 +- projects/openttd_vs90.vcproj | 18 +- source.list | 18 +- src/fiber.hpp | 53 ++++++ src/fiber_thread.cpp | 153 +++++++++++++++++ src/fiber_win32.cpp | 206 +++++++++++++++++++++++ src/genworld.cpp | 10 +- src/genworld.h | 17 +- src/saveload.cpp | 10 +- src/thread.cpp | 313 ----------------------------------- src/thread.h | 93 ++++++++++- src/thread_none.cpp | 42 +++++ src/thread_os2.cpp | 80 +++++++++ src/thread_pthread.cpp | 199 ++++++++++++++++++++++ src/thread_win32.cpp | 188 +++++++++++++++++++++ 18 files changed, 1076 insertions(+), 348 deletions(-) create mode 100644 src/fiber.hpp create mode 100644 src/fiber_thread.cpp create mode 100644 src/fiber_win32.cpp delete mode 100644 src/thread.cpp create mode 100644 src/thread_none.cpp create mode 100644 src/thread_os2.cpp create mode 100644 src/thread_pthread.cpp create mode 100644 src/thread_win32.cpp diff --git a/configure b/configure index 1c6aaa7191..b3730fb2bd 100755 --- a/configure +++ b/configure @@ -96,7 +96,7 @@ SRCS="`< $ROOT_DIR/source.list tr '\r' '\n' | $awk ' if ($0 == "MSVC" && "'$os'" != "MSVC") { next; } if ($0 == "DIRECTMUSIC" && "'$with_direct_music'" == "0") { next; } if ($0 == "LIBTIMIDITY" && "'$libtimidity'" == "" ) { next; } - if ($0 == "NO_THREADS" && "'$with_threads'" == "0") { next; } + if ($0 == "HAVE_THREAD" && "'$with_threads'" == "0") { next; } skip += 1; diff --git a/projects/generate b/projects/generate index cdc29bf405..0e19295833 100755 --- a/projects/generate +++ b/projects/generate @@ -83,7 +83,7 @@ load_main_data() { if ($0 == "MSVC" && "'$os'" != "MSVC") { next; } if ($0 == "DIRECTMUSIC" && "'$enable_directmusic'" != "1") { next; } if ($0 == "LIBTIMIDITY" && "'$libtimidity'" == "" ) { next; } - if ($0 == "NO_THREADS" && "'$with_threads'" == "0") { next; } + if ($0 == "HAVE_THREAD" && "'$with_threads'" == "0") { next; } skip += 1; diff --git a/projects/generate.vbs b/projects/generate.vbs index bdb6ef2032..1435e84a19 100755 --- a/projects/generate.vbs +++ b/projects/generate.vbs @@ -72,7 +72,7 @@ Function load_main_data(filename) line = "WIN32" Or _ line = "MSVC" Or _ line = "DIRECTMUSIC" Or _ - line = "NO_THREADS" _ + line = "HAVE_THREAD" _ ) Then skip = skip + 1 deep = deep + 1 Case "#" diff --git a/projects/openttd_vs80.vcproj b/projects/openttd_vs80.vcproj index 3507106ea6..56eedfc249 100644 --- a/projects/openttd_vs80.vcproj +++ b/projects/openttd_vs80.vcproj @@ -736,7 +736,11 @@ > + + + + @@ -943,6 +951,10 @@ RelativePath=".\..\src\core\enum_type.hpp" > + + @@ -2231,6 +2243,10 @@ RelativePath=".\..\src\misc\blob.hpp" > + + diff --git a/projects/openttd_vs90.vcproj b/projects/openttd_vs90.vcproj index e4be42d9c3..435e7f87e3 100644 --- a/projects/openttd_vs90.vcproj +++ b/projects/openttd_vs90.vcproj @@ -733,7 +733,11 @@ > + + + + @@ -940,6 +948,10 @@ RelativePath=".\..\src\core\enum_type.hpp" > + + @@ -2228,6 +2240,10 @@ RelativePath=".\..\src\misc\blob.hpp" > + + diff --git a/source.list b/source.list index fd1276978f..137a860b1e 100644 --- a/source.list +++ b/source.list @@ -77,7 +77,21 @@ string.cpp strings.cpp texteff.cpp tgp.cpp -thread.cpp +#if HAVE_THREAD + #if WIN32 + thread_win32.cpp + fiber_win32.cpp + #else + #if OS2 + thread_os2.cpp + #else + thread_pthread.cpp + #end + fiber_thread.cpp + #end +#else + thread_none.cpp +#end tile_map.cpp #if WIN32 #else @@ -145,6 +159,7 @@ core/endian_func.hpp engine_func.h engine_type.h core/enum_type.hpp +fiber.hpp fileio.h fios.h fontcache.h @@ -493,6 +508,7 @@ misc/autocopyptr.hpp misc/autoptr.hpp misc/binaryheap.hpp misc/blob.hpp +misc/countedobj.cpp misc/countedptr.hpp misc/crc32.hpp misc/dbg_helpers.cpp diff --git a/src/fiber.hpp b/src/fiber.hpp new file mode 100644 index 0000000000..3ea776d5c3 --- /dev/null +++ b/src/fiber.hpp @@ -0,0 +1,53 @@ +/* $Id$ */ + +/** @file fiber.hpp */ + +#ifndef FIBER_HPP +#define FIBER_HPP + +typedef void (CDECL *FiberFunc)(void *); + +class Fiber { +public: + /** + * Switch to this fiber. + */ + virtual void SwitchToFiber() = 0; + + /** + * Exit a fiber. + */ + virtual void Exit() = 0; + + /** + * Check if a fiber is running. + */ + virtual bool IsRunning() = 0; + + /** + * Get the 'param' data of the Fiber. + */ + virtual void *GetFiberData() = 0; + + /** + * Virtual Destructor to mute warnings. + */ + virtual ~Fiber() {}; + + /** + * Create a new fiber, calling proc(param) when running. + */ + static Fiber *New(FiberFunc proc, void *param); + + /** + * Attach a current thread to a new fiber. + */ + static Fiber *AttachCurrent(void *param); + + /** + * Get the 'param' data of the current active Fiber. + */ + static void *GetCurrentFiberData(); +}; + +#endif /* FIBER_HPP */ diff --git a/src/fiber_thread.cpp b/src/fiber_thread.cpp new file mode 100644 index 0000000000..6d311290c8 --- /dev/null +++ b/src/fiber_thread.cpp @@ -0,0 +1,153 @@ +/* $Id$ */ + +/** @file fiber_thread.cpp ThreadObject implementation of Fiber. */ + +#include "stdafx.h" +#include "fiber.hpp" +#include "thread.h" +#include + +class Fiber_Thread : public Fiber { +private: + ThreadObject *m_thread; + FiberFunc m_proc; + void *m_param; + bool m_attached; + ThreadSemaphore *m_sem; + bool m_kill; + + static Fiber_Thread *s_current; + static Fiber_Thread *s_main; + +public: + /** + * Create a ThreadObject fiber and start it, calling proc(param). + */ + Fiber_Thread(FiberFunc proc, void *param) : + m_thread(NULL), + m_proc(proc), + m_param(param), + m_attached(false), + m_kill(false) + { + this->m_sem = ThreadSemaphore::New(); + /* Create a thread and start stFiberProc */ + this->m_thread = ThreadObject::New(&stFiberProc, this); + } + + /** + * Create a ThreadObject fiber and attach current thread to it. + */ + Fiber_Thread(void *param) : + m_thread(NULL), + m_proc(NULL), + m_param(param), + m_attached(true), + m_kill(false) + { + this->m_sem = ThreadSemaphore::New(); + /* Attach the current thread to this Fiber */ + this->m_thread = ThreadObject::AttachCurrent(); + /* We are the current thread */ + if (s_current == NULL) s_current = this; + if (s_main == NULL) s_main = this; + } + + ~Fiber_Thread() + { + /* Remove the thread if needed */ + if (this->m_thread != NULL) { + assert(this->m_attached || !this->m_thread->IsRunning()); + delete this->m_thread; + } + /* Remove the semaphore */ + delete this->m_sem; + } + + /* virtual */ void SwitchToFiber() + { + /* You can't switch to yourself */ + assert(s_current != this); + Fiber_Thread *cur = s_current; + + /* Continue the execution of 'this' Fiber */ + this->m_sem->Set(); + /* Hold the execution of the current Fiber */ + cur->m_sem->Wait(); + if (this->m_kill) { + /* If the thread we switched too was killed, join it so it can finish quiting */ + this->m_thread->Join(); + } + /* If we continue, we are the current thread */ + s_current = cur; + } + + /* virtual */ void Exit() + { + /* Kill off our thread */ + this->m_kill = true; + this->m_thread->Exit(); + } + + /* virtual */ bool IsRunning() + { + if (this->m_thread == NULL) return false; + return this->m_thread->IsRunning(); + } + + /* virtual */ void *GetFiberData() + { + return this->m_param; + } + + static Fiber_Thread *GetCurrentFiber() + { + return s_current; + } + +private: + /** + * First function which is called within the fiber. + */ + static void * CDECL stFiberProc(void *fiber) + { + Fiber_Thread *cur = (Fiber_Thread *)fiber; + /* Now suspend the thread until we get SwitchToFiber() for the first time */ + cur->m_sem->Wait(); + /* If we continue, we are the current thread */ + s_current = cur; + + try { + cur->m_proc(cur->m_param); + } catch (...) { + /* Unlock the main thread */ + s_main->m_sem->Set(); + throw; + } + + return NULL; + } +}; + +/* Initialize the static member of Fiber_Thread */ +/* static */ Fiber_Thread *Fiber_Thread::s_current = NULL; +/* static */ Fiber_Thread *Fiber_Thread::s_main = NULL; + +#ifndef WIN32 + +/* static */ Fiber *Fiber::New(FiberFunc proc, void *param) +{ + return new Fiber_Thread(proc, param); +} + +/* static */ Fiber *Fiber::AttachCurrent(void *param) +{ + return new Fiber_Thread(param); +} + +/* static */ void *Fiber::GetCurrentFiberData() +{ + return Fiber_Thread::GetCurrentFiber()->GetFiberData(); +} + +#endif /* WIN32 */ diff --git a/src/fiber_win32.cpp b/src/fiber_win32.cpp new file mode 100644 index 0000000000..61718188ce --- /dev/null +++ b/src/fiber_win32.cpp @@ -0,0 +1,206 @@ +/* $Id$ */ + +/** @file fiber_win32.cpp Win32 implementation of Fiber. */ + +#include "stdafx.h" +#include "fiber.hpp" +#include +#include +#include + +class Fiber_Win32 : public Fiber { +private: + LPVOID m_fiber; + FiberFunc m_proc; + void *m_param; + bool m_attached; + + static Fiber_Win32 *s_main; + +public: + /** + * Create a win32 fiber and start it, calling proc(param). + */ + Fiber_Win32(FiberFunc proc, void *param) : + m_fiber(NULL), + m_proc(proc), + m_param(param), + m_attached(false) + { + CreateFiber(); + } + + /** + * Create a win32 fiber and attach current thread to it. + */ + Fiber_Win32(void *param) : + m_fiber(NULL), + m_proc(NULL), + m_param(param), + m_attached(true) + { + ConvertThreadToFiber(); + if (s_main == NULL) s_main = this; + } + + /* virtual */ ~Fiber_Win32() + { + if (this->m_fiber != NULL) { + if (this->m_attached) { + this->ConvertFiberToThread(); + } else { + this->DeleteFiber(); + } + } + } + + /* virtual */ void SwitchToFiber() + { + typedef VOID (WINAPI *FnSwitchToFiber)(LPVOID fiber); + + static FnSwitchToFiber fnSwitchToFiber = (FnSwitchToFiber)stGetProcAddr("SwitchToFiber"); + assert(fnSwitchToFiber != NULL); + + fnSwitchToFiber(this->m_fiber); + } + + /* virtual */ void Exit() + { + /* Simply switch back to the main fiber, we kill the fiber sooner or later */ + s_main->SwitchToFiber(); + } + + /* virtual */ bool IsRunning() + { + return this->m_fiber != NULL; + } + + /* virtual */ void *GetFiberData() + { + return this->m_param; + } + + /** + * Win95 doesn't have Fiber support. So check if we have Fiber support, + * and else fall back on Fiber_Thread. + */ + static bool IsSupported() + { + static bool first_run = true; + static bool is_supported = false; + + if (first_run) { + first_run = false; + static const char *names[] = { + "ConvertThreadToFiber", + "CreateFiber", + "DeleteFiber", + "ConvertFiberToThread", + "SwitchToFiber"}; + for (size_t i = 0; i < lengthof(names); i++) { + if (stGetProcAddr(names[i]) == NULL) return false; + } + is_supported = true; + } + return is_supported; + } + +private: + /** + * Get a function from kernel32.dll. + * @param name Function to get. + * @return Proc to the function, or NULL when not found. + */ + static FARPROC stGetProcAddr(const char *name) + { + static HMODULE hKernel = LoadLibraryA("kernel32.dll"); + return GetProcAddress(hKernel, name); + } + + /** + * First function which is called within the fiber. + */ + static VOID CALLBACK stFiberProc(LPVOID fiber) + { + Fiber_Win32 *cur = (Fiber_Win32 *)fiber; + cur->m_proc(cur->m_param); + } + + /** + * Delete a fiber. + */ + void DeleteFiber() + { + typedef VOID (WINAPI *FnDeleteFiber)(LPVOID lpFiber); + + static FnDeleteFiber fnDeleteFiber = (FnDeleteFiber)stGetProcAddr("DeleteFiber"); + assert(fnDeleteFiber != NULL); + + fnDeleteFiber(this->m_fiber); + this->m_fiber = NULL; + } + + /** + * Convert a current thread to a fiber. + */ + void ConvertThreadToFiber() + { + typedef LPVOID (WINAPI *FnConvertThreadToFiber)(LPVOID lpParameter); + + static FnConvertThreadToFiber fnConvertThreadToFiber = (FnConvertThreadToFiber)stGetProcAddr("ConvertThreadToFiber"); + assert(fnConvertThreadToFiber != NULL); + + this->m_fiber = fnConvertThreadToFiber(this); + } + + /** + * Create a new fiber. + */ + void CreateFiber() + { + typedef LPVOID (WINAPI *FnCreateFiber)(SIZE_T dwStackSize, LPFIBER_START_ROUTINE lpStartAddress, LPVOID lpParameter); + + static FnCreateFiber fnCreateFiber = (FnCreateFiber)stGetProcAddr("CreateFiber"); + assert(fnCreateFiber != NULL); + + this->m_fiber = fnCreateFiber(0, &stFiberProc, this); + } + + /** + * Convert a fiber back to a thread. + */ + void ConvertFiberToThread() + { + typedef BOOL (WINAPI *FnConvertFiberToThread)(); + + static FnConvertFiberToThread fnConvertFiberToThread = (FnConvertFiberToThread)stGetProcAddr("ConvertFiberToThread"); + assert(fnConvertFiberToThread != NULL); + + fnConvertFiberToThread(); + this->m_fiber = NULL; + } +}; + +/* Initialize the static member of Fiber_Win32 */ +/* static */ Fiber_Win32 *Fiber_Win32::s_main = NULL; + +/* Include Fiber_Thread, as Win95 needs it */ +#include "fiber_thread.cpp" + +/* static */ Fiber *Fiber::New(FiberFunc proc, void *param) +{ + if (Fiber_Win32::IsSupported()) return new Fiber_Win32(proc, param); + return new Fiber_Thread(proc, param); +} + +/* static */ Fiber *Fiber::AttachCurrent(void *param) +{ + if (Fiber_Win32::IsSupported()) return new Fiber_Win32(param); + return new Fiber_Thread(param); +} + +/* static */ void *Fiber::GetCurrentFiberData() +{ + if (Fiber_Win32::IsSupported()) return ((Fiber *)::GetFiberData())->GetFiberData(); + return Fiber_Thread::GetCurrentFiber()->GetFiberData(); +} diff --git a/src/genworld.cpp b/src/genworld.cpp index 885e5171e5..ca6b89d717 100644 --- a/src/genworld.cpp +++ b/src/genworld.cpp @@ -84,7 +84,7 @@ bool IsGenerateWorldThreaded() /** * The internal, real, generate function. */ -static void *_GenerateWorld(void *arg) +static void * CDECL _GenerateWorld(void *arg) { _generating_world = true; if (_network_dedicated) DEBUG(net, 0, "Generating map, please wait..."); @@ -194,7 +194,7 @@ void WaitTillGeneratedWorld() { if (_gw.thread == NULL) return; _gw.quit_thread = true; - OTTDJoinThread((OTTDThread*)_gw.thread); + _gw.thread->Join(); _gw.thread = NULL; _gw.threaded = false; } @@ -228,6 +228,8 @@ void HandleGeneratingWorldAbortion() if (_cursor.sprite == SPR_CURSOR_ZZZ) SetMouseCursor(SPR_CURSOR_MOUSE, PAL_NONE); /* Show all vital windows again, because we have hidden them */ if (_gw.threaded && _game_mode != GM_MENU) ShowVitalWindows(); + + ThreadObject *thread = _gw.thread; _gw.active = false; _gw.thread = NULL; _gw.proc = NULL; @@ -237,7 +239,7 @@ void HandleGeneratingWorldAbortion() DeleteWindowById(WC_GENERATE_PROGRESS_WINDOW, 0); MarkWholeScreenDirty(); - OTTDExitThread(); + thread->Exit(); } /** @@ -282,7 +284,7 @@ void GenerateWorld(int mode, uint size_x, uint size_y) SetupColorsAndInitialWindow(); if (_network_dedicated || - (_gw.thread = OTTDCreateThread(&_GenerateWorld, NULL)) == NULL) { + (_gw.thread = ThreadObject::New(&_GenerateWorld, NULL)) == NULL) { DEBUG(misc, 1, "Cannot create genworld thread, reverting to single-threaded mode"); _gw.threaded = false; _GenerateWorld(NULL); diff --git a/src/genworld.h b/src/genworld.h index ebac04e167..1fc357d1c4 100644 --- a/src/genworld.h +++ b/src/genworld.h @@ -5,16 +5,6 @@ #ifndef GENWORLD_H #define GENWORLD_H -/* If OTTDThread isn't defined, define it to a void, but make sure to undefine - * it after this include. This makes including genworld.h easier, as you - * don't need to include thread.h before it, while it stays possible to - * include it after it, and still work. - */ -#ifndef OTTDThread -#define TEMPORARY_OTTDTHREAD_DEFINITION -#define OTTDThread void -#endif - #include "player_type.h" /* @@ -43,14 +33,9 @@ struct gw_info { uint size_y; ///< Y-size of the map gw_done_proc *proc; ///< Proc that is called when done (can be NULL) gw_abort_proc *abortp; ///< Proc that is called when aborting (can be NULL) - OTTDThread *thread; ///< The thread we are in (can be NULL) + class ThreadObject *thread; ///< The thread we are in (can be NULL) }; -#ifdef TEMPORARY_OTTDTHREAD_DEFINITION -#undef OTTDThread -#undef TEMPORARY_OTTDTHREAD_DEFINITION -#endif - enum gwp_class { GWP_MAP_INIT, ///< Initialize/allocate the map, start economy GWP_LANDSCAPE, ///< Create the landscape diff --git a/src/saveload.cpp b/src/saveload.cpp index cfcbab3c5c..00713718bc 100644 --- a/src/saveload.cpp +++ b/src/saveload.cpp @@ -1499,7 +1499,7 @@ void SaveFileError() SaveFileDone(); } -static OTTDThread* save_thread; +static ThreadObject *save_thread; /** We have written the whole game into memory, _Savegame_pool, now find * and appropiate compressor and start writing to file. @@ -1561,7 +1561,7 @@ static SaveOrLoadResult SaveFileToDisk(bool threaded) } } -static void* SaveFileToDiskThread(void *arg) +static void * CDECL SaveFileToDiskThread(void *arg) { SaveFileToDisk(true); return NULL; @@ -1569,7 +1569,9 @@ static void* SaveFileToDiskThread(void *arg) void WaitTillSaved() { - OTTDJoinThread(save_thread); + if (save_thread == NULL) return; + + save_thread->Join(); save_thread = NULL; } @@ -1641,7 +1643,7 @@ SaveOrLoadResult SaveOrLoad(const char *filename, int mode, Subdirectory sb) SaveFileStart(); if (_network_server || - (save_thread = OTTDCreateThread(&SaveFileToDiskThread, NULL)) == NULL) { + (save_thread = ThreadObject::New(&SaveFileToDiskThread, NULL)) == NULL) { if (!_network_server) DEBUG(sl, 1, "Cannot create savegame thread, reverting to single-threaded mode..."); SaveOrLoadResult result = SaveFileToDisk(false); diff --git a/src/thread.cpp b/src/thread.cpp deleted file mode 100644 index 4e4f065fbd..0000000000 --- a/src/thread.cpp +++ /dev/null @@ -1,313 +0,0 @@ -/* $Id$ */ - -/** @file thread.cpp */ - -#include "stdafx.h" -#include "thread.h" -#include "core/alloc_func.hpp" -#include - -#if defined(__AMIGA__) || defined(PSP) || defined(NO_THREADS) -OTTDThread *OTTDCreateThread(OTTDThreadFunc function, void *arg) { return NULL; } -void *OTTDJoinThread(OTTDThread *t) { return NULL; } -void OTTDExitThread() { NOT_REACHED(); }; - -#elif defined(__OS2__) - -#define INCL_DOS -#include -#include - -struct OTTDThread { - TID thread; - OTTDThreadFunc func; - void* arg; - void* ret; -}; - -static void Proxy(void* arg) -{ - OTTDThread* t = (OTTDThread*)arg; - t->ret = t->func(t->arg); -} - -OTTDThread* OTTDCreateThread(OTTDThreadFunc function, void* arg) -{ - OTTDThread* t = MallocT(1); - - if (t == NULL) return NULL; - - t->func = function; - t->arg = arg; - t->thread = _beginthread(Proxy, NULL, 32768, t); - if (t->thread != (TID)-1) { - return t; - } else { - free(t); - return NULL; - } -} - -void* OTTDJoinThread(OTTDThread* t) -{ - void* ret; - - if (t == NULL) return NULL; - - DosWaitThread(&t->thread, DCWW_WAIT); - ret = t->ret; - free(t); - return ret; -} - -void OTTDExitThread() -{ - _endthread(); -} - -#elif defined(UNIX) && !defined(MORPHOS) - -#include - -struct OTTDThread { - pthread_t thread; -}; - -OTTDThread* OTTDCreateThread(OTTDThreadFunc function, void* arg) -{ - OTTDThread* t = MallocT(1); - - if (t == NULL) return NULL; - - if (pthread_create(&t->thread, NULL, function, arg) == 0) { - return t; - } else { - free(t); - return NULL; - } -} - -void* OTTDJoinThread(OTTDThread* t) -{ - void* ret; - - if (t == NULL) return NULL; - - pthread_join(t->thread, &ret); - free(t); - return ret; -} - -void OTTDExitThread() -{ - pthread_exit(NULL); -} - -#elif defined(WIN32) - -#include - -struct OTTDThread { - HANDLE thread; - OTTDThreadFunc func; - void* arg; - void* ret; -}; - -static DWORD WINAPI Proxy(LPVOID arg) -{ - OTTDThread* t = (OTTDThread*)arg; - t->ret = t->func(t->arg); - return 0; -} - -OTTDThread* OTTDCreateThread(OTTDThreadFunc function, void* arg) -{ - OTTDThread* t = MallocT(1); - DWORD dwThreadId; - - if (t == NULL) return NULL; - - t->func = function; - t->arg = arg; - t->thread = CreateThread(NULL, 0, Proxy, t, 0, &dwThreadId); - - if (t->thread != NULL) { - return t; - } else { - free(t); - return NULL; - } -} - -void* OTTDJoinThread(OTTDThread* t) -{ - void* ret; - - if (t == NULL) return NULL; - - WaitForSingleObject(t->thread, INFINITE); - CloseHandle(t->thread); - ret = t->ret; - free(t); - return ret; -} - -void OTTDExitThread() -{ - ExitThread(0); -} - - -#elif defined(MORPHOS) - -#include -#include -#include - -#include -#include - -#include - -/* NOTE: this code heavily depends on latest libnix updates. So make - * sure you link with new stuff which supports semaphore locking of - * the IO resources, else it will just go foobar. */ - -struct OTTDThreadStartupMessage { - struct Message msg; ///< standard exec.library message (MUST be the first thing in the message struct!) - OTTDThreadFunc func; ///< function the thread will execute - void *arg; ///< functions arguments for the thread function - void *ret; ///< return value of the thread function - jmp_buf jumpstore; ///< storage for the setjump state -}; - -struct OTTDThread { - struct MsgPort *replyport; - struct OTTDThreadStartupMessage msg; -}; - - -/** - * Default OpenTTD STDIO/ERR debug output is not very useful for this, so we - * utilize serial/ramdebug instead. - */ -#ifndef NO_DEBUG_MESSAGES -void KPutStr(CONST_STRPTR format) -{ - RawDoFmt(format, NULL, (void (*)())RAWFMTFUNC_SERIAL, NULL); -} -#else -#define KPutStr(x) -#endif - -static void Proxy(void) -{ - struct Task *child = FindTask(NULL); - struct OTTDThreadStartupMessage *msg; - - /* Make sure, we don't block the parent. */ - SetTaskPri(child, -5); - - KPutStr("[Child] Progressing...\n"); - - if (NewGetTaskAttrs(NULL, &msg, sizeof(struct OTTDThreadStartupMessage *), TASKINFOTYPE_STARTUPMSG, TAG_DONE) && msg != NULL) { - /* Make use of setjmp() here, so this point can be reached again from inside - * OTTDExitThread() which can be called from anythere inside msg->func. - * It's a bit ugly and in worst case it leaks some memory. */ - if (setjmp(msg->jumpstore) == 0) { - msg->ret = msg->func(msg->arg); - } else { - KPutStr("[Child] Returned to main()\n"); - } - } - - /* Quit the child, exec.library will reply the startup msg internally. */ - KPutStr("[Child] Done.\n"); -} - -OTTDThread* OTTDCreateThread(OTTDThreadFunc function, void *arg) -{ - OTTDThread *t; - struct Task *parent; - - KPutStr("[OpenTTD] Create thread...\n"); - - t = (struct OTTDThread *)AllocVecTaskPooled(sizeof(struct OTTDThread)); - if (t == NULL) return NULL; - - parent = FindTask(NULL); - - /* Make sure main thread runs with sane priority */ - SetTaskPri(parent, 0); - - /* Things we'll pass down to the child by utilizing NP_StartupMsg */ - t->msg.func = function; - t->msg.arg = arg; - t->msg.ret = NULL; - - t->replyport = CreateMsgPort(); - - if (t->replyport != NULL) { - struct Process *child; - - t->msg.msg.mn_Node.ln_Type = NT_MESSAGE; - t->msg.msg.mn_ReplyPort = t->replyport; - t->msg.msg.mn_Length = sizeof(struct OTTDThreadStartupMessage); - - child = CreateNewProcTags( - NP_CodeType, CODETYPE_PPC, - NP_Entry, Proxy, - NP_StartupMsg, (ULONG)&t->msg, - NP_Priority, 5UL, - NP_Name, (ULONG)"OpenTTD Thread", - NP_PPCStackSize, 131072UL, - TAG_DONE); - - if (child != NULL) { - KPutStr("[OpenTTD] Child process launched.\n"); - return t; - } - DeleteMsgPort(t->replyport); - } - FreeVecTaskPooled(t); - - return NULL; -} - -void* OTTDJoinThread(OTTDThread *t) -{ - struct OTTDThreadStartupMessage *reply; - void *ret; - - KPutStr("[OpenTTD] Join threads...\n"); - - if (t == NULL) return NULL; - - KPutStr("[OpenTTD] Wait for child to quit...\n"); - WaitPort(t->replyport); - - reply = (struct OTTDThreadStartupMessage *)GetMsg(t->replyport); - ret = reply->ret; - - DeleteMsgPort(t->replyport); - FreeVecTaskPooled(t); - - return ret; -} - -void OTTDExitThread() -{ - struct OTTDThreadStartupMessage *msg; - - KPutStr("[Child] Aborting...\n"); - - if (NewGetTaskAttrs(NULL, &msg, sizeof(struct OTTDThreadStartupMessage *), TASKINFOTYPE_STARTUPMSG, TAG_DONE) && msg != NULL) { - KPutStr("[Child] Jumping back...\n"); - longjmp(msg->jumpstore, 0xBEAFCAFE); - } - - NOT_REACHED(); -} - -#endif diff --git a/src/thread.h b/src/thread.h index 5ec169b0c3..2b12b45826 100644 --- a/src/thread.h +++ b/src/thread.h @@ -5,12 +5,95 @@ #ifndef THREAD_H #define THREAD_H -struct OTTDThread; +typedef void * (CDECL *OTTDThreadFunc)(void *); -typedef void * (*OTTDThreadFunc)(void*); +/** + * A Thread Object which works on all our supported OSes. + */ +class ThreadObject { +public: + /** + * Virtual destructor to allow 'delete' operator to work properly. + */ + virtual ~ThreadObject() {}; -OTTDThread *OTTDCreateThread(OTTDThreadFunc, void*); -void *OTTDJoinThread(OTTDThread*); -void OTTDExitThread(); + /** + * Check if the thread is currently running. + * @return True if the thread is running. + */ + virtual bool IsRunning() = 0; + + /** + * Waits for the thread to exit. + * @return True if the thread has exited. + */ + virtual bool WaitForStop() = 0; + + /** + * Exit this thread. + */ + virtual bool Exit() = 0; + + /** + * Join this thread. + */ + virtual void *Join() = 0; + + /** + * Check if this thread is the current active thread. + * @return True if it is the current active thread. + */ + virtual bool IsCurrent() = 0; + + /** + * Get the unique ID of this thread. + * @return A value unique to each thread. + */ + virtual uint GetId() = 0; + + /** + * Create a thread; proc will be called as first function inside the thread, + * with optinal params. + * @param proc The procedure to call inside the thread. + * @param param The params to give with 'proc'. + * @return True if the thread was started correctly. + */ + static ThreadObject *New(OTTDThreadFunc proc, void *param); + + /** + * Convert the current thread to a new ThreadObject. + * @return A new ThreadObject with the current thread attached to it. + */ + static ThreadObject* AttachCurrent(); + + /** + * Find the Id of the current running thread. + * @return The thread ID of the current active thread. + */ + static uint CurrentId(); +}; + +/** + * Cross-platform Thread Semaphore. Wait() waits for a Set() of someone else. + */ +class ThreadSemaphore { +public: + static ThreadSemaphore *New(); + + /** + * Virtual Destructor to avoid compiler warnings. + */ + virtual ~ThreadSemaphore() {}; + + /** + * Signal all threads that are in Wait() to continue. + */ + virtual void Set() = 0; + + /** + * Wait until we are signaled by a call to Set(). + */ + virtual void Wait() = 0; +}; #endif /* THREAD_H */ diff --git a/src/thread_none.cpp b/src/thread_none.cpp new file mode 100644 index 0000000000..6209e89a7b --- /dev/null +++ b/src/thread_none.cpp @@ -0,0 +1,42 @@ +/* $Id$ */ + +/** @file thread_none.cpp No-Threads-Available implementation of Threads */ + +#include "stdafx.h" +#include "thread.h" +#include "fiber.hpp" + +/* static */ ThreadObject *ThreadObject::New(OTTDThreadFunc proc, void *param) +{ + return NULL; +} + +/* static */ ThreadObject *ThreadObject::AttachCurrent() +{ + return NULL; +} + +/* static */ uint ThreadObject::CurrentId() +{ + return -1; +} + +/* static */ ThreadSemaphore *ThreadSemaphore::New() +{ + return NULL; +} + +/* static */ Fiber *Fiber::New(FiberFunc proc, void *param) +{ + return NULL; +} + +/* static */ Fiber *Fiber::AttachCurrent(void *param) +{ + return NULL; +} + +/* static */ void *Fiber::GetCurrentFiberData() +{ + return NULL; +} diff --git a/src/thread_os2.cpp b/src/thread_os2.cpp new file mode 100644 index 0000000000..29b33557fa --- /dev/null +++ b/src/thread_os2.cpp @@ -0,0 +1,80 @@ +/* $Id$ */ + +/** @file thread_os2.cpp OS2 implementation of Threads. */ + +#include "stdafx.h" +#include "thread.h" + +#if 0 +#include "debug.h" +#include "core/alloc_func.hpp" +#include + +#define INCL_DOS +#include +#include + +struct OTTDThread { + TID thread; + OTTDThreadFunc func; + void *arg; + void *ret; +}; + +static void Proxy(void *arg) +{ + OTTDThread *t = (OTTDThread *)arg; + t->ret = t->func(t->arg); +} + +OTTDThread *OTTDCreateThread(OTTDThreadFunc function, void *arg) +{ + OTTDThread *t = MallocT(1); + + t->func = function; + t->arg = arg; + t->thread = _beginthread(Proxy, NULL, 32768, t); + if (t->thread != (TID)-1) { + return t; + } else { + free(t); + return NULL; + } +} + +void *OTTDJoinThread(OTTDThread *t) +{ + if (t == NULL) return NULL; + + DosWaitThread(&t->thread, DCWW_WAIT); + void *ret = t->ret; + free(t); + return ret; +} + +void OTTDExitThread() +{ + _endthread(); +} + +#endif + +/* static */ ThreadObject *ThreadObject::New(OTTDThreadFunc proc, void *param) +{ + return NULL; +} + +/* static */ ThreadObject *ThreadObject::AttachCurrent() +{ + return NULL; +} + +/* static */ uint ThreadObject::CurrentId() +{ + return -1; +} + +/* static */ ThreadSemaphore *ThreadSemaphore::New() +{ + return NULL; +} diff --git a/src/thread_pthread.cpp b/src/thread_pthread.cpp new file mode 100644 index 0000000000..7aa0fda18e --- /dev/null +++ b/src/thread_pthread.cpp @@ -0,0 +1,199 @@ +/* $Id$ */ + +/** @file thread_pthread.cpp POSIX pthread implementation of Threads. */ + +#include "stdafx.h" +#include "thread.h" +#include "debug.h" +#include "core/alloc_func.hpp" +#include +#include +#include +#include + +/** + * POSIX pthread version for ThreadObject. + */ +class ThreadObject_pthread : public ThreadObject { +private: + pthread_t m_thr; ///< System thread identifier. + OTTDThreadFunc m_proc; ///< External thread procedure. + void *m_param; ///< Parameter for the external thread procedure. + bool m_attached; ///< True if the ThreadObject was attached to an existing thread. + sem_t m_sem_start; ///< Here the new thread waits before it starts. + sem_t m_sem_stop; ///< Here the other thread can wait for this thread to end. + +public: + /** + * Create a pthread and start it, calling proc(param). + */ + ThreadObject_pthread(OTTDThreadFunc proc, void *param) : + m_thr(0), + m_proc(proc), + m_param(param), + m_attached(false) + { + sem_init(&m_sem_start, 0, 0); + sem_init(&m_sem_stop, 0, 0); + + pthread_create(&m_thr, NULL, &stThreadProc, this); + sem_post(&m_sem_start); + } + + /** + * Create a pthread and attach current thread to it. + */ + ThreadObject_pthread() : + m_thr(0), + m_proc(NULL), + m_param(0), + m_attached(true) + { + sem_init(&m_sem_start, 0, 0); + sem_init(&m_sem_stop, 0, 0); + + m_thr = pthread_self(); + } + + /* virtual */ ~ThreadObject_pthread() + { + sem_destroy(&m_sem_stop); + sem_destroy(&m_sem_start); + }; + + /* virtual */ bool IsRunning() + { + return m_thr != 0; + } + + /* virtual */ bool WaitForStop() + { + /* You can't wait on yourself */ + assert(!IsCurrent()); + /* If the thread is not running, waiting is over */ + if (!IsRunning()) return true; + + int ret = sem_wait(&m_sem_stop); + if (ret == 0) { + /* We have passed semaphore so increment it again */ + sem_post(&m_sem_stop); + return true; + } + return false; + } + + /* virtual */ bool Exit() + { + /* You can only exit yourself */ + assert(IsCurrent()); + /* If the thread is not running, we are already closed */ + if (!IsRunning()) return false; + + /* For now we terminate by throwing an error, gives much cleaner cleanup */ + throw 0; + } + + /* virtual */ void *Join() + { + /* You cannot join yourself */ + assert(!IsCurrent()); + + void *ret; + pthread_join(m_thr, &ret); + m_thr = 0; + + return ret; + } + + /* virtual */ bool IsCurrent() + { + return pthread_self() == m_thr; + } + + /* virtual */ uint GetId() + { + return (uint)m_thr; + } + +private: + /** + * On thread creation, this function is called, which calls the real startup + * function. This to get back into the correct instance again. + */ + static void *stThreadProc(void *thr) + { + return ((ThreadObject_pthread *)thr)->ThreadProc(); + } + + /** + * A new thread is created, and this function is called. Call the custom + * function of the creator of the thread. + */ + void *ThreadProc() + { + /* The new thread stops here so the calling thread can complete pthread_create() call */ + sem_wait(&m_sem_start); + + /* Call the proc of the creator to continue this thread */ + try { + m_proc(m_param); + } catch (...) { + } + + /* Notify threads waiting for our completion */ + sem_post(&m_sem_stop); + + return NULL; + } +}; + +/* static */ ThreadObject *ThreadObject::New(OTTDThreadFunc proc, void *param) +{ + return new ThreadObject_pthread(proc, param); +} + +/* static */ ThreadObject *ThreadObject::AttachCurrent() +{ + return new ThreadObject_pthread(); +} + +/* static */ uint ThreadObject::CurrentId() +{ + return (uint)pthread_self(); +} + + +/** + * POSIX pthread version of ThreadSemaphore. + */ +class ThreadSemaphore_pthread : public ThreadSemaphore { +private: + sem_t m_sem; + +public: + ThreadSemaphore_pthread() + { + sem_init(&m_sem, 0, 0); + } + + /* virtual */ ~ThreadSemaphore_pthread() + { + sem_destroy(&m_sem); + } + + /* virtual */ void Set() + { + int val = 0; + if (sem_getvalue(&m_sem, &val) == 0 && val == 0) sem_post(&m_sem); + } + + /* virtual */ void Wait() + { + sem_wait(&m_sem); + } +}; + +/* static */ ThreadSemaphore *ThreadSemaphore::New() +{ + return new ThreadSemaphore_pthread(); +} diff --git a/src/thread_win32.cpp b/src/thread_win32.cpp new file mode 100644 index 0000000000..778bee9bfe --- /dev/null +++ b/src/thread_win32.cpp @@ -0,0 +1,188 @@ +/* $Id$ */ + +/** @file thread_win32.cpp Win32 thread implementation of Threads. */ + +#include "stdafx.h" +#include "thread.h" +#include "debug.h" +#include "core/alloc_func.hpp" +#include +#include +#include + +/** + * Win32 thread version for ThreadObject. + */ +class ThreadObject_Win32 : public ThreadObject { +private: + uint m_id_thr; + HANDLE m_h_thr; + OTTDThreadFunc m_proc; + void *m_param; + bool m_attached; + void *ret; + +public: + /** + * Create a win32 thread and start it, calling proc(param). + */ + ThreadObject_Win32(OTTDThreadFunc proc, void *param) : + m_id_thr(0), + m_h_thr(NULL), + m_proc(proc), + m_param(param), + m_attached(false) + { + m_h_thr = (HANDLE)_beginthreadex(NULL, 0, &stThreadProc, this, CREATE_SUSPENDED, &m_id_thr); + if (m_h_thr == NULL) return; + ResumeThread(m_h_thr); + } + + /** + * Create a win32 thread and attach current thread to it. + */ + ThreadObject_Win32() : + m_id_thr(0), + m_h_thr(NULL), + m_proc(NULL), + m_param(NULL), + m_attached(false) + { + BOOL ret = DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), GetCurrentProcess(), &m_h_thr, 0, FALSE, DUPLICATE_SAME_ACCESS); + if (!ret) return; + m_id_thr = GetCurrentThreadId(); + } + + /* virtual */ ~ThreadObject_Win32() + { + if (m_h_thr != NULL) { + CloseHandle(m_h_thr); + m_h_thr = NULL; + } + } + + /* virtual */ bool IsRunning() + { + if (m_h_thr == NULL) return false; + DWORD exit_code = 0; + if (!GetExitCodeThread(m_h_thr, &exit_code)) return false; + return (exit_code == STILL_ACTIVE); + } + + /* virtual */ bool WaitForStop() + { + /* You can't wait on yourself */ + assert(!IsCurrent()); + /* If the thread is not running, waiting is over */ + if (!IsRunning()) return true; + + DWORD res = WaitForSingleObject(m_h_thr, INFINITE); + return res == WAIT_OBJECT_0; + } + + /* virtual */ bool Exit() + { + /* You can only exit yourself */ + assert(IsCurrent()); + /* If the thread is not running, we are already closed */ + if (!IsRunning()) return false; + + /* For now we terminate by throwing an error, gives much cleaner cleanup */ + throw 0; + } + + /* virtual */ void *Join() + { + /* You cannot join yourself */ + assert(!IsCurrent()); + + WaitForSingleObject(m_h_thr, INFINITE); + + return this->ret; + } + + /* virtual */ bool IsCurrent() + { + DWORD id_cur = GetCurrentThreadId(); + return id_cur == m_id_thr; + } + + /* virtual */ uint GetId() + { + return m_id_thr; + } + +private: + /** + * On thread creation, this function is called, which calls the real startup + * function. This to get back into the correct instance again. + */ + static uint CALLBACK stThreadProc(void *thr) + { + return ((ThreadObject_Win32 *)thr)->ThreadProc(); + } + + /** + * A new thread is created, and this function is called. Call the custom + * function of the creator of the thread. + */ + uint ThreadProc() + { + try { + this->ret = m_proc(m_param); + } catch (...) { + } + + return 0; + } +}; + +/* static */ ThreadObject *ThreadObject::New(OTTDThreadFunc proc, void *param) +{ + return new ThreadObject_Win32(proc, param); +} + +/* static */ ThreadObject* ThreadObject::AttachCurrent() +{ + return new ThreadObject_Win32(); +} + +/* static */ uint ThreadObject::CurrentId() +{ + return GetCurrentThreadId(); +} + + +/** + * Win32 thread version of ThreadSemaphore. + */ +class ThreadSemaphore_Win32 : public ThreadSemaphore { +private: + HANDLE m_handle; + +public: + ThreadSemaphore_Win32() + { + m_handle = ::CreateEvent(NULL, FALSE, FALSE, NULL); + } + + /* virtual */ ~ThreadSemaphore_Win32() + { + ::CloseHandle(m_handle); + } + + /* virtual */ void Set() + { + ::SetEvent(m_handle); + } + + /* virtual */ void Wait() + { + ::WaitForSingleObject(m_handle, INFINITE); + } +}; + +/* static */ ThreadSemaphore *ThreadSemaphore::New() +{ + return new ThreadSemaphore_Win32(); +}