Skip to content

Commit

Permalink
OpenMP locks instead of busy-waiting with NUM_PARALLEL
Browse files Browse the repository at this point in the history
  • Loading branch information
shivammonaka committed Feb 12, 2024
1 parent b1ae777 commit 7b302ac
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 88 deletions.
7 changes: 0 additions & 7 deletions Makefile.rule
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ VERSION = 0.3.26.dev
# footprint penalty, even if users reduce the actual number of threads at runtime.
# NUM_THREADS = 24

# If you have enabled USE_OPENMP and your application would call
# OpenBLAS's calculation API from multiple threads, please comment this in.
# This flag defines how many instances of OpenBLAS's calculation API can actually
# run in parallel. If more than NUM_PARALLEL threads call OpenBLAS's calculation API,
# they need to wait for the preceding API calls to finish or risk data corruption.
# NUM_PARALLEL = 2

# When multithreading, OpenBLAS needs to use a memory buffer for communicating
# and collating results for individual subranges of the original matrix. Since
# the original GotoBLAS of the early 2000s, the default size of this buffer has
Expand Down
7 changes: 0 additions & 7 deletions Makefile.system
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,6 @@ HAVE_FMA3=
include $(TOPDIR)/Makefile_kernel.conf
endif


ifndef NUM_PARALLEL
NUM_PARALLEL = 1
endif

ifndef NUM_THREADS
NUM_THREADS = $(NUM_CORES)
endif
Expand Down Expand Up @@ -1476,8 +1471,6 @@ endif

CCOMMON_OPT += -DMAX_CPU_NUMBER=$(NUM_THREADS)

CCOMMON_OPT += -DMAX_PARALLEL_NUMBER=$(NUM_PARALLEL)

ifdef USE_SIMPLE_THREADED_LEVEL3
CCOMMON_OPT += -DUSE_SIMPLE_THREADED_LEVEL3
endif
Expand Down
5 changes: 0 additions & 5 deletions cmake/system.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ if (NOT CMAKE_CROSSCOMPILING)

endif()

if (NOT DEFINED NUM_PARALLEL)
set(NUM_PARALLEL 1)
endif()

if (NOT DEFINED NUM_THREADS)
if (DEFINED NUM_CORES AND NOT NUM_CORES EQUAL 0)
Expand Down Expand Up @@ -475,8 +472,6 @@ endif ()

set(CCOMMON_OPT "${CCOMMON_OPT} -DMAX_CPU_NUMBER=${NUM_THREADS}")

set(CCOMMON_OPT "${CCOMMON_OPT} -DMAX_PARALLEL_NUMBER=${NUM_PARALLEL}")

if (BUFFERSIZE)
set(CCOMMON_OPT "${CCOMMON_OPT} -DBUFFERSIZE=${BUFFERSIZE}")
endif ()
Expand Down
2 changes: 1 addition & 1 deletion common.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ extern "C" {

#define ALLOCA_ALIGN 63UL

#define NUM_BUFFERS MAX(50,(MAX_CPU_NUMBER * 2 * MAX_PARALLEL_NUMBER))
#define NUM_BUFFERS MAX(50,(MAX_CPU_NUMBER * 2))

#ifdef NEEDBUNDERSCORE
#define BLASFUNC(FUNC) FUNC##_
Expand Down
46 changes: 30 additions & 16 deletions driver/level3/level3_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -540,18 +540,32 @@ static int round_up(int remainder, int width, int multiple)
return width;
}


