Skip to content

平台商品比价汇总数据接入 — 技术评审文档

项目内容
所属模块cloud-erp-store-host
评审日期2026年6月
文档状态补评审

一、需求背景

药九九平台通过 Kafka 推送各平台(药九九、药师帮、1药城、好药师)的商品比价汇总数据。我方需要:

  1. 消费 Kafka 消息,接收比价数据并写入 ES
  2. 提供 HTTP 接口,供前端按 ES 文档 ID 批量查询比价汇总数据

数据来源为药九九侧的爬虫比价系统,每条 Kafka 消息最多包含 500 条比价记录,按「基本码 + 省份编码 + 用户类型」维度汇总各平台的价格信息。


二、整体架构

药九九比价系统

    │ Kafka (topic: comparison-server_yjj_base_item_push)

┌─────────────────────────────────┐
│  PlatformProdPriceConsumer      │
│  ├─ 反序列化为 Wrapper           │
│  ├─ 过滤无效数据                  │
│  ├─ convertKafka2Es 转换         │
│  ├─ initIndex 索引初始化          │
│  └─ insertBatch 批量写入 ES      │
└──────────────┬──────────────────┘


┌─────────────────────────────────┐
│  Elasticsearch                  │
│  索引: platform_prod_price       │
│  _id: {baseNo}_{provinceCode}   │
│       {userType}                 │
└──────────────┬──────────────────┘


┌─────────────────────────────────┐
│  PlatformProdPriceController    │
│  POST /api/prodPrice/queryByIds │
│  POST /api/prodPrice/createIndex│
└─────────────────────────────────┘

三、数据模型设计

3.1 对象分层

对象类名职责
Kafka 报文PlatformProdPriceKafkaDTO接收药九九推送的完整报文,字段与上游一一对应
Kafka 包装体PlatformProdPriceWrapperKafka 消息外层包装,包含 List<PlatformProdPriceKafkaDTO> data
ES 文档PlatformProdPriceEsDTOES 存储文档,与 KafkaDTO 字段一致,额外增加 esId@JsonIgnore,不存入 _source
接口返回PlatformProdPriceVo按业务需要裁剪的视图对象,仅包含前端所需字段

3.2 对象分层设计说明

KafkaDTO → EsDTO 显式字段映射convertKafka2Es 方法采用逐字段显式赋值,而非 BeanUtils.copyProperties。原因:

  • 上游药九九报文结构可能变动,显式赋值确保只取我方需要的字段
  • 上游字段名变更或删除时,编译期即可发现,避免隐式丢数据

EsDTO 与 Vo 字段差异:EsDTO 包含全量字段(90+ 字段)用于 ES 完整存储;Vo 按业务需求裁剪为 25 个字段,控制百万级数据量场景下的 HTTP 报文大小。Vo 不包含好药师(HYS)字段,因当前业务接口无需返回。

3.3 字段分类总览

商品基础信息(10 字段)

字段类型ES Mapping说明
idLonglong药九九数据 ID
itemNameStringkeyword商品名称
specsStringkeyword商品规格
manufacturerStringkeyword生产厂家
approvalNoStringkeyword批准文号
itemPictureStringkeyword图片 URL
categoryNamesStringkeyword平台挂网分类
baseNoStringkeyword基本码
industryCodeStringkeyword行业码
accountStringkeyword账号

维度与标识(5 字段)

字段类型ES Mapping说明
userTypeIntegerinteger用户类型:1-药店 2-诊疗
provinceCodeLonglong账号所属省份编码
provinceNameStringkeyword账号所属省份
recordIdLonglong记录 ID
esIdStringES 文档 _id@JsonIgnore 不参与 source 存储

各平台价格数据

每个平台(药九九 yjj / 药师帮 ysb / 1药城 yyc / 好药师 hys)均包含以下维度的价格字段,全部使用 scaled_float(100) 存储:

维度原价字段折扣价字段
最低价xxxItemPriceMinxxxMemberPriceMin
最高价xxxItemPriceMaxxxxMemberPriceMax
中位价xxxItemPriceMiddlexxxMemberPriceMiddle
平均价xxxItemPriceAvgxxxMemberPriceAvg

优价率数据

各平台间对比的优价率字段,同样使用 scaled_float(100) 存储:

对比维度原价优价率折扣优价率
药九九 vs 药师帮yjjVsYsbPrice{Min/Max/Middle/Avg}YjlyjjVsYsbMemberPrice{Min/Max/Middle/Avg}Yjl
药九九 vs 1药城yjjVsYycItemPrice{Min/Max/Middle/Avg}YjlyjjVsYycMemberPrice{Min/Max/Middle/Avg}Yjl
药九九 vs 好药师yjjVsHysItemPrice{Min/Max/Middle/Avg}YjlyjjVsHysMemberPrice{Min/Max/Middle/Avg}Yjl

自营/三方店铺数据(8 字段)

