Appearance
平台商品比价汇总数据接入 — 技术评审文档
| 项目 | 内容 |
|---|---|
| 所属模块 | cloud-erp-store-host |
| 评审日期 | 2026年6月 |
| 文档状态 | 补评审 |
一、需求背景
药九九平台通过 Kafka 推送各平台(药九九、药师帮、1药城、好药师)的商品比价汇总数据。我方需要:
- 消费 Kafka 消息,接收比价数据并写入 ES
- 提供 HTTP 接口,供前端按 ES 文档 ID 批量查询比价汇总数据
数据来源为药九九侧的爬虫比价系统,每条 Kafka 消息最多包含 500 条比价记录,按「基本码 + 省份编码 + 用户类型」维度汇总各平台的价格信息。
二、整体架构
药九九比价系统
│
│ Kafka (topic: comparison-server_yjj_base_item_push)
▼
┌─────────────────────────────────┐
│ PlatformProdPriceConsumer │
│ ├─ 反序列化为 Wrapper │
│ ├─ 过滤无效数据 │
│ ├─ convertKafka2Es 转换 │
│ ├─ initIndex 索引初始化 │
│ └─ insertBatch 批量写入 ES │
└──────────────┬──────────────────┘
│
▼
┌─────────────────────────────────┐
│ Elasticsearch │
│ 索引: platform_prod_price │
│ _id: {baseNo}_{provinceCode} │
│ {userType} │
└──────────────┬──────────────────┘
│
▼
┌─────────────────────────────────┐
│ PlatformProdPriceController │
│ POST /api/prodPrice/queryByIds │
│ POST /api/prodPrice/createIndex│
└─────────────────────────────────┘三、数据模型设计
3.1 对象分层
| 对象 | 类名 | 职责 |
|---|---|---|
| Kafka 报文 | PlatformProdPriceKafkaDTO | 接收药九九推送的完整报文,字段与上游一一对应 |
| Kafka 包装体 | PlatformProdPriceWrapper | Kafka 消息外层包装,包含 List<PlatformProdPriceKafkaDTO> data |
| ES 文档 | PlatformProdPriceEsDTO | ES 存储文档,与 KafkaDTO 字段一致,额外增加 esId(@JsonIgnore,不存入 _source) |
| 接口返回 | PlatformProdPriceVo | 按业务需要裁剪的视图对象,仅包含前端所需字段 |
3.2 对象分层设计说明
KafkaDTO → EsDTO 显式字段映射:convertKafka2Es 方法采用逐字段显式赋值,而非 BeanUtils.copyProperties。原因:
- 上游药九九报文结构可能变动,显式赋值确保只取我方需要的字段
- 上游字段名变更或删除时,编译期即可发现,避免隐式丢数据
EsDTO 与 Vo 字段差异:EsDTO 包含全量字段(90+ 字段)用于 ES 完整存储;Vo 按业务需求裁剪为 25 个字段,控制百万级数据量场景下的 HTTP 报文大小。Vo 不包含好药师(HYS)字段,因当前业务接口无需返回。
3.3 字段分类总览
商品基础信息(10 字段)
| 字段 | 类型 | ES Mapping | 说明 |
|---|---|---|---|
| id | Long | long | 药九九数据 ID |
| itemName | String | keyword | 商品名称 |
| specs | String | keyword | 商品规格 |
| manufacturer | String | keyword | 生产厂家 |
| approvalNo | String | keyword | 批准文号 |
| itemPicture | String | keyword | 图片 URL |
| categoryNames | String | keyword | 平台挂网分类 |
| baseNo | String | keyword | 基本码 |
| industryCode | String | keyword | 行业码 |
| account | String | keyword | 账号 |
维度与标识(5 字段)
| 字段 | 类型 | ES Mapping | 说明 |
|---|---|---|---|
| userType | Integer | integer | 用户类型:1-药店 2-诊疗 |
| provinceCode | Long | long | 账号所属省份编码 |
| provinceName | String | keyword | 账号所属省份 |
| recordId | Long | long | 记录 ID |
| esId | String | — | ES 文档 _id,@JsonIgnore 不参与 source 存储 |
各平台价格数据
每个平台(药九九 yjj / 药师帮 ysb / 1药城 yyc / 好药师 hys)均包含以下维度的价格字段,全部使用 scaled_float(100) 存储:
| 维度 | 原价字段 | 折扣价字段 |
|---|---|---|
| 最低价 | xxxItemPriceMin | xxxMemberPriceMin |
| 最高价 | xxxItemPriceMax | xxxMemberPriceMax |
| 中位价 | xxxItemPriceMiddle | xxxMemberPriceMiddle |
| 平均价 | xxxItemPriceAvg | xxxMemberPriceAvg |
优价率数据
各平台间对比的优价率字段,同样使用 scaled_float(100) 存储:
| 对比维度 | 原价优价率 | 折扣优价率 |
|---|---|---|
| 药九九 vs 药师帮 | yjjVsYsbPrice{Min/Max/Middle/Avg}Yjl | yjjVsYsbMemberPrice{Min/Max/Middle/Avg}Yjl |
| 药九九 vs 1药城 | yjjVsYycItemPrice{Min/Max/Middle/Avg}Yjl | yjjVsYycMemberPrice{Min/Max/Middle/Avg}Yjl |
| 药九九 vs 好药师 | yjjVsHysItemPrice{Min/Max/Middle/Avg}Yjl | yjjVsHysMemberPrice{Min/Max/Middle/Avg}Yjl |
自营/三方店铺数据(8 字段)
| 字段 | 类型 | ES Mapping | 说明 |
|---|---|---|---|
| yjjZyItemPriceMin | BigDecimal | scaled_float(100) | 自营店铺最低价挂网价 |
| yjjZyMemberPriceMin | BigDecimal | scaled_float(100) | 自营店铺最低价折后约 |
| yjjZyItemIdMin | Long | long | 自营店铺最低价商品编码 |
| yjjZyStoreNameMin | String | keyword | 自营店铺最低价店铺名称 |
| yjjZyItemPriceMax | BigDecimal | scaled_float(100) | 自营店铺最高价挂网价 |
| yjjZyMemberPriceMax | BigDecimal | scaled_float(100) | 自营店铺最高价折后约 |
| yjjZyItemIdMax | Long | long | 自营店铺最高价商品编码 |
| yjjZyStoreNameMax | String | keyword | 自营店铺最高价店铺名称 |
| yjjSfItemPriceMin/... | 同上模式 | 同上 | 三方店铺数据(8 字段,结构一致) |
时间戳与标签(8 字段)
| 字段 | 类型 | ES Mapping | 说明 |
|---|---|---|---|
| updateTime | Date | date(epoch_millis) | 药九九更新时间 |
| climbingPriceTag | String | keyword | 药九九爬价标签 |
| ysbUpdateTime | Date | date(epoch_millis) | 药师帮更新时间 |
| ysbClimbingPriceTag | String | keyword | 药师帮爬价标签 |
| yycUpdateTime | Date | date(epoch_millis) | 1药城更新时间 |
| yycClimbingPriceTag | String | keyword | 1药城爬价标签 |
| hysUpdateTime | Date | date(epoch_millis) | 好药师更新时间 |
| hysClimbingPriceTag | String | keyword | 好药师爬价标签 |
3.4 ES 文档 ID 设计
_id = {baseNo}_{provinceCode}_{userType}- 拼接字段:基本码 + 省份编码 + 用户类型
- 唯一性:同一商品在同一省份对同一用户类型只保留最新一份数据
- 覆盖更新:使用 ES index 操作,相同
_id的新文档自动覆盖旧文档 - 前置校验:Consumer 中通过 filter 确保
baseNo非空、provinceCode和userType非 null,避免 NPE
四、ES 索引设计
4.1 索引名称
platform_prod_price4.2 Mapping 构建方案
使用自研 TypeMappingBuilder 工具类,通过 方法引用(SFunction)提取字段名,实现编译期校验:
java
TypeMappingBuilder.create()
.addKeyword(PlatformProdPriceEsDTO::getItemName)
.addScaledFloat100(PlatformProdPriceEsDTO::getYjjItemPriceMin)
.addDate(PlatformProdPriceEsDTO::getUpdateTime)
// ...
.build();技术实现:利用 MyBatis-Plus 的 LambdaUtils.extract() 提取 Lambda 方法名,再通过 PropertyNamer.methodToProperty() 转换为字段名。相比硬编码字符串,字段重命名时编译器会直接报错。
4.3 ES 类型映射
| Java 类型 | ES 类型 | 说明 |
|---|---|---|
| String(需精确匹配) | keyword | 用于过滤、排序的字符串字段 |
| Integer | integer | userType |
| Long | long | id、provinceCode、recordId、店铺商品编码 |
| BigDecimal | scaled_float(100) | 所有价格、优价率字段,scalingFactor=100 保留两位小数精度 |
| Date | date(epoch_millis) | 所有时间字段,使用时间戳毫秒格式 |
4.4 Mapping 字段统计
| 类型 | 数量 |
|---|---|
| keyword | 18 |
| integer | 1 |
| long | 7 |
| scaled_float(100) | 60 |
| date | 4 |
| 合计 | 90 |
4.5 索引初始化策略
采用 懒加载 + 双重检查 模式:
initIndex()
├─ INDEX_INIT_FLAG == true → 直接返回(99.99% 路径)
├─ existsIndex() == true → 设置标记,返回
└─ createIndex() → 设置标记,返回
└─ ElasticsearchException → existsIndex() 兜底检查(处理并发创建场景)INDEX_INIT_FLAG:内存标记,避免每次消息都检查 ESexistsIndex():ES 层面兜底,即使标记为脏数据也不会重复创建createIndex():带完整 mapping 创建,并发场景下由ElasticsearchException+ 二次existsIndex()兜底- 额外提供
POST /api/prodPrice/createIndex接口,支持上线后由开发人员提前手动创建索引
五、Kafka 消费端设计
5.1 消费配置
| 配置项 | 值 |
|---|---|
| Topic | comparison-server_yjj_base_item_push |
| Group ID | cloud-erp-store-comparison-server_yjj_base_item_push |
| Container Factory | ERPFOUR_KAFKA_STRING_KEY_JSON_VALUE_CONTAINER_FACTORY |
| Key 反序列化 | java.lang.String |
| Value 反序列化 | PlatformProdPriceWrapper(Jackson JSON) |
| ACK 模式 | 手动 ACK |
5.2 消费流程
收到 Kafka 消息
│
├─ wrapper == null || data 为空 → 直接 ACK
│
├─ 日志记录消息条数
│
├─ Stream 过滤
│ ├─ baseNo 非空
│ ├─ provinceCode 非 null
│ └─ userType 非 null
│
├─ convertKafka2Es 转换(拼接 esId + 逐字段赋值)
│
├─ 过滤后为空 → 直接 ACK(不写入)
│
├─ initIndex() 初始化索引
│ └─ 失败 → 抛异常,不 ACK,Kafka 重试
│
├─ insertBatch() 批量写入 ES
│ └─ response.errors() → 抛异常,不 ACK,Kafka 重试
│
└─ ack.acknowledge() 手动确认5.3 异常处理策略
| 异常场景 | 处理方式 | 是否 ACK |
|---|---|---|
| wrapper 为 null 或 data 为空 | 跳过处理 | ACK |
| 过滤后无有效数据 | 跳过写入 | ACK(在 if 外) |
| ES 索引初始化失败 | 抛 RuntimeException | 不 ACK,触发重试 |
| ES 批量写入存在失败文档 | 抛 RuntimeException | 不 ACK,触发重试 |
| Kafka 消息反序列化失败 | Spring Kafka 框架层处理 | 不 ACK,触发重试 |
| ES 连接失败 | bulk() 抛 IOException | 不 ACK,触发重试 |
六、HTTP 接口设计
6.1 批量查询接口
POST /api/prodPrice/queryByIds请求体:
json
["baseNo1_provinceCode1_userType1", "baseNo2_provinceCode2_userType2"]响应体:
json
{
"code": 200,
"data": [
{
"esId": "baseNo1_provinceCode1_userType1",
"yjjItemPriceMin": 12.50,
"ysbItemPriceMin": 13.00,
"updateTime": "2026-06-11 10:00:00",
...
}
]
}实现要点:
- 使用 ES
ids查询,原生支持批量 ID 查询 size设置为ids.size(),确保返回全部结果- 查询结果从
hit.source()反序列化后,手动回填esId = hit.id()(因为esId在 EsDTO 上标记了@JsonIgnore) - Vo 的 Date 字段使用
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")格式化输出
6.2 索引创建接口
POST /api/prodPrice/createIndex用途:上线后由开发人员手动调用,提前创建 ES 索引及 mapping。
特性:内部 initIndex() 有幂等保护,重复调用无副作用。
七、涉及文件清单
| 文件路径 | 类型 | 说明 |
|---|---|---|
entity/dto/PlatformProdPriceKafkaDTO.java | 新增 | Kafka 报文 DTO,90 字段 |
entity/dto/PlatformProdPriceEsDTO.java | 新增 | ES 文档 DTO,含 @JsonIgnore esId |
entity/vo/PlatformProdPriceVo.java | 新增 | API 返回 VO,25 字段 |
entity/wrapper/PlatformProdPriceWrapper.java | 新增 | Kafka 消息包装体 |
entity/builder/TypeMappingBuilder.java | 新增 | ES Mapping 构建工具(通用) |
service/PlatformProdPriceService.java | 新增 | 服务接口 |
service/impl/PlatformProdPriceServiceImpl.java | 新增 | 服务实现 |
controller/PlatformProdPriceController.java | 新增 | REST 接口 |
consumer/PlatformProdPriceConsumer.java | 新增 | Kafka 消费端 |
八、关键技术决策
8.1 为什么使用 ES 而非数据库存储
- 比价数据量大(百万级),且为全量覆盖更新模式
- 查询模式为按 ID 批量精确查询,ES 的
_id查询性能优于数据库 - 数据由 Kafka 推送,无需复杂事务和关联查询
8.2 为什么使用 scaled_float 存储 BigDecimal
scaled_float(100)内部以 long 存储(值 × 100),保留两位小数精度- 相比 float/double 无精度丢失问题
- 存储空间小于 double,对百万级数据有实际意义
8.3 为什么 Vo 不包含好药师字段
- 当前
queryByIds接口业务上不需要好药师数据 - ES 中数据量百万级,Vo 裁剪字段可减小 HTTP 报文体积和序列化开销
- 后续如需返回,在 Vo 中补齐字段即可,EsDTO 和 ES mapping 已包含
8.4 为什么 convertKafka2Es 使用显式赋值
- 上游药九九报文结构可能变动
- 显式赋值确保只取我方需要的字段,上游新增字段不会自动带入
- 上游字段变更时编译期即可发现,避免运行时隐式丢数据
8.5 为什么 Date 使用 epoch_millis 格式
- Kafka 消息中时间字段默认为时间戳序列化(Java Date 默认行为)
- ES mapping 使用
epoch_millis与上游序列化方式一致,无需额外转换
九、上线步骤
| 步骤 | 操作 | 说明 |
|---|---|---|
| 1 | 部署 cloud-erp-store-host 服务 | 包含 Consumer、Service、Controller |
| 2 | 调用 POST /api/prodPrice/createIndex | 创建 ES 索引及 mapping |
| 3 | 确认 Kafka topic 已配置消费权限 | topic: comparison-server_yjj_base_item_push |
| 4 | 验证 Consumer 正常消费 | 观察日志中「药九九推送kafka」日志及 ES 写入情况 |
| 5 | 验证 queryByIds 接口 | 使用已知 ES ID 查询验证数据正确性 |
十、后续规划
| 事项 | 说明 |
|---|---|
| 索引结构变更 | 已预留 deleteIndex() 方法,后续如需变更 mapping 可删除索引重建 |
| 新平台接入 | 上游新增平台时,在 KafkaDTO/EsDTO 中新增字段,buildMapping() 中新增映射即可 |
| Vo 字段扩展 | 如需在接口中返回好药师等更多数据,在 Vo 中补齐对应字段即可 |