Compare commits

..

No commits in common. "9e173f546829b84dc9fddff17f7e3fba63ce54e5" and "de2e425fae1c2f5259e190a56082cf22d8ec61fb" have entirely different histories.

10 changed files with 74 additions and 123 deletions

View file

@ -1,60 +0,0 @@
services:
nginx:
container_name: balance-zig-pay
image: nginx:1.28-alpine
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
depends_on:
- zig1
- zig2
ports:
- "9999:9999"
networks:
- payment-processor
deploy:
resources:
limits:
cpus: "0.50"
memory: "100MB"
zig1: &zig
container_name: zig-pay-1
build: .
environment:
- SERVER_ENV=PROD
- DEFAULT_HOST=payment-processor-default
- DEFAULT_PORT=8080
- DEFAULT_POOL=400
- FALLBACK_HOST=payment-processor-fallback
- FALLBACK_PORT=8080
- FALLBACK_POOL=200
- PORT=8080
- PAYMENT_SUMMARY_EXCHANGE_HOST=zig2
- PAYMENT_SUMMARY_EXCHANGE_PORT=6969
networks:
- payment-processor
deploy:
resources:
limits:
cpus: "0.50"
memory: "125MB"
zig2:
<<: *zig
container_name: zig-pay-2
environment:
- SERVER_ENV=PROD
- DEFAULT_HOST=payment-processor-default
- DEFAULT_PORT=8080
- DEFAULT_POOL=400
- FALLBACK_HOST=payment-processor-fallback
- FALLBACK_PORT=8080
- FALLBACK_POOL=200
- PORT=8080
- PAYMENT_SUMMARY_EXCHANGE_HOST=zig1
- PAYMENT_SUMMARY_EXCHANGE_PORT=6969
networks:
backend:
driver: bridge
payment-processor:
external: true

View file

@ -19,8 +19,8 @@ services:
deploy: deploy:
resources: resources:
limits: limits:
cpus: "1.5" cpus: "0.20"
memory: "130MB" memory: "50MB"
zig1: &zig zig1: &zig
container_name: zig-pay-1 container_name: zig-pay-1
build: . build: .
@ -28,10 +28,10 @@ services:
- SERVER_ENV=PROD - SERVER_ENV=PROD
- DEFAULT_HOST=payment-processor-default - DEFAULT_HOST=payment-processor-default
- DEFAULT_PORT=8080 - DEFAULT_PORT=8080
- DEFAULT_POOL=700 - DEFAULT_POOL=100
- FALLBACK_HOST=payment-processor-fallback - FALLBACK_HOST=payment-processor-fallback
- FALLBACK_PORT=8080 - FALLBACK_PORT=8080
- FALLBACK_POOL=400 - FALLBACK_POOL=0
- PORT=8080 - PORT=8080
- PAYMENT_SUMMARY_EXCHANGE_HOST=zig2 - PAYMENT_SUMMARY_EXCHANGE_HOST=zig2
- PAYMENT_SUMMARY_EXCHANGE_PORT=6969 - PAYMENT_SUMMARY_EXCHANGE_PORT=6969
@ -40,8 +40,8 @@ services:
deploy: deploy:
resources: resources:
limits: limits:
cpus: "0.45" cpus: "0.20"
memory: "110MB" memory: "50MB"
zig2: zig2:
<<: *zig <<: *zig
container_name: zig-pay-2 container_name: zig-pay-2
@ -49,10 +49,10 @@ services:
- SERVER_ENV=PROD - SERVER_ENV=PROD
- DEFAULT_HOST=payment-processor-default - DEFAULT_HOST=payment-processor-default
- DEFAULT_PORT=8080 - DEFAULT_PORT=8080
- DEFAULT_POOL=700 - DEFAULT_POOL=100
- FALLBACK_HOST=payment-processor-fallback - FALLBACK_HOST=payment-processor-fallback
- FALLBACK_PORT=8080 - FALLBACK_PORT=8080
- FALLBACK_POOL=400 - FALLBACK_POOL=0
- PORT=8080 - PORT=8080
- PAYMENT_SUMMARY_EXCHANGE_HOST=zig1 - PAYMENT_SUMMARY_EXCHANGE_HOST=zig1
- PAYMENT_SUMMARY_EXCHANGE_PORT=6969 - PAYMENT_SUMMARY_EXCHANGE_PORT=6969

View file