字段类型ES Mapping说明
yjjZyItemPriceMinBigDecimalscaled_float(100)自营店铺最低价挂网价
yjjZyMemberPriceMinBigDecimalscaled_float(100)自营店铺最低价折后约
yjjZyItemIdMinLonglong自营店铺最低价商品编码
yjjZyStoreNameMinStringkeyword自营店铺最低价店铺名称
yjjZyItemPriceMaxBigDecimalscaled_float(100)自营店铺最高价挂网价
yjjZyMemberPriceMaxBigDecimalscaled_float(100)自营店铺最高价折后约
yjjZyItemIdMaxLonglong自营店铺最高价商品编码
yjjZyStoreNameMaxStringkeyword自营店铺最高价店铺名称
yjjSfItemPriceMin/...同上模式同上三方店铺数据(8 字段,结构一致)

时间戳与标签(8 字段)

字段类型ES Mapping说明
updateTimeDatedate(epoch_millis)药九九更新时间
climbingPriceTagStringkeyword药九九爬价标签
ysbUpdateTimeDatedate(epoch_millis)药师帮更新时间
ysbClimbingPriceTagStringkeyword药师帮爬价标签
yycUpdateTimeDatedate(epoch_millis)1药城更新时间
yycClimbingPriceTagStringkeyword1药城爬价标签
hysUpdateTimeDatedate(epoch_millis)好药师更新时间
hysClimbingPriceTagStringkeyword好药师爬价标签

3.4 ES 文档 ID 设计

_id = {baseNo}_{provinceCode}_{userType}
  • 拼接字段:基本码 + 省份编码 + 用户类型
  • 唯一性:同一商品在同一省份对同一用户类型只保留最新一份数据
  • 覆盖更新:使用 ES index 操作,相同 _id 的新文档自动覆盖旧文档
  • 前置校验:Consumer 中通过 filter 确保 baseNo 非空、provinceCodeuserType 非 null,避免 NPE

四、ES 索引设计

4.1 索引名称

platform_prod_price

4.2 Mapping 构建方案

使用自研 TypeMappingBuilder 工具类,通过 方法引用SFunction)提取字段名,实现编译期校验:

java
TypeMappingBuilder.create()
    .addKeyword(PlatformProdPriceEsDTO::getItemName)
    .addScaledFloat100(PlatformProdPriceEsDTO::getYjjItemPriceMin)
    .addDate(PlatformProdPriceEsDTO::getUpdateTime)
    // ...
    .build();

技术实现:利用 MyBatis-Plus 的 LambdaUtils.extract() 提取 Lambda 方法名,再通过 PropertyNamer.methodToProperty() 转换为字段名。相比硬编码字符串,字段重命名时编译器会直接报错。

4.3 ES 类型映射

Java 类型ES 类型说明
String(需精确匹配)keyword用于过滤、排序的字符串字段
IntegerintegeruserType
Longlongid、provinceCode、recordId、店铺商品编码
BigDecimalscaled_float(100)所有价格、优价率字段,scalingFactor=100 保留两位小数精度
Datedate(epoch_millis)所有时间字段,使用时间戳毫秒格式

4.4 Mapping 字段统计

类型数量
keyword18
integer1
long7
scaled_float(100)60
date4
合计90

4.5 索引初始化策略

采用 懒加载 + 双重检查 模式:

initIndex()
  ├─ INDEX_INIT_FLAG == true → 直接返回(99.99% 路径)
  ├─ existsIndex() == true → 设置标记,返回
  └─ createIndex() → 设置标记,返回
      └─ ElasticsearchException → existsIndex() 兜底检查(处理并发创建场景)
  • INDEX_INIT_FLAG:内存标记,避免每次消息都检查 ES
  • existsIndex():ES 层面兜底,即使标记为脏数据也不会重复创建
  • createIndex():带完整 mapping 创建,并发场景下由 ElasticsearchException + 二次 existsIndex() 兜底
  • 额外提供 POST /api/prodPrice/createIndex 接口,支持上线后由开发人员提前手动创建索引

五、Kafka 消费端设计

5.1 消费配置

配置项
Topiccomparison-server_yjj_base_item_push
Group IDcloud-erp-store-comparison-server_yjj_base_item_push
Container FactoryERPFOUR_KAFKA_STRING_KEY_JSON_VALUE_CONTAINER_FACTORY
Key 反序列化java.lang.String
Value 反序列化PlatformProdPriceWrapper(Jackson JSON)
ACK 模式手动 ACK

5.2 消费流程

收到 Kafka 消息

    ├─ wrapper == null || data 为空 → 直接 ACK

    ├─ 日志记录消息条数

    ├─ Stream 过滤
    │   ├─ baseNo 非空
    │   ├─ provinceCode 非 null
    │   └─ userType 非 null

    ├─ convertKafka2Es 转换(拼接 esId + 逐字段赋值)

    ├─ 过滤后为空 → 直接 ACK(不写入)

    ├─ initIndex() 初始化索引
    │   └─ 失败 → 抛异常,不 ACK,Kafka 重试

    ├─ insertBatch() 批量写入 ES
    │   └─ response.errors() → 抛异常,不 ACK,Kafka 重试

    └─ ack.acknowledge() 手动确认

