功能迭代时间线

第一版:基础同步功能(直接保存,问题多多)
第二版:增加去重逻辑(解决重复数据问题)
第三版:增加平台推送功能(但是同步推送,很慢)
第四版:引入异步处理(性能大幅提升)
第五版:优化推送去重逻辑(避免重复推送)
第六版:增加缓存优化(减少90%重复调用)
第七版:增加定时任务支持(实现自动化)


业务流程图

graph TD A[开始] --> B{触发方式} B -->|定时任务| C[execute方法<br/>Quartz调度] B -->|手动调用| D[syncPolicy方法<br/>API调用] B -->|批量推送| E[pushByCodes方法<br/>指定编码推送] C --> F[获取任务参数<br/>JobDataMap] F --> G[调用syncPolicy] D --> G G --> H[pullData<br/>拉取政策数据] H --> I[构建请求参数<br/>PolicyReqDTO] I --> J[特殊区划处理<br/>330299000→330200000] J --> K[HTTP POST请求<br/>BSRequestUtil] K --> L[响应数据处理<br/>区划代码还原] L --> M{数据是否为空?} M -->|是| N[抛出异常<br/>数据为空] M -->|否| O[filterNoSave<br/>过滤已存在数据] O --> P[查询数据库<br/>PAY_NBYB_AGENCYPOLICYRELATION] P --> Q[Stream过滤<br/>双重条件匹配] Q --> R[设置政策类型<br/>COMPANY] R --> S{过滤后有数据?} S -->|否| T[结束流程<br/>无新数据] S -->|是| U[filterPushData<br/>推送数据去重] U --> V[内存去重<br/>HashSet去重hold15_code] V --> W[数据库查重<br/>IN查询已存在政策] W --> X[计算差集<br/>需要推送的政策] X --> Y[save方法<br/>批量保存数据] Y --> Z[generalDAO.batchSaveDataBySql<br/>数据库批量插入] Z --> AA{推送数据是否为空?} AA -->|是| BB[记录日志<br/>政策已存在] AA -->|否| CC[异步推送循环] CC --> DD[getMofAndEleInfo<br/>获取区划要素信息] DD --> EE{缓存中是否存在?} EE -->|是| FF[返回缓存数据<br/>Triple对象] EE -->|否| GG[HTTP GET请求<br/>查询区划信息] GG --> HH[HTTP GET请求<br/>查询枚举类型] HH --> II[构建Triple对象<br/>存入缓存] II --> FF FF --> JJ[CompletableFuture.runAsync<br/>异步执行推送] JJ --> KK[pushData方法<br/>推送到平台] KK --> LL[构建推送数据<br/>PlatReqDTO] LL --> MM[设置时间范围<br/>当前登录年度] MM --> NN[JSON序列化<br/>FastJSON] NN --> OO[HTTP POST推送<br/>cruxUtil.requestPostUrl] OO --> PP[记录推送日志<br/>成功/失败] E --> QQ[字符串分割<br/>逗号分隔编码] QQ --> RR[数据库查询<br/>根据编码查询政策] RR --> SS[循环异步推送<br/>每个政策独立推送] SS --> DD BB --> TT[流程结束] PP --> TT N --> TT T --> TT style A fill:#e1f5fe style G fill:#f3e5f5 style H fill:#fff3e0 style O fill:#fff3e0 style U fill:#fff3e0 style Y fill:#e8f5e8 style DD fill:#fff8e1 style KK fill:#fce4ec style TT fill:#ffebee classDef processBox fill:#e3f2fd,stroke:#1976d2,stroke-width:2px classDef dataBox fill:#f1f8e9,stroke:#689f38,stroke-width:2px classDef asyncBox fill:#fce4ec,stroke:#c2185b,stroke-width:2px classDef cacheBox fill:#fff8e1,stroke:#f57c00,stroke-width:2px class G,O,U,Y processBox class H,P,Z dataBox class JJ,KK asyncBox class DD,EE,FF cacheBox

数据流程时序图

