#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include const int PORT = 7860; const std::string ORG = "velost"; const std::string BUCKET = "orgs"; const std::string STATUS_BUCKET = "status"; const std::string BASE_URL = "https://huggingface.co"; const int REFRESH_INTERVAL = 12 * 60 * 60; const int MAX_CONCURRENT_PER_GROUP = 10; const int MAX_RPS = 50; const std::string MY_NAME = "3"; const std::string RPS_FILE = "/status/rps_limits.parquet"; // Forward declarations class Semaphore; class ThreadPool; class RPSLimiter; size_t WriteCallback(void* contents, size_t size, size_t nmemb, std::string* output); CURL* getThreadLocalCurl(); std::pair fetchOrgsFile(const std::string& url); std::pair fetchFullFile(const std::string& url); std::pair fetchSizeFile(const std::string& url); std::vector parseOrgs(const std::string& data); void bundledFetchOrgs(); bool hasAllNumbers(const std::vector& numbers); Json::Value bundledFetchCheck(); std::string jsonToString(const Json::Value& json); std::string jsonToJsonl(const Json::Value& json); void runServer(); Json::Value getRPSStatus(); // ============================================ // RPSLimiter Class // ============================================ class RPSLimiter { private: std::mutex mtx; std::chrono::steady_clock::time_point windowStart; std::atomic requestCount{0}; int maxRps; std::string name; std::string filePath; public: RPSLimiter(int maxRequestsPerSecond, const std::string& myName, const std::string& path) : maxRps(maxRequestsPerSecond), name(myName), filePath(path) { windowStart = std::chrono::steady_clock::now(); } bool checkAndAdd() { auto now = std::chrono::steady_clock::now(); std::lock_guard lock(mtx); auto elapsed = std::chrono::duration_cast(now - windowStart).count(); if (elapsed >= 1) { windowStart = now; requestCount = 0; } int currentCount = ++requestCount; if (currentCount > maxRps) { addNameToParquet(); return false; } if (currentCount <= maxRps) { removeNameFromParquet(); } return true; } void waitIfNeeded() { while (!checkAndAdd()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } private: void ensureFileExists() { int fd = open(filePath.c_str(), O_CREAT | O_RDWR, 0644); if (fd >= 0) { close(fd); } } std::vector readNamesLocked(FILE* fp) { std::vector names; rewind(fp); char line[256]; while (fgets(line, sizeof(line), fp)) { std::string s(line); s.erase(0, s.find_first_not_of(" \t\r\n")); s.erase(s.find_last_not_of(" \t\r\n") + 1); if (!s.empty()) names.push_back(s); } return names; } void writeNamesLocked(FILE* fp, const std::vector& names) { rewind(fp); ftruncate(fileno(fp), 0); for (size_t i = 0; i < names.size(); i++) { fputs(names[i].c_str(), fp); if (i < names.size() - 1) { fputs("\n", fp); } } fflush(fp); } void addNameToParquet() { ensureFileExists(); int fd = open(filePath.c_str(), O_RDWR); if (fd < 0) return; flock(fd, LOCK_EX); FILE* fp = fdopen(fd, "r+"); if (fp) { std::vector names = readNamesLocked(fp); if (std::find(names.begin(), names.end(), name) == names.end()) { names.push_back(name); writeNamesLocked(fp, names); std::cout << "RPS limit reached! Added '" << name << "' to " << filePath << std::endl; } fclose(fp); } flock(fd, LOCK_UN); close(fd); } void removeNameFromParquet() { ensureFileExists(); int fd = open(filePath.c_str(), O_RDWR); if (fd < 0) return; flock(fd, LOCK_EX); FILE* fp = fdopen(fd, "r+"); if (fp) { std::vector names = readNamesLocked(fp); auto it = std::find(names.begin(), names.end(), name); if (it != names.end()) { names.erase(it); writeNamesLocked(fp, names); std::cout << "RPS below limit. Removed '" << name << "' from " << filePath << std::endl; } fclose(fp); } flock(fd, LOCK_UN); close(fd); } }; // ============================================ // ThreadPool Class // ============================================ class ThreadPool { private: std::vector workers; std::queue> tasks; std::mutex queueMutex; std::condition_variable condition; bool stop; std::atomic activeTasks{0}; public: ThreadPool(size_t threads) : stop(false) { for (size_t i = 0; i < threads; i++) { workers.emplace_back([this] { while (true) { std::function task; { std::unique_lock lock(queueMutex); condition.wait(lock, [this] { return stop || !tasks.empty(); }); if (stop && tasks.empty()) return; task = std::move(tasks.front()); tasks.pop(); } activeTasks++; task(); activeTasks--; } }); } } void enqueue(std::function task) { { std::unique_lock lock(queueMutex); tasks.push(std::move(task)); } condition.notify_one(); } void wait() { while (activeTasks > 0 || !tasks.empty()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } ~ThreadPool() { { std::unique_lock lock(queueMutex); stop = true; } condition.notify_all(); for (auto& worker : workers) { if (worker.joinable()) worker.join(); } } }; // ============================================ // Semaphore Class // ============================================ class Semaphore { private: std::mutex mtx; std::condition_variable cv; int count; public: Semaphore(int count) : count(count) {} void acquire() { std::unique_lock lock(mtx); cv.wait(lock, [this] { return count > 0; }); count--; } void release() { std::unique_lock lock(mtx); count++; cv.notify_one(); } }; // ============================================ // Global Variables // ============================================ thread_local CURL* tls_curl = nullptr; Semaphore g1_sem(MAX_CONCURRENT_PER_GROUP); Semaphore g2_sem(MAX_CONCURRENT_PER_GROUP); Semaphore g3_sem(MAX_CONCURRENT_PER_GROUP); std::mutex cacheMutex; std::vector allOrgNames; std::string lastRefresh; std::string nextRefresh; std::atomic loading{false}; std::atomic refreshInProgress{false}; RPSLimiter g_rpsLimiter(MAX_RPS, MY_NAME, RPS_FILE); // ============================================ // Utility Functions // ============================================ size_t WriteCallback(void* contents, size_t size, size_t nmemb, std::string* output) { size_t totalSize = size * nmemb; output->append((char*)contents, totalSize); return totalSize; } CURL* getThreadLocalCurl() { if (!tls_curl) { tls_curl = curl_easy_init(); curl_easy_setopt(tls_curl, CURLOPT_WRITEFUNCTION, WriteCallback); curl_easy_setopt(tls_curl, CURLOPT_TIMEOUT, 10L); curl_easy_setopt(tls_curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(tls_curl, CURLOPT_TCP_KEEPALIVE, 1L); curl_easy_setopt(tls_curl, CURLOPT_USERAGENT, "Mozilla/5.0"); curl_easy_setopt(tls_curl, CURLOPT_NOSIGNAL, 1L); } return tls_curl; } // ============================================ // Fetch Functions // ============================================ std::pair fetchOrgsFile(const std::string& url) { g_rpsLimiter.waitIfNeeded(); g1_sem.acquire(); CURL* curl = getThreadLocalCurl(); std::string response; long httpCode = 0; curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); CURLcode res = curl_easy_perform(curl); if (res == CURLE_OK) { curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); } else { httpCode = 0; } g1_sem.release(); return {httpCode, response}; } std::pair fetchFullFile(const std::string& url) { g_rpsLimiter.waitIfNeeded(); g2_sem.acquire(); CURL* curl = getThreadLocalCurl(); std::string response; long httpCode = 0; curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); CURLcode res = curl_easy_perform(curl); if (res == CURLE_OK) { curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); } else { httpCode = 0; } g2_sem.release(); return {httpCode, response}; } std::pair fetchSizeFile(const std::string& url) { g_rpsLimiter.waitIfNeeded(); g3_sem.acquire(); CURL* curl = getThreadLocalCurl(); std::string response; long httpCode = 0; curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); CURLcode res = curl_easy_perform(curl); if (res == CURLE_OK) { curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); } else { httpCode = 0; } g3_sem.release(); return {httpCode, response}; } // ============================================ // Parsing Functions // ============================================ std::vector parseOrgs(const std::string& data) { std::vector orgs; std::stringstream ss(data); std::string org; while (std::getline(ss, org, '|')) { org.erase(0, org.find_first_not_of(" \t\r\n")); org.erase(org.find_last_not_of(" \t\r\n") + 1); if (!org.empty()) orgs.push_back(org); } return orgs; } bool hasAllNumbers(const std::vector& numbers) { for (int num : {1, 2, 3}) { if (std::find(numbers.begin(), numbers.end(), num) == numbers.end()) return false; } return true; } // ============================================ // Core Functions // ============================================ void bundledFetchOrgs() { if (refreshInProgress.exchange(true)) { return; } loading = true; std::cout << "[GROUP 1] Parallel fetch of ALL orgs*.txt files..." << std::endl; auto startTime = std::chrono::steady_clock::now(); std::vector newOrgNames; std::vector existingFiles; for (int i = 1; i <= 20; i++) { std::string fileName = "orgs" + std::to_string(i) + ".txt"; std::string url = BASE_URL + "/buckets/" + ORG + "/" + BUCKET + "/resolve/" + fileName + "?download=true"; auto [httpCode, response] = fetchOrgsFile(url); if (httpCode == 404) { if (i == 1) { break; } break; } if (httpCode == 200) { existingFiles.push_back(fileName); } } if (!existingFiles.empty()) { unsigned int numCores = std::min(std::thread::hardware_concurrency(), 8u); ThreadPool pool(numCores); std::mutex orgMutex; std::atomic completed{0}; std::cout << " Downloading " << existingFiles.size() << " files on " << numCores << " cores..." << std::endl; for (const auto& fileName : existingFiles) { pool.enqueue([&, fileName]() { std::string url = BASE_URL + "/buckets/" + ORG + "/" + BUCKET + "/resolve/" + fileName + "?download=true"; auto [httpCode, response] = fetchOrgsFile(url); if (httpCode == 200) { std::vector orgs = parseOrgs(response); { std::lock_guard lock(orgMutex); newOrgNames.insert(newOrgNames.end(), orgs.begin(), orgs.end()); } } int done = ++completed; if (done % 10 == 0 || done == (int)existingFiles.size()) { std::cout << " Progress: " << done << "/" << existingFiles.size() << std::endl; } }); } pool.wait(); } std::set unique(newOrgNames.begin(), newOrgNames.end()); newOrgNames.assign(unique.begin(), unique.end()); std::sort(newOrgNames.begin(), newOrgNames.end()); { std::lock_guard lock(cacheMutex); allOrgNames.swap(newOrgNames); } auto now = std::chrono::system_clock::now(); auto nowTime = std::chrono::system_clock::to_time_t(now); auto nextTime = nowTime + REFRESH_INTERVAL; char timeStr[100]; std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%dT%H:%M:%SZ", std::gmtime(&nowTime)); lastRefresh = timeStr; std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%dT%H:%M:%SZ", std::gmtime(&nextTime)); nextRefresh = timeStr; loading = false; refreshInProgress = false; auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - startTime ).count(); std::cout << "[GROUP 1] Done: " << allOrgNames.size() << " orgs in " << duration << "s" << std::endl; } Json::Value bundledFetchCheck() { Json::Value response; std::vector orgNamesCopy; { std::lock_guard lock(cacheMutex); if (allOrgNames.empty()) { response["error"] = "No org names loaded"; return response; } orgNamesCopy = allOrgNames; } std::cout << "[GROUP 2+3] Parallel check of " << orgNamesCopy.size() << " orgs..." << std::endl; auto startTime = std::chrono::steady_clock::now(); unsigned int numCores = std::min(std::thread::hardware_concurrency(), 8u); ThreadPool pool(numCores); struct OrgResult { std::string org; std::vector fullNumbers; bool hasAll; std::string sizeData; bool sizeEmpty; bool sizeFetched; size_t index; }; std::vector> results; results.reserve(orgNamesCopy.size()); std::mutex resultMutex; std::atomic completed{0}; std::atomic missingCount{0}; std::atomic emptySizeCount{0}; std::vector emptySizeOrgs; std::mutex emptySizeMutex; for (size_t i = 0; i < orgNamesCopy.size(); i++) { pool.enqueue([&, i]() { const std::string& orgName = orgNamesCopy[i]; auto result = std::make_unique(); result->org = orgName; result->sizeFetched = false; result->index = i; std::string fullUrl = BASE_URL + "/buckets/" + orgName + "/status/resolve/full.txt?download=true"; auto [fullCode, fullResp] = fetchFullFile(fullUrl); if (fullCode == 200) { std::stringstream ss(fullResp); std::string num; while (std::getline(ss, num, '|')) { try { result->fullNumbers.push_back(std::stoi(num)); } catch (...) {} } } result->hasAll = hasAllNumbers(result->fullNumbers); if (!result->hasAll) { missingCount++; std::string sizeUrl = BASE_URL + "/buckets/" + orgName + "/status/resolve/size.txt?download=true"; auto [sizeCode, sizeResp] = fetchSizeFile(sizeUrl); if (sizeCode == 200) { result->sizeData = sizeResp; result->sizeData.erase(0, result->sizeData.find_first_not_of(" \t\r\n")); result->sizeData.erase(result->sizeData.find_last_not_of(" \t\r\n") + 1); result->sizeEmpty = result->sizeData.empty(); result->sizeFetched = true; if (result->sizeEmpty) { emptySizeCount++; std::lock_guard lock(emptySizeMutex); emptySizeOrgs.push_back(orgName); } } } { std::lock_guard lock(resultMutex); results.push_back(std::move(result)); } int done = ++completed; if (done % 100 == 0 || done == (int)orgNamesCopy.size()) { std::cout << " Progress: " << done << "/" << orgNamesCopy.size() << std::endl; } }); } pool.wait(); std::sort(results.begin(), results.end(), [](const std::unique_ptr& a, const std::unique_ptr& b) { return a->index < b->index; }); std::sort(emptySizeOrgs.begin(), emptySizeOrgs.end()); Json::Value jsonResults(Json::arrayValue); Json::Value emptySizeJson(Json::arrayValue); for (const auto& orgName : emptySizeOrgs) { emptySizeJson.append(orgName); } for (auto& r : results) { Json::Value org; org["org"] = r->org; Json::Value present(Json::arrayValue); for (int num : r->fullNumbers) present.append(num); org["fullNumbers"] = present; Json::Value missing(Json::arrayValue); for (int num : {1, 2, 3}) { if (std::find(r->fullNumbers.begin(), r->fullNumbers.end(), num) == r->fullNumbers.end()) { missing.append(num); } } org["fullMissing"] = missing; org["hasAllNumbers"] = r->hasAll; if (!r->hasAll && r->sizeFetched) { org["sizeData"] = r->sizeData; org["sizeEmpty"] = r->sizeEmpty; } jsonResults.append(org); } auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - startTime ).count(); Json::Value summary; summary["totalOrgs"] = (int)orgNamesCopy.size(); summary["missingNumbersOrgs"] = missingCount.load(); summary["emptySizeOrgs"] = emptySizeCount.load(); summary["duration"] = std::to_string(duration) + "s"; response["summary"] = summary; response["results"] = jsonResults; response["emptySizeOrgs"] = emptySizeJson; std::cout << "[GROUP 2+3] Done in " << duration << "s" << std::endl; return response; } std::string jsonToString(const Json::Value& json) { Json::StreamWriterBuilder builder; builder["indentation"] = ""; return Json::writeString(builder, json); } std::string jsonToJsonl(const Json::Value& json) { std::string result; if (json.isArray()) { Json::StreamWriterBuilder builder; builder["indentation"] = ""; for (const auto& item : json) { result += Json::writeString(builder, item) + "\n"; } } else { Json::StreamWriterBuilder builder; builder["indentation"] = ""; result = Json::writeString(builder, json) + "\n"; } return result; } Json::Value getRPSStatus() { Json::Value status; status["maxRps"] = MAX_RPS; status["myName"] = MY_NAME; std::ifstream file(RPS_FILE); if (file.is_open()) { std::vector names; std::string line; while (std::getline(file, line)) { line.erase(0, line.find_first_not_of(" \t\r\n")); line.erase(line.find_last_not_of(" \t\r\n") + 1); if (!line.empty()) { names.push_back(line); } } file.close(); status["fileExists"] = true; status["namesInFile"] = (int)names.size(); Json::Value namesArray(Json::arrayValue); for (const auto& name : names) { namesArray.append(name); } status["names"] = namesArray; status["isLimited"] = std::find(names.begin(), names.end(), MY_NAME) != names.end(); } else { status["fileExists"] = false; status["namesInFile"] = 0; status["isLimited"] = false; } return status; } // ============================================ // Server // ============================================ void runServer() { int serverSocket = socket(AF_INET, SOCK_STREAM, 0); int opt = 1; setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); struct sockaddr_in serverAddr; serverAddr.sin_family = AF_INET; serverAddr.sin_addr.s_addr = INADDR_ANY; serverAddr.sin_port = htons(PORT); bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr)); listen(serverSocket, 100); unsigned int numCores = std::thread::hardware_concurrency(); std::cout << "Server on port " << PORT << std::endl; std::cout << "Using " << numCores << " CPU cores" << std::endl; std::cout << "3 Groups: [1]orgs [2]full [3]size" << std::endl; std::cout << "Parallel processing - never sequential" << std::endl; std::cout << "RPS Limiter: max " << MAX_RPS << " req/s, name: '" << MY_NAME << "'" << std::endl; std::cout << "Parquet file: " << RPS_FILE << std::endl; curl_global_init(CURL_GLOBAL_ALL); std::filesystem::create_directories("/status"); std::thread(bundledFetchOrgs).detach(); std::thread([]() { while (true) { std::this_thread::sleep_for(std::chrono::seconds(REFRESH_INTERVAL)); bundledFetchOrgs(); } }).detach(); while (true) { struct sockaddr_in clientAddr; socklen_t clientLen = sizeof(clientAddr); int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientLen); if (clientSocket < 0) continue; std::thread([clientSocket, numCores]() { char buffer[4096]; int bytesRead = recv(clientSocket, buffer, sizeof(buffer) - 1, 0); if (bytesRead > 0) { buffer[bytesRead] = '\0'; std::istringstream iss(buffer); std::string method, path, version; iss >> method >> path >> version; std::string query; size_t pos = path.find('?'); if (pos != std::string::npos) { query = path.substr(pos + 1); path = path.substr(0, pos); } Json::Value respJson; if (path == "/" || path == "/health") { respJson["status"] = "healthy"; respJson["orgs"] = (int)allOrgNames.size(); respJson["cores"] = (int)numCores; } else if (path == "/status") { respJson["orgs"] = (int)allOrgNames.size(); respJson["lastRefresh"] = lastRefresh; respJson["nextRefresh"] = nextRefresh; respJson["loading"] = loading.load(); } else if (path == "/check") { respJson = bundledFetchCheck(); } else if (path == "/rps") { respJson = getRPSStatus(); } else if (path == "/refresh" && method == "POST") { std::thread(bundledFetchOrgs).detach(); respJson["status"] = "started"; } else { respJson["error"] = "Not found"; } std::string body; std::string contentType; if (path == "/check" && respJson.isMember("results")) { body = jsonToJsonl(respJson["results"]); contentType = "text/plain"; } else { body = jsonToJsonl(respJson); contentType = "text/plain"; } std::ostringstream resp; resp << "HTTP/1.1 200 OK\r\n"; resp << "Access-Control-Allow-Origin: *\r\n"; resp << "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n"; resp << "Access-Control-Allow-Headers: Content-Type\r\n"; resp << "Content-Type: " << contentType << "\r\n"; resp << "Content-Length: " << body.size() << "\r\n"; resp << "Connection: keep-alive\r\n\r\n"; resp << body; std::string r = resp.str(); send(clientSocket, r.c_str(), r.size(), 0); } close(clientSocket); }).detach(); } close(serverSocket); } int main() { signal(SIGPIPE, SIG_IGN); runServer(); curl_global_cleanup(); return 0; }