5.3 异常处理策略

异常场景处理方式是否 ACK
wrapper 为 null 或 data 为空跳过处理ACK
过滤后无有效数据跳过写入ACK(在 if 外)
ES 索引初始化失败抛 RuntimeException不 ACK,触发重试
ES 批量写入存在失败文档抛 RuntimeException不 ACK,触发重试
Kafka 消息反序列化失败Spring Kafka 框架层处理不 ACK,触发重试
ES 连接失败bulk() 抛 IOException不 ACK,触发重试

六、HTTP 接口设计

6.1 批量查询接口

POST /api/prodPrice/queryByIds

请求体

json
["baseNo1_provinceCode1_userType1", "baseNo2_provinceCode2_userType2"]

响应体

json
{
  "code": 200,
  "data": [
    {
      "esId": "baseNo1_provinceCode1_userType1",
      "yjjItemPriceMin": 12.50,
      "ysbItemPriceMin": 13.00,
      "updateTime": "2026-06-11 10:00:00",
      ...
    }
  ]
}

实现要点

  • 使用 ES ids 查询,原生支持批量 ID 查询
  • size 设置为 ids.size(),确保返回全部结果
  • 查询结果从 hit.source() 反序列化后,手动回填 esId = hit.id()(因为 esId 在 EsDTO 上标记了 @JsonIgnore
  • Vo 的 Date 字段使用 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") 格式化输出

6.2 索引创建接口

POST /api/prodPrice/createIndex

用途:上线后由开发人员手动调用,提前创建 ES 索引及 mapping。

特性:内部 initIndex() 有幂等保护,重复调用无副作用。


七、涉及文件清单

文件路径类型说明
entity/dto/PlatformProdPriceKafkaDTO.java新增Kafka 报文 DTO,90 字段
entity/dto/PlatformProdPriceEsDTO.java新增ES 文档 DTO,含 @JsonIgnore esId
entity/vo/PlatformProdPriceVo.java新增API 返回 VO,25 字段
entity/wrapper/PlatformProdPriceWrapper.java新增Kafka 消息包装体
entity/builder/TypeMappingBuilder.java新增ES Mapping 构建工具(通用)
service/PlatformProdPriceService.java新增服务接口
service/impl/PlatformProdPriceServiceImpl.java新增服务实现
controller/PlatformProdPriceController.java新增REST 接口
consumer/PlatformProdPriceConsumer.java新增Kafka 消费端

八、关键技术决策

8.1 为什么使用 ES 而非数据库存储

  • 比价数据量大(百万级),且为全量覆盖更新模式
  • 查询模式为按 ID 批量精确查询,ES 的 _id 查询性能优于数据库
  • 数据由 Kafka 推送,无需复杂事务和关联查询

8.2 为什么使用 scaled_float 存储 BigDecimal

  • scaled_float(100) 内部以 long 存储(值 × 100),保留两位小数精度
  • 相比 float/double 无精度丢失问题
  • 存储空间小于 double,对百万级数据有实际意义

8.3 为什么 Vo 不包含好药师字段

  • 当前 queryByIds 接口业务上不需要好药师数据
  • ES 中数据量百万级,Vo 裁剪字段可减小 HTTP 报文体积和序列化开销
  • 后续如需返回,在 Vo 中补齐字段即可,EsDTO 和 ES mapping 已包含

8.4 为什么 convertKafka2Es 使用显式赋值

  • 上游药九九报文结构可能变动
  • 显式赋值确保只取我方需要的字段,上游新增字段不会自动带入
  • 上游字段变更时编译期即可发现,避免运行时隐式丢数据

8.5 为什么 Date 使用 epoch_millis 格式

  • Kafka 消息中时间字段默认为时间戳序列化(Java Date 默认行为)
  • ES mapping 使用 epoch_millis 与上游序列化方式一致,无需额外转换

九、上线步骤

步骤操作说明
1部署 cloud-erp-store-host 服务包含 Consumer、Service、Controller
2调用 POST /api/prodPrice/createIndex创建 ES 索引及 mapping
3确认 Kafka topic 已配置消费权限topic: comparison-server_yjj_base_item_push
4验证 Consumer 正常消费观察日志中「药九九推送kafka」日志及 ES 写入情况
5验证 queryByIds 接口使用已知 ES ID 查询验证数据正确性

十、后续规划

事项说明
索引结构变更已预留 deleteIndex() 方法,后续如需变更 mapping 可删除索引重建
新平台接入上游新增平台时,在 KafkaDTO/EsDTO 中新增字段,buildMapping() 中新增映射即可
Vo 字段扩展如需在接口中返回好药师等更多数据,在 Vo 中补齐对应字段即可

页脚:版权前显示的信息