sequenceDiagram participant Client as 客户端/定时器 participant Controller as SyncPolicyController participant Service as SyncPolicyServiceBO participant External as 外部政策系统 participant Cache as Guava Cache participant DB as 数据库 participant Platform as 业务平台 participant ThreadPool as 线程池 Note over Client: 触发同步操作 Client->>Controller: POST /sync/policy Controller->>Service: syncPolicy(params) Note over Service: 第一步:拉取数据 Service->>External: pullData() HTTP POST External-->>Service: 返回政策列表 PolicyResDTO[] Note over Service: 第二步:过滤已存在数据 Service->>DB: filterNoSave() 查询已存在政策 DB-->>Service: 返回数据库中的政策 Service->>Service: Stream过滤去重 Note over Service: 第三步:过滤推送数据 Service->>Service: filterPushData() 内存去重 Service->>DB: 查询数据库已存在政策 DB-->>Service: 返回已存在政策编码 Service->>Service: 计算需要推送的差集 Note over Service: 第四步:批量保存 Service->>DB: save() batchSaveDataBySql DB-->>Service: 保存成功 Note over Service: 第五步:异步推送循环 loop 每个需要推送的政策 Service->>Cache: getMofAndEleInfo() 查询缓存 alt 缓存命中 Cache-->>Service: 返回缓存的Triple数据 else 缓存未命中 Service->>Platform: HTTP GET 查询区划信息 Platform-->>Service: 返回区划ID Service->>Platform: HTTP GET 查询枚举类型 Platform-->>Service: 返回类型信息 Service->>Cache: 存入缓存(3小时过期) Cache-->>Service: 返回Triple数据 end Service->>ThreadPool: CompletableFuture.runAsync() Note over ThreadPool: 异步执行pushData() ThreadPool->>Platform: HTTP POST 推送政策 Platform-->>ThreadPool: 推送结果 end Service-->>Controller: 同步完成 Controller-->>Client: 返回 "success" Note over ThreadPool: 并发推送任务继续执行 rect rgb(255, 245, 245) Note over Service, DB: 同步操作区域 - 保证数据一致性 end rect rgb(245, 255, 245) Note over ThreadPool, Platform: 异步操作区域 - 提高响应性能 end rect rgb(245, 245, 255) Note over Cache: 缓存优化区域 - 减少重复调用 end

代码详解

@Slf4j
@Service
public class SyncPolicyServiceBO implements Job, SyncPolicyService {

    @Autowired
    private GeneralDAO generalDAO;

    @Autowired
    private CruxUrlConfig urlConfig;

    @Autowired
    private CruxUtil cruxUtil;

    private static Cache<String, Triple<String, String, String>> mofCache = CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.HOURS).build();

    private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 10,TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("syncPolicy-pool-%d").build());

类继承了quartz的定时任务接口,然后注入了三个类,三个都是内部封装的类,作用分别是,操控数据库,封装url配置和http请求相关类。
接下来声明了一个Cache类型,key是string类型,value是一个三元组Triple包含三个 String 值:第一个 String: mofDivId - 财政区划的唯一标识ID,第二个 String: typeId - 要素类型的唯一标识ID,第三个 String: lastModifiedVersion - 最后修改版本号,CacheBuilder.newBuilder(),创建一个新的缓存构建器,使用建造者模式来配置缓存的各种参数,然后expireAfterWrite(3, TimeUnit.HOURS)是缓存过期策略配置,expireAfterWrite: 基于写入时间的过期策略,表示缓存项在写入后经过指定时间就会过期,3: 过期时间的数值,TimeUnit.HOURS: 时间单位,build()根据前面配置的参数构建并返回 Cache 实例,

然后采用的是Executor框架创建ThreadPoolExecutor 构造函数的corePoolSize = 5,线程池中始终保持活跃的线程数量,即使这些线程处于空闲状态,也不会被回收,当有任务提交时,如果当前线程数少于5个,会创建新线程来处理任务,实际意义: 这个政策同步服务保证至少有5个线程随时待命,maximumPoolSize = 10,线程池中允许存在的最大线程数量,当任务很多时,线程池最多可以扩展到10个线程,超过这个数量的任务会被放入队列等待,实际意义: 政策推送高峰期最多同时运行10个推送任务,keepAliveTime = 10,当线程数超过核心线程数时,多余的线程在空闲状态下的最大存活时间,new LinkedBlockingQueue<>(100),LinkedBlockingQueue: 基于链表的阻塞队列,容量100: 队列最多可以存放100个待处理的任务,阻塞特性: 当队列满时,新提交的任务会被阻塞,new ThreadFactoryBuilder().setNameFormat("syncPolicy-pool-%d").build();setNameFormat("syncPolicy-pool-%d"): 设置线程命名格式;%d: 会被替换为线程编号

