/* * Demo code of a Lock Mechanism for BlitzDB to gain maximum * throughput from Tokyo Cabinet. This implementation is inspired * by pthread's rwlock implementation. * * Quick Start: * $ g++ -Wall -pedantic blitzlock.cc -lpthread && ./a.out * * Copyright 2009 Toru Maesaka */ #include #include #include #include #include class BlitzLock { private: int scanner_count; int updater_count; pthread_mutex_t mutex; pthread_cond_t condition; public: BlitzLock(); ~BlitzLock(); bool init(); bool destroy(); void update_begin(); void update_end(); void scan_begin(); void scan_end(); void scan_update_begin(); void scan_update_end(); }; BlitzLock::BlitzLock() : scanner_count(0), updater_count(0) { pthread_cond_init(&condition, NULL); pthread_mutex_init(&mutex, NULL); } BlitzLock::~BlitzLock() { pthread_cond_destroy(&condition); pthread_mutex_destroy(&mutex); } void BlitzLock::update_begin() { pthread_mutex_lock(&mutex); while (true) { if (scanner_count < 1) { updater_count++; pthread_mutex_unlock(&mutex); return; } pthread_cond_wait(&condition, &mutex); } pthread_mutex_unlock(&mutex); } void BlitzLock::update_end() { pthread_mutex_lock(&mutex); updater_count--; assert(updater_count >= 0); if (updater_count == 0) pthread_cond_broadcast(&condition); pthread_mutex_unlock(&mutex); } void BlitzLock::scan_begin() { pthread_mutex_lock(&mutex); while (true) { if (updater_count == 0) { scanner_count++; pthread_mutex_unlock(&mutex); return; } pthread_cond_wait(&condition, &mutex); } pthread_mutex_unlock(&mutex); } void BlitzLock::scan_end() { pthread_mutex_lock(&mutex); scanner_count--; assert(scanner_count >= 0); if (scanner_count == 0) pthread_cond_broadcast(&condition); pthread_mutex_unlock(&mutex); } void BlitzLock::scan_update_begin() { pthread_mutex_lock(&mutex); while (true) { if (scanner_count == 0 && updater_count == 0) { scanner_count++; updater_count++; pthread_mutex_unlock(&mutex); return; } pthread_cond_wait(&condition, &mutex); } pthread_mutex_unlock(&mutex); } void BlitzLock::scan_update_end() { pthread_mutex_lock(&mutex); scanner_count--; updater_count--; assert(scanner_count >= 0 && updater_count >= 0); /* All other threads are guaranteed to be waiting so broadcast regardless. */ pthread_cond_broadcast(&condition); pthread_mutex_unlock(&mutex); } /*---- TEST CODE FROM HERE ----*/ #define DEFAULT_RAND_SEED 149 #define MAX_WORKERS 10 #define ITERATIONS 200 static int rand_usecs(int max_time) { assert(max_time > 0); return random() % max_time; } // Assumes reading takes longer than updating the table state static void scan(BlitzLock *lock) { assert(lock); int process_time = rand_usecs(30000); lock->scan_begin(); fprintf(stderr, "scanner: %dus\n", process_time); usleep(process_time); lock->scan_end(); } static void update(BlitzLock *lock) { assert(lock); int process_time = rand_usecs(20000); lock->update_begin(); fprintf(stderr, "\t\tupdater: %dus\n", process_time); usleep(process_time); lock->update_end(); } static void scan_update(BlitzLock *lock) { assert(lock); int process_time = rand_usecs(20000); lock->scan_update_begin(); fprintf(stderr, "\t\t\t\tscanner+updater: %dus\n", process_time); usleep(process_time); lock->scan_update_end(); } static void *worker(void *arg) { assert(arg); BlitzLock *lock = (BlitzLock *)arg; int pivot; for (int i = 0; i < ITERATIONS; i++) { pivot = random() % 100; if (pivot <= 33) scan(lock); else if (pivot > 33 && pivot <= 66) update(lock); else scan_update(lock); } return NULL; } int main(void) { BlitzLock *semaphore = new BlitzLock(); pthread_t workers[MAX_WORKERS]; pthread_attr_t joinable; pthread_attr_init(&joinable); pthread_attr_setdetachstate(&joinable, PTHREAD_CREATE_JOINABLE); srandom(DEFAULT_RAND_SEED); for (int i = 0; i < MAX_WORKERS; i++) pthread_create(&workers[i], &joinable, worker, (void*)semaphore); for (int j = 0; j < MAX_WORKERS; j++) pthread_join(workers[j], NULL); delete semaphore; return 0; }