static int gemm_driver(blas_arg_t *args, BLASLONG *range_m, BLASLONG
*range_n, IFLOAT *sa, IFLOAT *sb,
BLASLONG nthreads_m, BLASLONG nthreads_n) {

#ifndef USE_OPENMP
#ifndef OS_WINDOWS
static pthread_mutex_t level3_lock = PTHREAD_MUTEX_INITIALIZER;
#else
#ifdef USE_OPENMP
static omp_lock_t level3_lock;
static volatile BLASLONG omp_lock_initialized = 0;
static BLASULONG critical_section_lock = 0;

while(omp_lock_initialized == 0)
{
blas_lock(&critical_section_lock);
{
if(omp_lock_initialized == 0)
{
omp_init_lock(&level3_lock);
omp_lock_initialized = 1;
}
blas_unlock(&critical_section_lock);
}
}
#elif OS_WINDOWS
CRITICAL_SECTION level3_lock;
InitializeCriticalSection((PCRITICAL_SECTION)&level3_lock);
#endif
#else
static pthread_mutex_t level3_lock = PTHREAD_MUTEX_INITIALIZER;
#endif

blas_arg_t newarg;
Expand Down Expand Up @@ -599,12 +613,12 @@ InitializeCriticalSection((PCRITICAL_SECTION)&level3_lock);
#endif
#endif

#ifndef USE_OPENMP
#ifndef OS_WINDOWS
pthread_mutex_lock(&level3_lock);
#else
#ifdef USE_OPENMP
omp_set_lock(&level3_lock);
#elif OS_WINDOWS
EnterCriticalSection((PCRITICAL_SECTION)&level3_lock);
#endif
#else
pthread_mutex_lock(&level3_lock);
#endif

#ifdef USE_ALLOC_HEAP
Expand Down Expand Up @@ -732,12 +746,12 @@ EnterCriticalSection((PCRITICAL_SECTION)&level3_lock);
free(job);
#endif

#ifndef USE_OPENMP
#ifndef OS_WINDOWS
pthread_mutex_unlock(&level3_lock);
#else
#ifdef USE_OPENMP
omp_unset_lock(&level3_lock);
#elif OS_WINDOWS
LeaveCriticalSection((PCRITICAL_SECTION)&level3_lock);
#endif
#else
pthread_mutex_unlock(&level3_lock);
#endif

return 0;
Expand Down
74 changes: 22 additions & 52 deletions driver/others/blas_server_omp.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,27 @@ int blas_omp_threads_local = 1;

extern int openblas_omp_adaptive_env(void);

static void * blas_thread_buffer[MAX_PARALLEL_NUMBER][MAX_CPU_NUMBER];
#ifdef HAVE_C11
static atomic_bool blas_buffer_inuse[MAX_PARALLEL_NUMBER];
#else
static _Bool blas_buffer_inuse[MAX_PARALLEL_NUMBER];
#endif
static void * blas_thread_buffer[MAX_CPU_NUMBER];

static void adjust_thread_buffers(void) {

int i=0, j=0;
int i=0;

// Add assertion to ensure blas_cpu_number is within valid range
assert(0 <= blas_cpu_number && blas_cpu_number <= MAX_CPU_NUMBER);

//adjust buffer for each thread
for(i=0; i < MAX_PARALLEL_NUMBER; i++) {
for(j=0; j < blas_cpu_number; j++){
if(blas_thread_buffer[i][j] == NULL){
blas_thread_buffer[i][j] = blas_memory_alloc(2);
for(i=0; i < blas_cpu_number; i++){
if(blas_thread_buffer[i] == NULL){
blas_thread_buffer[i] = blas_memory_alloc(2);
}
}
for(; j < MAX_CPU_NUMBER; j++){
if(blas_thread_buffer[i][j] != NULL){
blas_memory_free(blas_thread_buffer[i][j]);
blas_thread_buffer[i][j] = NULL;
for(; i < MAX_CPU_NUMBER; i++){
if(blas_thread_buffer[i] != NULL){
blas_memory_free(blas_thread_buffer[i]);
blas_thread_buffer[i] = NULL;
}
}
}
}

void goto_set_num_threads(int num_threads) {
Expand Down Expand Up @@ -147,17 +143,15 @@ extern int openblas_omp_num_threads_env(void);
}

int BLASFUNC(blas_thread_shutdown)(void){
int i=0, j=0;
int i=0;
blas_server_avail = 0;

for(i=0; i<MAX_PARALLEL_NUMBER; i++) {
for(j=0; j<MAX_CPU_NUMBER; j++){
if(blas_thread_buffer[i][j]!=NULL){
blas_memory_free(blas_thread_buffer[i][j]);
blas_thread_buffer[i][j]=NULL;
for(i=0; i<MAX_CPU_NUMBER; i++){
if(blas_thread_buffer[i]!=NULL){
blas_memory_free(blas_thread_buffer[i]);
blas_thread_buffer[i]=NULL;
}
}
}

return 0;
}
Expand Down Expand Up @@ -285,7 +279,7 @@ static void legacy_exec(void *func, int mode, blas_arg_t *args, void *sb){
}
}

static void exec_threads(blas_queue_t *queue, int buf_index){
static void exec_threads(blas_queue_t *queue){

void *buffer, *sa, *sb;
int pos=0, release_flag=0;
Expand All @@ -306,7 +300,7 @@ static void exec_threads(blas_queue_t *queue, int buf_index){
if ((sa == NULL) && (sb == NULL) && ((queue -> mode & BLAS_PTHREAD) == 0)) {

pos = omp_get_thread_num();
buffer = blas_thread_buffer[buf_index][pos];
buffer = blas_thread_buffer[pos];

//fallback
if(buffer==NULL) {
Expand Down Expand Up @@ -392,7 +386,7 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){
// Handle lazy re-init of the thread-pool after a POSIX fork
if (unlikely(blas_server_avail == 0)) blas_thread_init();

BLASLONG i, buf_index;
BLASLONG i;

if ((num <= 0) || (queue == NULL)) return 0;

Expand All @@ -407,23 +401,6 @@ int exec_blas(BLASLONG num, blas_queue_t *queue){
}
#endif

while(true) {
for(i=0; i < MAX_PARALLEL_NUMBER; i++) {
#ifdef HAVE_C11
_Bool inuse = false;
if(atomic_compare_exchange_weak(&blas_buffer_inuse[i], &inuse, true)) {
#else
if(blas_buffer_inuse[i] == false) {
blas_buffer_inuse[i] = true;
#endif
buf_index = i;
break;
}
}
if(i != MAX_PARALLEL_NUMBER)
break;
}

if (openblas_omp_adaptive_env() != 0) {
#pragma omp parallel for num_threads(num) schedule(OMP_SCHED)
for (i = 0; i < num; i ++) {
Expand All @@ -432,7 +409,7 @@ if (openblas_omp_adaptive_env() != 0) {
queue[i].position = i;
#endif

exec_threads(&queue[i], buf_index);
exec_threads(&queue[i]);
}
} else {
#pragma omp parallel for schedule(OMP_SCHED)
Expand All @@ -442,16 +419,9 @@ if (openblas_omp_adaptive_env() != 0) {
queue[i].position = i;
#endif

exec_threads(&queue[i], buf_index);
exec_threads(&queue[i]);
}
}

#ifdef HAVE_C11
atomic_store(&blas_buffer_inuse[buf_index], false);
#else
blas_buffer_inuse[buf_index] = false;
#endif

return 0;
}

Expand Down

0 comments on commit 7b302ac

Please sign in to comment.