assembler3 / server.cpp
blaze-aura69's picture
Update server.cpp
58ac1ab verified
Raw
History Blame Contribute Delete
26.1 kB
#include <iostream>
#include <string>
#include <vector>
#include <map>
#include <set>
#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#include <sstream>
#include <algorithm>
#include <queue>
#include <condition_variable>
#include <functional>
#include <curl/curl.h>
#include <json/json.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <cstring>
#include <signal.h>
#include <fstream>
#include <filesystem>
#include <fcntl.h>
#include <sys/file.h>
const int PORT = 7860;
const std::string ORG = "velost";
const std::string BUCKET = "orgs";
const std::string STATUS_BUCKET = "status";
const std::string BASE_URL = "https://huggingface.co";
const int REFRESH_INTERVAL = 12 * 60 * 60;
const int MAX_CONCURRENT_PER_GROUP = 10;
const int MAX_RPS = 50;
const std::string MY_NAME = "3";
const std::string RPS_FILE = "/status/rps_limits.parquet";
// Forward declarations
class Semaphore;
class ThreadPool;
class RPSLimiter;
size_t WriteCallback(void* contents, size_t size, size_t nmemb, std::string* output);
CURL* getThreadLocalCurl();
std::pair<int, std::string> fetchOrgsFile(const std::string& url);
std::pair<int, std::string> fetchFullFile(const std::string& url);
std::pair<int, std::string> fetchSizeFile(const std::string& url);
std::vector<std::string> parseOrgs(const std::string& data);
void bundledFetchOrgs();
bool hasAllNumbers(const std::vector<int>& numbers);
Json::Value bundledFetchCheck();
std::string jsonToString(const Json::Value& json);
std::string jsonToJsonl(const Json::Value& json);
void runServer();
Json::Value getRPSStatus();
// ============================================
// RPSLimiter Class
// ============================================
class RPSLimiter {
private:
std::mutex mtx;
std::chrono::steady_clock::time_point windowStart;
std::atomic<int> requestCount{0};
int maxRps;
std::string name;
std::string filePath;
public:
RPSLimiter(int maxRequestsPerSecond, const std::string& myName, const std::string& path)
: maxRps(maxRequestsPerSecond), name(myName), filePath(path) {
windowStart = std::chrono::steady_clock::now();
}
bool checkAndAdd() {
auto now = std::chrono::steady_clock::now();
std::lock_guard<std::mutex> lock(mtx);
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - windowStart).count();
if (elapsed >= 1) {
windowStart = now;
requestCount = 0;
}
int currentCount = ++requestCount;
if (currentCount > maxRps) {
addNameToParquet();
return false;
}
if (currentCount <= maxRps) {
removeNameFromParquet();
}
return true;
}
void waitIfNeeded() {
while (!checkAndAdd()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
private:
void ensureFileExists() {
int fd = open(filePath.c_str(), O_CREAT | O_RDWR, 0644);
if (fd >= 0) {
close(fd);
}
}
std::vector<std::string> readNamesLocked(FILE* fp) {
std::vector<std::string> names;
rewind(fp);
char line[256];
while (fgets(line, sizeof(line), fp)) {
std::string s(line);
s.erase(0, s.find_first_not_of(" \t\r\n"));
s.erase(s.find_last_not_of(" \t\r\n") + 1);
if (!s.empty()) names.push_back(s);
}
return names;
}
void writeNamesLocked(FILE* fp, const std::vector<std::string>& names) {
rewind(fp);
ftruncate(fileno(fp), 0);
for (size_t i = 0; i < names.size(); i++) {
fputs(names[i].c_str(), fp);
if (i < names.size() - 1) {
fputs("\n", fp);
}
}
fflush(fp);
}
void addNameToParquet() {
ensureFileExists();
int fd = open(filePath.c_str(), O_RDWR);
if (fd < 0) return;
flock(fd, LOCK_EX);
FILE* fp = fdopen(fd, "r+");
if (fp) {
std::vector<std::string> names = readNamesLocked(fp);
if (std::find(names.begin(), names.end(), name) == names.end()) {
names.push_back(name);
writeNamesLocked(fp, names);
std::cout << "RPS limit reached! Added '" << name << "' to " << filePath << std::endl;
}
fclose(fp);
}
flock(fd, LOCK_UN);
close(fd);
}
void removeNameFromParquet() {
ensureFileExists();
int fd = open(filePath.c_str(), O_RDWR);
if (fd < 0) return;
flock(fd, LOCK_EX);
FILE* fp = fdopen(fd, "r+");
if (fp) {
std::vector<std::string> names = readNamesLocked(fp);
auto it = std::find(names.begin(), names.end(), name);
if (it != names.end()) {
names.erase(it);
writeNamesLocked(fp, names);
std::cout << "RPS below limit. Removed '" << name << "' from " << filePath << std::endl;
}
fclose(fp);
}
flock(fd, LOCK_UN);
close(fd);
}
};
// ============================================
// ThreadPool Class
// ============================================
class ThreadPool {
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
std::atomic<int> activeTasks{0};
public:
ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; i++) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return;
task = std::move(tasks.front());
tasks.pop();
}
activeTasks++;
task();
activeTasks--;
}
});
}
}
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(std::move(task));
}
condition.notify_one();
}
void wait() {
while (activeTasks > 0 || !tasks.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (auto& worker : workers) {
if (worker.joinable()) worker.join();
}
}
};
// ============================================
// Semaphore Class
// ============================================
class Semaphore {
private:
std::mutex mtx;
std::condition_variable cv;
int count;
public:
Semaphore(int count) : count(count) {}
void acquire() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return count > 0; });
count--;
}
void release() {
std::unique_lock<std::mutex> lock(mtx);
count++;
cv.notify_one();
}
};
// ============================================
// Global Variables
// ============================================
thread_local CURL* tls_curl = nullptr;
Semaphore g1_sem(MAX_CONCURRENT_PER_GROUP);
Semaphore g2_sem(MAX_CONCURRENT_PER_GROUP);
Semaphore g3_sem(MAX_CONCURRENT_PER_GROUP);
std::mutex cacheMutex;
std::vector<std::string> allOrgNames;
std::string lastRefresh;
std::string nextRefresh;
std::atomic<bool> loading{false};
std::atomic<bool> refreshInProgress{false};
RPSLimiter g_rpsLimiter(MAX_RPS, MY_NAME, RPS_FILE);
// ============================================
// Utility Functions
// ============================================
size_t WriteCallback(void* contents, size_t size, size_t nmemb, std::string* output) {
size_t totalSize = size * nmemb;
output->append((char*)contents, totalSize);
return totalSize;
}
CURL* getThreadLocalCurl() {
if (!tls_curl) {
tls_curl = curl_easy_init();
curl_easy_setopt(tls_curl, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(tls_curl, CURLOPT_TIMEOUT, 10L);
curl_easy_setopt(tls_curl, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(tls_curl, CURLOPT_TCP_KEEPALIVE, 1L);
curl_easy_setopt(tls_curl, CURLOPT_USERAGENT, "Mozilla/5.0");
curl_easy_setopt(tls_curl, CURLOPT_NOSIGNAL, 1L);
}
return tls_curl;
}
// ============================================
// Fetch Functions
// ============================================
std::pair<int, std::string> fetchOrgsFile(const std::string& url) {
g_rpsLimiter.waitIfNeeded();
g1_sem.acquire();
CURL* curl = getThreadLocalCurl();
std::string response;
long httpCode = 0;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
CURLcode res = curl_easy_perform(curl);
if (res == CURLE_OK) {
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
} else {
httpCode = 0;
}
g1_sem.release();
return {httpCode, response};
}
std::pair<int, std::string> fetchFullFile(const std::string& url) {
g_rpsLimiter.waitIfNeeded();
g2_sem.acquire();
CURL* curl = getThreadLocalCurl();
std::string response;
long httpCode = 0;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
CURLcode res = curl_easy_perform(curl);
if (res == CURLE_OK) {
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
} else {
httpCode = 0;
}
g2_sem.release();
return {httpCode, response};
}
std::pair<int, std::string> fetchSizeFile(const std::string& url) {
g_rpsLimiter.waitIfNeeded();
g3_sem.acquire();
CURL* curl = getThreadLocalCurl();
std::string response;
long httpCode = 0;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response);
CURLcode res = curl_easy_perform(curl);
if (res == CURLE_OK) {
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
} else {
httpCode = 0;
}
g3_sem.release();
return {httpCode, response};
}
// ============================================
// Parsing Functions
// ============================================
std::vector<std::string> parseOrgs(const std::string& data) {
std::vector<std::string> orgs;
std::stringstream ss(data);
std::string org;
while (std::getline(ss, org, '|')) {
org.erase(0, org.find_first_not_of(" \t\r\n"));
org.erase(org.find_last_not_of(" \t\r\n") + 1);
if (!org.empty()) orgs.push_back(org);
}
return orgs;
}
bool hasAllNumbers(const std::vector<int>& numbers) {
for (int num : {1, 2, 3}) {
if (std::find(numbers.begin(), numbers.end(), num) == numbers.end()) return false;
}
return true;
}
// ============================================
// Core Functions
// ============================================
void bundledFetchOrgs() {
if (refreshInProgress.exchange(true)) {
return;
}
loading = true;
std::cout << "[GROUP 1] Parallel fetch of ALL orgs*.txt files..." << std::endl;
auto startTime = std::chrono::steady_clock::now();
std::vector<std::string> newOrgNames;
std::vector<std::string> existingFiles;
for (int i = 1; i <= 20; i++) {
std::string fileName = "orgs" + std::to_string(i) + ".txt";
std::string url = BASE_URL + "/buckets/" + ORG + "/" + BUCKET + "/resolve/" + fileName + "?download=true";
auto [httpCode, response] = fetchOrgsFile(url);
if (httpCode == 404) {
if (i == 1) {
break;
}
break;
}
if (httpCode == 200) {
existingFiles.push_back(fileName);
}
}
if (!existingFiles.empty()) {
unsigned int numCores = std::min(std::thread::hardware_concurrency(), 8u);
ThreadPool pool(numCores);
std::mutex orgMutex;
std::atomic<int> completed{0};
std::cout << " Downloading " << existingFiles.size() << " files on " << numCores << " cores..." << std::endl;
for (const auto& fileName : existingFiles) {
pool.enqueue([&, fileName]() {
std::string url = BASE_URL + "/buckets/" + ORG + "/" + BUCKET + "/resolve/" + fileName + "?download=true";
auto [httpCode, response] = fetchOrgsFile(url);
if (httpCode == 200) {
std::vector<std::string> orgs = parseOrgs(response);
{
std::lock_guard<std::mutex> lock(orgMutex);
newOrgNames.insert(newOrgNames.end(), orgs.begin(), orgs.end());
}
}
int done = ++completed;
if (done % 10 == 0 || done == (int)existingFiles.size()) {
std::cout << " Progress: " << done << "/" << existingFiles.size() << std::endl;
}
});
}
pool.wait();
}
std::set<std::string> unique(newOrgNames.begin(), newOrgNames.end());
newOrgNames.assign(unique.begin(), unique.end());
std::sort(newOrgNames.begin(), newOrgNames.end());
{
std::lock_guard<std::mutex> lock(cacheMutex);
allOrgNames.swap(newOrgNames);
}
auto now = std::chrono::system_clock::now();
auto nowTime = std::chrono::system_clock::to_time_t(now);
auto nextTime = nowTime + REFRESH_INTERVAL;
char timeStr[100];
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%dT%H:%M:%SZ", std::gmtime(&nowTime));
lastRefresh = timeStr;
std::strftime(timeStr, sizeof(timeStr), "%Y-%m-%dT%H:%M:%SZ", std::gmtime(&nextTime));
nextRefresh = timeStr;
loading = false;
refreshInProgress = false;
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - startTime
).count();
std::cout << "[GROUP 1] Done: " << allOrgNames.size() << " orgs in " << duration << "s" << std::endl;
}
Json::Value bundledFetchCheck() {
Json::Value response;
std::vector<std::string> orgNamesCopy;
{
std::lock_guard<std::mutex> lock(cacheMutex);
if (allOrgNames.empty()) {
response["error"] = "No org names loaded";
return response;
}
orgNamesCopy = allOrgNames;
}
std::cout << "[GROUP 2+3] Parallel check of " << orgNamesCopy.size() << " orgs..." << std::endl;
auto startTime = std::chrono::steady_clock::now();
unsigned int numCores = std::min(std::thread::hardware_concurrency(), 8u);
ThreadPool pool(numCores);
struct OrgResult {
std::string org;
std::vector<int> fullNumbers;
bool hasAll;
std::string sizeData;
bool sizeEmpty;
bool sizeFetched;
size_t index;
};
std::vector<std::unique_ptr<OrgResult>> results;
results.reserve(orgNamesCopy.size());
std::mutex resultMutex;
std::atomic<int> completed{0};
std::atomic<int> missingCount{0};
std::atomic<int> emptySizeCount{0};
std::vector<std::string> emptySizeOrgs;
std::mutex emptySizeMutex;
for (size_t i = 0; i < orgNamesCopy.size(); i++) {
pool.enqueue([&, i]() {
const std::string& orgName = orgNamesCopy[i];
auto result = std::make_unique<OrgResult>();
result->org = orgName;
result->sizeFetched = false;
result->index = i;
std::string fullUrl = BASE_URL + "/buckets/" + orgName + "/status/resolve/full.txt?download=true";
auto [fullCode, fullResp] = fetchFullFile(fullUrl);
if (fullCode == 200) {
std::stringstream ss(fullResp);
std::string num;
while (std::getline(ss, num, '|')) {
try {
result->fullNumbers.push_back(std::stoi(num));
} catch (...) {}
}
}
result->hasAll = hasAllNumbers(result->fullNumbers);
if (!result->hasAll) {
missingCount++;
std::string sizeUrl = BASE_URL + "/buckets/" + orgName + "/status/resolve/size.txt?download=true";
auto [sizeCode, sizeResp] = fetchSizeFile(sizeUrl);
if (sizeCode == 200) {
result->sizeData = sizeResp;
result->sizeData.erase(0, result->sizeData.find_first_not_of(" \t\r\n"));
result->sizeData.erase(result->sizeData.find_last_not_of(" \t\r\n") + 1);
result->sizeEmpty = result->sizeData.empty();
result->sizeFetched = true;
if (result->sizeEmpty) {
emptySizeCount++;
std::lock_guard<std::mutex> lock(emptySizeMutex);
emptySizeOrgs.push_back(orgName);
}
}
}
{
std::lock_guard<std::mutex> lock(resultMutex);
results.push_back(std::move(result));
}
int done = ++completed;
if (done % 100 == 0 || done == (int)orgNamesCopy.size()) {
std::cout << " Progress: " << done << "/" << orgNamesCopy.size() << std::endl;
}
});
}
pool.wait();
std::sort(results.begin(), results.end(),
[](const std::unique_ptr<OrgResult>& a, const std::unique_ptr<OrgResult>& b) {
return a->index < b->index;
});
std::sort(emptySizeOrgs.begin(), emptySizeOrgs.end());
Json::Value jsonResults(Json::arrayValue);
Json::Value emptySizeJson(Json::arrayValue);
for (const auto& orgName : emptySizeOrgs) {
emptySizeJson.append(orgName);
}
for (auto& r : results) {
Json::Value org;
org["org"] = r->org;
Json::Value present(Json::arrayValue);
for (int num : r->fullNumbers) present.append(num);
org["fullNumbers"] = present;
Json::Value missing(Json::arrayValue);
for (int num : {1, 2, 3}) {
if (std::find(r->fullNumbers.begin(), r->fullNumbers.end(), num) == r->fullNumbers.end()) {
missing.append(num);
}
}
org["fullMissing"] = missing;
org["hasAllNumbers"] = r->hasAll;
if (!r->hasAll && r->sizeFetched) {
org["sizeData"] = r->sizeData;
org["sizeEmpty"] = r->sizeEmpty;
}
jsonResults.append(org);
}
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - startTime
).count();
Json::Value summary;
summary["totalOrgs"] = (int)orgNamesCopy.size();
summary["missingNumbersOrgs"] = missingCount.load();
summary["emptySizeOrgs"] = emptySizeCount.load();
summary["duration"] = std::to_string(duration) + "s";
response["summary"] = summary;
response["results"] = jsonResults;
response["emptySizeOrgs"] = emptySizeJson;
std::cout << "[GROUP 2+3] Done in " << duration << "s" << std::endl;
return response;
}
std::string jsonToString(const Json::Value& json) {
Json::StreamWriterBuilder builder;
builder["indentation"] = "";
return Json::writeString(builder, json);
}
std::string jsonToJsonl(const Json::Value& json) {
std::string result;
if (json.isArray()) {
Json::StreamWriterBuilder builder;
builder["indentation"] = "";
for (const auto& item : json) {
result += Json::writeString(builder, item) + "\n";
}
} else {
Json::StreamWriterBuilder builder;
builder["indentation"] = "";
result = Json::writeString(builder, json) + "\n";
}
return result;
}
Json::Value getRPSStatus() {
Json::Value status;
status["maxRps"] = MAX_RPS;
status["myName"] = MY_NAME;
std::ifstream file(RPS_FILE);
if (file.is_open()) {
std::vector<std::string> names;
std::string line;
while (std::getline(file, line)) {
line.erase(0, line.find_first_not_of(" \t\r\n"));
line.erase(line.find_last_not_of(" \t\r\n") + 1);
if (!line.empty()) {
names.push_back(line);
}
}
file.close();
status["fileExists"] = true;
status["namesInFile"] = (int)names.size();
Json::Value namesArray(Json::arrayValue);
for (const auto& name : names) {
namesArray.append(name);
}
status["names"] = namesArray;
status["isLimited"] = std::find(names.begin(), names.end(), MY_NAME) != names.end();
} else {
status["fileExists"] = false;
status["namesInFile"] = 0;
status["isLimited"] = false;
}
return status;
}
// ============================================
// Server
// ============================================
void runServer() {
int serverSocket = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = INADDR_ANY;
serverAddr.sin_port = htons(PORT);
bind(serverSocket, (struct sockaddr*)&serverAddr, sizeof(serverAddr));
listen(serverSocket, 100);
unsigned int numCores = std::thread::hardware_concurrency();
std::cout << "Server on port " << PORT << std::endl;
std::cout << "Using " << numCores << " CPU cores" << std::endl;
std::cout << "3 Groups: [1]orgs [2]full [3]size" << std::endl;
std::cout << "Parallel processing - never sequential" << std::endl;
std::cout << "RPS Limiter: max " << MAX_RPS << " req/s, name: '" << MY_NAME << "'" << std::endl;
std::cout << "Parquet file: " << RPS_FILE << std::endl;
curl_global_init(CURL_GLOBAL_ALL);
std::filesystem::create_directories("/status");
std::thread(bundledFetchOrgs).detach();
std::thread([]() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(REFRESH_INTERVAL));
bundledFetchOrgs();
}
}).detach();
while (true) {
struct sockaddr_in clientAddr;
socklen_t clientLen = sizeof(clientAddr);
int clientSocket = accept(serverSocket, (struct sockaddr*)&clientAddr, &clientLen);
if (clientSocket < 0) continue;
std::thread([clientSocket, numCores]() {
char buffer[4096];
int bytesRead = recv(clientSocket, buffer, sizeof(buffer) - 1, 0);
if (bytesRead > 0) {
buffer[bytesRead] = '\0';
std::istringstream iss(buffer);
std::string method, path, version;
iss >> method >> path >> version;
std::string query;
size_t pos = path.find('?');
if (pos != std::string::npos) {
query = path.substr(pos + 1);
path = path.substr(0, pos);
}
Json::Value respJson;
if (path == "/" || path == "/health") {
respJson["status"] = "healthy";
respJson["orgs"] = (int)allOrgNames.size();
respJson["cores"] = (int)numCores;
} else if (path == "/status") {
respJson["orgs"] = (int)allOrgNames.size();
respJson["lastRefresh"] = lastRefresh;
respJson["nextRefresh"] = nextRefresh;
respJson["loading"] = loading.load();
} else if (path == "/check") {
respJson = bundledFetchCheck();
} else if (path == "/rps") {
respJson = getRPSStatus();
} else if (path == "/refresh" && method == "POST") {
std::thread(bundledFetchOrgs).detach();
respJson["status"] = "started";
} else {
respJson["error"] = "Not found";
}
std::string body;
std::string contentType;
if (path == "/check" && respJson.isMember("results")) {
body = jsonToJsonl(respJson["results"]);
contentType = "text/plain";
} else {
body = jsonToJsonl(respJson);
contentType = "text/plain";
}
std::ostringstream resp;
resp << "HTTP/1.1 200 OK\r\n";
resp << "Access-Control-Allow-Origin: *\r\n";
resp << "Access-Control-Allow-Methods: GET, POST, OPTIONS\r\n";
resp << "Access-Control-Allow-Headers: Content-Type\r\n";
resp << "Content-Type: " << contentType << "\r\n";
resp << "Content-Length: " << body.size() << "\r\n";
resp << "Connection: keep-alive\r\n\r\n";
resp << body;
std::string r = resp.str();
send(clientSocket, r.c_str(), r.size(), 0);
}
close(clientSocket);
}).detach();
}
close(serverSocket);
}
int main() {
signal(SIGPIPE, SIG_IGN);
runServer();
curl_global_cleanup();
return 0;
}