Spaces:
Running
Running
| const int PORT = 7860; | |
| const std::string ORG = "velost"; | |
| const std::string BUCKET = "orgs"; | |
| const std::string STATUS_BUCKET = "status"; | |
| const std::string BASE_URL = "https://huggingface.co"; | |
| const int REFRESH_INTERVAL = 12 * 60 * 60; | |
| const int MAX_CONCURRENT_PER_GROUP = 10; | |
| const int MAX_RPS = 50; | |
| const std::string MY_NAME = "3"; | |
| const std::string RPS_FILE = "/status/rps_limits.parquet"; | |
| // Forward declarations | |
| class Semaphore; | |
| class ThreadPool; | |
| class RPSLimiter; | |
| size_t WriteCallback(void* contents, size_t size, size_t nmemb, std::string* output); | |
| CURL* getThreadLocalCurl(); | |
| std::pair<int, std::string> fetchOrgsFile(const std::string& url); | |
| std::pair<int, std::string> fetchFullFile(const std::string& url); | |
| std::pair<int, std::string> fetchSizeFile(const std::string& url); | |
| std::vector<std::string> parseOrgs(const std::string& data); | |
| void bundledFetchOrgs(); | |
| bool hasAllNumbers(const std::vector<int>& numbers); | |
| Json::Value bundledFetchCheck(); | |
| std::string jsonToString(const Json::Value& json); | |
| std::string jsonToJsonl(const Json::Value& json); | |
| void runServer(); | |
| Json::Value getRPSStatus(); | |
| // ============================================ | |
| // RPSLimiter Class | |
| // ============================================ | |
| class RPSLimiter { | |
| private: | |
| std::mutex mtx; | |
| std::chrono::steady_clock::time_point windowStart; | |
| std::atomic<int> requestCount{0}; | |
| int maxRps; | |
| std::string name; | |
| std::string filePath; | |
| public: | |
| RPSLimiter(int maxRequestsPerSecond, const std::string& myName, const std::string& path) | |
| : maxRps(maxRequestsPerSecond), name(myName), filePath(path) { | |
| windowStart = std::chrono::steady_clock::now(); | |
| } | |
| bool checkAndAdd() { | |
| auto now = std::chrono::steady_clock::now(); | |
| std::lock_guard<std::mutex> lock(mtx); | |
| auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - windowStart).count(); | |
| if (elapsed >= 1) { | |
| windowStart = now; | |
| requestCount = 0; | |
| } | |
| int currentCount = ++requestCount; | |
| if (currentCount > maxRps) { | |
| addNameToParquet(); | |
| return false; | |
| } | |
| if (currentCount <= maxRps) { | |
| removeNameFromParquet(); | |
| } | |
| return true; | |
| } | |
| void waitIfNeeded() { | |
| while (!checkAndAdd()) { | |
| std::this_thread::sleep_for(std::chrono::milliseconds(100)); | |
| } | |
| } | |
| private: | |
| void ensureFileExists() { | |
| int fd = open(filePath.c_str(), O_CREAT | O_RDWR, 0644); | |
| if (fd >= 0) { | |
| close(fd); | |
| } | |
| } | |
| std::vector<std::string> readNamesLocked(FILE* fp) { | |
| std::vector<std::string> names; | |
| rewind(fp); | |
| char line[256]; | |
| while (fgets(line, sizeof(line), fp)) { | |
| std::string s(line); | |
| s.erase(0, s.find_first_not_of(" \t\r\n")); | |
| s.erase(s.find_last_not_of(" \t\r\n") + 1); | |
| if (!s.empty()) names.push_back(s); | |
| } | |
| return names; | |
| } | |
| void writeNamesLocked(FILE* fp, const std::vector<std::string>& names) { | |
| rewind(fp); | |
| ftruncate(fileno(fp), 0); | |
| for (size_t i = 0; i < names.size(); i++) { | |
| fputs(names[i].c_str(), fp); | |
| if (i < names.size() - 1) { | |
| fputs("\n", fp); | |
| } | |
| } | |
| fflush(fp); | |
| } | |
| void addNameToParquet() { | |
| ensureFileExists(); | |
| int fd = open(filePath.c_str(), O_RDWR); | |
| if (fd < 0) return; | |
| flock(fd, LOCK_EX); | |
| FILE* fp = fdopen(fd, "r+"); | |
| if (fp) { | |
| std::vector<std::string> names = readNamesLocked(fp); | |
| if (std::find(names.begin(), names.end(), name) == names.end()) { | |
| names.push_back(name); | |
| writeNamesLocked(fp, names); | |
| std::cout << "RPS limit reached! Added '" << name << "' to " << filePath << std::endl; | |
| } | |
| fclose(fp); | |
| } | |
| flock(fd, LOCK_UN); | |
| close(fd); | |
| } | |
| void removeNameFromParquet() { | |
| ensureFileExists(); | |
| int fd = open(filePath.c_str(), O_RDWR); | |
| if (fd < 0) return; | |
| flock(fd, LOCK_EX); | |
| FILE* fp = fdopen(fd, "r+"); | |
| if (fp) { | |
| std::vector<std::string> names = readNamesLocked(fp); | |
| auto it = std::find(names.begin(), names.end(), name); | |
| if (it != names.end()) { | |
| names.erase(it); | |
| writeNamesLocked(fp, names); | |
| std::cout << "RPS below limit. Removed '" << name << "' from " << filePath << std::endl; | |
| } | |
| fclose(fp); | |
| } | |
| flock(fd, LOCK_UN); | |
| close(fd); | |
| } | |
| }; | |
| // ============================================ | |
| // ThreadPool Class | |
| // ============================================ | |
| class ThreadPool { | |
| private: | |
| std::vector<std::thread> workers; | |
| std::queue<std::function<void()>> tasks; | |
| std::mutex queueMutex; | |
| std::condition_variable condition; | |
| bool stop; | |
| std::atomic<int> activeTasks{0}; | |
| public: | |
| ThreadPool(size_t threads) : stop(false) { | |
| for (size_t i = 0; i < threads; i++) { | |
| workers.emplace_back([this] { | |
| while (true) { | |
| std::function<void()> task; | |
| { | |
| std::unique_lock<std::mutex> lock(queueMutex); | |
| condition.wait(lock, [this] { return stop || !tasks.empty(); }); | |
| if (stop && tasks.empty()) return; | |
| task = std::move(tasks.front()); | |
| tasks.pop(); | |
| } | |
| activeTasks++; | |
| task(); | |
| activeTasks--; | |
| } | |
| }); | |
| } | |
| } | |
| void enqueue(std::function<void()> task) { | |
| { | |
| std::unique_lock<std::mutex> lock(queueMutex); | |
| tasks.push(std::move(task)); | |
| } | |
| condition.notify_one(); | |
| } | |
| void wait() { | |
| while (activeTasks > 0 || !tasks.empty()) { | |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); | |
| } | |
| } | |
| ~ThreadPool() { | |
| { | |
| std::unique_lock<std::mutex> lock(queueMutex); | |
| stop = true; | |
| } | |
| condition.notify_all(); | |
| for (auto& worker : workers) { | |
| if (worker.joinable()) worker.join(); | |
| } | |
| } | |
| }; | |
| // ============================================ | |
| // Semaphore Class | |
| // ============================================ | |
| class Semaphore { | |
| private: | |
| std::mutex mtx; | |
| std::condition_variable cv; | |
| int count; | |
| public: | |
| Semaphore(int count) : count(count) {} | |
| void acquire() { | |
| std::unique_lock<std::mutex> lock(mtx); | |
| cv.wait(lock, [this] { return count > 0; }); | |
| count--; | |
| } | |
| void release() { | |
| std::unique_lock<std::mutex> lock(mtx); | |
| count++; | |
| cv.notify_one(); | |
| } | |
| }; | |
| // ============================================ | |
| // Global Variables | |
| // ============================================ | |
| thread_local CURL* tls_curl = nullptr; | |
| Semaphore g1_sem(MAX_CONCURRENT_PER_GROUP); | |
| Semaphore g2_sem(MAX_CONCURRENT_PER_GROUP); | |
| Semaphore g3_sem(MAX_CONCURRENT_PER_GROUP); | |
| std::mutex cacheMutex; | |
| std::vector<std::string> allOrgNames; | |
| std::string lastRefresh; | |
| std::string nextRefresh; | |
| std::atomic<bool> loading{false}; | |
| std::atomic<bool> refreshInProgress{false}; | |
| RPSLimiter g_rpsLimiter(MAX_RPS, MY_NAME, RPS_FILE); | |
| // ============================================ | |
| // Utility Functions | |
| // ============================================ | |
| size_t WriteCallback(void* contents, size_t size, size_t nmemb, std::string* output) { | |
| size_t totalSize = size * nmemb; | |
| output->append((char*)contents, totalSize); | |
| return totalSize; | |
| } | |
| CURL* getThreadLocalCurl() { | |
| if (!tls_curl) { | |
| tls_curl = curl_easy_init(); | |
| curl_easy_setopt(tls_curl, CURLOPT_WRITEFUNCTION, WriteCallback); | |
| curl_easy_setopt(tls_curl, CURLOPT_TIMEOUT, 10L); | |
| curl_easy_setopt(tls_curl, CURLOPT_FOLLOWLOCATION, 1L); | |
| curl_easy_setopt(tls_curl, CURLOPT_TCP_KEEPALIVE, 1L); | |
| curl_easy_setopt(tls_curl, CURLOPT_USERAGENT, "Mozilla/5.0"); | |
| curl_easy_setopt(tls_curl, CURLOPT_NOSIGNAL, 1L); | |
| } | |
| return tls_curl; | |
| } | |
| // ============================================ | |
| // Fetch Functions | |
| // ============================================ | |
| std::pair<int, std::string> fetchOrgsFile(const std::string& url) { | |
| g_rpsLimiter.waitIfNeeded(); | |
| g1_sem.acquire(); | |
| CURL* curl = getThreadLocalCurl(); | |
| std::string response; | |
| long httpCode = 0; | |
| curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); | |
| curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); | |
| CURLcode res = curl_easy_perform(curl); | |
| if (res == CURLE_OK) { | |
| curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); | |
| } else { | |
| httpCode = 0; | |
| } | |
| g1_sem.release(); | |
| return {httpCode, response}; | |
| } | |
| std::pair<int, std::string> fetchFullFile(const std::string& url) { | |
| g_rpsLimiter.waitIfNeeded(); | |
| g2_sem.acquire(); | |
| CURL* curl = getThreadLocalCurl(); | |
| std::string response; | |
| long httpCode = 0; | |
| curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); | |
| curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); | |
| CURLcode res = curl_easy_perform(curl); | |
| if (res == CURLE_OK) { | |
| curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); | |
| } else { | |
| httpCode = 0; | |
| } | |
| g2_sem.release(); | |
| return {httpCode, response}; | |
| } | |
| std::pair<int, std::string> fetchSizeFile(const std::string& url) { | |
| g_rpsLimiter.waitIfNeeded(); | |
| g3_sem.acquire(); | |
| CURL* curl = getThreadLocalCurl(); | |
| std::string response; | |
| long httpCode = 0; | |
| curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); | |
| curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); | |
| CURLcode res = curl_easy_perform(curl); | |
| if (res == CURLE_OK) { | |
| curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); | |
| } else { | |
| httpCode = 0; | |
| } | |
| g3_sem.release(); | |
| return {httpCode, response}; | |
| } | |
| // ============================================ | |
| // Parsing Functions | |
| // ============================================ | |
| std::vector<std::string> parseOrgs(const std::string& data) { | |
| std::vector<std::string> orgs; | |
| std::stringstream ss(data); | |
| std::string org; | |
| while (std::getline(ss, org, '|')) { | |
| org.erase(0, org.find_first_not_of(" \t\r\n")); | |
| org.erase(org.find_last_not_of(" \t\r\n") + 1); | |
| if (!org.empty()) orgs.push_back(org); | |
| } | |
| return orgs; | |
| } | |
| bool hasAllNumbers(const std::vector<int>& numbers) { | |
| for (int num : {1, 2, 3}) { | |
| if (std::find(numbers.begin(), numbers.end(), num) == numbers.end()) return false; | |
| } | |
| return true; | |
| } | |
| // ============================================ | |
| // Core Functions | |
| // ============================================ | |
| void bundledFetchOrgs() { | |
| if (refreshInProgress.exchange(true)) { | |
| return; | |
| } | |
| loading = true; | |
| std::cout << "[GROUP 1] Parallel fetch of ALL orgs*.txt files..." << std::endl; | |
| auto startTime = std::chrono::steady_clock::now(); | |
| std::vector<std::string> newOrgNames; | |
| std::vector<std::string> existingFiles; | |
| for (int i = 1; i <= 20; i++) { | |
| std::string fileName = "orgs" + std::to_string(i) + ".txt"; | |
| std::string url = BASE_URL + "/buckets/" + ORG + "/" + BUCKET + "/resolve/" + fileName + "?download=true"; | |
| auto [httpCode, response] = fetchOrgsFile(url); | |
| if (httpCode == 404) { | |
| if (i == 1) { | |
| break; | |
| } | |
| break; | |
| } | |
| if (httpCode == 200) { | |
| existingFiles.push_back(fileName); | |
| } | |
| } | |
| if (!existingFiles.empty()) { | |
| unsigned int numCores = std::min(std::thread::hardware_concurrency(), 8u); | |
| ThreadPool pool(numCores); | |
| std::mutex orgMutex; | |
| std::atomic<int> completed{0}; | |
| std::cout << " Downloading " << existingFiles.size() << " files on " << numCores << " cores..." << std::endl; | |
| for (const auto& fileName : existingFiles) { | |
| pool.enqueue([&, fileName]() { | |
| std::string url = BASE_URL + "/buckets/" + ORG + "/" + BUCKET + "/resolve/" + fileName + "?download=true"; | |
| auto [httpCode, response] = fetchOrgsFile(url); | |
| if (httpCode == 200) { | |
| std::vector<std::string> orgs = parseOrgs(response); | |
| { | |
| std::lock_guard<std::mutex> lock(orgMutex); | |
| newOrgNames.insert(newOrgNames.end(), orgs.begin(), orgs.end()); | |
| } | |
| } | |
| int done = ++completed; | |
| if (done % 10 == 0 || done == (int)existingFiles.size()) { | |
| std::cout << " Progress: " << done << "/" << existingFiles.size() << std::endl; | |
| } | |
| }); | |
| } | |
| pool.wait(); | |
| } | |
| std::set<std::string> unique(newOrgNames.begin(), newOrgNames.end()); | |
| newOrgNames.assign(unique.begin(), unique.end()); | |
| std::sort(newOrgNames.begin(), newOrgNames.end()); | |
| { | |
| std::lock_guard<std::mutex> lock(cacheMutex); | |
| allOrgNames.swap(newOrgNames); | |
| } | |
| auto now = std::chrono::system_clock::now(); | |
| auto nowTime = std::chrono::system_clock::to_time_t(now); | |
| auto nextTime = nowTime + REFRESH_INTERVAL; | |
| char timeStr[100]; | |
| std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%dT%H:%M:%SZ", std::gmtime(&nowTime)); | |
| lastRefresh = timeStr; | |
| std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%dT%H:%M:%SZ", std::gmtime(&nextTime)); | |
| nextRefresh = timeStr; | |
| loading = false; | |
| refreshInProgress = false; | |
| auto duration = std::chrono::duration_cast<std::chrono::seconds>( | |
| std::chrono::steady_clock::now() - startTime | |
| ).count(); | |
| std::cout << "[GROUP 1] Done: " << allOrgNames.size() << " orgs in " << duration << "s" << std::endl; | |
| } | |
| Json::Value bundledFetchCheck() { | |
| Json::Value response; | |
| std::vector<std::string> orgNamesCopy; | |
| { | |
| std::lock_guard<std::mutex> lock(cacheMutex); | |
| if (allOrgNames.empty()) { | |
| response["error"] = "No org names loaded"; | |
| return response; | |
| } | |
| orgNamesCopy = allOrgNames; | |
| } | |
| std::cout << "[GROUP 2+3] Parallel check of " << orgNamesCopy.size() << " orgs..." << std::endl; | |
| auto startTime = std::chrono::steady_clock::now(); | |
| unsigned int numCores = std::min(std::thread::hardware_concurrency(), 8u); | |
| ThreadPool pool(numCores); | |
| struct OrgResult { | |
| std::string org; | |
| std::vector<int> fullNumbers; | |
| bool hasAll; | |
| std::string sizeData; | |
| bool sizeEmpty; | |
| bool sizeFetched; | |
| size_t index; | |
| }; | |
| std::vector<std::unique_ptr<OrgResult>> results; | |
| results.reserve(orgNamesCopy.size()); | |
| std::mutex resultMutex; | |
| std::atomic<int> completed{0}; | |
| std::atomic<int> missingCount{0}; | |
| std::atomic<int> emptySizeCount{0}; | |
| std::vector<std::string> emptySizeOrgs; | |
| std::mutex emptySizeMutex; | |
| for (size_t i = 0; i < orgNamesCopy.size(); i++) { | |
| pool.enqueue([&, i]() { | |
| const std::string& orgName = orgNamesCopy[i]; | |
| auto result = std::make_unique<OrgResult>(); | |
| result->org = orgName; | |
| result->sizeFetched = false; | |
| result->index = i; | |
| std::string fullUrl = BASE_URL + "/buckets/" + orgName + "/status/resolve/full.txt?download=true"; | |
| auto [fullCode, fullResp] = fetchFullFile(fullUrl); | |
| if (fullCode == 200) { | |
| std::stringstream ss(fullResp); | |
| std::string num; | |
| while (std::getline(ss, num, '|')) { | |
| try { | |
| result->fullNumbers.push_back(std::stoi(num)); | |
| } catch (...) {} | |
| } | |
| } | |
| result->hasAll = hasAllNumbers(result->fullNumbers); | |
| if (!result->hasAll) { | |
| missingCount++; | |
| std::string sizeUrl = BASE_URL + "/buckets/" + orgName + "/status/resolve/size.txt?download=true"; | |
| auto [sizeCode, sizeResp] = fetchSizeFile(sizeUrl); | |
| if (sizeCode == 200) { | |
| result->sizeData = sizeResp; | |
| result->sizeData.erase(0, result->sizeData.find_first_not_of(" \t\r\n")); | |
| result->sizeData.erase(result->sizeData.find_last_not_of(" \t\r\n") + 1); | |
| result->sizeEmpty = result->sizeData.empty(); | |
| result->sizeFetched = true; | |
| if (result->sizeEmpty) { | |
| emptySizeCount++; | |
| std::lock_guard<std::mutex> lock(emptySizeMutex); | |
| emptySizeOrgs.push_back(orgName); | |
| } | |
| } | |
| } | |
| { | |
| std::lock_guard<std::mutex> lock(resultMutex); | |
| results.push_back(std::move(result)); | |
| } | |
| int done = ++completed; | |
| if (done % 100 == 0 || done == (int)orgNamesCopy.size()) { | |
| std::cout << " Progress: " << done << "/" << orgNamesCopy.size() << std::endl; | |
| } | |
| }); | |
| } | |
| pool.wait(); | |
| std::sort(results.begin(), results.end(), | |
| [](const std::unique_ptr<OrgResult>& a, const std::unique_ptr<OrgResult>& b) { | |
| return a->index < b->index; | |
| }); | |
| std::sort(emptySizeOrgs.begin(), emptySizeOrgs.end()); | |
| Json::Value jsonResults(Json::arrayValue); | |
| Json::Value emptySizeJson(Json::arrayValue); | |
| for (const auto& orgName : emptySizeOrgs) { | |
| emptySizeJson.append(orgName); | |
| } | |
| for (auto& r : results) { | |
| Json::Value org; | |
| org["org"] = r->org; | |
| Json::Value present(Json::arrayValue); | |
| for (int num : r->fullNumbers) present.append(num); | |
| org["fullNumbers"] = present; | |
| Json::Value missing(Json::arrayValue); | |
| for (int num : {1, 2, 3}) { | |
| if (std::find(r->fullNumbers.begin(), r->fullNumbers.end(), num) == r->fullNumbers.end()) { | |
| missing.append(num); | |
| } | |
| } | |
| org["fullMissing"] = missing; | |
| org["hasAllNumbers"] = r->hasAll; | |
| if (!r->hasAll && r->sizeFetched) { | |
| org["sizeData"] = r->sizeData; | |
| org["sizeEmpty"] = r->sizeEmpty; | |
| } | |
| jsonResults.append(org); | |
| } | |
| auto duration = std::chrono::duration_cast<std::chrono::seconds>( | |
| std::chrono::steady_clock::now() - startTime | |
| ).count(); | |
| Json::Value summary; | |
| summary["totalOrgs"] = (int)orgNamesCopy.size(); | |
| summary["missingNumbersOrgs"] = missingCount.load(); | |
| summary["emptySizeOrgs"] = emptySizeCount.load(); | |
| summary["duration"] = std::to_string(duration) + "s"; | |
| response["summary"] = summary; | |
| response["results"] = jsonResults; | |
| response["emptySizeOrgs"] = emptySizeJson; | |
| std::cout << "[GROUP 2+3] Done in " << duration << "s" << std::endl; | |
| return response; | |
| } | |
| std::string jsonToString(const Json::Value& json) { | |
| Json::StreamWriterBuilder builder; | |
| builder["indentation"] = ""; | |
| return Json::writeString(builder, json); | |
| } | |
| std::string jsonToJsonl(const Json::Value& json) { | |
| std::string result; | |
| if (json.isArray()) { | |
| Json::StreamWriterBuilder builder; | |
| builder["indentation"] = ""; | |
| for (const auto& item : json) { | |
| result += Json::writeString(builder, item) + "\n"; | |
| } | |
| } else { | |
| Json::StreamWriterBuilder builder; | |
| builder["indentation"] = ""; | |
| result = Json::writeString(builder, json) + "\n"; | |
| } | |
| return result; | |
| } | |
| Json::Value getRPSStatus() { | |
| Json::Value status; | |
| status["maxRps"] = MAX_RPS; | |
| status["myName"] = MY_NAME; | |
| std::ifstream file(RPS_FILE); | |
| if (file.is_open()) { | |
| std::vector<std::string> names; | |
| std::string line; | |
| while (std::getline(file, line)) { | |
| line.erase(0, line.find_first_not_of(" \t\r\n")); | |
| line.erase(line.find_last_not_of(" \t\r\n") + 1); | |
| if (!line.empty()) { | |
| names.push_back(line); | |
| } | |
| } | |
| file.close(); | |
| status["fileExists"] = true; | |
| status["namesInFile"] = (int)names.size(); | |
| Json::Value namesArray(Json::arrayValue); | |
| for (const auto& name : names) { | |
| namesArray.append(name); | |
| } | |
| status["names"] = namesArray; | |
| status["isLimited"] = std::find(names.begin(), names.end(), MY_NAME) != names.end(); | |
| } else { | |
| status["fileExists"] = false; | |
| status["namesInFile"] = 0; | |
| status["isLimited"] = false; | |
| } | |
| return status; | |
| } | |
| // ============================================ | |
| // Server | |
| // ============================================ | |
| void runServer() { | |
| int serverSocket = socket(AF_INET, SOCK_STREAM, 0); | |
| int opt = 1; | |
| setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); | |
| struct sockaddr_in serverAddr; | |
| serverAddr.sin_family = AF_INET; | |
| serverAddr.sin_addr.s_addr = INADDR_ANY; | |
| serverAddr.sin_port = htons(PORT); | |
| bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)); | |
| listen(serverSocket, 100); | |
| unsigned int numCores = std::thread::hardware_concurrency(); | |
| std::cout << "Server on port " << PORT << std::endl; | |
| std::cout << "Using " << numCores << " CPU cores" << std::endl; | |
| std::cout << "3 Groups: [1]orgs [2]full [3]size" << std::endl; | |
| std::cout << "Parallel processing - never sequential" << std::endl; | |
| std::cout << "RPS Limiter: max " << MAX_RPS << " req/s, name: '" << MY_NAME << "'" << std::endl; | |
| std::cout << "Parquet file: " << RPS_FILE << std::endl; | |
| curl_global_init(CURL_GLOBAL_ALL); | |
| std::filesystem::create_directories("/status"); | |
| std::thread(bundledFetchOrgs).detach(); | |
| std::thread([]() { | |
| while (true) { | |
| std::this_thread::sleep_for(std::chrono::seconds(REFRESH_INTERVAL)); | |
| bundledFetchOrgs(); | |
| } | |
| }).detach(); | |
| while (true) { | |
| struct sockaddr_in clientAddr; | |
| socklen_t clientLen = sizeof(clientAddr); | |
| int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientLen); | |
| if (clientSocket < 0) continue; | |
| std::thread([clientSocket, numCores]() { | |
| char buffer[4096]; | |
| int bytesRead = recv(clientSocket, buffer, sizeof(buffer) - 1, 0); | |
| if (bytesRead > 0) { | |
| buffer[bytesRead] = '\0'; | |
| std::istringstream iss(buffer); | |
| std::string method, path, version; | |
| iss >> method >> path >> version; | |
| std::string query; | |
| size_t pos = path.find('?'); | |
| if (pos != std::string::npos) { | |
| query = path.substr(pos + 1); | |
| path = path.substr(0, pos); | |
| } | |
| Json::Value respJson; | |
| if (path == "/" || path == "/health") { | |
| respJson["status"] = "healthy"; | |
| respJson["orgs"] = (int)allOrgNames.size(); | |
| respJson["cores"] = (int)numCores; | |
| } else if (path == "/status") { | |
| respJson["orgs"] = (int)allOrgNames.size(); | |
| respJson["lastRefresh"] = lastRefresh; | |
| respJson["nextRefresh"] = nextRefresh; | |
| respJson["loading"] = loading.load(); | |
| } else if (path == "/check") { | |
| respJson = bundledFetchCheck(); | |
| } else if (path == "/rps") { | |
| respJson = getRPSStatus(); | |
| } else if (path == "/refresh" && method == "POST") { | |
| std::thread(bundledFetchOrgs).detach(); | |
| respJson["status"] = "started"; | |
| } else { | |
| respJson["error"] = "Not found"; | |
| } | |
| std::string body; | |
| std::string contentType; | |
| if (path == "/check" && respJson.isMember("results")) { | |
| body = jsonToJsonl(respJson["results"]); | |
| contentType = "text/plain"; | |
| } else { | |
| body = jsonToJsonl(respJson); | |
| contentType = "text/plain"; | |
| } | |
| std::ostringstream resp; | |
| resp << "HTTP/1.1 200 OK\r\n"; | |
| resp << "Access-Control-Allow-Origin: *\r\n"; | |
| resp << "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"; | |
| resp << "Access-Control-Allow-Headers: Content-Type\r\n"; | |
| resp << "Content-Type: " << contentType << "\r\n"; | |
| resp << "Content-Length: " << body.size() << "\r\n"; | |
| resp << "Connection: keep-alive\r\n\r\n"; | |
| resp << body; | |
| std::string r = resp.str(); | |
| send(clientSocket, r.c_str(), r.size(), 0); | |
| } | |
| close(clientSocket); | |
| }).detach(); | |
| } | |
| close(serverSocket); | |
| } | |
| int main() { | |
| signal(SIGPIPE, SIG_IGN); | |
| runServer(); | |
| curl_global_cleanup(); | |
| return 0; | |
| } |