From f2428d8bb6b65db7bb4b14fc7d83665e3d7298b0 Mon Sep 17 00:00:00 2001 From: Kaehvaman Date: Tue, 31 Dec 2024 16:25:55 +0400 Subject: [PATCH] =?UTF-8?q?=D1=8F=20=D0=BD=D0=B5=20=D0=BF=D0=BE=D0=BC?= =?UTF-8?q?=D0=BD=D1=8E=20=D1=87=D1=82=D0=BE=20=D1=82=D1=83=D1=82=20=D0=B1?= =?UTF-8?q?=D1=8B=D0=BB=D0=BE,=20=D0=BD=D0=BE=20=D0=BA=D0=BE=D0=BC=D0=BC?= =?UTF-8?q?=D0=B8=D1=82=20=D0=BD=D0=B0=D0=B4=D0=BE=20=D1=81=D0=B4=D0=B5?= =?UTF-8?q?=D0=BB=D0=B0=D1=82=D1=8C...?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Game of Life/Game of Life.vcxproj | 6 +- .../Game of Life/Game of Life.vcxproj.filters | 6 + Game of Life/src/main.c | 55 ++- Game of Life/src/parallel_for.c | 463 ++++++++++++++++++ Game of Life/src/parallel_for.h | 34 ++ .../lab16 with raylib.vcxproj | 2 - .../lab16 with raylib.vcxproj.filters | 8 +- lab16 with raylib/resources/watershader.frag | 2 +- lab16 with raylib/src/main.c | 30 +- .../parallel for loop from stackexchange.sln | 28 ++ .../parallel_for/parallel_for.vcxproj | 140 ++++++ .../parallel_for/parallel_for.vcxproj.filters | 33 ++ .../src/parallel_for.c | 463 ++++++++++++++++++ .../src/parallel_for.h | 34 ++ .../src/simpler test.c | 33 ++ .../src/test.c | 137 ++++++ 16 files changed, 1445 insertions(+), 29 deletions(-) create mode 100644 Game of Life/src/parallel_for.c create mode 100644 Game of Life/src/parallel_for.h create mode 100644 parallel for loop from stackexchange/parallel for loop from stackexchange.sln create mode 100644 parallel for loop from stackexchange/parallel_for/parallel_for.vcxproj create mode 100644 parallel for loop from stackexchange/parallel_for/parallel_for.vcxproj.filters create mode 100644 parallel for loop from stackexchange/src/parallel_for.c create mode 100644 parallel for loop from stackexchange/src/parallel_for.h create mode 100644 parallel for loop from stackexchange/src/simpler test.c create mode 100644 parallel for loop from stackexchange/src/test.c diff --git a/Game of Life/Game of Life/Game of Life.vcxproj b/Game of Life/Game of Life/Game of Life.vcxproj index 166cc1d..911436e 100644 --- a/Game of Life/Game of Life/Game of Life.vcxproj +++ b/Game of Life/Game of Life/Game of Life.vcxproj @@ -121,11 +121,11 @@ true NDEBUG;_CONSOLE;%(PreprocessorDefinitions) true - $(SolutionDir)\include + $(SolutionDir)\include; Default - Console + Windows true true true @@ -140,10 +140,12 @@ + + diff --git a/Game of Life/Game of Life/Game of Life.vcxproj.filters b/Game of Life/Game of Life/Game of Life.vcxproj.filters index 983ee57..3cfe70d 100644 --- a/Game of Life/Game of Life/Game of Life.vcxproj.filters +++ b/Game of Life/Game of Life/Game of Life.vcxproj.filters @@ -33,6 +33,9 @@ Файлы заголовков + + Файлы заголовков + @@ -44,5 +47,8 @@ Исходные файлы + + Исходные файлы + \ No newline at end of file diff --git a/Game of Life/src/main.c b/Game of Life/src/main.c index 2d204f4..be57189 100644 --- a/Game of Life/src/main.c +++ b/Game of Life/src/main.c @@ -8,12 +8,14 @@ #include #include +#include "parallel_for.h" + #define RAYGUI_IMPLEMENTATION #include "raygui.h" -#define MAP_X 200 -#define MAP_Y 100 -#define CELL_SIZE 12 +#define MAP_X 400 +#define MAP_Y 200 +#define CELL_SIZE 6 #define FCELL_SIZE (float)CELL_SIZE #define BOTTOM_BAR_HEIGHT 60 @@ -33,7 +35,7 @@ void* SafeMalloc(size_t size) { void* buffer = malloc(size); if (buffer == NULL) { - fprintf(stderr, "Fatal: failed to allocate %zu bytes.\n", size); + fprintf(stderr, "Error in SafeMalloc: failed to allocate %zu bytes.\n", size); abort(); } return buffer; @@ -43,7 +45,7 @@ void* SafeCalloc(size_t count, size_t size) { void* buffer = calloc(count, size); if (buffer == NULL) { - fprintf(stderr, "Fatal: failed to allocate %zu bytes.\n", count * size); + fprintf(stderr, "Error in SafeCalloc: failed to allocate %zu bytes.\n", count * size); abort(); } return buffer; @@ -90,6 +92,37 @@ void celluralAutomata() } } +//int compute_cell(int x) { +// int neighbours = 0; +// +// neighbours += checkCell(x - 1, y); +// neighbours += checkCell(x - 1, y + 1); +// neighbours += checkCell(x - 1, y - 1); +// neighbours += checkCell(x + 1, y); +// neighbours += checkCell(x + 1, y + 1); +// neighbours += checkCell(x + 1, y - 1); +// neighbours += checkCell(x, y + 1); +// neighbours += checkCell(x, y - 1); +// +// if (neighbours == 3) { +// tempMap[x][y] = true; +// } +// else if (neighbours == 2) { +// tempMap[x][y] = map[x][y]; +// } +// else { +// tempMap[x][y] = false; +// } +//} +// +//void* compute_cell_forp(void* arg) +//{ +// int* pa = (int*)arg; +// int* result = malloc(sizeof(*result)); +// *result = mult2(*pa); +// return result; +//} + void ClearMap() { for (int x = 0; x < MAP_X; x++) { memset(map[x], 0, MAP_Y * sizeof(bool)); @@ -242,19 +275,25 @@ int main() DrawFPS(0, MAP_Y * CELL_SIZE); DrawText(TextFormat("%.4fx", simSpeed), 0, MAP_Y * CELL_SIZE + 20, 20, ORANGE); - DrawText(TextFormat("%.1f TPS", monitorFPS * simSpeed), 0, MAP_Y * CELL_SIZE + 40, 20, BLUE); + DrawText(TextFormat("%.1f TPS", GetFPS() * simSpeed), 0, MAP_Y * CELL_SIZE + 40, 20, BLUE); EndDrawing(); } - /*for (int x = 0; x < MAP_X; x++) { + for (int x = 0; x < MAP_X; x++) { free(map[x]); + free(tempMap[x]); } - free(map);*/ + free(map); + free(tempMap); UnloadFont(InconsolataBold); CloseWindow(); return 0; +} + +int WinMain() { + return main(); } \ No newline at end of file diff --git a/Game of Life/src/parallel_for.c b/Game of Life/src/parallel_for.c new file mode 100644 index 0000000..8dea67f --- /dev/null +++ b/Game of Life/src/parallel_for.c @@ -0,0 +1,463 @@ +#include "parallel_for.h" + +#if defined(__APPLE__) || defined(__linux__) +#define POSIX +#elif defined(_WIN32) + +#else +#error "Platform not supported." +#endif + +#include +#include + +#ifdef POSIX +#include +#include +#else +#include +#endif + +/****************************************************************************** +* A task descriptor specifying the input element and the address at which the * +* output element should be stored. * +******************************************************************************/ +typedef struct task_descriptor { + void* input_element; + void** output_element_address; +} task_descriptor; + +/************************************************************ +* This structure implements a concurrent array-based queue. * +************************************************************/ +typedef struct concurrent_queue { + +#ifdef POSIX + pthread_mutex_t mutex; +#elif defined(_WIN32) + CRITICAL_SECTION criticalSection; +#endif + + task_descriptor** array; + size_t begin_index; + size_t end_index; + size_t size; + size_t len; +} concurrent_queue; + +/************************************************************ +* Initializes the input concurrent queue to an empty state. * +************************************************************/ +static int concurrent_queue_init(concurrent_queue* queue, size_t len) +{ + int ret; + queue->array = malloc(len * sizeof(*queue->array)); + + if (queue->array == NULL) + { + return ERROR_FORP_MALLOC_FAIL; + } + + queue->begin_index = 0; + queue->end_index = 0; + queue->size = 0; + queue->len = len; + +#ifdef POSIX + + ret = pthread_mutex_init(&queue->mutex, NULL); + + if (ret != 0) + { + return ERROR_FORP_NO_MUTEX_INIT; + } + +#else + + InitializeCriticalSection(&queue->criticalSection); + +#endif + + return ERROR_FORP_SUCCESS; +} + +/****************************************************** +* Appends a task descriptor to the tail of the queue. * +******************************************************/ +static void concurrent_queue_enqueue(concurrent_queue* queue, + task_descriptor* descriptor) +{ + queue->array[queue->end_index] = descriptor; + queue->end_index++; + queue->size++; +} + +/****************************************************************************** +* Removes the head element from the queue. Unlike all other functions related * +* to the queue, this is one is thread-safe. * +******************************************************************************/ +static task_descriptor* concurrent_queue_dequeue(concurrent_queue* queue) +{ + task_descriptor* descriptor; + +#ifdef POSIX + pthread_mutex_lock(&queue->mutex); +#else + EnterCriticalSection(&queue->criticalSection); +#endif + + if (queue->size > 0) + { + descriptor = queue->array[queue->begin_index]; + queue->begin_index++; + queue->size--; + } + else + { + descriptor = NULL; + } + +#ifdef POSIX + pthread_mutex_unlock(&queue->mutex); +#else + LeaveCriticalSection(&queue->criticalSection); +#endif + + return descriptor; +} + +/***************************************************************************** +* Releases all the resources occupied by the queue, or namely, the mutex and * +* the array. * +*****************************************************************************/ +static int concurrent_queue_destroy(concurrent_queue* queue) +{ + int ret; + size_t i; + + for (i = 0; i < queue->len; i++) + { + free(queue->array[i]); + } + + free(queue->array); + +#ifdef POSIX + ret = pthread_mutex_destroy(&queue->mutex); + return ret == 0 ? ERROR_FORP_SUCCESS : ERROR_FORP_NO_MUTEX_DESTROY; +#else + DeleteCriticalSection(&queue->criticalSection); + return ERROR_FORP_SUCCESS; +#endif + +} + +/******************************************************* +* Returns the number of processors on Mac OS or Linux. * +*******************************************************/ +static int get_number_of_processors_apple_linux(size_t* p_number_of_processors) +{ +#ifdef POSIX + * p_number_of_processors = (size_t)sysconf(_SC_NPROCESSORS_ONLN); +#endif + + return ERROR_FORP_SUCCESS; +} + +/*********************************************** +* Returns the number of processors on Windows. * +***********************************************/ +static int get_number_of_processors_windows(size_t* p_number_of_processors) +{ +#ifdef _WIN32 + SYSTEM_INFO si; + GetSystemInfo(&si); + *p_number_of_processors = (size_t)2 * si.dwNumberOfProcessors; +#endif + + return ERROR_FORP_SUCCESS; +} + +/************************************************************** +* A portable function for returning the number of processors. * +**************************************************************/ +static int get_number_of_processors(size_t* p_number_of_processors) +{ +#ifdef POSIX + return get_number_of_processors_apple_linux(p_number_of_processors); +#else + return get_number_of_processors_windows(p_number_of_processors); +#endif +} + +/***************************************************************************** +* Specifies the worker thread arguments. Holds the queue and the function to * +* be applied to each queue element. * +*****************************************************************************/ +typedef struct worker_thread_proc_args { + concurrent_queue* queue; + void* (*func)(void*); + int return_status; +} worker_thread_proc_args; + +/********************************* +* Implements the worker threads. * +*********************************/ +static void* worker_thread_proc(void* args) +{ + worker_thread_proc_args* worker_thread_proc_arguments = + (worker_thread_proc_args*)args; + + concurrent_queue* queue = worker_thread_proc_arguments->queue; + void* (*func)(void*) = worker_thread_proc_arguments->func; + task_descriptor* task_desc; + int ret = 0; + +#ifdef POSIX + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); +#endif + + if (ret != 0) + { + worker_thread_proc_arguments->return_status = ret; + return NULL; + } + else + { + worker_thread_proc_arguments->return_status = 0; + } + + while ((task_desc = concurrent_queue_dequeue(queue)) != NULL) + { + *task_desc->output_element_address = func(task_desc->input_element); + } + + return NULL; +} + +/*************************************** +* Cancels all the first 'len' threads. * +***************************************/ +#ifdef POSIX +static void cancel_threads(pthread_t* pthreads, size_t len) +#else +static void cancel_threads(HANDLE* threads, size_t len) +#endif +{ + size_t i; + + for (i = 0; i < len; ++i) + { + +#ifdef POSIX + pthread_cancel(pthreads[i]); +#else + TerminateThread(threads[i], 0); +#endif + + } +} + +/*********************************************************** +* The actual implementation of the parallel for construct. * +***********************************************************/ +int forp(void** input, void** output, size_t len, void* (*func)(void*)) +{ + size_t number_of_cores; + size_t szi; + int ret; + int join_ret = ERROR_FORP_SUCCESS; + concurrent_queue queue; + task_descriptor* task_desc; + worker_thread_proc_args* wtpa; + +#ifdef POSIX + + pthread_t* threads; + +#else + + HANDLE* threads; + +#endif + + if (input == NULL || output == NULL || func == NULL) + { + return ERROR_FORP_NO_ARGS; + } + + if (len == 0) + { + /***************** + * Nothing to do. * + *****************/ + return ERROR_FORP_SUCCESS; + } + + ret = get_number_of_processors(&number_of_cores); + + if (ret != ERROR_FORP_SUCCESS) + { + return ret; + } + + if (number_of_cores == 0) + { + return ERROR_FORP_UNKNOWN_CORES; + } + + if ((ret = concurrent_queue_init(&queue, len)) != ERROR_FORP_SUCCESS) + { + return ret; + } + + /************************************** + * Create a concurrent queue of tasks. * + **************************************/ + for (szi = 0; szi < len; szi++) + { + task_desc = malloc(sizeof * task_desc); + + if (task_desc == NULL) + { + concurrent_queue_destroy(&queue); + return ERROR_FORP_MALLOC_FAIL; + } + + task_desc->input_element = input[szi]; + task_desc->output_element_address = &output[szi]; + concurrent_queue_enqueue(&queue, task_desc); + + if (ret != ERROR_FORP_SUCCESS) + { + concurrent_queue_destroy(&queue); + return ret; + } + } + + /***************************** + * Create the worker threads. * + *****************************/ + threads = malloc(number_of_cores * sizeof(*threads)); + + if (threads == NULL) + { + concurrent_queue_destroy(&queue); + return ERROR_FORP_MALLOC_FAIL; + } + + wtpa = malloc(number_of_cores * sizeof(*wtpa)); + + if (wtpa == NULL) + { + free(threads); + concurrent_queue_destroy(&queue); + return ERROR_FORP_MALLOC_FAIL; + } + + for (szi = 0; szi < number_of_cores; szi++) + { + wtpa[szi].queue = &queue; + wtpa[szi].func = func; + wtpa[szi].return_status = 0; + +#ifdef POSIX + ret = pthread_create(&threads[szi], + NULL, + worker_thread_proc, + &wtpa[szi]); +#else + threads[szi] = CreateThread(NULL, + 100000, + (LPTHREAD_START_ROUTINE)worker_thread_proc, + (LPVOID)&wtpa[szi], + 0, + NULL); +#endif + + if (ret != 0) + { + cancel_threads(threads, szi); + concurrent_queue_destroy(&queue); + return ERROR_FORP_NO_THREAD; + } + + if (wtpa[szi].return_status != 0) + { + cancel_threads(threads, szi + 1); + concurrent_queue_destroy(&queue); + return ERROR_FORP_NO_SETCANCELTYPE; + } + } + + /*********************************************** + * Wait for all the worker threads to complete. * + ***********************************************/ + for (szi = 0; szi < number_of_cores; szi++) + { +#ifdef _WIN32 + + if (WaitForSingleObject(threads[szi], INFINITE) != 0 && join_ret == 0) + { + join_ret = ERROR_FORP_NO_JOIN; + } +#else + join_ret = pthread_join(threads[szi], NULL); + + if (ret != 0 && join_ret == ERROR_FORP_SUCCESS) + { + join_ret = ERROR_FORP_NO_JOIN; + } +#endif + } + + return join_ret; +} + +const char* forp_error(int error_code) +{ + switch (error_code) + { + case ERROR_FORP_SUCCESS: + return "forp succeeded."; + + case ERROR_FORP_NO_ARGS: + return "Some arguments missing."; + + case ERROR_FORP_NO_JOIN: + return "Could not join a thread."; + + case ERROR_FORP_CPU_FEOF: + return "Reached EOF while reading the number of processors."; + + case ERROR_FORP_NO_THREAD: + return "Could create a thread."; + + case ERROR_FORP_CPU_FERROR: + return "An error occured while reading the number of processors."; + + case ERROR_FORP_POPEN_FAIL: + return "Could not execute a program in popen."; + + case ERROR_FORP_MALLOC_FAIL: + return "A call to malloc returned NULL."; + + case ERROR_FORP_SSCANF_FAIL: + return "sscanf failed."; + + case ERROR_FORP_NO_MUTEX_INIT: + return "Could not initialize a mutex."; + + case ERROR_FORP_NO_MUTEX_DESTROY: + return "Could not destroy a mutex."; + + case ERROR_FORP_UNKNOWN_CORES: + return "Could not determine the number of processors."; + + case ERROR_FORP_NO_SETCANCELTYPE: + return "setcanceltype failed."; + + default: + return "Unknown error code."; + } +} \ No newline at end of file diff --git a/Game of Life/src/parallel_for.h b/Game of Life/src/parallel_for.h new file mode 100644 index 0000000..060b294 --- /dev/null +++ b/Game of Life/src/parallel_for.h @@ -0,0 +1,34 @@ +#ifndef PARALLEL_FOR_H +#define PARALLEL_FOR_H + +#include + +#define ERROR_FORP_SUCCESS 0 +#define ERROR_FORP_NO_ARGS 1 +#define ERROR_FORP_UNKNOWN_CORES 2 +#define ERROR_FORP_NO_MUTEX_INIT 3 +#define ERROR_FORP_NO_MUTEX_DESTROY 4 +#define ERROR_FORP_MALLOC_FAIL 5 +#define ERROR_FORP_SSCANF_FAIL 6 +#define ERROR_FORP_POPEN_FAIL 7 +#define ERROR_FORP_CPU_FEOF 8 +#define ERROR_FORP_CPU_FERROR 9 +#define ERROR_FORP_NO_THREAD 10 +#define ERROR_FORP_NO_SETCANCELTYPE 11 +#define ERROR_FORP_NO_JOIN 12 + +/******************************************************************************* +* Runs a multithreaded for loop over the input array producing the results and * +* storing them in the output array. * +*******************************************************************************/ +int forp(void** input, + void** output, + size_t len, + void* (*func)(void*)); + +/************************************************************************* +* Returns a human-readable description of an error code related to forp. * +*************************************************************************/ +const char* forp_error(int error_code); + +#endif /* PARALLEL_FOR_H */ \ No newline at end of file diff --git a/lab16 with raylib/lab16 with raylib/lab16 with raylib.vcxproj b/lab16 with raylib/lab16 with raylib/lab16 with raylib.vcxproj index bcaa159..c4f2734 100644 --- a/lab16 with raylib/lab16 with raylib/lab16 with raylib.vcxproj +++ b/lab16 with raylib/lab16 with raylib/lab16 with raylib.vcxproj @@ -156,8 +156,6 @@ - - diff --git a/lab16 with raylib/lab16 with raylib/lab16 with raylib.vcxproj.filters b/lab16 with raylib/lab16 with raylib/lab16 with raylib.vcxproj.filters index f946f49..6045f96 100644 --- a/lab16 with raylib/lab16 with raylib/lab16 with raylib.vcxproj.filters +++ b/lab16 with raylib/lab16 with raylib/lab16 with raylib.vcxproj.filters @@ -65,14 +65,8 @@ - - Файлы ресурсов - - - Файлы ресурсов - - Файлы ресурсов + Исходные файлы \ No newline at end of file diff --git a/lab16 with raylib/resources/watershader.frag b/lab16 with raylib/resources/watershader.frag index aee615d..3927351 100644 --- a/lab16 with raylib/resources/watershader.frag +++ b/lab16 with raylib/resources/watershader.frag @@ -33,7 +33,7 @@ vec4 blur13(sampler2D image, vec2 uv, vec2 resolution, vec2 direction) void main() { - vec4 bumpColor = texture(waterBumpMap, fragTexCoord + sin(seconds / 2.0) / 20.0); + vec4 bumpColor = texture(waterBumpMap, fragTexCoord/5 + sin(seconds / 2.0) / 20.0); bumpColor = (bumpColor + texture(waterBumpMap, fragTexCoord*1.5 + cos(seconds / 2.0) / 20.0)) * 0.5; vec2 samplePos = fragTexCoord; diff --git a/lab16 with raylib/src/main.c b/lab16 with raylib/src/main.c index ee117e9..c8a89b2 100644 --- a/lab16 with raylib/src/main.c +++ b/lab16 with raylib/src/main.c @@ -17,24 +17,30 @@ #define RAYLIB_NUKLEAR_IMPLEMENTATION #define NK_INCLUDE_VERTEX_BUFFER_OUTPUT //#define RAYLIB_NUKLEAR_DEFAULT_ARC_SEGMENTS 1 -//#pragma warning(disable: 4116) +#pragma warning(disable: 4116) #include "raylib-nuklear.h" Vector2 scaleDPI = { 1.0f, 1.0f }; #define M 10 #define N 15 -#define HEIGHT (int)(50 * scaleDPI.y) -#define WIDTH (int)(50 * scaleDPI.x) -#define VOFFSET (int)(52 * scaleDPI.y) +//#define WIDTH (int)(50 * scaleDPI.x) +//#define HEIGHT (int)(50 * scaleDPI.y) +//#define VOFFSET (int)(52 * scaleDPI.y) -#define FWIDTH (float)WIDTH -#define FHEIGHT (float)HEIGHT +//#define FWIDTH (float)WIDTH +//#define FHEIGHT (float)HEIGHT #define PUREBLUE (Color) { 0, 0, 255, 255 } #define BLACKGRAY (Color) {30, 30, 30, 255} #define VSGREEN (Color) {78, 201, 176, 255} #define WATERBLUE CLITERAL(Color){200, 240, 255, 255} +int HEIGHT = 50; +int WIDTH = 50; +int VOFFSET = 52; +float FWIDTH; +float FHEIGHT; + // Коды ячеек: // 0 - свободна // 1 - @@ -503,15 +509,21 @@ void callNKErrorBoxes(struct nk_context* ctx) { #define CPSIZE 213 int main() { - //SetConfigFlags(FLAG_WINDOW_RESIZABLE); + //SetConfigFlags(FLAG_WINDOW_HIGHDPI); - InitWindow( 1280, 720, "lab16 with raylib"); - scaleDPI = GetWindowScaleDPI(); + InitWindow(N * WIDTH, M * HEIGHT + VOFFSET, "lab16 with raylib"); int monitor = GetCurrentMonitor(); int monitorCenterX = GetMonitorWidth(monitor) / 2; int monitorCenterY = GetMonitorHeight(monitor) / 2; + scaleDPI = GetWindowScaleDPI(); + WIDTH = (int)(WIDTH * scaleDPI.x); + HEIGHT = (int)(HEIGHT * scaleDPI.y); + VOFFSET = (int)(VOFFSET * scaleDPI.y); + FWIDTH = (float)WIDTH; + FHEIGHT = (float)HEIGHT; + int screenWidth = N * WIDTH; int screenHeight = M * HEIGHT + VOFFSET; float screenWidthF = (float)screenWidth; diff --git a/parallel for loop from stackexchange/parallel for loop from stackexchange.sln b/parallel for loop from stackexchange/parallel for loop from stackexchange.sln new file mode 100644 index 0000000..e97d539 --- /dev/null +++ b/parallel for loop from stackexchange/parallel for loop from stackexchange.sln @@ -0,0 +1,28 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.12.35527.113 d17.12 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "parallel_for", "parallel_for\parallel_for.vcxproj", "{C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}.Debug|x64.ActiveCfg = Debug|x64 + {C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}.Debug|x64.Build.0 = Debug|x64 + {C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}.Debug|x86.ActiveCfg = Debug|Win32 + {C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}.Debug|x86.Build.0 = Debug|Win32 + {C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}.Release|x64.ActiveCfg = Release|x64 + {C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}.Release|x64.Build.0 = Release|x64 + {C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}.Release|x86.ActiveCfg = Release|Win32 + {C3792CE7-8D1B-41EB-AB2C-883F1EEF1923}.Release|x86.Build.0 = Release|Win32 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/parallel for loop from stackexchange/parallel_for/parallel_for.vcxproj b/parallel for loop from stackexchange/parallel_for/parallel_for.vcxproj new file mode 100644 index 0000000..ab0973d --- /dev/null +++ b/parallel for loop from stackexchange/parallel_for/parallel_for.vcxproj @@ -0,0 +1,140 @@ + + + + + Debug + Win32 + + + Release + Win32 + + + Debug + x64 + + + Release + x64 + + + + 17.0 + Win32Proj + {c3792ce7-8d1b-41eb-ab2c-883f1eef1923} + parallelfor + 10.0 + + + + Application + true + v143 + Unicode + + + Application + false + v143 + true + Unicode + + + Application + true + v143 + Unicode + + + Application + false + v143 + true + Unicode + + + + + + + + + + + + + + + + + + + + + + Level3 + true + WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + + + Console + true + + + + + Level3 + true + true + true + WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + + + Console + true + true + true + + + + + Level3 + true + _DEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + + + Console + true + + + + + Level3 + true + true + true + NDEBUG;_CONSOLE;%(PreprocessorDefinitions) + true + + + Console + true + true + true + + + + + + + + + + + + + + \ No newline at end of file diff --git a/parallel for loop from stackexchange/parallel_for/parallel_for.vcxproj.filters b/parallel for loop from stackexchange/parallel_for/parallel_for.vcxproj.filters new file mode 100644 index 0000000..ee6793f --- /dev/null +++ b/parallel for loop from stackexchange/parallel_for/parallel_for.vcxproj.filters @@ -0,0 +1,33 @@ + + + + + {4FC737F1-C7A5-4376-A066-2A32D752A2FF} + cpp;c;cc;cxx;c++;cppm;ixx;def;odl;idl;hpj;bat;asm;asmx + + + {93995380-89BD-4b04-88EB-625FBE52EBFB} + h;hh;hpp;hxx;h++;hm;inl;inc;ipp;xsd + + + {67DA6AB6-F800-4c08-8B7A-83BB121AAD01} + rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav;mfcribbon-ms + + + + + Исходные файлы + + + Исходные файлы + + + Исходные файлы + + + + + Файлы заголовков + + + \ No newline at end of file diff --git a/parallel for loop from stackexchange/src/parallel_for.c b/parallel for loop from stackexchange/src/parallel_for.c new file mode 100644 index 0000000..8dea67f --- /dev/null +++ b/parallel for loop from stackexchange/src/parallel_for.c @@ -0,0 +1,463 @@ +#include "parallel_for.h" + +#if defined(__APPLE__) || defined(__linux__) +#define POSIX +#elif defined(_WIN32) + +#else +#error "Platform not supported." +#endif + +#include +#include + +#ifdef POSIX +#include +#include +#else +#include +#endif + +/****************************************************************************** +* A task descriptor specifying the input element and the address at which the * +* output element should be stored. * +******************************************************************************/ +typedef struct task_descriptor { + void* input_element; + void** output_element_address; +} task_descriptor; + +/************************************************************ +* This structure implements a concurrent array-based queue. * +************************************************************/ +typedef struct concurrent_queue { + +#ifdef POSIX + pthread_mutex_t mutex; +#elif defined(_WIN32) + CRITICAL_SECTION criticalSection; +#endif + + task_descriptor** array; + size_t begin_index; + size_t end_index; + size_t size; + size_t len; +} concurrent_queue; + +/************************************************************ +* Initializes the input concurrent queue to an empty state. * +************************************************************/ +static int concurrent_queue_init(concurrent_queue* queue, size_t len) +{ + int ret; + queue->array = malloc(len * sizeof(*queue->array)); + + if (queue->array == NULL) + { + return ERROR_FORP_MALLOC_FAIL; + } + + queue->begin_index = 0; + queue->end_index = 0; + queue->size = 0; + queue->len = len; + +#ifdef POSIX + + ret = pthread_mutex_init(&queue->mutex, NULL); + + if (ret != 0) + { + return ERROR_FORP_NO_MUTEX_INIT; + } + +#else + + InitializeCriticalSection(&queue->criticalSection); + +#endif + + return ERROR_FORP_SUCCESS; +} + +/****************************************************** +* Appends a task descriptor to the tail of the queue. * +******************************************************/ +static void concurrent_queue_enqueue(concurrent_queue* queue, + task_descriptor* descriptor) +{ + queue->array[queue->end_index] = descriptor; + queue->end_index++; + queue->size++; +} + +/****************************************************************************** +* Removes the head element from the queue. Unlike all other functions related * +* to the queue, this is one is thread-safe. * +******************************************************************************/ +static task_descriptor* concurrent_queue_dequeue(concurrent_queue* queue) +{ + task_descriptor* descriptor; + +#ifdef POSIX + pthread_mutex_lock(&queue->mutex); +#else + EnterCriticalSection(&queue->criticalSection); +#endif + + if (queue->size > 0) + { + descriptor = queue->array[queue->begin_index]; + queue->begin_index++; + queue->size--; + } + else + { + descriptor = NULL; + } + +#ifdef POSIX + pthread_mutex_unlock(&queue->mutex); +#else + LeaveCriticalSection(&queue->criticalSection); +#endif + + return descriptor; +} + +/***************************************************************************** +* Releases all the resources occupied by the queue, or namely, the mutex and * +* the array. * +*****************************************************************************/ +static int concurrent_queue_destroy(concurrent_queue* queue) +{ + int ret; + size_t i; + + for (i = 0; i < queue->len; i++) + { + free(queue->array[i]); + } + + free(queue->array); + +#ifdef POSIX + ret = pthread_mutex_destroy(&queue->mutex); + return ret == 0 ? ERROR_FORP_SUCCESS : ERROR_FORP_NO_MUTEX_DESTROY; +#else + DeleteCriticalSection(&queue->criticalSection); + return ERROR_FORP_SUCCESS; +#endif + +} + +/******************************************************* +* Returns the number of processors on Mac OS or Linux. * +*******************************************************/ +static int get_number_of_processors_apple_linux(size_t* p_number_of_processors) +{ +#ifdef POSIX + * p_number_of_processors = (size_t)sysconf(_SC_NPROCESSORS_ONLN); +#endif + + return ERROR_FORP_SUCCESS; +} + +/*********************************************** +* Returns the number of processors on Windows. * +***********************************************/ +static int get_number_of_processors_windows(size_t* p_number_of_processors) +{ +#ifdef _WIN32 + SYSTEM_INFO si; + GetSystemInfo(&si); + *p_number_of_processors = (size_t)2 * si.dwNumberOfProcessors; +#endif + + return ERROR_FORP_SUCCESS; +} + +/************************************************************** +* A portable function for returning the number of processors. * +**************************************************************/ +static int get_number_of_processors(size_t* p_number_of_processors) +{ +#ifdef POSIX + return get_number_of_processors_apple_linux(p_number_of_processors); +#else + return get_number_of_processors_windows(p_number_of_processors); +#endif +} + +/***************************************************************************** +* Specifies the worker thread arguments. Holds the queue and the function to * +* be applied to each queue element. * +*****************************************************************************/ +typedef struct worker_thread_proc_args { + concurrent_queue* queue; + void* (*func)(void*); + int return_status; +} worker_thread_proc_args; + +/********************************* +* Implements the worker threads. * +*********************************/ +static void* worker_thread_proc(void* args) +{ + worker_thread_proc_args* worker_thread_proc_arguments = + (worker_thread_proc_args*)args; + + concurrent_queue* queue = worker_thread_proc_arguments->queue; + void* (*func)(void*) = worker_thread_proc_arguments->func; + task_descriptor* task_desc; + int ret = 0; + +#ifdef POSIX + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); +#endif + + if (ret != 0) + { + worker_thread_proc_arguments->return_status = ret; + return NULL; + } + else + { + worker_thread_proc_arguments->return_status = 0; + } + + while ((task_desc = concurrent_queue_dequeue(queue)) != NULL) + { + *task_desc->output_element_address = func(task_desc->input_element); + } + + return NULL; +} + +/*************************************** +* Cancels all the first 'len' threads. * +***************************************/ +#ifdef POSIX +static void cancel_threads(pthread_t* pthreads, size_t len) +#else +static void cancel_threads(HANDLE* threads, size_t len) +#endif +{ + size_t i; + + for (i = 0; i < len; ++i) + { + +#ifdef POSIX + pthread_cancel(pthreads[i]); +#else + TerminateThread(threads[i], 0); +#endif + + } +} + +/*********************************************************** +* The actual implementation of the parallel for construct. * +***********************************************************/ +int forp(void** input, void** output, size_t len, void* (*func)(void*)) +{ + size_t number_of_cores; + size_t szi; + int ret; + int join_ret = ERROR_FORP_SUCCESS; + concurrent_queue queue; + task_descriptor* task_desc; + worker_thread_proc_args* wtpa; + +#ifdef POSIX + + pthread_t* threads; + +#else + + HANDLE* threads; + +#endif + + if (input == NULL || output == NULL || func == NULL) + { + return ERROR_FORP_NO_ARGS; + } + + if (len == 0) + { + /***************** + * Nothing to do. * + *****************/ + return ERROR_FORP_SUCCESS; + } + + ret = get_number_of_processors(&number_of_cores); + + if (ret != ERROR_FORP_SUCCESS) + { + return ret; + } + + if (number_of_cores == 0) + { + return ERROR_FORP_UNKNOWN_CORES; + } + + if ((ret = concurrent_queue_init(&queue, len)) != ERROR_FORP_SUCCESS) + { + return ret; + } + + /************************************** + * Create a concurrent queue of tasks. * + **************************************/ + for (szi = 0; szi < len; szi++) + { + task_desc = malloc(sizeof * task_desc); + + if (task_desc == NULL) + { + concurrent_queue_destroy(&queue); + return ERROR_FORP_MALLOC_FAIL; + } + + task_desc->input_element = input[szi]; + task_desc->output_element_address = &output[szi]; + concurrent_queue_enqueue(&queue, task_desc); + + if (ret != ERROR_FORP_SUCCESS) + { + concurrent_queue_destroy(&queue); + return ret; + } + } + + /***************************** + * Create the worker threads. * + *****************************/ + threads = malloc(number_of_cores * sizeof(*threads)); + + if (threads == NULL) + { + concurrent_queue_destroy(&queue); + return ERROR_FORP_MALLOC_FAIL; + } + + wtpa = malloc(number_of_cores * sizeof(*wtpa)); + + if (wtpa == NULL) + { + free(threads); + concurrent_queue_destroy(&queue); + return ERROR_FORP_MALLOC_FAIL; + } + + for (szi = 0; szi < number_of_cores; szi++) + { + wtpa[szi].queue = &queue; + wtpa[szi].func = func; + wtpa[szi].return_status = 0; + +#ifdef POSIX + ret = pthread_create(&threads[szi], + NULL, + worker_thread_proc, + &wtpa[szi]); +#else + threads[szi] = CreateThread(NULL, + 100000, + (LPTHREAD_START_ROUTINE)worker_thread_proc, + (LPVOID)&wtpa[szi], + 0, + NULL); +#endif + + if (ret != 0) + { + cancel_threads(threads, szi); + concurrent_queue_destroy(&queue); + return ERROR_FORP_NO_THREAD; + } + + if (wtpa[szi].return_status != 0) + { + cancel_threads(threads, szi + 1); + concurrent_queue_destroy(&queue); + return ERROR_FORP_NO_SETCANCELTYPE; + } + } + + /*********************************************** + * Wait for all the worker threads to complete. * + ***********************************************/ + for (szi = 0; szi < number_of_cores; szi++) + { +#ifdef _WIN32 + + if (WaitForSingleObject(threads[szi], INFINITE) != 0 && join_ret == 0) + { + join_ret = ERROR_FORP_NO_JOIN; + } +#else + join_ret = pthread_join(threads[szi], NULL); + + if (ret != 0 && join_ret == ERROR_FORP_SUCCESS) + { + join_ret = ERROR_FORP_NO_JOIN; + } +#endif + } + + return join_ret; +} + +const char* forp_error(int error_code) +{ + switch (error_code) + { + case ERROR_FORP_SUCCESS: + return "forp succeeded."; + + case ERROR_FORP_NO_ARGS: + return "Some arguments missing."; + + case ERROR_FORP_NO_JOIN: + return "Could not join a thread."; + + case ERROR_FORP_CPU_FEOF: + return "Reached EOF while reading the number of processors."; + + case ERROR_FORP_NO_THREAD: + return "Could create a thread."; + + case ERROR_FORP_CPU_FERROR: + return "An error occured while reading the number of processors."; + + case ERROR_FORP_POPEN_FAIL: + return "Could not execute a program in popen."; + + case ERROR_FORP_MALLOC_FAIL: + return "A call to malloc returned NULL."; + + case ERROR_FORP_SSCANF_FAIL: + return "sscanf failed."; + + case ERROR_FORP_NO_MUTEX_INIT: + return "Could not initialize a mutex."; + + case ERROR_FORP_NO_MUTEX_DESTROY: + return "Could not destroy a mutex."; + + case ERROR_FORP_UNKNOWN_CORES: + return "Could not determine the number of processors."; + + case ERROR_FORP_NO_SETCANCELTYPE: + return "setcanceltype failed."; + + default: + return "Unknown error code."; + } +} \ No newline at end of file diff --git a/parallel for loop from stackexchange/src/parallel_for.h b/parallel for loop from stackexchange/src/parallel_for.h new file mode 100644 index 0000000..060b294 --- /dev/null +++ b/parallel for loop from stackexchange/src/parallel_for.h @@ -0,0 +1,34 @@ +#ifndef PARALLEL_FOR_H +#define PARALLEL_FOR_H + +#include + +#define ERROR_FORP_SUCCESS 0 +#define ERROR_FORP_NO_ARGS 1 +#define ERROR_FORP_UNKNOWN_CORES 2 +#define ERROR_FORP_NO_MUTEX_INIT 3 +#define ERROR_FORP_NO_MUTEX_DESTROY 4 +#define ERROR_FORP_MALLOC_FAIL 5 +#define ERROR_FORP_SSCANF_FAIL 6 +#define ERROR_FORP_POPEN_FAIL 7 +#define ERROR_FORP_CPU_FEOF 8 +#define ERROR_FORP_CPU_FERROR 9 +#define ERROR_FORP_NO_THREAD 10 +#define ERROR_FORP_NO_SETCANCELTYPE 11 +#define ERROR_FORP_NO_JOIN 12 + +/******************************************************************************* +* Runs a multithreaded for loop over the input array producing the results and * +* storing them in the output array. * +*******************************************************************************/ +int forp(void** input, + void** output, + size_t len, + void* (*func)(void*)); + +/************************************************************************* +* Returns a human-readable description of an error code related to forp. * +*************************************************************************/ +const char* forp_error(int error_code); + +#endif /* PARALLEL_FOR_H */ \ No newline at end of file diff --git a/parallel for loop from stackexchange/src/simpler test.c b/parallel for loop from stackexchange/src/simpler test.c new file mode 100644 index 0000000..877dc70 --- /dev/null +++ b/parallel for loop from stackexchange/src/simpler test.c @@ -0,0 +1,33 @@ +#include +#include "parallel_for.h" + +int mult2(int x) { + return x * 2; +} + +static void* mult2_parallel(void* arg) +{ + int* pa = (int*) arg; + int* result = malloc(sizeof(*result)); + *result = mult2(*pa); + return result; +} + +int main() { + int nums[4] = { 1, 2, 3, 4 }; + void* input[4]; + void* output[4]; + + input[0] = &nums[0]; + input[1] = &nums[1]; + input[2] = &nums[2]; + input[3] = &nums[3]; + + forp(input, output, 4, mult2_parallel); + + for (int i = 0; i < 4; i++) { + printf("%d\n", *(int*)output[i]); + } + + return 0; +} \ No newline at end of file diff --git a/parallel for loop from stackexchange/src/test.c b/parallel for loop from stackexchange/src/test.c new file mode 100644 index 0000000..54fc597 --- /dev/null +++ b/parallel for loop from stackexchange/src/test.c @@ -0,0 +1,137 @@ +#include "parallel_for.h" +#include +#include +#include + +#if defined(__APPLE__) || defined(__linux__) +#define POSIX +#endif + +#if defined(POSIX) +#include +#elif defined(_WIN32) +#include +#else +#error "Platform not supported." +#endif + +/********************************* +* Implements a dummy heavy task. * +*********************************/ +static unsigned long long fibonacci(unsigned long long num) +{ + switch (num) + { + case 0: + return 0; + + case 1: + return 1; + + default: + return fibonacci(num - 1) + fibonacci(num - 2); + } +} + +/******************************* +* The worker thread procedure. * +*******************************/ +static void* fibonacci_func(void* arg) +{ + unsigned long long* pa = (unsigned long long*) arg; + unsigned long long* result = malloc(sizeof(*result)); + *result = fibonacci(*pa); + return result; +} + +/************************************** +* Populates randomly the input array. * +**************************************/ +static void populate_input_randomly(void** input_array, size_t len) +{ + unsigned long long* input_datum; + size_t i; + + for (i = 0; i < len; i++) + { + input_datum = malloc(sizeof(unsigned long long)); + *input_datum = 20 + rand() % 21; + input_array[i] = input_datum; + } +} + +/*************************** +* Prints the output array. * +***************************/ +static void print_output(void** output, size_t len) +{ + void* raw_datum; + unsigned long long datum; + size_t i; + char* separator = ""; + printf("["); + + for (i = 0; i < len; i++) { + printf("%s", separator); + separator = ", "; + raw_datum = output[i]; + datum = *((unsigned long long*) raw_datum); + printf("%llu", datum); + } + + puts("]"); +} + +/************************************************************** +* Returns a current millisecond count. Used for benchmarking. * +**************************************************************/ +static unsigned long long get_milliseconds() +{ +#ifdef POSIX + struct timeval tv; + gettimeofday(&tv, NULL); + return 1000 * tv.tv_sec + tv.tv_usec / 1000; +#else + return (unsigned long long) GetTickCount64(); +#endif +} + +#define N 100 + +int main(int argc, const char* argv[]) { + void* input[N]; + void* output[N]; + unsigned long long start; + unsigned long long end; + size_t i; + int error_code; + + srand((unsigned int)time(NULL)); + populate_input_randomly(input, N); + + start = get_milliseconds(); + error_code = forp(input, output, N, fibonacci_func); + end = get_milliseconds(); + + print_output(output, N); + printf("Parallel for took %llu milliseconds. Error message: %s\n\n", + end - start, + forp_error(error_code)); + + start = get_milliseconds(); + + for (i = 0; i < N; i++) + { + output[i] = fibonacci_func(input[i]); + } + + end = get_milliseconds(); + print_output(output, N); + printf("Sequential for took %llu milliseconds.\n", end - start); + +#ifdef _WIN32 + getchar(); +#endif + + return 0; +} \ No newline at end of file