execute方法

@Override
    @SneakyThrows
    public void execute(JobExecutionContext ctx) throws JobExecutionException {
        JobDataMap taskParams = ctx.getJobDetail().getJobDataMap();
        String reqUrl = taskParams.getString("reqUrl");
        Map<String, Object> params = new HashMap<>();
        params.put("reqUrl", reqUrl);
        syncPolicy(params);
    }

这是一个定时触发同步政策的方法,参数: JobExecutionContext ctx - Quartz 提供的执行上下文
ctx: JobExecutionContext,包含了任务执行的所有上下文信息

getJobDetail(): 获取 JobDetail 对象,包含任务的详细配置;getJobDataMap(): 获取任务参数映射表;JobDataMap: 类似于 Map,存储任务配置的键值对
然后从参数中提取url,然后构建参数对象params,把requrl put进去,最后再调用核心业务方法,syncPolicy
PS:如果定时任务时效的话,可以配置Quartz的重试策略


SyncPolicy方法()

@Override
    public void syncPolicy(a<String, Object> params) {
        List<PolicyResDTO> findData = pullData(params);
        List<PolicyResDTO> noSave = filterNoSave(findData);
        if(CollectionUtils.isNotEmpty(noSave)){
            //保存之前先过滤 避免同一个事物能查到当前保存的数据
            List<PolicyResDTO> pushPolicy = filterPushData(noSave);
            save(noSave);
            if(CollectionUtils.isNotEmpty(pushPolicy)){
                for(PolicyResDTO dto : pushPolicy){
                    Triple<String, String, String> mofAndEleInfo = getMofAndEleInfo(dto.getMof_div_code(), PayConstant.HOLD15_ELE);
                    CompletableFuture.runAsync(()->pushData(dto, mofAndEleInfo), poolExecutor);
                }
            }else{
                log.info("【{}区划推送平台数据】此次获取的该区划中的政策在平台都已存在", noSave.get(0).getMof_div_code());
            }
        }

    }


这是政策同步的核心流程,分为4个主要阶段:
// 1. 数据拉取 → 2. 数据过滤 → 3. 数据保存 → 4. 平台推送
方法接收hashmap类型的参数,然后进行数据拉取,目的: 从博思拉取政策数据;参数: params 包含请求URL和查询条件;返回: PolicyResDTO 列表,包含政策的完整信息,然后对拉取获得的数据进行第一次过滤,目的: 过滤掉数据库中已经存在的政策,避免重复保存;过滤逻辑: 根据 agency_code + hold15_code 的组合判断是否已存在。数据库查询: 查询 PAY_NBYB_AGENCYPOLICYRELATION 表,返回结果: 只包含数据库中不存在的新政策,然后进入判断逻辑,如果noSave列表是空的,直接结束,如果不为空,则进入下一段业务逻辑,系统拉取最新的政策数据,将本地没有的政策数据保存到本地数据库,再将这些数据推送到其他平台,这里有一个场景问题

 1. 事务A正在保存数据到数据库
// 2. 同时事务B在查询数据库,准备推送到平台
// 3. 如果B能看到A还未提交的数据,可能导致:
//    - 推送了不完整的数据
//    - 数据状态不一致

// 解决方案:
// 先确定哪些需要推送,再保存数据
// 这样推送的都是基于"保存前"状态的数据

对需要保存的数据进行二次过滤,确定哪些需要推送到平台,去重逻辑: 同一个区划的同一个政策只推送一条,避免重复推送: 检查数据库中是否已有该政策的推送记录,然后过滤完了,我们再保存,然后再次进入判断逻辑,如果有数据需要推送 → 执行推送逻辑,如果没有数据需要推送 → 输出日志说明,然后进入逻辑开始遍历逐个处理每个需要推送的政策,每个政策都会创建一个独立的异步任务。

