add load_balancer
This commit is contained in:
parent
989e968d24
commit
7e9f49e46f
9 changed files with 352 additions and 44 deletions
|
@ -1,9 +1,13 @@
|
||||||
services:
|
services:
|
||||||
nginx:
|
lb:
|
||||||
container_name: balance-zig-pay
|
build: .
|
||||||
image: nginx:1.28-alpine
|
environment:
|
||||||
volumes:
|
- PORT=9999
|
||||||
- ./nginx.conf:/etc/nginx/nginx.conf:ro
|
- SERVER_ENV=PROD
|
||||||
|
- DEFAULT_HOST=zig1
|
||||||
|
- DEFAULT_PORT=8080
|
||||||
|
- FALLBACK_HOST=zig2
|
||||||
|
- FALLBACK_PORT=8080
|
||||||
depends_on:
|
depends_on:
|
||||||
- zig1
|
- zig1
|
||||||
- zig2
|
- zig2
|
||||||
|
@ -11,11 +15,12 @@ services:
|
||||||
- "9999:9999"
|
- "9999:9999"
|
||||||
networks:
|
networks:
|
||||||
- payment-processor
|
- payment-processor
|
||||||
|
command: ["--load-balancer"]
|
||||||
deploy:
|
deploy:
|
||||||
resources:
|
resources:
|
||||||
limits:
|
limits:
|
||||||
cpus: "0.70"
|
cpus: "0.20"
|
||||||
memory: "10MB"
|
memory: "50MB"
|
||||||
zig1: &zig
|
zig1: &zig
|
||||||
container_name: zig-pay-1
|
container_name: zig-pay-1
|
||||||
build: .
|
build: .
|
||||||
|
@ -23,7 +28,7 @@ services:
|
||||||
- SERVER_ENV=PROD
|
- SERVER_ENV=PROD
|
||||||
- DEFAULT_HOST=payment-processor-default
|
- DEFAULT_HOST=payment-processor-default
|
||||||
- DEFAULT_PORT=8080
|
- DEFAULT_PORT=8080
|
||||||
- DEFAULT_POOL=20
|
- DEFAULT_POOL=100
|
||||||
- FALLBACK_HOST=payment-processor-fallback
|
- FALLBACK_HOST=payment-processor-fallback
|
||||||
- FALLBACK_PORT=8080
|
- FALLBACK_PORT=8080
|
||||||
- FALLBACK_POOL=0
|
- FALLBACK_POOL=0
|
||||||
|
@ -36,7 +41,7 @@ services:
|
||||||
resources:
|
resources:
|
||||||
limits:
|
limits:
|
||||||
cpus: "0.15"
|
cpus: "0.15"
|
||||||
memory: "10MB"
|
memory: "50MB"
|
||||||
zig2:
|
zig2:
|
||||||
<<: *zig
|
<<: *zig
|
||||||
container_name: zig-pay-2
|
container_name: zig-pay-2
|
||||||
|
@ -44,7 +49,7 @@ services:
|
||||||
- SERVER_ENV=PROD
|
- SERVER_ENV=PROD
|
||||||
- DEFAULT_HOST=payment-processor-default
|
- DEFAULT_HOST=payment-processor-default
|
||||||
- DEFAULT_PORT=8080
|
- DEFAULT_PORT=8080
|
||||||
- DEFAULT_POOL=20
|
- DEFAULT_POOL=100
|
||||||
- FALLBACK_HOST=payment-processor-fallback
|
- FALLBACK_HOST=payment-processor-fallback
|
||||||
- FALLBACK_PORT=8080
|
- FALLBACK_PORT=8080
|
||||||
- FALLBACK_POOL=0
|
- FALLBACK_POOL=0
|
||||||
|
|
|
@ -17,8 +17,11 @@ pub const Config = struct {
|
||||||
fallback_pool: u16,
|
fallback_pool: u16,
|
||||||
payment_summary_exchange_host: ?[]const u8,
|
payment_summary_exchange_host: ?[]const u8,
|
||||||
payment_summary_exchange_port: u16,
|
payment_summary_exchange_port: u16,
|
||||||
|
args: [][:0]u8,
|
||||||
|
|
||||||
pub fn init(allocator: Allocator) !Config {
|
pub fn init(allocator: Allocator) !Config {
|
||||||
|
const args = try std.process.argsAlloc(allocator);
|
||||||
|
|
||||||
var envMap = try std.process.getEnvMap(allocator);
|
var envMap = try std.process.getEnvMap(allocator);
|
||||||
|
|
||||||
const port_env = envMap.get("PORT");
|
const port_env = envMap.get("PORT");
|
||||||
|
@ -59,6 +62,17 @@ pub const Config = struct {
|
||||||
.fallback_pool = fallback_pool,
|
.fallback_pool = fallback_pool,
|
||||||
.payment_summary_exchange_host = payment_summary_exchange_host,
|
.payment_summary_exchange_host = payment_summary_exchange_host,
|
||||||
.payment_summary_exchange_port = payment_summary_exchange_port,
|
.payment_summary_exchange_port = payment_summary_exchange_port,
|
||||||
|
.args = args,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn isLoadBalancer(self: *Config) bool {
|
||||||
|
for (self.args) |arg| {
|
||||||
|
if (std.mem.eql(u8, arg, "--load-balancer")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
|
@ -26,7 +26,7 @@ pub var summary_exchange: exchange.PaymentSummaryExchange = undefined;
|
||||||
pub var server_settings: ServerSettings = .{};
|
pub var server_settings: ServerSettings = .{};
|
||||||
|
|
||||||
pub fn init() !void {
|
pub fn init() !void {
|
||||||
const total_elements_expected = 50_000;
|
const total_elements_expected = 200_000;
|
||||||
const alloc_payments_repository = payments.calculateNecessaryMemory(total_elements_expected);
|
const alloc_payments_repository = payments.calculateNecessaryMemory(total_elements_expected);
|
||||||
|
|
||||||
heap_alocated = try std.heap.page_allocator.alloc(u8, 1024 * 100 + alloc_payments_repository);
|
heap_alocated = try std.heap.page_allocator.alloc(u8, 1024 * 100 + alloc_payments_repository);
|
||||||
|
|
|
@ -23,6 +23,16 @@ pub fn registerEndpoints(get_endpoints: *EndpointsManager, post_endpoints: *Endp
|
||||||
try post_endpoints.add("/payments", postPayments);
|
try post_endpoints.add("/payments", postPayments);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const default_date_time = DateTime{
|
||||||
|
.year = 0,
|
||||||
|
.month = 0,
|
||||||
|
.day = 0,
|
||||||
|
.hour = 0,
|
||||||
|
.minutes = 0,
|
||||||
|
.seconds = 0,
|
||||||
|
.milliseconds = 0,
|
||||||
|
};
|
||||||
|
|
||||||
fn postPayments(req: *Request, res: *Response) void {
|
fn postPayments(req: *Request, res: *Response) void {
|
||||||
res.withStatus(.unprocessable_entity).withContentType(.json).end();
|
res.withStatus(.unprocessable_entity).withContentType(.json).end();
|
||||||
|
|
||||||
|
@ -37,7 +47,7 @@ fn postPayments(req: *Request, res: *Response) void {
|
||||||
return res.withContent("{ \"correlationId\" : \"invalid\" }").end();
|
return res.withContent("{ \"correlationId\" : \"invalid\" }").end();
|
||||||
}, .amount = prop_amount.asFloat(f64) catch {
|
}, .amount = prop_amount.asFloat(f64) catch {
|
||||||
return res.withContent("{ \"amount\" : \"invalid\" }").end();
|
return res.withContent("{ \"amount\" : \"invalid\" }").end();
|
||||||
}, .requested_at = DateTime.now() };
|
}, .requested_at = default_date_time };
|
||||||
|
|
||||||
ctx.payments_repository.insert(payment) catch {
|
ctx.payments_repository.insert(payment) catch {
|
||||||
return res.withStatus(.internal_server_error).end();
|
return res.withStatus(.internal_server_error).end();
|
||||||
|
|
230
src/load_balancer.zig
Normal file
230
src/load_balancer.zig
Normal file
|
@ -0,0 +1,230 @@
|
||||||
|
const std = @import("std");
|
||||||
|
|
||||||
|
const log = std.log;
|
||||||
|
const Server = std.Server;
|
||||||
|
const http = std.http;
|
||||||
|
const net = std.net;
|
||||||
|
|
||||||
|
const testing = std.testing;
|
||||||
|
const expect = testing.expect;
|
||||||
|
|
||||||
|
const Address = net.Address;
|
||||||
|
const Stream = net.Stream;
|
||||||
|
|
||||||
|
const BUFFER_SIZE = 1024;
|
||||||
|
|
||||||
|
const UpstreamConnectionState = enum { inactive, available, occupied };
|
||||||
|
|
||||||
|
const UpstreamConnection = struct {
|
||||||
|
state: UpstreamConnectionState,
|
||||||
|
stream: Stream,
|
||||||
|
|
||||||
|
pub fn init(address: net.Address) UpstreamConnection {
|
||||||
|
const stream = net.tcpConnectToAddress(address) catch |err| {
|
||||||
|
log.err("Error when connecting to upstream {}", .{err});
|
||||||
|
unreachable;
|
||||||
|
};
|
||||||
|
|
||||||
|
return UpstreamConnection{
|
||||||
|
.stream = stream,
|
||||||
|
.state = .available,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
inline fn occupy(self: *UpstreamConnection) void {
|
||||||
|
self.state = .occupied;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline fn free(self: *UpstreamConnection) void {
|
||||||
|
self.state = .available;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const UpstreamServer = struct {
|
||||||
|
pool: [512]UpstreamConnection,
|
||||||
|
address: net.Address,
|
||||||
|
|
||||||
|
pub fn init(host: []const u8, port: u16) UpstreamServer {
|
||||||
|
var buf: [4096]u8 = undefined;
|
||||||
|
var pba = std.heap.FixedBufferAllocator.init(&buf);
|
||||||
|
|
||||||
|
const addrList = net.getAddressList(pba.allocator(), host, port) catch unreachable;
|
||||||
|
defer addrList.deinit();
|
||||||
|
|
||||||
|
var final_addr: ?Address = null;
|
||||||
|
for (addrList.addrs) |addr| {
|
||||||
|
const ping = net.tcpConnectToAddress(addr) catch {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
ping.close();
|
||||||
|
final_addr = addr;
|
||||||
|
}
|
||||||
|
|
||||||
|
std.debug.assert(final_addr != null);
|
||||||
|
|
||||||
|
var connections: [512]UpstreamConnection = undefined;
|
||||||
|
|
||||||
|
for (&connections) |*conn| {
|
||||||
|
conn.* = UpstreamConnection.init(final_addr.?);
|
||||||
|
}
|
||||||
|
|
||||||
|
return UpstreamServer{
|
||||||
|
.address = final_addr.?,
|
||||||
|
.pool = connections,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn getAvailableConnection(self: *UpstreamServer) ?*UpstreamConnection {
|
||||||
|
for (&self.pool) |*conn| {
|
||||||
|
if (conn.state == .available) {
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
pub const LoadBalancer = struct {
|
||||||
|
address: net.Address,
|
||||||
|
address_options: net.Address.ListenOptions,
|
||||||
|
servers: []UpstreamServer,
|
||||||
|
|
||||||
|
pub fn init(ip_map: []const u8, port: u16, servers: []UpstreamServer) !LoadBalancer {
|
||||||
|
const address = try net.Address.parseIp4(ip_map, port);
|
||||||
|
|
||||||
|
return LoadBalancer{
|
||||||
|
.address = address,
|
||||||
|
.address_options = net.Address.ListenOptions{},
|
||||||
|
.servers = servers,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(self: *LoadBalancer) !void {
|
||||||
|
var server = try self.address.listen(self.address_options);
|
||||||
|
|
||||||
|
std.debug.print("Listening load balancer http://{}\n", .{self.address});
|
||||||
|
|
||||||
|
defer server.deinit();
|
||||||
|
var lb: usize = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const conn = server.accept() catch |err| {
|
||||||
|
log.err("Error socket {}\n", .{err});
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
var upstream: ?*UpstreamConnection = null;
|
||||||
|
|
||||||
|
while (upstream == null) {
|
||||||
|
upstream = self.servers[lb % self.servers.len].getAvailableConnection();
|
||||||
|
lb += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
upstream.?.occupy();
|
||||||
|
|
||||||
|
var thread = std.Thread.spawn(.{ .stack_size = 1024 * 16 }, handleConnection, .{ self, conn, upstream.? }) catch |err| {
|
||||||
|
log.err("Creating thread error: {}\n", .{err});
|
||||||
|
conn.stream.close();
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
thread.detach();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handleConnection(
|
||||||
|
_: *LoadBalancer,
|
||||||
|
conn: net.Server.Connection,
|
||||||
|
upstream: *UpstreamConnection,
|
||||||
|
) void {
|
||||||
|
defer upstream.free();
|
||||||
|
defer conn.stream.close();
|
||||||
|
|
||||||
|
var buffer_request: [BUFFER_SIZE]u8 = undefined;
|
||||||
|
var buffer_response: [BUFFER_SIZE]u8 = undefined;
|
||||||
|
|
||||||
|
buffer_request[0] = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
var req_len: usize = 1;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
const aux_len = conn.stream.read(buffer_request[req_len..]) catch |err| {
|
||||||
|
log.err("Error when read from connection {}\n", .{err});
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (aux_len == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
req_len += aux_len;
|
||||||
|
|
||||||
|
if (buffer_request[1] == 'G' or buffer_request[8] == 'u') break;
|
||||||
|
|
||||||
|
if (req_len >= 25 + 22 + 30 + 20 + 32 + 2 + 70) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
upstream.stream.writeAll(buffer_request[0..req_len]) catch |err| {
|
||||||
|
log.err("Error when writing to upstream {}\n", .{err});
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
var res_len: usize = 0;
|
||||||
|
|
||||||
|
while (res_len == 0 or buffer_response[res_len - 1] != 0) {
|
||||||
|
res_len += upstream.stream.read(buffer_response[res_len..]) catch |err| {
|
||||||
|
log.err("Error when reading from upstream {}\n", .{err});
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = conn.stream.write(buffer_response[0 .. res_len - 1]) catch |err| {
|
||||||
|
log.err("Error when write from connection {}\n", .{err});
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const skip_tests = true;
|
||||||
|
|
||||||
|
test "expect connect to upstream server" {
|
||||||
|
if (skip_tests) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const upstream_server = UpstreamServer.init("localhost", 5001);
|
||||||
|
|
||||||
|
for (upstream_server.pool) |conn| {
|
||||||
|
try expect(conn.state == .available);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test "expect get the first available connection upstream server" {
|
||||||
|
if (skip_tests) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
var upstream_server = UpstreamServer.init("localhost", 5001);
|
||||||
|
|
||||||
|
upstream_server.pool[0].state = .occupied;
|
||||||
|
upstream_server.pool[1].state = .occupied;
|
||||||
|
upstream_server.pool[3].state = .occupied;
|
||||||
|
|
||||||
|
try expect(upstream_server.getAvailableConnection().? == &upstream_server.pool[2]);
|
||||||
|
}
|
||||||
|
|
||||||
|
test "expect initiate load balancer" {
|
||||||
|
if (skip_tests) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const upstream_server = UpstreamServer.init("localhost", 5001);
|
||||||
|
var servers = [_]UpstreamServer{upstream_server};
|
||||||
|
|
||||||
|
var lb = try LoadBalancer.init("127.0.0.1", 9999, &servers);
|
||||||
|
|
||||||
|
try lb.start();
|
||||||
|
}
|
49
src/main.zig
49
src/main.zig
|
@ -10,19 +10,33 @@ const HttpService = services.HttpService;
|
||||||
const server = @import("server.zig");
|
const server = @import("server.zig");
|
||||||
const PaymentIntegrator = @import("payment_integrator.zig").PaymentIntegrator;
|
const PaymentIntegrator = @import("payment_integrator.zig").PaymentIntegrator;
|
||||||
|
|
||||||
|
const lb = @import("load_balancer.zig");
|
||||||
|
|
||||||
pub fn main() !void {
|
pub fn main() !void {
|
||||||
|
std.debug.print("Init context\n", .{});
|
||||||
|
|
||||||
|
try ctx.init();
|
||||||
|
defer ctx.deinit();
|
||||||
|
std.debug.print("Context: OK\n", .{});
|
||||||
|
|
||||||
|
if (ctx.config.isLoadBalancer()) {
|
||||||
|
try startLoadBalancer();
|
||||||
|
} else {
|
||||||
|
try startServer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn startServer() !void {
|
||||||
|
std.debug.print("Starting server...\n", .{});
|
||||||
|
|
||||||
var get_endpoints = endpoints.EndpointsManager{};
|
var get_endpoints = endpoints.EndpointsManager{};
|
||||||
var post_endpoints = endpoints.EndpointsManager{};
|
var post_endpoints = endpoints.EndpointsManager{};
|
||||||
var put_endpoints = endpoints.EndpointsManager{};
|
var put_endpoints = endpoints.EndpointsManager{};
|
||||||
|
|
||||||
std.debug.print("Init context\n", .{});
|
|
||||||
try ctx.init();
|
|
||||||
defer ctx.deinit();
|
|
||||||
std.debug.print("Context: OK\n", .{});
|
|
||||||
sec.updateToken(ctx.config.initial_token);
|
sec.updateToken(ctx.config.initial_token);
|
||||||
|
|
||||||
try endpoints.payments.registerEndpoints(&get_endpoints, &post_endpoints);
|
try endpoints.payments.registerEndpoints(&get_endpoints, &post_endpoints);
|
||||||
std.debug.print("Endpionts: OK\n", .{});
|
std.debug.print("Endpoints: OK\n", .{});
|
||||||
|
|
||||||
std.debug.print("default_pool: {d}\n", .{ctx.config.default_pool});
|
std.debug.print("default_pool: {d}\n", .{ctx.config.default_pool});
|
||||||
std.debug.print("fallback_pool: {d}\n", .{ctx.config.fallback_pool});
|
std.debug.print("fallback_pool: {d}\n", .{ctx.config.fallback_pool});
|
||||||
|
@ -53,11 +67,30 @@ pub fn main() !void {
|
||||||
std.log.info("Server Enviroment: {}\n", .{ctx.config.env});
|
std.log.info("Server Enviroment: {}\n", .{ctx.config.env});
|
||||||
|
|
||||||
var myserver = try server.HttpServer.init(ip_map, ctx.config.port, &get_endpoints, &post_endpoints, &put_endpoints);
|
var myserver = try server.HttpServer.init(ip_map, ctx.config.port, &get_endpoints, &post_endpoints, &put_endpoints);
|
||||||
std.debug.print("Init server: OK\n", .{});
|
|
||||||
|
|
||||||
std.Thread.sleep(1_000_000 * 1000 * 4);
|
std.Thread.sleep(1_000_000 * 1000 * 2);
|
||||||
try myserver.start();
|
try myserver.start();
|
||||||
std.debug.print("Server Started: OK\n", .{});
|
}
|
||||||
|
|
||||||
|
fn startLoadBalancer() !void {
|
||||||
|
std.Thread.sleep(1_000_000 * 1000 * 3);
|
||||||
|
std.debug.print("Starting load balancer...\n", .{});
|
||||||
|
|
||||||
|
var ip_map: []const u8 = "127.0.0.1";
|
||||||
|
|
||||||
|
if (ctx.config.env == .PROD)
|
||||||
|
ip_map = "0.0.0.0";
|
||||||
|
|
||||||
|
std.log.info("LB Enviroment: {}\n", .{ctx.config.env});
|
||||||
|
|
||||||
|
const server1 = lb.UpstreamServer.init(ctx.config.default_host, ctx.config.default_port);
|
||||||
|
const server2 = lb.UpstreamServer.init(ctx.config.fallback_host, ctx.config.fallback_port);
|
||||||
|
|
||||||
|
var servers = [_]lb.UpstreamServer{ server1, server2 };
|
||||||
|
|
||||||
|
var mylb = try lb.LoadBalancer.init(ip_map, ctx.config.port, &servers);
|
||||||
|
|
||||||
|
try mylb.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
|
|
|
@ -78,6 +78,7 @@ pub const PaymentIntegrator = struct {
|
||||||
pi.payment.integration_status = .not_integrated;
|
pi.payment.integration_status = .not_integrated;
|
||||||
pi.is_integrated = true; // liar
|
pi.is_integrated = true; // liar
|
||||||
} else {
|
} else {
|
||||||
|
pi.payment.requested_at = DateTime.now();
|
||||||
pi.ticket = self.service_pool.dive(pi.getMessage()) catch null;
|
pi.ticket = self.service_pool.dive(pi.getMessage()) catch null;
|
||||||
pi.retries -= 1;
|
pi.retries -= 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ pub const HttpServer = struct {
|
||||||
log.info("Listeting server http://{}\n", .{self.address});
|
log.info("Listeting server http://{}\n", .{self.address});
|
||||||
|
|
||||||
defer server.deinit();
|
defer server.deinit();
|
||||||
|
var count: usize = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
const conn = server.accept() catch |err| {
|
const conn = server.accept() catch |err| {
|
||||||
|
@ -69,6 +70,7 @@ pub const HttpServer = struct {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
count += 1;
|
||||||
thread.detach();
|
thread.detach();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -86,8 +88,10 @@ pub const HttpServer = struct {
|
||||||
while (true) {
|
while (true) {
|
||||||
var req_len: usize = 0;
|
var req_len: usize = 0;
|
||||||
|
|
||||||
|
var lb_weight: usize = 0;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
const aux_len = conn.stream.read(&buffer_request) catch {
|
const aux_len = conn.stream.read(buffer_request[req_len..]) catch {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -95,22 +99,24 @@ pub const HttpServer = struct {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lb_weight = if (buffer_request[0] == 0) 1 else 0;
|
||||||
|
|
||||||
req_len += aux_len;
|
req_len += aux_len;
|
||||||
|
|
||||||
if (buffer_request[0] == 'G' or buffer_request[7] == 'u') break;
|
if (buffer_request[0 + lb_weight] == 'G' or buffer_request[7 + lb_weight] == 'u') break;
|
||||||
|
|
||||||
if (req_len >= 25 + 22 + 30 + 20 + 32 + 2 + 70) {
|
if (req_len >= 25 + 22 + 30 + 20 + 32 + 2 + 70) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const b_target = nextWhiteSpace(&buffer_request, 0, req_len) + 1;
|
const b_target = nextWhiteSpace(&buffer_request, 0 + lb_weight, req_len) + 1;
|
||||||
const e_target = nextWhiteSpace(&buffer_request, b_target, req_len);
|
const e_target = nextWhiteSpace(&buffer_request, b_target, req_len);
|
||||||
const full_path = buffer_request[b_target..e_target];
|
const full_path = buffer_request[b_target..e_target];
|
||||||
|
|
||||||
const path, const query = separatePathAndQuery(full_path);
|
const path, const query = separatePathAndQuery(full_path);
|
||||||
|
|
||||||
const http_method: http.Method = switch (buffer_request[0]) {
|
const http_method: http.Method = switch (buffer_request[0 + lb_weight]) {
|
||||||
'P' => .POST,
|
'P' => .POST,
|
||||||
'G' => .GET,
|
'G' => .GET,
|
||||||
else => unreachable
|
else => unreachable
|
||||||
|
@ -138,7 +144,14 @@ pub const HttpServer = struct {
|
||||||
.{ response.content.len, response.content },
|
.{ response.content.len, response.content },
|
||||||
) catch return;
|
) catch return;
|
||||||
|
|
||||||
_ = conn.stream.write(response_message) catch return;
|
var res_len = response_message.len;
|
||||||
|
|
||||||
|
if (lb_weight == 1) {
|
||||||
|
send_buffer[res_len] = 0;
|
||||||
|
res_len += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.stream.writeAll(send_buffer[0..res_len]) catch return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@ const ServiceError = s.ServiceError;
|
||||||
const ServiceTicket = s.ServiceTicket;
|
const ServiceTicket = s.ServiceTicket;
|
||||||
const ServiceHealth = s.ServiceHealth;
|
const ServiceHealth = s.ServiceHealth;
|
||||||
|
|
||||||
|
const CONN_PER_THREAD = 100;
|
||||||
|
|
||||||
pub const HttpService = struct {
|
pub const HttpService = struct {
|
||||||
id: u64,
|
id: u64,
|
||||||
name: []const u8,
|
name: []const u8,
|
||||||
|
@ -59,12 +61,12 @@ pub const HttpService = struct {
|
||||||
if (final_addr == null)
|
if (final_addr == null)
|
||||||
return ServiceError.Unreachable;
|
return ServiceError.Unreachable;
|
||||||
|
|
||||||
if (num_connections != 0 and num_connections % 10 != 0) {
|
if (num_connections != 0 and num_connections % CONN_PER_THREAD != 0) {
|
||||||
return ServiceError.NumConnectionsShouldBeMultipleOfTen;
|
return ServiceError.NumConnectionsShouldBeMultipleOfTen;
|
||||||
}
|
}
|
||||||
|
|
||||||
const num_tickets = num_connections * 2;
|
const num_tickets = num_connections * 2;
|
||||||
const num_threads = (num_connections / 10) + 2;
|
const num_threads = (num_connections / CONN_PER_THREAD) + 2;
|
||||||
|
|
||||||
var connections = try allocator.alloc(ServiceConnection, num_connections);
|
var connections = try allocator.alloc(ServiceConnection, num_connections);
|
||||||
const tickets = try allocator.alloc(ServiceTicket, num_tickets);
|
const tickets = try allocator.alloc(ServiceTicket, num_tickets);
|
||||||
|
@ -104,15 +106,15 @@ pub const HttpService = struct {
|
||||||
pub fn start(self: *HttpService) !void {
|
pub fn start(self: *HttpService) !void {
|
||||||
if (self.connections.len == 0) return;
|
if (self.connections.len == 0) return;
|
||||||
|
|
||||||
var conn_i: usize = 10;
|
var conn_i: usize = CONN_PER_THREAD;
|
||||||
var msg_i: usize = 20;
|
var msg_i: usize = CONN_PER_THREAD * 2;
|
||||||
|
|
||||||
for (0..self.threads.len - 2) |i| {
|
for (0..self.threads.len - 2) |i| {
|
||||||
const connections = self.connections[conn_i - 10 .. conn_i];
|
const connections = self.connections[conn_i - CONN_PER_THREAD .. conn_i];
|
||||||
const tickets = self.tickets[msg_i - 20 .. msg_i];
|
const tickets = self.tickets[msg_i - (CONN_PER_THREAD * 2) .. msg_i];
|
||||||
|
|
||||||
conn_i += 10;
|
conn_i += CONN_PER_THREAD;
|
||||||
msg_i += 20;
|
msg_i += CONN_PER_THREAD * 2;
|
||||||
|
|
||||||
self.threads[i] = try Thread.spawn(.{ .stack_size = 1024 * 64 }, HttpService.startMessenger, .{
|
self.threads[i] = try Thread.spawn(.{ .stack_size = 1024 * 64 }, HttpService.startMessenger, .{
|
||||||
self,
|
self,
|
||||||
|
@ -192,7 +194,7 @@ pub const HttpService = struct {
|
||||||
|
|
||||||
pub fn startMessenger(self: *HttpService, connections: []ServiceConnection, tickets: []ServiceTicket, thread_id: usize) void {
|
pub fn startMessenger(self: *HttpService, connections: []ServiceConnection, tickets: []ServiceTicket, thread_id: usize) void {
|
||||||
while (true) {
|
while (true) {
|
||||||
Thread.sleep(10_000_000);
|
Thread.sleep(20_000_000);
|
||||||
if (self.thread_stop[thread_id]) {
|
if (self.thread_stop[thread_id]) {
|
||||||
var conn_open = false;
|
var conn_open = false;
|
||||||
|
|
||||||
|
@ -354,7 +356,7 @@ test "expect init service if host exists and service is reachable" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect init service error if number of connections is not multiple of 10" {
|
test "expect init service error if number of connections is not multiple of 10" {
|
||||||
var service = HttpService.init(1, "default", "one.one.one.one", 53, 97, testing.allocator) catch |err| {
|
var service = HttpService.init(1, "default", "one.one.one.one", 53, CONN_PER_THREAD + 1, testing.allocator) catch |err| {
|
||||||
try expect(err == ServiceError.NumConnectionsShouldBeMultipleOfTen);
|
try expect(err == ServiceError.NumConnectionsShouldBeMultipleOfTen);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
@ -365,7 +367,7 @@ test "expect init service error if number of connections is not multiple of 10"
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect use only disponible or closed connections to send the ticket" {
|
test "expect use only disponible or closed connections to send the ticket" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
try service.connections[0].tryConnect(&service.address);
|
try service.connections[0].tryConnect(&service.address);
|
||||||
|
@ -383,7 +385,7 @@ test "expect use only disponible or closed connections to send the ticket" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect ticket status try_again if all connection ocuppied" {
|
test "expect ticket status try_again if all connection ocuppied" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
for (service.connections) |*conn| {
|
for (service.connections) |*conn| {
|
||||||
|
@ -400,7 +402,7 @@ test "expect ticket status try_again if all connection ocuppied" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect ticket status success if response is success" {
|
test "expect ticket status success if response is success" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
var conn = &service.connections[0];
|
var conn = &service.connections[0];
|
||||||
|
@ -418,7 +420,7 @@ test "expect ticket status success if response is success" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect ticket status try_again if connection closed" {
|
test "expect ticket status try_again if connection closed" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
var conn = &service.connections[0];
|
var conn = &service.connections[0];
|
||||||
|
@ -435,7 +437,7 @@ test "expect ticket status try_again if connection closed" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect ticket status try_again if response is http status error" {
|
test "expect ticket status try_again if response is http status error" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
var conn = &service.connections[0];
|
var conn = &service.connections[0];
|
||||||
|
@ -478,7 +480,7 @@ test "expect complete all ticket before thread stop" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect checkHealth update the service health: available" {
|
test "expect checkHealth update the service health: available" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
var conn = &service.health_connection;
|
var conn = &service.health_connection;
|
||||||
|
@ -494,7 +496,7 @@ test "expect checkHealth update the service health: available" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect checkHealth update the service health: unavailable" {
|
test "expect checkHealth update the service health: unavailable" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
var conn = &service.health_connection;
|
var conn = &service.health_connection;
|
||||||
|
@ -510,7 +512,7 @@ test "expect checkHealth update the service health: unavailable" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect unavailable if any connection ocorrur" {
|
test "expect unavailable if any connection ocorrur" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
var conn = &service.health_connection;
|
var conn = &service.health_connection;
|
||||||
|
@ -551,7 +553,7 @@ test "expect service start" {
|
||||||
}
|
}
|
||||||
|
|
||||||
test "expect queue the message in the right spot" {
|
test "expect queue the message in the right spot" {
|
||||||
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 40, testing.allocator);
|
var service = try HttpService.init(1, "default", "one.one.one.one", 53, 400, testing.allocator);
|
||||||
defer service.deinit();
|
defer service.deinit();
|
||||||
|
|
||||||
for (0..23) |i| {
|
for (0..23) |i| {
|
||||||
|
|
Loading…
Reference in a new issue