const std = @import("std"); const log = std.log; const Thread = std.Thread; const is_test = @import("builtin").is_test; const testing = std.testing; const expect = testing.expect; const FakeStram = @import("helpers").FakeStream; const things = @import("things"); const DateTime = things.DateTime; const data = @import("data"); const ExchangeConnection = @import("exchange_connection.zig").ExchangeConnection; const PaymentsRepository = data.payments.PaymentsRepository; const Payment = data.payments.Payment; const PaymentsIntegrationSummary = data.payments.PaymentsIntegrationSummary; const REQUEST_SIZE = @sizeOf(?DateTime) * 2; const RESPONSE_SIZE = @sizeOf(PaymentsIntegrationSummary); pub const PaymentSummaryExchange = struct { payment_repository: *PaymentsRepository, exchange_connection: *ExchangeConnection, pub fn init(payment_repository: *PaymentsRepository, exchange_connection: *ExchangeConnection) PaymentSummaryExchange { return PaymentSummaryExchange{ .payment_repository = payment_repository, .exchange_connection = exchange_connection, }; } pub fn start(self: *PaymentSummaryExchange) void { self.exchange_connection.start(); _ = Thread.spawn(.{ .stack_size = 1024 * 16 }, PaymentSummaryExchange.give, .{self}) catch |err| { log.err("Was not possible create thread to give {any}", .{err}); unreachable; }; } pub fn give(self: *PaymentSummaryExchange) void { var buf: [REQUEST_SIZE]u8 = undefined; var conn = self.exchange_connection; while (true) { while (conn.server_status == .active) { const size = conn.in.readAtLeast(&buf, buf.len) catch 0; if (size == 0) { conn.server_status = .waiting; conn.in.close(); break; } var summary = self.getPaymentSummary(&buf); const summary_as_bytes = std.mem.asBytes(&summary); _ = conn.in.writeAll(summary_as_bytes) catch { conn.server_status = .waiting; conn.in.close(); break; }; if (is_test) { return; } } Thread.sleep(1_000_000 * 1000 * 5); } } fn getPaymentSummary(self: *PaymentSummaryExchange, payload: []u8) PaymentsIntegrationSummary { const from: *?DateTime = @ptrCast(@alignCast(payload[0..@sizeOf(?DateTime)])); const to: *?DateTime = @ptrCast(@alignCast(payload[@sizeOf(?DateTime)..])); return self.payment_repository.integrationSummary(from.*, to.*); } pub fn receive(self: *PaymentSummaryExchange, from: ?DateTime, to: ?DateTime) PaymentsIntegrationSummary { const conn = self.exchange_connection; blk: { if (conn.out_status != .active) { break :blk; } var payload: [REQUEST_SIZE]u8 = undefined; var buf_res: [RESPONSE_SIZE]u8 = undefined; @memcpy(payload[0 .. REQUEST_SIZE / 2], std.mem.asBytes(&from)); @memcpy(payload[REQUEST_SIZE / 2 ..], std.mem.asBytes(&to)); conn.out.writeAll(&payload) catch { break :blk; }; const size_read: usize = conn.out.readAtLeast(&buf_res, buf_res.len) catch 0; if (size_read == 0) { break :blk; } const summary: *PaymentsIntegrationSummary = @ptrCast(@alignCast(&buf_res)); return summary.*; } if (conn.out_status == .active) { conn.out.close(); conn.out_status = .dead; } return PaymentsIntegrationSummary{ .default_total_payments_processed = 0, .default_total_value = 0.0, .fallback_total_payments_processed = 0, .fallback_total_value = 0.0, }; } }; fn initAndPopulateRepository() !PaymentsRepository { var repository = try PaymentsRepository.init(testing.allocator, 100); const payment_1 = Payment{ .id = .{'a'} ** 36, .amount = 23, .requested_at = try DateTime.ParseFromIso("2025-08-07T11:00:00.000Z"), .integration_status = .processed, .processed_by = 1, }; const payment_2 = Payment{ .id = .{'a'} ** 36, .amount = 10, .requested_at = try DateTime.ParseFromIso("2025-08-07T11:00:00.101Z"), .integration_status = .processed, .processed_by = 1, }; const payment_3 = Payment{ .id = .{'a'} ** 36, .amount = 15, .requested_at = try DateTime.ParseFromIso("2025-08-06T00:00:00.000Z"), .integration_status = .processed, .processed_by = 2, }; try repository.insert(payment_1); try repository.insert(payment_2); try repository.insert(payment_3); return repository; } test "expect give full summary when no date" { var repository = try initAndPopulateRepository(); defer repository.deinit(); var conn = try ExchangeConnection.init("uchoamp.dev", 80); var exchange = PaymentSummaryExchange.init(&repository, &conn); var buf: [REQUEST_SIZE]u8 = .{0} ** REQUEST_SIZE; const summary = exchange.getPaymentSummary(&buf); try expect(summary.default_total_payments_processed == 2); try expect(summary.default_total_value == 33); try expect(summary.fallback_total_payments_processed == 1); try expect(summary.fallback_total_value == 15); } test "expect summary with only payments after from date" { var repository = try initAndPopulateRepository(); defer repository.deinit(); var conn = try ExchangeConnection.init("uchoamp.dev", 80); var exchange = PaymentSummaryExchange.init(&repository, &conn); var buf: [REQUEST_SIZE]u8 = .{0} ** REQUEST_SIZE; const from: *?DateTime = @ptrCast(@alignCast(buf[0..@sizeOf(?DateTime)])); from.* = try DateTime.ParseFromIso("2025-08-07T00:00:00.000Z"); const summary = exchange.getPaymentSummary(&buf); try expect(summary.default_total_payments_processed == 2); try expect(summary.default_total_value == 33); try expect(summary.fallback_total_payments_processed == 0); try expect(summary.fallback_total_value == 0); } test "expect receive return the correct summary" { var fake_stream_buf: [1024]u8 = undefined; var fake_stream = FakeStram.init(&fake_stream_buf); var repository = try initAndPopulateRepository(); defer repository.deinit(); var conn = try ExchangeConnection.init("uchoamp.dev", 80); conn.in = &fake_stream; conn.out = &fake_stream; var exchange = PaymentSummaryExchange.init(&repository, &conn); var buf: [REQUEST_SIZE]u8 = .{0} ** REQUEST_SIZE; const from: *?DateTime = @ptrCast(@alignCast(buf[0..@sizeOf(?DateTime)])); from.* = try DateTime.ParseFromIso("2025-08-07T11:00:00.000Z"); const to: *?DateTime = @ptrCast(@alignCast(buf[@sizeOf(?DateTime)..])); to.* = try DateTime.ParseFromIso("2025-08-07T11:00:00.100Z"); _ = try fake_stream.write(&buf); conn.server_status = .active; exchange.give(); fake_stream.not_write = true; conn.out_status = .active; const summary = exchange.receive(from.*, to.*); try expect(summary.default_total_payments_processed == 1); try expect(summary.default_total_value == 23); try expect(summary.fallback_total_payments_processed == 0); try expect(summary.fallback_total_value == 0); } test "expect receive return summary with zeros if connection error" { var fake_stream_buf: [1024]u8 = undefined; var fake_stream = FakeStram.init(&fake_stream_buf); var repository = try initAndPopulateRepository(); defer repository.deinit(); var conn = try ExchangeConnection.init("uchoamp.dev", 80); conn.in = &fake_stream; conn.out = &fake_stream; var exchange = PaymentSummaryExchange.init(&repository, &conn); var buf: [REQUEST_SIZE]u8 = .{0} ** REQUEST_SIZE; const from: *?DateTime = @ptrCast(@alignCast(buf[0..@sizeOf(?DateTime)])); from.* = try DateTime.ParseFromIso("2025-08-07T11:00:00.000Z"); const to: *?DateTime = @ptrCast(@alignCast(buf[@sizeOf(?DateTime)..])); to.* = try DateTime.ParseFromIso("2025-08-07T11:00:00.100Z"); _ = try fake_stream.write(&buf); conn.server_status = .active; exchange.give(); fake_stream._error = true; conn.out_status = .active; const summary = exchange.receive(from.*, null); try expect(summary.default_total_payments_processed == 0); try expect(summary.default_total_value == 0); try expect(summary.fallback_total_payments_processed == 0); try expect(summary.fallback_total_value == 0); }