graph TD A["syncPolicy方法开始"] --> B["pullData(params)<br/>从外部系统拉取政策数据"] B --> C["filterNoSave(findData)<br/>过滤数据库中已存在的政策"] C --> D{"noSave.isNotEmpty()?<br/>是否有新政策需要处理"} D -->|否| E["方法结束<br/>没有新数据需要处理"] D -->|是| F["filterPushData(noSave)<br/>过滤需要推送的政策<br/>(去重+避免重复推送)"] F --> G["save(noSave)<br/>批量保存新政策到数据库"] G --> H{"pushPolicy.isNotEmpty()?<br/>是否有政策需要推送"} H -->|否| I["记录日志:<br/>该区划政策在平台都已存在"] H -->|是| J["遍历需要推送的政策<br/>for(PolicyResDTO dto : pushPolicy)"] J --> K["getMofAndEleInfo()<br/>获取区划和要素信息(带缓存)"] K --> L["创建异步任务<br/>CompletableFuture.runAsync()"] L --> M["异步执行pushData()<br/>推送政策到平台"] J --> N{"还有政策需要处理?"} N -->|是| K N -->|否| O["主流程结束<br/>异步推送继续执行"] M --> P["HTTP请求推送"] P --> Q{"推送成功?"} Q -->|成功| R["记录成功日志"] Q -->|失败| S["记录失败日志<br/>抛出异常"] I --> E R --> T["异步任务完成"] S --> T style A fill:#e3f2fd style E fill:#c8e6c9 style G fill:#fff3e0 style L fill:#f3e5f5 style P fill:#ffebee style R fill:#c8e6c9 style S fill:#ffcdd2

pullData

这个 pullData 方法接受一个 Map 类型的参数,首先通过 debug 级别记录传入的参数信息用于调试。接着从参数 Map 中提取两个关键配置:reqUrl(博思系统的API接口地址)和 updateDay(时间偏移天数)。方法会先校验 updateDay 参数是否为空,如果为空则抛出运行时异常提示配置缺失。然后将 updateDay 字符串转换为整数,并调用 getDateTimeAfterDays 方法计算出查询的起始时间点,这个时间点通常是当前时间减去指定天数,用于实现增量数据同步。紧接着创建一个 PolicyReqDTO 请求对象,从用户会话中获取财政区划编码,并对特定的区划编码"330299000"做特殊处理,将其转换为"330200000"以适配博思系统的编码规范。随后将区划编码和计算出的时间设置到请求对象中,使用 FastJSON 将请求对象序列化为 JSON 字符串,其中空值会被转换为空字符串而非 null。然后通过 BSRequestUtil.post 方法向博思系统发送 HTTP POST 请求,并将响应自动反序列化为 PolicyResDTO 对象列表。如果响应结果为空,则记录错误日志并抛出运行时异常。最后对返回的数据进行反向转换处理,将博思系统返回的区划编码"330200000"转换回内部系统的"330299000",确保数据在内部系统中使用正确的编码格式,最终返回处理后的政策数据列表。