@ -3,14 +3,11 @@ worker_rlimit_nofile 100000;
events { events {
use epoll; use epoll;
worker_connections 2000; worker_connections 1024;
} }
http { http {
access_log off; access_log off;
error_log /dev/null crit; error_log /dev/null crit;
keepalive_requests 10000;
keepalive_timeout 600;
tcp_nopush on; tcp_nopush on;
@ -20,14 +17,12 @@ http {
upstream backend_servers { upstream backend_servers {
server zig1:8080; server zig1:8080;
server zig2:8080; server zig2:8080;
keepalive 600; keepalive 500;
} }
server { server {
listen 9999; listen 9999;
location / { location / {
proxy_no_cache 1;
proxy_cache_bypass 1;
proxy_buffering off; proxy_buffering off;
proxy_http_version 1.1; proxy_http_version 1.1;
proxy_pass http://backend_servers; proxy_pass http://backend_servers;

View file

@ -33,13 +33,13 @@ pub const Config = struct {
const default_port_env = envMap.get("DEFAULT_PORT"); const default_port_env = envMap.get("DEFAULT_PORT");
const default_port = try std.fmt.parseInt(u16, default_port_env orelse "8001", 10); const default_port = try std.fmt.parseInt(u16, default_port_env orelse "8001", 10);
const default_pool_env = envMap.get("DEFAULT_POOL"); const default_pool_env = envMap.get("DEFAULT_POOL");
const default_pool = try std.fmt.parseInt(u16, default_pool_env orelse "200", 10); const default_pool = try std.fmt.parseInt(u16, default_pool_env orelse "20", 10);
const fallback_host = envMap.get("FALLBACK_HOST") orelse "localhost"; const fallback_host = envMap.get("FALLBACK_HOST") orelse "localhost";
const fallback_port_env = envMap.get("FALLBACK_PORT"); const fallback_port_env = envMap.get("FALLBACK_PORT");
const fallback_port = try std.fmt.parseInt(u16, fallback_port_env orelse "8002", 10); const fallback_port = try std.fmt.parseInt(u16, fallback_port_env orelse "8002", 10);
const fallback_pool_env = envMap.get("FALLBACK_POOL"); const fallback_pool_env = envMap.get("FALLBACK_POOL");
const fallback_pool = try std.fmt.parseInt(u16, fallback_pool_env orelse "100", 10); const fallback_pool = try std.fmt.parseInt(u16, fallback_pool_env orelse "10", 10);
const payment_summary_exchange_host = envMap.get("PAYMENT_SUMMARY_EXCHANGE_HOST"); const payment_summary_exchange_host = envMap.get("PAYMENT_SUMMARY_EXCHANGE_HOST");
const payment_summary_exchange_port_env = envMap.get("PAYMENT_SUMMARY_EXCHANGE_PORT"); const payment_summary_exchange_port_env = envMap.get("PAYMENT_SUMMARY_EXCHANGE_PORT");

View file

@ -12,11 +12,10 @@ const Address = net.Address;
const Stream = net.Stream; const Stream = net.Stream;
const BUFFER_SIZE = 1024; const BUFFER_SIZE = 1024;
const MAX_CONNECTION_UPSTREAM = 2050;
//var metrics_mutex = std.Thread.Mutex{}; var metrics_mutex = std.Thread.Mutex{};
//var metrics_num_requests: u64 = 0; var metrics_num_requests: u64 = 0;
//var metrics_sum_req_time: u64 = 0; var metrics_sum_req_time: u64 = 0;
const UpstreamConnectionState = enum { inactive, available, occupied }; const UpstreamConnectionState = enum { inactive, available, occupied };
@ -46,7 +45,7 @@ const UpstreamConnection = struct {
}; };
pub const UpstreamServer = struct { pub const UpstreamServer = struct {
pool: [MAX_CONNECTION_UPSTREAM]UpstreamConnection, pool: [300]UpstreamConnection,
address: net.Address, address: net.Address,
pub fn init(host: []const u8, port: u16) UpstreamServer { pub fn init(host: []const u8, port: u16) UpstreamServer {
@ -68,7 +67,7 @@ pub const UpstreamServer = struct {
std.debug.assert(final_addr != null); std.debug.assert(final_addr != null);
var connections: [MAX_CONNECTION_UPSTREAM]UpstreamConnection = undefined; var connections: [300]UpstreamConnection = undefined;
for (&connections) |*conn| { for (&connections) |*conn| {
conn.* = UpstreamConnection.init(final_addr.?); conn.* = UpstreamConnection.init(final_addr.?);
@ -129,7 +128,7 @@ pub const LoadBalancer = struct {
upstream.?.occupy(); upstream.?.occupy();
var thread = std.Thread.spawn(.{ .stack_size = 1024 * 6 }, handleConnection, .{ self, conn, upstream.? }) catch |err| { var thread = std.Thread.spawn(.{ .stack_size = 1024 * 16 }, handleConnection, .{ self, conn, upstream.? }) catch |err| {
log.err("Creating thread error: {}\n", .{err}); log.err("Creating thread error: {}\n", .{err});
conn.stream.close(); conn.stream.close();
continue; continue;
@ -152,7 +151,7 @@ pub const LoadBalancer = struct {
buffer_request[0] = 0; buffer_request[0] = 0;
//var timer = std.time.Timer.start() catch return; var timer = std.time.Timer.start() catch return;
while (true) { while (true) {
var req_len: usize = 1; var req_len: usize = 1;
@ -175,8 +174,7 @@ pub const LoadBalancer = struct {
break; break;
} }
} }
timer.reset();
//timer.reset();
upstream.stream.writeAll(buffer_request[0..req_len]) catch |err| { upstream.stream.writeAll(buffer_request[0..req_len]) catch |err| {
log.err("Error when writing to upstream {}\n", .{err}); log.err("Error when writing to upstream {}\n", .{err});
@ -197,17 +195,17 @@ pub const LoadBalancer = struct {
return; return;
}; };
//const req_time_ns = timer.lap(); const req_time_ns = timer.lap();
//metrics_mutex.lock(); metrics_mutex.lock();
//metrics_num_requests += 1; metrics_num_requests += 1;
//metrics_sum_req_time += req_time_ns; metrics_sum_req_time += req_time_ns;
//if (metrics_num_requests % 5000 == 0) { if (metrics_num_requests % 5000 == 0) {
// std.debug.print("average requests time ns: {d}\n", .{metrics_sum_req_time / metrics_num_requests}); std.debug.print("average requests time ns: {d}\n", .{metrics_sum_req_time / metrics_num_requests});
//} }
//metrics_mutex.unlock(); metrics_mutex.unlock();
} }
} }
}; };

View file

@ -26,7 +26,7 @@ pub fn main() !void {
} }
fn startServer() !void { fn startServer() !void {
try ctx.intiRepository(1_000_000); try ctx.intiRepository(100_000);
std.debug.print("Starting server...\n", .{}); std.debug.print("Starting server...\n", .{});

View file

@ -12,8 +12,8 @@ const DateTime = @import("things").DateTime;
const is_test = @import("builtin").is_test; const is_test = @import("builtin").is_test;
const MAX_QUEUE_SIZE = if (is_test) 100 else 1_000_000; const MAX_QUEUE_SIZE = if (is_test) 100 else 100_000;
const MAX_RETRY = if (is_test) 2 else 1_000_000; const MAX_RETRY = if (is_test) 2 else 10_000_000;
const PaymentIntegration = struct { const PaymentIntegration = struct {
payment: *payments.Payment, payment: *payments.Payment,
@ -53,7 +53,7 @@ pub const PaymentIntegrator = struct {
fn startProcess(self: *PaymentIntegrator) void { fn startProcess(self: *PaymentIntegrator) void {
while (true) { while (true) {
Thread.sleep(40_000_000); Thread.sleep(50_000_000);
self.verifyTailSize(); self.verifyTailSize();
self.processPayments(); self.processPayments();
@ -79,7 +79,7 @@ pub const PaymentIntegrator = struct {
pi.is_integrated = true; // liar pi.is_integrated = true; // liar
} else { } else {
pi.payment.requested_at = DateTime.now(); pi.payment.requested_at = DateTime.now();
pi.ticket = self.service_pool.dive(pi.getMessage(), MAX_RETRY - pi.retries) catch null; pi.ticket = self.service_pool.dive(pi.getMessage()) catch null;
pi.retries -= 1; pi.retries -= 1;
} }
} else if (pi.ticket.?.status == .try_again) { } else if (pi.ticket.?.status == .try_again) {
@ -88,8 +88,7 @@ pub const PaymentIntegrator = struct {
pi.payment.integration_status = .not_integrated; pi.payment.integration_status = .not_integrated;
pi.is_integrated = true; // liar pi.is_integrated = true; // liar
} else { } else {
pi.payment.requested_at = DateTime.now(); pi.ticket = self.service_pool.dive(pi.getMessage()) catch null;
pi.ticket = self.service_pool.dive(pi.getMessage(), MAX_RETRY - pi.retries) catch null;
pi.retries -= 1; pi.retries -= 1;
} }
} else if (pi.ticket.?.status == .new or pi.ticket.?.status == .pending) { } else if (pi.ticket.?.status == .new or pi.ticket.?.status == .pending) {

View file

@ -64,7 +64,7 @@ pub const HttpServer = struct {
continue; continue;
}; };
var thread = std.Thread.spawn(.{ .stack_size = 1024 * 6 }, handleConnection, .{ self, conn }) catch |err| { var thread = std.Thread.spawn(.{ .stack_size = 1024 * 64 }, handleConnection, .{ self, conn }) catch |err| {
log.err("Creating thread error: {}\n", .{err}); log.err("Creating thread error: {}\n", .{err});
conn.stream.close(); conn.stream.close();
continue; continue;
@ -126,7 +126,7 @@ pub const HttpServer = struct {
.method = http_method, .method = http_method,
.path = path, .path = path,
.query = query, .query = query,
.body = if (http_method == .POST) buffer_request[findBeginJson(&buffer_request)..req_len] else buffer_request[0..0], .body = if (req_len > 70) buffer_request[req_len - 70 .. req_len] else buffer_request[0..0],
}; };
response = Response{ response = Response{
@ -161,16 +161,6 @@ pub const HttpServer = struct {
return b; return b;
} }
fn findBeginJson(buf: []const u8) usize {
for (0..buf.len) |i| {
if (buf[i] == '{' and (i > 0 and buf[i - 1] == '\n')) {
return i;
}
}
return if (buf.len == 0) 0 else buf.len - 1;
}
pub fn handleReqRes(self: HttpServer, req: *Request, res: *Response) void { pub fn handleReqRes(self: HttpServer, req: *Request, res: *Response) void {
switch (req.method) { switch (req.method) {
.GET => resolveEndpoint(req, res, self.get_endpoints), .GET => resolveEndpoint(req, res, self.get_endpoints),

View file

@ -241,7 +241,7 @@ pub const HttpService = struct {
} }
self.checkHealth(); self.checkHealth();
log.info("Service {s} health {any} {d} ms\n", .{ self.name, self.health, self.response_time }); //log.info("Service {s} health {any} {d} ms\n", .{ self.name, self.health, self.response_time });
Thread.sleep(1_000_000 * 1000); Thread.sleep(1_000_000 * 1000);
} }
} }
@ -305,7 +305,7 @@ pub const HttpService = struct {
fn updateCapacity(self: *HttpService, thread_id: usize) void { fn updateCapacity(self: *HttpService, thread_id: usize) void {
while (false) { while (false) {
Thread.sleep(10_000_000 * 1000); Thread.sleep(500_000_000);
var count: usize = 0; var count: usize = 0;
for (self.connections) |conn| { for (self.connections) |conn| {

View file

@ -27,11 +27,11 @@ pub const ServicePool = struct {
self.services_len += 1; self.services_len += 1;
} }
pub fn dive(self: *ServicePool, message: []const u8, numberOfRetries: i32) ServicePoolError!*ServiceTicket { pub fn dive(self: *ServicePool, message: []const u8) ServicePoolError!*ServiceTicket {
//var skip: [10]bool = .{false} ** 10; var skip: [10]bool = .{false} ** 10;
for (0..self.services_len) |_| { for (0..self.services_len) |_| {
var best_service = self.findBestService(numberOfRetries); var best_service = self.findBestService(&skip);
if (best_service == null) { if (best_service == null) {
return error.NoServiceAvailable; return error.NoServiceAvailable;
@ -49,12 +49,41 @@ pub const ServicePool = struct {
return error.NoServiceAvailable; return error.NoServiceAvailable;
} }
fn findBestService(self: *ServicePool, numberOfRetries: i32) ?*HttpService { fn findBestService(self: *ServicePool, skip: []bool) ?*HttpService {
if (numberOfRetries > 500) { var first_service_available_i: ?usize = null;
return self.services[1]; var best_perfomance_i: ?usize = null;
for (0..self.services_len) |i| {
if (skip[i]) {
continue;
}
const service = self.services[i];
if (service.health == .unavailable or service.capacity >= 90) {
continue;
}
if (first_service_available_i == null) {
first_service_available_i = i;
}
if (best_perfomance_i == null or self.services[best_perfomance_i.?].response_time > service.response_time) {
best_perfomance_i = i;
}
} }
return self.services[0]; if (best_perfomance_i != null) {
skip[best_perfomance_i.?] = true;
return self.services[best_perfomance_i.?];
}
if (first_service_available_i != null) {
skip[first_service_available_i.?] = true;
return self.services[first_service_available_i.?];
}
return null;
} }
}; };