diff --git a/docker-compose.yml b/docker-compose.yml index 5e0b5e8..4aad93c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,13 @@ services: - nginx: - container_name: balance-zig-pay - image: nginx:1.28-alpine - volumes: - - ./nginx.conf:/etc/nginx/nginx.conf:ro + lb: + build: . + environment: + - PORT=9999 + - SERVER_ENV=PROD + - DEFAULT_HOST=zig1 + - DEFAULT_PORT=8080 + - FALLBACK_HOST=zig2 + - FALLBACK_PORT=8080 depends_on: - zig1 - zig2 @@ -11,11 +15,12 @@ services: - "9999:9999" networks: - payment-processor + command: ["--load-balancer"] deploy: resources: limits: - cpus: "0.70" - memory: "10MB" + cpus: "0.20" + memory: "50MB" zig1: &zig container_name: zig-pay-1 build: . @@ -23,7 +28,7 @@ services: - SERVER_ENV=PROD - DEFAULT_HOST=payment-processor-default - DEFAULT_PORT=8080 - - DEFAULT_POOL=20 + - DEFAULT_POOL=100 - FALLBACK_HOST=payment-processor-fallback - FALLBACK_PORT=8080 - FALLBACK_POOL=0 @@ -36,7 +41,7 @@ services: resources: limits: cpus: "0.15" - memory: "10MB" + memory: "50MB" zig2: <<: *zig container_name: zig-pay-2 @@ -44,7 +49,7 @@ services: - SERVER_ENV=PROD - DEFAULT_HOST=payment-processor-default - DEFAULT_PORT=8080 - - DEFAULT_POOL=20 + - DEFAULT_POOL=100 - FALLBACK_HOST=payment-processor-fallback - FALLBACK_PORT=8080 - FALLBACK_POOL=0 diff --git a/src/context/config.zig b/src/context/config.zig index d53f53c..5463fb6 100644 --- a/src/context/config.zig +++ b/src/context/config.zig @@ -17,8 +17,11 @@ pub const Config = struct { fallback_pool: u16, payment_summary_exchange_host: ?[]const u8, payment_summary_exchange_port: u16, + args: [][:0]u8, pub fn init(allocator: Allocator) !Config { + const args = try std.process.argsAlloc(allocator); + var envMap = try std.process.getEnvMap(allocator); const port_env = envMap.get("PORT"); @@ -59,6 +62,17 @@ pub const Config = struct { .fallback_pool = fallback_pool, .payment_summary_exchange_host = payment_summary_exchange_host, .payment_summary_exchange_port = payment_summary_exchange_port, + .args = args, }; } + + pub fn isLoadBalancer(self: *Config) bool { + for (self.args) |arg| { + if (std.mem.eql(u8, arg, "--load-balancer")) { + return true; + } + } + + return false; + } }; diff --git a/src/context/main.zig b/src/context/main.zig index 7c744aa..49e7889 100644 --- a/src/context/main.zig +++ b/src/context/main.zig @@ -26,7 +26,7 @@ pub var summary_exchange: exchange.PaymentSummaryExchange = undefined; pub var server_settings: ServerSettings = .{}; pub fn init() !void { - const total_elements_expected = 50_000; + const total_elements_expected = 200_000; const alloc_payments_repository = payments.calculateNecessaryMemory(total_elements_expected); heap_alocated = try std.heap.page_allocator.alloc(u8, 1024 * 100 + alloc_payments_repository); diff --git a/src/endpoints/payments.zig b/src/endpoints/payments.zig index f6c466a..3f624ee 100644 --- a/src/endpoints/payments.zig +++ b/src/endpoints/payments.zig @@ -23,6 +23,16 @@ pub fn registerEndpoints(get_endpoints: *EndpointsManager, post_endpoints: *Endp try post_endpoints.add("/payments", postPayments); } +const default_date_time = DateTime{ + .year = 0, + .month = 0, + .day = 0, + .hour = 0, + .minutes = 0, + .seconds = 0, + .milliseconds = 0, +}; + fn postPayments(req: *Request, res: *Response) void { res.withStatus(.unprocessable_entity).withContentType(.json).end(); @@ -37,7 +47,7 @@ fn postPayments(req: *Request, res: *Response) void { return res.withContent("{ \"correlationId\" : \"invalid\" }").end(); }, .amount = prop_amount.asFloat(f64) catch { return res.withContent("{ \"amount\" : \"invalid\" }").end(); - }, .requested_at = DateTime.now() }; + }, .requested_at = default_date_time }; ctx.payments_repository.insert(payment) catch { return res.withStatus(.internal_server_error).end(); diff --git a/src/load_balancer.zig b/src/load_balancer.zig new file mode 100644 index 0000000..840ad38 --- /dev/null +++ b/src/load_balancer.zig @@ -0,0 +1,230 @@ +const std = @import("std"); + +const log = std.log; +const Server = std.Server; +const http = std.http; +const net = std.net; + +const testing = std.testing; +const expect = testing.expect; + +const Address = net.Address; +const Stream = net.Stream; + +const BUFFER_SIZE = 1024; + +const UpstreamConnectionState = enum { inactive, available, occupied }; + +const UpstreamConnection = struct { + state: UpstreamConnectionState, + stream: Stream, + + pub fn init(address: net.Address) UpstreamConnection { + const stream = net.tcpConnectToAddress(address) catch |err| { + log.err("Error when connecting to upstream {}", .{err}); + unreachable; + }; + + return UpstreamConnection{ + .stream = stream, + .state = .available, + }; + } + + inline fn occupy(self: *UpstreamConnection) void { + self.state = .occupied; + } + + inline fn free(self: *UpstreamConnection) void { + self.state = .available; + } +}; + +pub const UpstreamServer = struct { + pool: [512]UpstreamConnection, + address: net.Address, + + pub fn init(host: []const u8, port: u16) UpstreamServer { + var buf: [4096]u8 = undefined; + var pba = std.heap.FixedBufferAllocator.init(&buf); + + const addrList = net.getAddressList(pba.allocator(), host, port) catch unreachable; + defer addrList.deinit(); + + var final_addr: ?Address = null; + for (addrList.addrs) |addr| { + const ping = net.tcpConnectToAddress(addr) catch { + continue; + }; + + ping.close(); + final_addr = addr; + } + + std.debug.assert(final_addr != null); + + var connections: [512]UpstreamConnection = undefined; + + for (&connections) |*conn| { + conn.* = UpstreamConnection.init(final_addr.?); + } + + return UpstreamServer{ + .address = final_addr.?, + .pool = connections, + }; + } + + pub fn getAvailableConnection(self: *UpstreamServer) ?*UpstreamConnection { + for (&self.pool) |*conn| { + if (conn.state == .available) { + return conn; + } + } + + return null; + } +}; + +pub const LoadBalancer = struct { + address: net.Address, + address_options: net.Address.ListenOptions, + servers: []UpstreamServer, + + pub fn init(ip_map: []const u8, port: u16, servers: []UpstreamServer) !LoadBalancer { + const address = try net.Address.parseIp4(ip_map, port); + + return LoadBalancer{ + .address = address, + .address_options = net.Address.ListenOptions{}, + .servers = servers, + }; + } + + pub fn start(self: *LoadBalancer) !void { + var server = try self.address.listen(self.address_options); + + std.debug.print("Listening load balancer http://{}\n", .{self.address}); + + defer server.deinit(); + var lb: usize = 0; + + while (true) { + const conn = server.accept() catch |err| { + log.err("Error socket {}\n", .{err}); + continue; + }; + + var upstream: ?*UpstreamConnection = null; + + while (upstream == null) { + upstream = self.servers[lb % self.servers.len].getAvailableConnection(); + lb += 1; + } + + upstream.?.occupy(); + + var thread = std.Thread.spawn(.{ .stack_size = 1024 * 16 }, handleConnection, .{ self, conn, upstream.? }) catch |err| { + log.err("Creating thread error: {}\n", .{err}); + conn.stream.close(); + continue; + }; + + thread.detach(); + } + } + + fn handleConnection( + _: *LoadBalancer, + conn: net.Server.Connection, + upstream: *UpstreamConnection, + ) void { + defer upstream.free(); + defer conn.stream.close(); + + var buffer_request: [BUFFER_SIZE]u8 = undefined; + var buffer_response: [BUFFER_SIZE]u8 = undefined; + + buffer_request[0] = 0; + + while (true) { + var req_len: usize = 1; + + while (true) { + const aux_len = conn.stream.read(buffer_request[req_len..]) catch |err| { + log.err("Error when read from connection {}\n", .{err}); + return; + }; + + if (aux_len == 0) { + return; + } + + req_len += aux_len; + + if (buffer_request[1] == 'G' or buffer_request[8] == 'u') break; + + if (req_len >= 25 + 22 + 30 + 20 + 32 + 2 + 70) { + break; + } + } + + upstream.stream.writeAll(buffer_request[0..req_len]) catch |err| { + log.err("Error when writing to upstream {}\n", .{err}); + return; + }; + + var res_len: usize = 0; + + while (res_len == 0 or buffer_response[res_len - 1] != 0) { + res_len += upstream.stream.read(buffer_response[res_len..]) catch |err| { + log.err("Error when reading from upstream {}\n", .{err}); + return; + }; + } + + _ = conn.stream.write(buffer_response[0 .. res_len - 1]) catch |err| { + log.err("Error when write from connection {}\n", .{err}); + return; + }; + } + } +}; + +const skip_tests = true; + +test "expect connect to upstream server" { + if (skip_tests) { + return; + } + const upstream_server = UpstreamServer.init("localhost", 5001); + + for (upstream_server.pool) |conn| { + try expect(conn.state == .available); + } +} + +test "expect get the first available connection upstream server" { + if (skip_tests) { + return; + } + var upstream_server = UpstreamServer.init("localhost", 5001); + + upstream_server.pool[0].state = .occupied; + upstream_server.pool[1].state = .occupied; + upstream_server.pool[3].state = .occupied; + + try expect(upstream_server.getAvailableConnection().? == &upstream_server.pool[2]); +} + +test "expect initiate load balancer" { + if (skip_tests) { + return; + } + const upstream_server = UpstreamServer.init("localhost", 5001); + var servers = [_]UpstreamServer{upstream_server}; + + var lb = try LoadBalancer.init("127.0.0.1", 9999, &servers); + + try lb.start(); +} diff --git a/src/main.zig b/src/main.zig index 71b3128..559ed63 100644 --- a/src/main.zig +++ b/src/main.zig @@ -10,19 +10,33 @@ const HttpService = services.HttpService; const server = @import("server.zig"); const PaymentIntegrator = @import("payment_integrator.zig").PaymentIntegrator; +const lb = @import("load_balancer.zig"); + pub fn main() !void { + std.debug.print("Init context\n", .{}); + + try ctx.init(); + defer ctx.deinit(); + std.debug.print("Context: OK\n", .{}); + + if (ctx.config.isLoadBalancer()) { + try startLoadBalancer(); + } else { + try startServer(); + } +} + +fn startServer() !void { + std.debug.print("Starting server...\n", .{}); + var get_endpoints = endpoints.EndpointsManager{}; var post_endpoints = endpoints.EndpointsManager{}; var put_endpoints = endpoints.EndpointsManager{}; - std.debug.print("Init context\n", .{}); - try ctx.init(); - defer ctx.deinit(); - std.debug.print("Context: OK\n", .{}); sec.updateToken(ctx.config.initial_token); try endpoints.payments.registerEndpoints(&get_endpoints, &post_endpoints); - std.debug.print("Endpionts: OK\n", .{}); + std.debug.print("Endpoints: OK\n", .{}); std.debug.print("default_pool: {d}\n", .{ctx.config.default_pool}); std.debug.print("fallback_pool: {d}\n", .{ctx.config.fallback_pool}); @@ -53,11 +67,30 @@ pub fn main() !void { std.log.info("Server Enviroment: {}\n", .{ctx.config.env}); var myserver = try server.HttpServer.init(ip_map, ctx.config.port, &get_endpoints, &post_endpoints, &put_endpoints); - std.debug.print("Init server: OK\n", .{}); - std.Thread.sleep(1_000_000 * 1000 * 4); + std.Thread.sleep(1_000_000 * 1000 * 2); try myserver.start(); - std.debug.print("Server Started: OK\n", .{}); +} + +fn startLoadBalancer() !void { + std.Thread.sleep(1_000_000 * 1000 * 3); + std.debug.print("Starting load balancer...\n", .{}); + + var ip_map: []const u8 = "127.0.0.1"; + + if (ctx.config.env == .PROD) + ip_map = "0.0.0.0"; + + std.log.info("LB Enviroment: {}\n", .{ctx.config.env}); + + const server1 = lb.UpstreamServer.init(ctx.config.default_host, ctx.config.default_port); + const server2 = lb.UpstreamServer.init(ctx.config.fallback_host, ctx.config.fallback_port); + + var servers = [_]lb.UpstreamServer{ server1, server2 }; + + var mylb = try lb.LoadBalancer.init(ip_map, ctx.config.port, &servers); + + try mylb.start(); } test { diff --git a/src/payment_integrator.zig b/src/payment_integrator.zig index abf018e..03d3d89 100644 --- a/src/payment_integrator.zig +++ b/src/payment_integrator.zig @@ -78,6 +78,7 @@ pub const PaymentIntegrator = struct { pi.payment.integration_status = .not_integrated; pi.is_integrated = true; // liar } else { + pi.payment.requested_at = DateTime.now(); pi.ticket = self.service_pool.dive(pi.getMessage()) catch null; pi.retries -= 1; } diff --git a/src/server.zig b/src/server.zig index 14cd814..eff9ec6 100644 --- a/src/server.zig +++ b/src/server.zig @@ -56,6 +56,7 @@ pub const HttpServer = struct { log.info("Listeting server http://{}\n", .{self.address}); defer server.deinit(); + var count: usize = 0; while (true) { const conn = server.accept() catch |err| { @@ -69,6 +70,7 @@ pub const HttpServer = struct { continue; }; + count += 1; thread.detach(); } } @@ -86,8 +88,10 @@ pub const HttpServer = struct { while (true) { var req_len: usize = 0; + var lb_weight: usize = 0; + while (true) { - const aux_len = conn.stream.read(&buffer_request) catch { + const aux_len = conn.stream.read(buffer_request[req_len..]) catch { return; }; @@ -95,22 +99,24 @@ pub const HttpServer = struct { return; } + lb_weight = if (buffer_request[0] == 0) 1 else 0; + req_len += aux_len; - if (buffer_request[0] == 'G' or buffer_request[7] == 'u') break; + if (buffer_request[0 + lb_weight] == 'G' or buffer_request[7 + lb_weight] == 'u') break; if (req_len >= 25 + 22 + 30 + 20 + 32 + 2 + 70) { break; } } - const b_target = nextWhiteSpace(&buffer_request, 0, req_len) + 1; + const b_target = nextWhiteSpace(&buffer_request, 0 + lb_weight, req_len) + 1; const e_target = nextWhiteSpace(&buffer_request, b_target, req_len); const full_path = buffer_request[b_target..e_target]; const path, const query = separatePathAndQuery(full_path); - const http_method: http.Method = switch (buffer_request[0]) { + const http_method: http.Method = switch (buffer_request[0 + lb_weight]) { 'P' => .POST, 'G' => .GET, else => unreachable @@ -138,7 +144,14 @@ pub const HttpServer = struct { .{ response.content.len, response.content }, ) catch return; - _ = conn.stream.write(response_message) catch return; + var res_len = response_message.len; + + if (lb_weight == 1) { + send_buffer[res_len] = 0; + res_len += 1; + } + + conn.stream.writeAll(send_buffer[0..res_len]) catch return; } } diff --git a/src/services/http_service.zig b/src/services/http_service.zig index d17394a..f73e040 100644 --- a/src/services/http_service.zig +++ b/src/services/http_service.zig @@ -19,6 +19,8 @@ const ServiceError = s.ServiceError; const ServiceTicket = s.ServiceTicket; const ServiceHealth = s.ServiceHealth; +const CONN_PER_THREAD = 100; + pub const HttpService = struct { id: u64, name: []const u8, @@ -59,12 +61,12 @@ pub const HttpService = struct { if (final_addr == null) return ServiceError.Unreachable; - if (num_connections != 0 and num_connections % 10 != 0) { + if (num_connections != 0 and num_connections % CONN_PER_THREAD != 0) { return ServiceError.NumConnectionsShouldBeMultipleOfTen; } const num_tickets = num_connections * 2; - const num_threads = (num_connections / 10) + 2; + const num_threads = (num_connections / CONN_PER_THREAD) + 2; var connections = try allocator.alloc(ServiceConnection, num_connections); const tickets = try allocator.alloc(ServiceTicket, num_tickets); @@ -104,15 +106,15 @@ pub const HttpService = struct { pub fn start(self: *HttpService) !void { if (self.connections.len == 0) return; - var conn_i: usize = 10; - var msg_i: usize = 20; + var conn_i: usize = CONN_PER_THREAD; + var msg_i: usize = CONN_PER_THREAD * 2; for (0..self.threads.len - 2) |i| { - const connections = self.connections[conn_i - 10 .. conn_i]; - const tickets = self.tickets[msg_i - 20 .. msg_i]; + const connections = self.connections[conn_i - CONN_PER_THREAD .. conn_i]; + const tickets = self.tickets[msg_i - (CONN_PER_THREAD * 2) .. msg_i]; - conn_i += 10; - msg_i += 20; + conn_i += CONN_PER_THREAD; + msg_i += CONN_PER_THREAD * 2; self.threads[i] = try Thread.spawn(.{ .stack_size = 1024 * 64 }, HttpService.startMessenger, .{ self, @@ -192,7 +194,7 @@ pub const HttpService = struct { pub fn startMessenger(self: *HttpService, connections: []ServiceConnection, tickets: []ServiceTicket, thread_id: usize) void { while (true) { - Thread.sleep(10_000_000); + Thread.sleep(20_000_000); if (self.thread_stop[thread_id]) { var conn_open = false; @@ -354,7 +356,7 @@ test "expect init service if host exists and service is reachable" { } test "expect init service error if number of connections is not multiple of 10" { - var service = HttpService.init(1, "default", "one.one.one.one", 53, 97, testing.allocator) catch |err| { + var service = HttpService.init(1, "default", "one.one.one.one", 53, CONN_PER_THREAD + 1, testing.allocator) catch |err| { try expect(err == ServiceError.NumConnectionsShouldBeMultipleOfTen); return; }; @@ -365,7 +367,7 @@ test "expect init service error if number of connections is not multiple of 10" } test "expect use only disponible or closed connections to send the ticket" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator); defer service.deinit(); try service.connections[0].tryConnect(&service.address); @@ -383,7 +385,7 @@ test "expect use only disponible or closed connections to send the ticket" { } test "expect ticket status try_again if all connection ocuppied" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator); defer service.deinit(); for (service.connections) |*conn| { @@ -400,7 +402,7 @@ test "expect ticket status try_again if all connection ocuppied" { } test "expect ticket status success if response is success" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator); defer service.deinit(); var conn = &service.connections[0]; @@ -418,7 +420,7 @@ test "expect ticket status success if response is success" { } test "expect ticket status try_again if connection closed" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator); defer service.deinit(); var conn = &service.connections[0]; @@ -435,7 +437,7 @@ test "expect ticket status try_again if connection closed" { } test "expect ticket status try_again if response is http status error" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator); defer service.deinit(); var conn = &service.connections[0]; @@ -478,7 +480,7 @@ test "expect complete all ticket before thread stop" { } test "expect checkHealth update the service health: available" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator); defer service.deinit(); var conn = &service.health_connection; @@ -494,7 +496,7 @@ test "expect checkHealth update the service health: available" { } test "expect checkHealth update the service health: unavailable" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator); defer service.deinit(); var conn = &service.health_connection; @@ -510,7 +512,7 @@ test "expect checkHealth update the service health: unavailable" { } test "expect unavailable if any connection ocorrur" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 10, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 100, testing.allocator); defer service.deinit(); var conn = &service.health_connection; @@ -551,7 +553,7 @@ test "expect service start" { } test "expect queue the message in the right spot" { - var service = try HttpService.init(1, "default", "one.one.one.one", 53, 40, testing.allocator); + var service = try HttpService.init(1, "default", "one.one.one.one", 53, 400, testing.allocator); defer service.deinit(); for (0..23) |i| {