# Token 交易平台 P2 实施计划(Java Spring Boot 版) > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans. **Goal:** 实现平台托管代理模式 — Key 池化、智能路由、按量结算、掺水检测、平台保底。 **Architecture:** 在 P1 基础上增加代理层(Proxy Layer),所有托管请求通过统一 API endpoint 进入,由路由引擎分发到具体 key,用量实时计量并结算。 **Tech Stack:** Java 21, Spring Boot 3.2, Maven, MyBatis-Plus, MySQL 8, Redis, Sa-Token, WebClient (异步 HTTP 客户端) **前置依赖:** P0 和 P1 所有模块已完成 **模块依赖顺序:** ``` 模块1: 代理层基础(统一 endpoint + 请求转发) ↓ 模块2: Key 池化管理(存入/验证/状态监控) ↓ 模块3: 智能路由引擎(模型匹配 + 负载均衡 + 质量优先) ↓ 模块4: 实时用量计量(逐 token 精确计费) ↓ 模块5: 按量结算系统(Escrow + 实时清算) ↓ 模块6: 掺水检测与风控(余额监控 + 失败率检测 + 自动下线) ↓ 模块7: 平台保底 Key 池 ↓ 模块8: 高级用量分析与报表 ``` --- ## 模块1:代理层基础 ### Task 1.1: 统一代理 Endpoint **Files:** - Create: `src/main/java/com/danke/tokenexchange/proxy/ProxyController.java` - Create: `src/main/java/com/danke/tokenexchange/proxy/ProxyService.java` - [ ] **Step 1: 编写 ProxyController** ```java // src/main/java/com/danke/tokenexchange/proxy/ProxyController.java package com.danke.tokenexchange.proxy; import cn.dev33.satoken.annotation.SaCheckLogin; import cn.dev33.satoken.stp.StpUtil; import com.danke.tokenexchange.common.Result; import com.fasterxml.jackson.databind.JsonNode; import lombok.RequiredArgsConstructor; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; @RestController @RequestMapping("/api/v1/proxy") @RequiredArgsConstructor @SaCheckLogin public class ProxyController { private final ProxyService proxyService; @PostMapping(value = "/{model}/chat/completions", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux chatCompletions( @PathVariable String model, @RequestBody JsonNode requestBody) { Long userId = StpUtil.getLoginIdAsLong(); return proxyService.proxyRequest(userId, model, requestBody); } @PostMapping(value = "/{model}/embeddings") public Result embeddings( @PathVariable String model, @RequestBody JsonNode requestBody) { Long userId = StpUtil.getLoginIdAsLong(); String response = proxyService.proxyRequestSync(userId, model, requestBody, "/embeddings"); return Result.success(response); } } ``` - [ ] **Step 2: 编写 ProxyService 骨架** ```java // src/main/java/com/danke/tokenexchange/proxy/ProxyService.java package com.danke.tokenexchange.proxy; import com.fasterxml.jackson.databind.JsonNode; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; @Slf4j @Service @RequiredArgsConstructor public class ProxyService { private final WebClient webClient = WebClient.builder().build(); private final KeyPoolService keyPoolService; private final UsageMeterService usageMeterService; private final CreditService creditService; public Flux proxyRequest(Long userId, String model, JsonNode requestBody) { // 1. Check user has enough credits (rough estimate) // 2. Select a key from pool // 3. Forward request // 4. Stream response and meter usage // 5. Deduct credits return keyPoolService.selectKey(model) .flatMapMany(key -> { String targetUrl = key.getBaseUrl() + "/chat/completions"; return webClient.post() .uri(targetUrl) .header("Authorization", "Bearer " + key.getApiKey()) .header("Content-Type", "application/json") .bodyValue(requestBody) .retrieve() .bodyToFlux(String.class) .doOnNext(chunk -> { // Stream metering for SSE responses usageMeterService.recordStreamChunk(key.getId(), userId, chunk); }) .doOnComplete(() -> { // Final settlement long tokens = usageMeterService.getSessionUsage(key.getId(), userId); settleUsage(userId, key.getOwnerId(), key.getId(), tokens, model); }); }); } public String proxyRequestSync(Long userId, String model, JsonNode requestBody, String endpoint) { // Similar logic but synchronous for non-streaming endpoints return "not implemented"; } private void settleUsage(Long buyerId, Long sellerId, Long keyId, long tokens, String model) { // Calculate credits based on reference rate // Deduct from buyer, add to seller // For P2: simplified - use a fixed rate per token per model long credits = calculateCredits(tokens, model); creditService.deductCredits(buyerId, credits, "proxy_usage", String.valueOf(keyId), "代理调用 " + model); creditService.addCredits(sellerId, credits * 95 / 100, "proxy_earnings", String.valueOf(keyId), "代理收入 " + model); } private long calculateCredits(long tokens, String model) { // Simplified: 1 token = 1 credit * model factor // In real implementation, use PricingService reference rate return tokens; } } ``` - [ ] **Step 3: Commit** ```bash git add . git commit -m "feat(P2): add proxy layer with unified endpoint and WebClient" ``` --- ## 模块2:Key 池化管理 ### Task 2.1: 托管 Key 实体与管理 **Files:** - Create: `src/main/java/com/danke/tokenexchange/entity/HostedKey.java` - Create: `src/main/resources/db/migration/V11__create_hosted_keys.sql` - Create: `src/main/java/com/danke/tokenexchange/mapper/HostedKeyMapper.java` - Create: `src/main/java/com/danke/tokenexchange/service/KeyPoolService.java` - [ ] **Step 1: 编写 Flyway V11** ```sql -- V11__create_hosted_keys.sql CREATE TABLE IF NOT EXISTS hosted_keys ( id BIGINT AUTO_INCREMENT PRIMARY KEY, owner_id BIGINT NOT NULL COMMENT 'key提供者', model VARCHAR(100) NOT NULL, provider VARCHAR(50) NOT NULL, api_key_encrypted VARCHAR(2048) NOT NULL, status VARCHAR(20) DEFAULT 'active' COMMENT 'active, paused, depleted, flagged', initial_balance BIGINT DEFAULT 0 COMMENT '初始token余额', current_balance BIGINT DEFAULT 0, platform_usage BIGINT DEFAULT 0 COMMENT '平台已使用量', last_verified_at TIMESTAMP NULL, last_used_at TIMESTAMP NULL, failure_count INT DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_hk_owner (owner_id), INDEX idx_hk_model (model), INDEX idx_hk_status (status) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` - [ ] **Step 2: 编写 HostedKey 实体** ```java // src/main/java/com/danke/tokenexchange/entity/HostedKey.java package com.danke.tokenexchange.entity; import com.baomidou.mybatisplus.annotation.*; import lombok.Data; import java.time.LocalDateTime; @Data @TableName("hosted_keys") public class HostedKey { @TableId(type = IdType.AUTO) private Long id; private Long ownerId; private String model; private String provider; private String apiKeyEncrypted; private String status; private Long initialBalance; private Long currentBalance; private Long platformUsage; private LocalDateTime lastVerifiedAt; private LocalDateTime lastUsedAt; private Integer failureCount; private LocalDateTime createdAt; } ``` - [ ] **Step 3: 编写 KeyPoolService** ```java // src/main/java/com/danke/tokenexchange/service/KeyPoolService.java package com.danke.tokenexchange.service; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.danke.tokenexchange.entity.HostedKey; import com.danke.tokenexchange.mapper.HostedKeyMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @Slf4j @Service @RequiredArgsConstructor public class KeyPoolService { private final HostedKeyMapper hostedKeyMapper; public HostedKey selectKey(String model) { // Strategy: // 1. Find active keys for this model // 2. Exclude keys with high failure rate // 3. Prefer keys with higher remaining balance (cost priority) // 4. Load balance among candidates QueryWrapper wrapper = new QueryWrapper<>(); wrapper.eq("model", model) .eq("status", "active") .gt("current_balance", 1000) .lt("failure_count", 5) .orderByAsc("last_used_at"); // Least recently used List candidates = hostedKeyMapper.selectList(wrapper); if (candidates.isEmpty()) { throw new RuntimeException("No available key for model: " + model); } // Pick the least recently used (simple round-robin) HostedKey selected = candidates.get(0); selected.setLastUsedAt(LocalDateTime.now()); hostedKeyMapper.updateById(selected); return selected; } public void addKey(Long ownerId, String model, String provider, String encryptedKey, Long initialBalance) { HostedKey key = new HostedKey(); key.setOwnerId(ownerId); key.setModel(model); key.setProvider(provider); key.setApiKeyEncrypted(encryptedKey); key.setStatus("active"); key.setInitialBalance(initialBalance); key.setCurrentBalance(initialBalance); key.setPlatformUsage(0L); key.setFailureCount(0); hostedKeyMapper.insert(key); } public void updateBalance(Long keyId, Long newBalance) { HostedKey key = new HostedKey(); key.setId(keyId); key.setCurrentBalance(newBalance); hostedKeyMapper.updateById(key); } public void incrementFailure(Long keyId) { HostedKey key = hostedKeyMapper.selectById(keyId); if (key != null) { key.setFailureCount(key.getFailureCount() + 1); if (key.getFailureCount() >= 10) { key.setStatus("flagged"); } hostedKeyMapper.updateById(key); } } public void pauseKey(Long keyId) { HostedKey key = new HostedKey(); key.setId(keyId); key.setStatus("paused"); hostedKeyMapper.updateById(key); } public void resumeKey(Long keyId) { HostedKey key = new HostedKey(); key.setId(keyId); key.setStatus("active"); key.setFailureCount(0); hostedKeyMapper.updateById(key); } } ``` - [ ] **Step 4: Commit** ```bash git add . git commit -m "feat(P2): add hosted key pool with LRU selection strategy" ``` --- ## 模块3:智能路由引擎 ### Task 3.1: 路由策略实现 **Files:** - Create: `src/main/java/com/danke/tokenexchange/proxy/RoutingStrategy.java` - Create: `src/main/java/com/danke/tokenexchange/proxy/WeightedRoundRobinRouter.java` - [ ] **Step 1: 编写路由策略接口** ```java // src/main/java/com/danke/tokenexchange/proxy/RoutingStrategy.java package com.danke.tokenexchange.proxy; import com.danke.tokenexchange.entity.HostedKey; import java.util.List; public interface RoutingStrategy { HostedKey select(String model, List candidates); } ``` - [ ] **Step 2: 编写加权轮询路由** ```java // src/main/java/com/danke/tokenexchange/proxy/WeightedRoundRobinRouter.java package com.danke.tokenexchange.proxy; import com.danke.tokenexchange.entity.HostedKey; import org.springframework.stereotype.Component; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @Component public class WeightedRoundRobinRouter implements RoutingStrategy { private final ConcurrentHashMap counters = new ConcurrentHashMap<>(); @Override public HostedKey select(String model, List candidates) { if (candidates.isEmpty()) return null; if (candidates.size() == 1) return candidates.get(0); // Weight by remaining balance (more balance = more weight) long totalWeight = candidates.stream() .mapToLong(k -> Math.max(k.getCurrentBalance(), 1)) .sum(); AtomicInteger counter = counters.computeIfAbsent(model, k -> new AtomicInteger(0)); int index = counter.getAndIncrement() % candidates.size(); return candidates.get(index); } } ``` - [ ] **Step 3: Commit** ```bash git add . git commit -m "feat(P2): add weighted round-robin routing strategy" ``` --- ## 模块4:实时用量计量 ### Task 4.1: 用量计量 Service **Files:** - Create: `src/main/java/com/danke/tokenexchange/proxy/UsageMeterService.java` - [ ] **Step 1: 编写 UsageMeterService** ```java // src/main/java/com/danke/tokenexchange/proxy/UsageMeterService.java package com.danke.tokenexchange.proxy; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.concurrent.TimeUnit; @Slf4j @Service public class UsageMeterService { private final StringRedisTemplate redisTemplate; private final ObjectMapper objectMapper = new ObjectMapper(); public UsageMeterService(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } public void recordStreamChunk(Long keyId, Long userId, String chunk) { String sessionKey = String.format("usage:%d:%d", keyId, userId); try { // For SSE format, each chunk may contain usage info at the end if (chunk.contains("\"usage\"")) { JsonNode node = objectMapper.readTree(chunk); JsonNode usage = node.get("usage"); if (usage != null) { long promptTokens = usage.has("prompt_tokens") ? usage.get("prompt_tokens").asLong() : 0; long completionTokens = usage.has("completion_tokens") ? usage.get("completion_tokens").asLong() : 0; long total = promptTokens + completionTokens; redisTemplate.opsForValue().increment(sessionKey + ":tokens", total); redisTemplate.expire(sessionKey + ":tokens", 1, TimeUnit.HOURS); } } } catch (Exception e) { // Ignore parsing errors for non-JSON chunks } } public long getSessionUsage(Long keyId, Long userId) { String sessionKey = String.format("usage:%d:%d", keyId, userId); String value = redisTemplate.opsForValue().get(sessionKey + ":tokens"); return value != null ? Long.parseLong(value) : 0; } public void clearSession(Long keyId, Long userId) { String sessionKey = String.format("usage:%d:%d", keyId, userId); redisTemplate.delete(sessionKey + ":tokens"); } } ``` - [ ] **Step 2: Commit** ```bash git add . git commit -m "feat(P2): add real-time usage metering with Redis session tracking" ``` --- ## 模块5:按量结算系统 ### Task 5.1: Escrow 与实时清算 **Files:** - Create: `src/main/java/com/danke/tokenexchange/entity/EscrowAccount.java` - Create: `src/main/resources/db/migration/V12__create_escrow.sql` - Create: `src/main/java/com/danke/tokenexchange/service/EscrowService.java` - [ ] **Step 1: 编写 Flyway V12** ```sql -- V12__create_escrow.sql CREATE TABLE IF NOT EXISTS escrow_accounts ( id BIGINT AUTO_INCREMENT PRIMARY KEY, buyer_id BIGINT NOT NULL, seller_id BIGINT NOT NULL, key_id BIGINT NOT NULL, allocated_credits BIGINT DEFAULT 0 COMMENT '预分配积分', consumed_credits BIGINT DEFAULT 0 COMMENT '已消耗积分', status VARCHAR(20) DEFAULT 'active' COMMENT 'active, exhausted, released', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expires_at TIMESTAMP NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` - [ ] **Step 2: 编写 EscrowService** ```java // src/main/java/com/danke/tokenexchange/service/EscrowService.java package com.danke.tokenexchange.service; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.danke.tokenexchange.common.exception.BusinessException; import com.danke.tokenexchange.entity.EscrowAccount; import com.danke.tokenexchange.mapper.EscrowMapper; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; @Service @RequiredArgsConstructor public class EscrowService { private final EscrowMapper escrowMapper; private final CreditService creditService; @Transactional public EscrowAccount createEscrow(Long buyerId, Long sellerId, Long keyId, Long allocatedCredits) { Long balance = creditService.getBalance(buyerId); if (balance < allocatedCredits) { throw new BusinessException(1004, "积分余额不足"); } creditService.deductCredits(buyerId, allocatedCredits, "escrow", "", "购买调用配额"); EscrowAccount escrow = new EscrowAccount(); escrow.setBuyerId(buyerId); escrow.setSellerId(sellerId); escrow.setKeyId(keyId); escrow.setAllocatedCredits(allocatedCredits); escrow.setConsumedCredits(0L); escrow.setStatus("active"); escrow.setExpiresAt(LocalDateTime.now().plusDays(30)); escrowMapper.insert(escrow); return escrow; } @Transactional public void settleUsage(Long escrowId, Long consumedCredits) { EscrowAccount escrow = escrowMapper.selectById(escrowId); if (escrow == null || !"active".equals(escrow.getStatus())) { return; } long newConsumed = escrow.getConsumedCredits() + consumedCredits; if (newConsumed > escrow.getAllocatedCredits()) { newConsumed = escrow.getAllocatedCredits(); } escrow.setConsumedCredits(newConsumed); // Settle to seller (minus 5% platform fee) long toSettle = consumedCredits * 95 / 100; creditService.addCredits(escrow.getSellerId(), toSettle, "proxy_earnings", String.valueOf(escrow.getKeyId()), "按量结算"); if (newConsumed >= escrow.getAllocatedCredits()) { escrow.setStatus("exhausted"); // Release remaining to buyer long remaining = escrow.getAllocatedCredits() - newConsumed; if (remaining > 0) { creditService.addCredits(escrow.getBuyerId(), remaining, "escrow_release", "", "配额未用完退还"); } } escrowMapper.updateById(escrow); } @Transactional public void releaseEscrow(Long escrowId) { EscrowAccount escrow = escrowMapper.selectById(escrowId); if (escrow == null || !"active".equals(escrow.getStatus())) { return; } long remaining = escrow.getAllocatedCredits() - escrow.getConsumedCredits(); if (remaining > 0) { creditService.addCredits(escrow.getBuyerId(), remaining, "escrow_release", "", "配额释放退还"); } escrow.setStatus("released"); escrowMapper.updateById(escrow); } } ``` - [ ] **Step 3: Commit** ```bash git add . git commit -m "feat(P2): add escrow system with usage-based settlement" ``` --- ## 模块6:掺水检测与风控 ### Task 6.1: 掺水检测 Service **Files:** - Create: `src/main/java/com/danke/tokenexchange/service/DilutionDetectionService.java` - [ ] **Step 1: 编写掺水检测 Service** ```java // src/main/java/com/danke/tokenexchange/service/DilutionDetectionService.java package com.danke.tokenexchange.service; import com.danke.tokenexchange.entity.HostedKey; import com.danke.tokenexchange.mapper.HostedKeyMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.util.List; @Slf4j @Service @RequiredArgsConstructor public class DilutionDetectionService { private final HostedKeyMapper hostedKeyMapper; private final VerificationService verificationService; private final KeyPoolService keyPoolService; // Monitor every 5 minutes @Scheduled(fixedRate = 300000) public void monitorKeys() { List activeKeys = hostedKeyMapper.selectList( new com.baomidou.mybatisplus.core.conditions.query.QueryWrapper() .eq("status", "active")); for (HostedKey key : activeKeys) { checkKeyHealth(key); } } private void checkKeyHealth(HostedKey key) { // Check 1: Platform usage vs balance change long platformUsed = key.getPlatformUsage(); long balanceDrop = key.getInitialBalance() - key.getCurrentBalance(); if (balanceDrop > platformUsed * 1.15) { // More than 15% external usage detected log.warn("Key {} dilution detected: platform used {}, balance dropped {}", key.getId(), platformUsed, balanceDrop); flagKey(key, String.format("掺水检测: 平台使用 %d, 余额下降 %d, 差额 %.1f%%", platformUsed, balanceDrop, (balanceDrop - platformUsed) * 100.0 / platformUsed)); } // Check 2: Failure rate if (key.getFailureCount() >= 5) { log.warn("Key {} high failure rate: {}", key.getId(), key.getFailureCount()); keyPoolService.pauseKey(key.getId()); } // Check 3: Balance exhaustion if (key.getCurrentBalance() <= 0) { log.info("Key {} depleted", key.getId()); HostedKey update = new HostedKey(); update.setId(key.getId()); update.setStatus("depleted"); hostedKeyMapper.updateById(update); } } private void flagKey(HostedKey key, String reason) { HostedKey update = new HostedKey(); update.setId(key.getId()); update.setStatus("flagged"); hostedKeyMapper.updateById(update); // TODO: Send notification to key owner log.warn("Key {} flagged: {}", key.getId(), reason); } } ``` - [ ] **Step 2: 启用定时任务** ```java // In TokenExchangeApplication.java, add: import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling @MapperScan("com.danke.tokenexchange.mapper") public class TokenExchangeApplication { // ... } ``` - [ ] **Step 3: Commit** ```bash git add . git commit -m "feat(P2): add dilution detection with scheduled monitoring" ``` --- ## 模块7:平台保底 Key 池 ### Task 7.1: 保底 Key 管理 **Files:** - Create: `src/main/java/com/danke/tokenexchange/entity/FallbackKey.java` - Create: `src/main/resources/db/migration/V13__create_fallback_keys.sql` - Create: `src/main/java/com/danke/tokenexchange/service/FallbackKeyService.java` - [ ] **Step 1: 编写 Flyway V13** ```sql -- V13__create_fallback_keys.sql CREATE TABLE IF NOT EXISTS fallback_keys ( id BIGINT AUTO_INCREMENT PRIMARY KEY, model VARCHAR(100) NOT NULL, provider VARCHAR(50) NOT NULL, api_key_encrypted VARCHAR(2048) NOT NULL, status VARCHAR(20) DEFAULT 'active', priority INT DEFAULT 0 COMMENT '使用优先级,数字越小越优先', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ``` - [ ] **Step 2: 编写 FallbackKeyService** ```java // src/main/java/com/danke/tokenexchange/service/FallbackKeyService.java package com.danke.tokenexchange.service; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.danke.tokenexchange.entity.FallbackKey; import com.danke.tokenexchange.entity.HostedKey; import com.danke.tokenexchange.mapper.FallbackKeyMapper; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import java.util.List; @Service @RequiredArgsConstructor public class FallbackKeyService { private final FallbackKeyMapper fallbackKeyMapper; public HostedKey getFallbackKey(String model) { List fallbacks = fallbackKeyMapper.selectList( new QueryWrapper() .eq("model", model) .eq("status", "active") .orderByAsc("priority")); if (fallbacks.isEmpty()) { return null; } FallbackKey fb = fallbacks.get(0); // Convert to HostedKey format for uniform handling HostedKey key = new HostedKey(); key.setId(-fb.getId()); // Negative ID to distinguish from user keys key.setOwnerId(0L); // Platform owner key.setModel(fb.getModel()); key.setProvider(fb.getProvider()); key.setApiKeyEncrypted(fb.getApiKeyEncrypted()); key.setStatus("active"); key.setCurrentBalance(Long.MAX_VALUE); // Unlimited return key; } } ``` - [ ] **Step 3: Commit** ```bash git add . git commit -m "feat(P2): add platform fallback key pool for availability guarantee" ``` --- ## 模块8:高级用量分析与报表 ### Task 8.1: 用量统计 Service **Files:** - Create: `src/main/java/com/danke/tokenexchange/service/AnalyticsService.java` - Create: `src/main/java/com/danke/tokenexchange/controller/AnalyticsController.java` - [ ] **Step 1: 编写 AnalyticsService** ```java // src/main/java/com/danke/tokenexchange/service/AnalyticsService.java package com.danke.tokenexchange.service; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.danke.tokenexchange.entity.CreditTransaction; import com.danke.tokenexchange.entity.HostedKey; import com.danke.tokenexchange.entity.Order; import com.danke.tokenexchange.mapper.CreditTransactionMapper; import com.danke.tokenexchange.mapper.HostedKeyMapper; import com.danke.tokenexchange.mapper.OrderMapper; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; @Service @RequiredArgsConstructor public class AnalyticsService { private final OrderMapper orderMapper; private final CreditTransactionMapper creditTransactionMapper; private final HostedKeyMapper hostedKeyMapper; public Map getPlatformStats() { Map stats = new HashMap<>(); // Total orders long totalOrders = orderMapper.selectCount(null); long completedOrders = orderMapper.selectCount( new QueryWrapper().eq("status", "completed")); long disputedOrders = orderMapper.selectCount( new QueryWrapper().eq("status", "disputed")); stats.put("total_orders", totalOrders); stats.put("completed_orders", completedOrders); stats.put("disputed_orders", disputedOrders); stats.put("dispute_rate", totalOrders > 0 ? (double) disputedOrders / totalOrders : 0); // Volume (last 7 days) LocalDateTime weekAgo = LocalDateTime.now().minusDays(7); List weekTx = creditTransactionMapper.selectList( new QueryWrapper() .ge("created_at", weekAgo) .eq("type", "trade")); long weekVolume = weekTx.stream().mapToLong(CreditTransaction::getAmount).sum(); stats.put("week_volume", weekVolume); stats.put("week_transactions", weekTx.size()); // Active keys long activeKeys = hostedKeyMapper.selectCount( new QueryWrapper().eq("status", "active")); stats.put("active_hosted_keys", activeKeys); return stats; } public Map getUserStats(Long userId) { Map stats = new HashMap<>(); // Orders as seller long sellOrders = orderMapper.selectCount( new QueryWrapper().eq("seller_id", userId)); // Orders as buyer long buyOrders = orderMapper.selectCount( new QueryWrapper().eq("buyer_id", userId)); stats.put("sell_orders", sellOrders); stats.put("buy_orders", buyOrders); // Earnings/spending List txs = creditTransactionMapper.selectList( new QueryWrapper().eq("user_id", userId)); long totalIn = txs.stream().filter(t -> t.getAmount() > 0).mapToLong(CreditTransaction::getAmount).sum(); long totalOut = txs.stream().filter(t -> t.getAmount() < 0).mapToLong(t -> -t.getAmount()).sum(); stats.put("total_earnings", totalIn); stats.put("total_spending", totalOut); return stats; } } ``` - [ ] **Step 2: 编写 AnalyticsController** ```java // src/main/java/com/danke/tokenexchange/controller/AnalyticsController.java package com.danke.tokenexchange.controller; import cn.dev33.satoken.annotation.SaCheckLogin; import cn.dev33.satoken.stp.StpUtil; import com.danke.tokenexchange.common.Result; import com.danke.tokenexchange.service.AnalyticsService; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Map; @RestController @RequestMapping("/api/v1/analytics") @RequiredArgsConstructor @SaCheckLogin public class AnalyticsController { private final AnalyticsService analyticsService; @GetMapping("/platform") public Result getPlatformStats() { Map stats = analyticsService.getPlatformStats(); return Result.success(stats); } @GetMapping("/user") public Result getUserStats() { Long userId = StpUtil.getLoginIdAsLong(); Map stats = analyticsService.getUserStats(userId); return Result.success(stats); } } ``` - [ ] **Step 3: Commit** ```bash git add . git commit -m "feat(P2): add analytics dashboard with platform and user stats" ``` --- ## P2 总结 | 模块 | 新增能力 | |------|---------| | 代理层 | 统一 endpoint、WebClient 异步转发、SSE 流式响应 | | Key 池化 | 存入/验证/LRU 选择/状态监控 | | 智能路由 | 加权轮询、负载均衡 | | 用量计量 | Redis 会话级 token 计数、逐 chunk 计量 | | Escrow 结算 | 预分配积分、实时清算、余额退还 | | 掺水检测 | 5分钟定时监控、差额检测、失败率检测、自动下线 | | 保底 Key | 平台自有 key 池、优先级调度 | | 用量分析 | 平台统计、用户统计、交易量分析 | --- **P2 计划完成。**