Refactor parallelism to re-usable method

This commit is contained in:
Ted John 2018-05-27 01:36:55 +01:00
parent 1766abde91
commit 3a708ea112
1 changed files with 48 additions and 37 deletions

View File

@ -500,6 +500,33 @@ private:
return requiredObjects;
}
template<typename T, typename TFunc>
static void ParallelFor(const std::vector<T>& items, TFunc func)
{
auto partitions = std::thread::hardware_concurrency();
auto partitionSize = (items.size() + (partitions - 1)) / partitions;
std::vector<std::thread> threads;
for (size_t n = 0; n < partitions; n++)
{
auto begin = n * partitionSize;
auto end = std::min(items.size(), begin + partitionSize);
threads.emplace_back(
[func, &items](size_t begin, size_t end)
{
for (size_t i = begin; i < end; i++)
{
func(i);
}
},
begin,
end);
}
for (auto& t : threads)
{
t.join();
}
}
std::vector<Object *> LoadObjects(std::vector<const ObjectRepositoryItem *> &requiredObjects, size_t * outNewObjectsLoaded)
{
std::vector<Object *> objects;
@ -510,51 +537,35 @@ private:
// Read objects
std::mutex commonMutex;
auto batch =
[this, &commonMutex, requiredObjects, &objects, &badObjects, &loadedObjects](size_t begin, size_t end) -> void
ParallelFor(
requiredObjects,
[this, &commonMutex, requiredObjects, &objects, &badObjects, &loadedObjects](size_t i)
{
for (size_t i = begin; i < end; i++)
auto ori = requiredObjects[i];
Object * loadedObject = nullptr;
if (ori != nullptr)
{
auto ori = requiredObjects[i];
Object * loadedObject = nullptr;
if (ori != nullptr)
loadedObject = ori->LoadedObject;
if (loadedObject == nullptr)
{
loadedObject = ori->LoadedObject;
loadedObject = _objectRepository->LoadObject(ori);
if (loadedObject == nullptr)
{
loadedObject = _objectRepository->LoadObject(ori);
if (loadedObject == nullptr)
{
std::lock_guard<std::mutex> guard(commonMutex);
badObjects.push_back(ori->ObjectEntry);
ReportObjectLoadProblem(&ori->ObjectEntry);
}
else
{
std::lock_guard<std::mutex> guard(commonMutex);
loadedObjects.push_back(loadedObject);
// Connect the ori to the registered object
_objectRepository->RegisterLoadedObject(ori, loadedObject);
}
std::lock_guard<std::mutex> guard(commonMutex);
badObjects.push_back(ori->ObjectEntry);
ReportObjectLoadProblem(&ori->ObjectEntry);
}
else
{
std::lock_guard<std::mutex> guard(commonMutex);
loadedObjects.push_back(loadedObject);
// Connect the ori to the registered object
_objectRepository->RegisterLoadedObject(ori, loadedObject);
}
}
objects[i] = loadedObject;
}
};
auto partitions = std::thread::hardware_concurrency();
auto partitionSize = (requiredObjects.size() + (partitions - 1)) / partitions;
std::vector<std::thread> threads;
for (size_t n = 0; n < partitions; n++)
{
auto begin = n * partitionSize;
auto end = std::min(requiredObjects.size(), begin + partitionSize);
threads.emplace_back(batch, begin, end);
}
for (auto& t : threads)
{
t.join();
}
objects[i] = loadedObject;
});
// Load objects
for (auto obj : loadedObjects)