const std = @import("std"); const Thread = std.Thread; const Mutex = std.Thread.Mutex; const testing = std.testing; const expect = testing.expect; const log = std.log; const services = @import("services"); const payments = @import("data").payments; const DateTime = @import("things").DateTime; const is_test = @import("builtin").is_test; const MAX_QUEUE_SIZE = if (is_test) 100 else 500_000; const MAX_RETRY = if (is_test) 2 else 10_000_000; const PaymentIntegration = struct { payment: *payments.Payment, ticket: ?*services.ServiceTicket = null, is_integrated: bool = false, buffer: [512]u8 = undefined, retries: i32 = MAX_RETRY, pub fn getMessage(self: *PaymentIntegration) []u8 { const message_template = "POST /payments HTTP/1.1\r\nHost: 0.0.0.0\r\nAccept: application/json\r\nContent-Type: application/json\r\nContent-Length: 123\r\n\r\n{{ \"correlationId\": \"{s}\",\"amount\": {d: >10.2}, \"requestedAt\": \"{s}\" }}"; const iso_date = self.payment.requested_at.toIso(); return std.fmt.bufPrint(&self.buffer, message_template, .{ self.payment.id, self.payment.amount, iso_date }) catch &.{}; } }; var payment_integrator: PaymentIntegrator = undefined; pub const PaymentIntegrator = struct { service_pool: *services.ServicePool, queue: [MAX_QUEUE_SIZE]PaymentIntegration = undefined, head: usize = 0, tail: usize = 0, mutex: Mutex = undefined, thread: Thread = undefined, pub fn init(service_pool: *services.ServicePool) *PaymentIntegrator { payment_integrator.service_pool = service_pool; payment_integrator.mutex = Mutex{}; return &payment_integrator; } pub fn start(self: *PaymentIntegrator) !void { self.thread = try Thread.spawn(.{ .stack_size = 1024 * 64 }, PaymentIntegrator.startProcess, .{self}); } fn startProcess(self: *PaymentIntegrator) void { while (true) { Thread.sleep(1_000_000); self.verifyTailSize(); self.processPayments(); } } fn processPayments(self: *PaymentIntegrator) void { var not_integrated_found = false; for (self.tail..@min(self.head, MAX_QUEUE_SIZE)) |i| { var pi = &self.queue[i]; if (pi.is_integrated) { self.tail += if (not_integrated_found) 0 else 1; continue; } else { not_integrated_found = true; } if (pi.ticket == null) { if (pi.retries <= 0) { pi.payment.integration_status = .not_integrated; pi.is_integrated = true; // liar } else { pi.ticket = self.service_pool.dive(pi.getMessage()) catch null; pi.retries -= 1; } } else if (pi.ticket.?.status == .try_again) { pi.ticket.?.status = .ack; if (pi.retries <= 0) { pi.payment.integration_status = .not_integrated; pi.is_integrated = true; // liar } else { pi.ticket = self.service_pool.dive(pi.getMessage()) catch null; pi.retries -= 1; } } else if (pi.ticket.?.status == .new or pi.ticket.?.status == .pending) { continue; } else { pi.payment.integration_status = .processed; pi.payment.processed_by = pi.ticket.?.service_id; pi.is_integrated = true; pi.ticket.?.status = .ack; } } } fn verifyTailSize(self: *PaymentIntegrator) void { if (self.tail == MAX_QUEUE_SIZE) { self.mutex.lock(); self.tail = 0; self.head = self.head % MAX_QUEUE_SIZE; self.mutex.unlock(); } } pub fn newPaymentEvent(payment: *payments.Payment) void { payment_integrator.mutex.lock(); defer payment_integrator.mutex.unlock(); const head = payment_integrator.head % MAX_QUEUE_SIZE; payment_integrator.queue[head].payment = payment; payment_integrator.queue[head].is_integrated = false; payment_integrator.queue[head].ticket = null; payment_integrator.queue[head].retries = MAX_RETRY; payment_integrator.head += 1; } }; test "expect reset the tail if it is the size of the queue" { var service_pool = services.ServicePool{}; var integrator = PaymentIntegrator.init(&service_pool); integrator.tail = MAX_QUEUE_SIZE; integrator.head = MAX_QUEUE_SIZE + 5; integrator.verifyTailSize(); try expect(integrator.tail == 0); try expect(integrator.head == 5); } test "expect increment tail if payment is integrated" { var service_pool = services.ServicePool{}; var integrator = PaymentIntegrator.init(&service_pool); integrator.tail = 0; integrator.head = 10; var payment = payments.Payment{ .id = .{'a'} ** 36, .amount = 10, .requested_at = DateTime.now() }; for (0..integrator.head - 5) |i| { integrator.queue[i].is_integrated = true; integrator.queue[i].payment = &payment; } for (5..integrator.head) |i| { integrator.queue[i].is_integrated = if (i == 7) true else false; integrator.queue[i].retries = MAX_RETRY; integrator.queue[i].payment = &payment; } integrator.processPayments(); try expect(integrator.tail == 5); } test "expect change payment integration status to processed when success" { var service_pool = services.ServicePool{}; var integrator = PaymentIntegrator.init(&service_pool); integrator.tail = 0; integrator.head = 1; var payment = payments.Payment{ .id = .{'a'} ** 36, .amount = 10, .requested_at = DateTime.now() }; var ticket = services.ServiceTicket{ .content = undefined, .service_id = 1, .status = .success }; integrator.queue[0].payment = &payment; integrator.queue[0].ticket = &ticket; integrator.queue[0].is_integrated = false; integrator.processPayments(); try expect(integrator.queue[0].is_integrated); try expect(ticket.status == .ack); try expect(payment.integration_status == .processed); try expect(payment.processed_by == 1); } test "expect queue payment when event raise" { var service_pool = services.ServicePool{}; var integrator = PaymentIntegrator.init(&service_pool); integrator.tail = 0; integrator.head = 0; var payment = payments.Payment{ .id = .{'a'} ** 36, .amount = 10, .requested_at = DateTime.now() }; PaymentIntegrator.newPaymentEvent(&payment); try expect(integrator.head == 1); try expect(integrator.queue[0].payment == &payment); try expect(integrator.queue[0].ticket == null); try expect(integrator.queue[0].retries == MAX_RETRY); try expect(integrator.queue[0].is_integrated == false); } test "expect change payment integration status to not_integrated in a number of attempts" { var service_pool = services.ServicePool{}; var integrator = PaymentIntegrator.init(&service_pool); integrator.tail = 0; integrator.head = 0; var payment = payments.Payment{ .id = .{'a'} ** 36, .amount = 10, .requested_at = DateTime.now() }; PaymentIntegrator.newPaymentEvent(&payment); integrator.processPayments(); integrator.processPayments(); integrator.processPayments(); try expect(payment.integration_status == .not_integrated); try expect(integrator.queue[0].is_integrated); } test "expect get message" { var payment = payments.Payment{ .id = .{'a'} ** 36, .amount = 100000, .requested_at = DateTime.now() }; var payment_integration = PaymentIntegration{ .payment = &payment }; const message = payment_integration.getMessage(); try expect(message.len == 244); }