flowchart TD A[开始: pullData方法被调用] --> B[输入参数:<br/>Map params] B --> C[记录DEBUG日志:<br/>打印传入参数内容] C --> D[从params中提取参数:<br/>reqUrl 和 updateDay] D --> E{参数校验:<br/>updateDay是否为空?} E -->|是| F[抛出RuntimeException:<br/>未配置拉取政策时更新时间] E -->|否| G[转换updateDay为整数] G --> H[调用getDateTimeAfterDays方法:<br/>计算目标时间点] H --> I[getDateTimeAfterDays内部逻辑:<br/>当前时间加减天数<br/>格式化为标准时间字符串] I --> J[返回格式化时间字符串:<br/>dateTimeAfterDays] J --> K[创建请求对象:<br/>PolicyReqDTO] K --> L[获取用户区划编码:<br/>从Session中获取mofDivCode] L --> M{区划编码转换判断:<br/>是否为330299000?} M -->|是| N[转换区划编码:<br/>330299000 → 330200000] M -->|否| O[保持原区划编码不变] N --> P[设置请求参数:<br/>区划编码和更新时间] O --> P P --> Q[JSON序列化:<br/>将请求对象转为JSON字符串<br/>空值转为空字符串] Q --> R[生成JSON请求体] R --> S[发送HTTP POST请求:<br/>调用博思系统接口] S --> T[博思系统处理请求<br/>返回政策数据] T --> U[接收HTTP响应:<br/>PolicyResDTO列表] U --> V{响应数据校验:<br/>返回结果是否为空?} V -->|是| W[记录ERROR日志<br/>抛出RuntimeException:<br/>区划查询政策结果为空] V -->|否| X[开始响应数据处理循环:<br/>遍历每个PolicyResDTO对象] X --> Y{数据转换判断:<br/>区划编码是否为330200000?} Y -->|是| Z[反向转换区划编码:<br/>330200000 → 330299000] Y -->|否| AA[保持原数据不变] Z --> BB[继续处理下一个对象] AA --> BB BB --> CC{是否还有更多对象?} CC -->|是| Y CC -->|否| DD[返回处理后的数据列表] DD --> EE[方法结束] %% 异常流程 F --> FF[异常结束] W --> WW[异常结束] %% 样式定义 classDef startEnd fill:#e1f5fe,stroke:#01579b,stroke-width:2px classDef process fill:#f3e5f5,stroke:#4a148c,stroke-width:1px classDef decision fill:#fff3e0,stroke:#e65100,stroke-width:2px classDef data fill:#e8f5e8,stroke:#1b5e20,stroke-width:1px classDef error fill:#ffebee,stroke:#b71c1c,stroke-width:2px classDef external fill:#fce4ec,stroke:#880e4f,stroke-width:2px class A,EE startEnd class C,D,G,H,I,J,K,L,P,Q,R,X,Z,AA,BB,DD process class E,M,V,Y,CC decision class B,N,O,S,U data class F,W,FF,WW error class T external

getDateTimeAfterDays

这个 getDateTimeAfterDays 静态工具方法接受一个整数类型的天数参数,用于计算相对于当前时间的日期时间字符串。方法首先通过 LocalDateTime.now() 获取当前系统的本地日期时间,然后使用 plusDays() 方法在当前时间基础上增加指定的天数(如果传入负数则实际是减去相应天数,这样可以计算过去的时间点)。接着创建一个日期时间格式化器,使用 "yyyy-MM-dd HH:mm:ss" 的标准格式模式,这种格式便于数据库查询和API调用。最后通过 format() 方法将计算得到的 LocalDateTime 对象按照指定格式转换为字符串并返回。整个方法设计简洁且线程安全,能够自动处理月末、年末、闰年等各种边界情况,为政策数据的增量同步提供精确的时间基准点。

filternosave

这是整个业务的多次过滤的第一次过滤主要作用是过滤掉数据库中已经存在的政策数据,只返回新增保存的数据,避免重复插入。然后进入到方法内,接受一个list对象,然后对数据用stream进行一个处理,只要agencycode和hold15-code,再得到一个列表,然后用stringbuilder构建mysql的查询语句,然后进入过滤逻辑,

flowchart TD subgraph "核心过滤逻辑" A["遍历拉取数据 item"] A --> B["在数据库数据中查找匹配项"] B --> C["dbData.stream().noneMatch()"] C --> D["比较条件:<br/>list2Item.agency_code.equals(item.agency_code)<br/>AND<br/>list2Item.hold15_code.equals(item.hold15_code)"] D --> E{"找到匹配项?"} E -->|找到匹配| F["noneMatch() 返回 false"] E -->|未找到匹配| G["noneMatch() 返回 true"] F --> H["filter() 排除此项<br/>(数据已存在)"] G --> I["filter() 保留此项<br/>(新数据)"] I --> J["map() 设置 hold15_type"] J --> K["添加到结果集合"] H --> L{"还有数据?"} K --> L L -->|是| A L -->|否| M["返回最终结果"] end style E fill:#fff3e0 style F fill:#ffcdd2 style G fill:#c8e6c9 style H fill:#ffebee style I fill:#e8f5e8 style M fill:#e1f5fe