Commit 6018f990 by zhoupeng

提交导入组织架构openapi接口

parent 147943bf
...@@ -82,44 +82,6 @@ ...@@ -82,44 +82,6 @@
<artifactId>pagehelper-spring-boot-starter</artifactId> <artifactId>pagehelper-spring-boot-starter</artifactId>
<version>1.2.3</version> <version>1.2.3</version>
</dependency> </dependency>
<dependency>
<groupId>com.rome.public</groupId>
<artifactId>arch-elastic-job-starter</artifactId>
<version>2.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.3.3</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
package com.rome.order.api.controller;
import com.rome.arch.core.clientobject.Response;
import com.rome.order.domain.service.SyncTsaProductService;
import com.rome.order.domain.service.SyncTsaProductV2Service;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/v1/produt")
@Api(value = "同步商品", tags = "同步商品")
public class SyncProductController {
@Resource
private SyncTsaProductService syncTsaProductService;
@Resource
private SyncTsaProductV2Service syncTsaProductV2Service;
// @GetMapping("/syncAllProduct")
// @ApiOperation(value = "syncAllProduct同步商品")
// private void syncAllProduct() {
// syncTsaProductService.syncAllProduct();
// }
@GetMapping("/syncSingleProduct")
@ApiOperation(value = "同步指定sku编码商品到商品库")
public Response<Boolean> syncSingleProduct(@RequestParam(value = "skuKey") String skuKey){
return Response.builderSuccess(syncTsaProductService.syncSingleProduct(skuKey));
}
@GetMapping("/syncAllProduct")
@ApiOperation(value = "syncAllProduct同步商品")
private void syncAllProductV2() {
syncTsaProductV2Service.syncAllProduct();
}
}
package com.rome.order.domain.service;
public interface SyncTsaProductService {
/**
* 全量同步商品到商品库
*/
void syncAllProduct();
/**
* 同步指定sku编码商品
*/
Boolean syncSingleProduct(String skuKey);
}
package com.rome.order.domain.service;
public interface SyncTsaProductV2Service {
/**
* 全量同步商品到商品库
*/
void syncAllProduct();
}
package com.rome.order.domain.service.impl;
import com.google.common.collect.Lists;
import com.rome.order.api.dto.product.req.TsaAddProductListReqDTO;
import com.rome.order.api.dto.product.req.TsaBatchUpdateProductInfoReqDTO;
import com.rome.order.api.dto.product.req.TsaUpdateProductReqDTO;
import com.rome.order.api.dto.product.res.TsaProductInfoResDTO;
import com.rome.order.api.dto.product.res.TsaProductListResDTO;
import com.rome.order.api.dto.product.res.TsaProductResDTO;
import com.rome.order.domain.constant.BaseConstant;
import com.rome.order.domain.convertor.SyncProductConvertor;
import com.rome.order.domain.service.SyncTsaProductService;
import com.rome.order.domain.service.TsaProductService;
import com.rome.order.domain.util.HiveJdbc;
import com.rome.order.infrastructure.dataobject.DimSkuDO;
import com.rome.order.infrastructure.dataobject.SyncProductDO;
import com.rome.order.infrastructure.mapper.SyncProductMapper;
import com.rome.order.infrastructure.remote.constant.TsaRemoteConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Service
public class SyncTsaProductServiceImpl implements SyncTsaProductService {
@Resource
private SyncProductMapper syncProductMapper;
@Resource
private TsaProductService tsaProductService;
@Resource
private SyncProductConvertor syncProductConvertor;
@Override
public void syncAllProduct() {
// 获取到hive中商品信息
List<DimSkuDO> hiveList = getHiveProductInfoList();
if (CollectionUtils.isEmpty(hiveList)) {
return;
}
// 获取到中间表信息
List<SyncProductDO> centerList = getSyncProductInfoList();
Map<String, SyncProductDO> centerMap = centerList.stream().collect(Collectors.toMap(SyncProductDO::getSkuKey, Function.identity(), (k1, k2) -> k1));
// 添加商品的集合
List<TsaAddProductListReqDTO> addAllList = new ArrayList<>();
// 修改商品的集合
List<TsaAddProductListReqDTO> updateAllList = new ArrayList<>();
// 对比新增还是更新
for (DimSkuDO skuDO : hiveList) {
// 实体转换
TsaAddProductListReqDTO tsaDto = syncProductConvertor.doToTsaAddProductListReqDTO(skuDO);
// 设置下架时间
tsaDto.setExpiration_time(BaseConstant.DEFAULT_EXPIRATION_TIME);
// 设置描述,默认商品名称
tsaDto.setDescription(skuDO.getName());
// 判断hive中商品是否已经在中间表中
SyncProductDO syncProductDO = centerMap.get(skuDO.getSku_key());
if (Objects.isNull(syncProductDO) || StringUtils.isBlank(syncProductDO.getTsaProdId())) {
addAllList.add(tsaDto);
} else if (checkExecuteUpdate(skuDO, syncProductDO)) {
updateAllList.add(tsaDto);
}
}
if (CollectionUtils.isNotEmpty(addAllList)) {
List<List<TsaAddProductListReqDTO>> addList = Lists.partition(addAllList, 50);
for (List<TsaAddProductListReqDTO> sonList : addList) {
try {
// 批量同步商品信息
assembleSaveInfo(sonList);
} catch (Exception e) {
log.error("批量同步商品异常,异常信息={}", e);
}
}
}
// if (CollectionUtils.isNotEmpty(updateAllList)) {
// List<List<TsaAddProductListReqDTO>> updateList = Lists.partition(updateAllList, 20);
// for (List<TsaAddProductListReqDTO> sonList : updateList) {
// assembleUpdateInfo(sonList);
// }
// }
}
/**
* 更新媒体测商品信息
*/
public void assembleUpdateInfo(List<TsaAddProductListReqDTO> list) {
// 调用腾讯更新商品接口
for (TsaAddProductListReqDTO reqDTO : list) {
SyncProductDO syncProductDO = syncProductConvertor.doToSyncProductDO(reqDTO);
// 组装更新商品请求体
TsaUpdateProductReqDTO tsaUpdateDto = new TsaUpdateProductReqDTO();
tsaUpdateDto.setProduct_outer_id(reqDTO.getProduct_outer_id());
tsaUpdateDto.setExpiration_time(Integer.valueOf(reqDTO.getExpiration_time()));
// 调用更新接口
TsaProductResDTO resDTO = tsaProductService.updateProduct(tsaUpdateDto);
if (Objects.nonNull(resDTO) && resDTO.getCode() == 0) {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ONE);
} else {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ZERO);
}
// 设置同步信息
syncProductDO.setSyncMsg(resDTO.getCode() + ":" + resDTO.getMessage());
// 更新中间表信息
syncProductMapper.update(syncProductDO);
}
}
/**
* 批量更新媒体测商品信息
*/
public void assembleBatchUpdateInfo(List<TsaAddProductListReqDTO> list) {
// 要更新的商品信息
List<SyncProductDO> updateList = new ArrayList<>();
// 腾讯接口要更新的商品信息
List<TsaBatchUpdateProductInfoReqDTO> tsaUpdateList = new ArrayList<>();
// 组装腾讯要更新商品信息
for (TsaAddProductListReqDTO reqDTO : list) {
TsaBatchUpdateProductInfoReqDTO updateProductInfoReqDTO = new TsaBatchUpdateProductInfoReqDTO();
updateProductInfoReqDTO.setProduct_outer_id(reqDTO.getProduct_outer_id());
tsaUpdateList.add(updateProductInfoReqDTO);
}
// 调用腾讯批量更新商品接口
TsaProductResDTO tsaProductResDTO = tsaProductService.batchUpdateProduct(tsaUpdateList);
if (Objects.nonNull(tsaProductResDTO) && tsaProductResDTO.getCode() == TsaRemoteConstant.SUCCESS_CODE
&& Objects.nonNull(tsaProductResDTO.getData()) && CollectionUtils.isNotEmpty(tsaProductResDTO.getData().getList())) {
TsaProductListResDTO data = tsaProductResDTO.getData();
List<TsaProductInfoResDTO> resList = data.getList();
// 用户接收调用腾讯接口成功的商品信息
Map<String, TsaProductInfoResDTO> map = resList.stream().collect(Collectors.toMap(TsaProductInfoResDTO::getProduct_outer_id, Function.identity(), (k1, k2) -> k1));
for (TsaAddProductListReqDTO reqDTO : list) {
SyncProductDO syncProductDO = syncProductConvertor.doToSyncProductDO(reqDTO);
TsaProductInfoResDTO tsaProductInfo = map.get(syncProductDO.getSkuKey());
if (Objects.nonNull(tsaProductInfo)) {
// 设置媒体侧商品id
syncProductDO.setTsaProdId(tsaProductInfo.getProduct_id());
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ONE);
// 设置同步信息
syncProductDO.setSyncMsg(tsaProductResDTO.getCode() + ":" + tsaProductResDTO.getMessage());
} else {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ZERO);
// 设置同步信息
syncProductDO.setSyncMsg("当前批次同步失败");
}
updateList.add(syncProductDO);
}
} else {
// 当前批次全部同步失败
for (TsaAddProductListReqDTO reqDTO : list) {
SyncProductDO syncProductDO = syncProductConvertor.doToSyncProductDO(reqDTO);
syncProductDO.setSyncMsg(tsaProductResDTO.getCode() + ":" + tsaProductResDTO.getMessage());
syncProductDO.setSyncStatus(BaseConstant.ZERO);
updateList.add(syncProductDO);
}
}
syncProductMapper.batchUpdate(updateList);
}
/**
* 插入媒体测商品信息
*/
public void assembleSaveInfo(List<TsaAddProductListReqDTO> list) {
// 要新增的商品信息
List<SyncProductDO> allList = new ArrayList<>();
// 调用腾讯添加商品接口
TsaProductResDTO tsaProductResDTO = tsaProductService.batchAddProduct(list);
if (Objects.nonNull(tsaProductResDTO) && tsaProductResDTO.getCode() == TsaRemoteConstant.SUCCESS_CODE
&& Objects.nonNull(tsaProductResDTO.getData()) && CollectionUtils.isNotEmpty(tsaProductResDTO.getData().getList())) {
TsaProductListResDTO data = tsaProductResDTO.getData();
List<TsaProductInfoResDTO> resList = data.getList();
// 用户接收调用腾讯接口成功的商品信息
Map<String, TsaProductInfoResDTO> map = resList.stream().collect(Collectors.toMap(TsaProductInfoResDTO::getProduct_outer_id, Function.identity(), (k1, k2) -> k1));
for (TsaAddProductListReqDTO reqDTO : list) {
SyncProductDO syncProductDO = syncProductConvertor.doToSyncProductDO(reqDTO);
TsaProductInfoResDTO tsaProductInfo = map.get(syncProductDO.getSkuKey());
if (Objects.nonNull(tsaProductInfo)) {
// 设置媒体侧商品id
syncProductDO.setTsaProdId(tsaProductInfo.getProduct_id());
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ONE);
// 设置同步信息
syncProductDO.setSyncMsg(tsaProductResDTO.getCode() + ":" + tsaProductResDTO.getMessage());
} else {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ZERO);
// 设置同步信息
syncProductDO.setSyncMsg("当前批次同步失败");
}
allList.add(syncProductDO);
}
} else {
// 当前批次全部同步失败
for (TsaAddProductListReqDTO reqDTO : list) {
SyncProductDO syncProductDO = syncProductConvertor.doToSyncProductDO(reqDTO);
syncProductDO.setSyncMsg(tsaProductResDTO.getCode() + ":" + tsaProductResDTO.getMessage());
syncProductDO.setSyncStatus(BaseConstant.ZERO);
allList.add(syncProductDO);
}
}
// 获取到商品编码
List<String> skuKeyList = allList.stream().map(SyncProductDO::getSkuKey).filter(s -> StringUtils.isNotEmpty(s)).collect(Collectors.toList());
// 查询中间表是否已有记录
List<SyncProductDO> centerSkuList = syncProductMapper.querySyncProductListBySkuKey(skuKeyList);
if (CollectionUtils.isEmpty(centerSkuList)) {
// 插入中间表记录商品同步信息
syncProductMapper.batchSave(allList);
} else {
Map<String, SyncProductDO> map = centerSkuList.stream().collect(Collectors.toMap(SyncProductDO::getSkuKey, Function.identity(), (k1, k2) -> k1));
// 批量新增的集合
List<SyncProductDO> addList = allList.stream().filter(s -> Objects.isNull(map.get(s.getSkuKey()))).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(addList)) {
// 插入中间表记录商品同步信息
syncProductMapper.batchSave(addList);
}
// 批量更新的集合
List<SyncProductDO> updateList = allList.stream().filter(s -> Objects.nonNull(map.get(s.getSkuKey()))).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(updateList)) {
syncProductMapper.batchUpdate(updateList);
}
}
}
/**
* 检查当前商品是否需要更新
*
* @param
*/
public boolean checkExecuteUpdate(DimSkuDO dimSkuDO, SyncProductDO syncDO) {
if (Objects.isNull(dimSkuDO) || Objects.isNull(syncDO)) {
return false;
}
// 商品名称不一致
if (StringUtils.isNotEmpty(dimSkuDO.getName()) &&
!dimSkuDO.getName().equals(syncDO.getName())) {
return true;
}
// 品牌名称不一致
if (StringUtils.isNotEmpty(dimSkuDO.getBrand_name()) &&
!dimSkuDO.getBrand_name().equals(syncDO.getBrandName())) {
return true;
}
// 商品主图url地址
if (StringUtils.isNotEmpty(dimSkuDO.getSku_image_url()) &&
!dimSkuDO.getSku_image_url().equals(syncDO.getSkuImageUrl())) {
return true;
}
// 一级分类 id
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_one_code()) &&
!dimSkuDO.getMdm_category_one_code().equals(syncDO.getMdmCategoryOneCode())) {
return true;
}
// 一级分类名称
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_one_name()) &&
!dimSkuDO.getMdm_category_one_name().equals(syncDO.getMdmCategoryOneName())) {
return true;
}
// 二级分类 id
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_two_code()) &&
!dimSkuDO.getMdm_category_two_code().equals(syncDO.getMdmCategoryTwoCode())) {
return true;
}
// 二级分类名称
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_two_name()) &&
!dimSkuDO.getMdm_category_two_name().equals(syncDO.getMdmCategoryTwoName())) {
return true;
}
// 三级分类 id
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_three_code()) &&
!dimSkuDO.getMdm_category_three_code().equals(syncDO.getMdmCategoryThreeCode())) {
return true;
}
// 三级分类名称
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_three_name()) &&
!dimSkuDO.getMdm_category_three_name().equals(syncDO.getMdmCategoryThreeName())) {
return true;
}
// 四级分类 id
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_four_code()) &&
!dimSkuDO.getMdm_category_four_code().equals(syncDO.getMdmCategoryFourCode())) {
return true;
}
// 四级分类名称
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_four_name()) &&
!dimSkuDO.getMdm_category_four_name().equals(syncDO.getMdmCategoryFourName())) {
return true;
}
// 是否同步成功
if (Objects.isNull(syncDO.getSyncStatus()) || syncDO.getSyncStatus() == 0) {
return true;
}
return false;
}
/**
* 获取hive中商品信息
*/
public List<DimSkuDO> getHiveProductInfoList() {
//实体中参数必须和sql中参数、顺序一致
String sql = "select sku_key,name,brand_name,sku_image_url,mdm_category_one_code,mdm_category_one_name," +
"mdm_category_two_code,mdm_category_two_name,mdm_category_three_code,mdm_category_three_name," +
"mdm_category_four_code,mdm_category_four_name from dim_sku";
List<DimSkuDO> hiveList = (List<DimSkuDO>) HiveJdbc.hiveSql(sql, DimSkuDO.class);
return hiveList;
}
/**
* 获取到同步商品中间表所有数据
*/
public List<SyncProductDO> getSyncProductInfoList() {
List<SyncProductDO> list = new ArrayList<>();
try {
// 分页获取全量数据
int pageNum = 0;
int pageSize = 100;
log.info("------getSyncProductInfoList开始-------");
while (true) {
List<SyncProductDO> productList = syncProductMapper.getAllProductList(pageNum * pageSize, pageSize);
if (productList == null) {
log.info("------getSyncProductInfoList全量同步结束-------");
break;
} else {
if (productList.size() < pageSize) {
list.addAll(productList);
log.info("------getSyncProductInfoList全量同步结束-------");
break;
}
list.addAll(productList);
}
pageNum++;
log.info("------getSyncProductInfoList当前的pageNum----:{}", pageNum);
}
return list;
} catch (Exception e) {
log.error("------getSyncProductInfoList出现异常-----");
}
return list;
}
/**
* 同步指定sku编码商品
*/
@Override
public Boolean syncSingleProduct(String skuKey) {
if (StringUtils.isEmpty(skuKey)) {
return false;
}
//实体中参数必须和sql中参数、顺序一致
String sql = "select sku_key,name,brand_name,sku_image_url,mdm_category_one_code,mdm_category_one_name," +
"mdm_category_two_code,mdm_category_two_name,mdm_category_three_code,mdm_category_three_name," +
"mdm_category_four_code,mdm_category_four_name from dim_sku where sku_key = " + skuKey + "";
List<DimSkuDO> hiveList = (List<DimSkuDO>) HiveJdbc.hiveSql(sql, DimSkuDO.class);
if (CollectionUtils.isEmpty(hiveList)) {
return false;
}
DimSkuDO skuDO = hiveList.get(0);
TsaAddProductListReqDTO tsaProductDto = syncProductConvertor.doToTsaAddProductListReqDTO(skuDO);
// 设置下架时间
tsaProductDto.setExpiration_time(BaseConstant.DEFAULT_EXPIRATION_TIME);
// 设置描述,默认商品名称
tsaProductDto.setDescription(skuDO.getName());
SyncProductDO syncProductDO = syncProductConvertor.doToSyncProductDO(tsaProductDto);
// 去中间表中查询是否存在
SyncProductDO centerDO = syncProductMapper.findProductInfoBySkuKey(skuDO.getSku_key());
if (Objects.isNull(centerDO) || StringUtils.isBlank(centerDO.getTsaProdId())) {
//调用腾讯添加商品接口
TsaProductResDTO tsaResDto = tsaProductService.addProduct(tsaProductDto);
if (Objects.nonNull(tsaResDto) && tsaResDto.getCode() == TsaRemoteConstant.SUCCESS_CODE
&& Objects.nonNull(tsaResDto.getData()) && CollectionUtils.isNotEmpty(tsaResDto.getData().getList())) {
TsaProductListResDTO data = tsaResDto.getData();
List<TsaProductInfoResDTO> resList = data.getList();
// 用户接收调用腾讯接口成功的商品信息
Map<String, TsaProductInfoResDTO> map = resList.stream().collect(Collectors.toMap(TsaProductInfoResDTO::getProduct_outer_id, Function.identity(), (k1, k2) -> k1));
TsaProductInfoResDTO tsaProductResInfo = map.get(syncProductDO.getSkuKey());
if (Objects.nonNull(tsaProductResInfo)) {
// 设置媒体侧商品id
syncProductDO.setTsaProdId(tsaProductResInfo.getProduct_id());
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ONE);
// 设置同步信息
syncProductDO.setSyncMsg(tsaResDto.getCode() + ":" + tsaResDto.getMessage());
} else {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ZERO);
// 设置同步信息
syncProductDO.setSyncMsg(tsaResDto.getCode() + ":" + tsaResDto.getMessage());
}
} else {
syncProductDO.setSyncMsg(tsaResDto.getCode() + ":" + tsaResDto.getMessage());
syncProductDO.setSyncStatus(BaseConstant.ZERO);
}
// 查询该商品是否已经在中间表中
SyncProductDO productDO = syncProductMapper.findProductInfoBySkuKey(syncProductDO.getSkuKey());
if (Objects.isNull(productDO)) {
// 不存在,插入
syncProductMapper.save(syncProductDO);
}else {
// 反之,更新
syncProductMapper.update(syncProductDO);
}
} else if (checkExecuteUpdate(skuDO, centerDO)) {
// 创建更新商品请求对象
TsaUpdateProductReqDTO tsaUpdateReqDto = new TsaUpdateProductReqDTO();
tsaUpdateReqDto.setProduct_outer_id(tsaProductDto.getProduct_outer_id());
tsaUpdateReqDto.setExpiration_time(Integer.valueOf(tsaProductDto.getExpiration_time()));
// 调用腾讯更新商品接口
TsaProductResDTO tsaUpdateProductResDTO = tsaProductService.updateProduct(tsaUpdateReqDto);
if (Objects.nonNull(tsaUpdateProductResDTO) && tsaUpdateProductResDTO.getCode() == 0) {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ONE);
// 设置同步信息
syncProductDO.setSyncMsg(tsaUpdateProductResDTO.getCode() + ":" + tsaUpdateProductResDTO.getMessage());
} else {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ZERO);
// 设置同步信息
syncProductDO.setSyncMsg(tsaUpdateProductResDTO.getCode() + ":" + tsaUpdateProductResDTO.getMessage());
}
// 更新中间表
syncProductMapper.update(syncProductDO);
}
return true;
}
}
package com.rome.order.domain.service.impl;
import com.rome.order.api.dto.product.req.TsaAddProductListReqDTO;
import com.rome.order.api.dto.product.req.TsaBatchUpdateProductInfoReqDTO;
import com.rome.order.api.dto.product.res.TsaProductInfoResDTO;
import com.rome.order.api.dto.product.res.TsaProductListResDTO;
import com.rome.order.api.dto.product.res.TsaProductResDTO;
import com.rome.order.domain.constant.BaseConstant;
import com.rome.order.domain.convertor.SyncProductConvertor;
import com.rome.order.domain.service.SyncTsaProductV2Service;
import com.rome.order.domain.service.TsaProductService;
import com.rome.order.domain.util.HiveJdbc;
import com.rome.order.infrastructure.dataobject.DimSkuDO;
import com.rome.order.infrastructure.dataobject.SyncProductDO;
import com.rome.order.infrastructure.mapper.SyncProductMapper;
import com.rome.order.infrastructure.remote.constant.TsaRemoteConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
@Service
public class SyncTsaProductV2ServiceImpl implements SyncTsaProductV2Service {
@Resource
private TsaProductService tsaProductService;
@Resource
private SyncProductMapper syncProductMapper;
@Resource
private SyncProductConvertor syncProductConvertor;
/**
* 获取hive中商品信息
*/
public List<DimSkuDO> getHiveProductInfoPage(Integer pageNum, Integer pageSize) {
//实体中参数必须和sql中参数、顺序一致
String sql = "select sku_key,name,brand_name,sku_image_url,mdm_category_one_code,mdm_category_one_name," +
"mdm_category_two_code,mdm_category_two_name,mdm_category_three_code,mdm_category_three_name," +
"mdm_category_four_code,mdm_category_four_name from dim_sku limit " + pageNum + "," + pageSize;
List<DimSkuDO> hiveList = (List<DimSkuDO>) HiveJdbc.hiveSql(sql, DimSkuDO.class);
return hiveList;
}
@Override
public void syncAllProduct() {
try {
// 分页获取全量数据
int pageNum = 0;
int pageSize = 50;
log.info("------syncAllProduct同步商品开始-------");
while (true) {
List<DimSkuDO> hiveList = this.getHiveProductInfoPage(pageNum * pageSize, pageSize);
if (hiveList == null) {
log.info("------syncAllProduct全量同步结束-------");
break;
} else {
if (hiveList.size() < pageSize) {
this.dealthProductList(hiveList);
log.info("------syncAllProduct全量同步结束-------");
break;
}
this.dealthProductList(hiveList);
}
pageNum++;
log.info("------syncAllProduct当前的pageNum----:{}", pageNum);
}
} catch (Exception e) {
log.error("------syncAllProduct出现异常-----异常信息={}", e);
}
}
public void dealthProductList(List<DimSkuDO> hiveList) {
if (CollectionUtils.isEmpty(hiveList)) {
return;
}
// 获取到本次同步的skuKey
List<String> skuKeyList = hiveList.stream().map(DimSkuDO::getSku_key).filter(s -> StringUtils.isNotEmpty(s)).distinct().collect(Collectors.toList());
// 查询中间表
List<SyncProductDO> centerList = syncProductMapper.querySyncProductListBySkuKey(skuKeyList);
if (CollectionUtils.isEmpty(centerList)) {
// 本次全部新增
this.dealthAddProduct(hiveList);
} else {
List<DimSkuDO> batchAdd = new ArrayList<>();
List<DimSkuDO> batchUpdate = new ArrayList<>();
Map<String, SyncProductDO> syncProductMap = centerList.stream().collect(Collectors.toMap(SyncProductDO::getSkuKey, Function.identity(), (k1, k2) -> k1));
for (DimSkuDO dimSkuDO : hiveList) {
SyncProductDO syncProductDO = syncProductMap.get(dimSkuDO.getSku_key());
if (Objects.isNull(syncProductDO) || StringUtils.isBlank(syncProductDO.getTsaProdId())) {
batchAdd.add(dimSkuDO);
} else if (checkExecuteUpdate(dimSkuDO, syncProductDO)) {
batchUpdate.add(dimSkuDO);
}
}
if (CollectionUtils.isNotEmpty(batchAdd)) {
try {
//需要根据skuKey进行去重
batchAdd = batchAdd.stream().collect(Collectors.collectingAndThen(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getSku_key()))), ArrayList::new));
this.dealthAddProduct(batchAdd);
} catch (Exception e) {
log.error("批量添加商品发送异常,异常信息={}", e);
}
}
// if (CollectionUtils.isNotEmpty(batchUpdate)) {
// try {
// this.dealthUpdateProduct(batchUpdate);
// } catch (Exception e) {
// log.error("批量更新商品信息发生异常,异常信息={}", e);
// }
// }
}
}
/**
* 批量同步商品
*/
public void dealthAddProduct(List<DimSkuDO> list) {
// 所有集合list
List<SyncProductDO> allList = new ArrayList<>();
// 组装请求腾讯接口数据
List<TsaAddProductListReqDTO> syncTsaList = new ArrayList<>();
for (DimSkuDO dimSkuDO : list) {
// 过滤掉名称为空的商品
if (StringUtils.isBlank(dimSkuDO.getName())) {
continue;
}
TsaAddProductListReqDTO tsaAddProductListReqDTO = syncProductConvertor.doToTsaAddProductListReqDTO(dimSkuDO);
tsaAddProductListReqDTO.setExpiration_time(BaseConstant.DEFAULT_EXPIRATION_TIME);
tsaAddProductListReqDTO.setDescription(dimSkuDO.getName());
syncTsaList.add(tsaAddProductListReqDTO);
}
// 调用腾讯添加商品接口
TsaProductResDTO tsaResDto = tsaProductService.batchAddProduct(syncTsaList);
if (Objects.nonNull(tsaResDto) && tsaResDto.getCode() == TsaRemoteConstant.SUCCESS_CODE
&& Objects.nonNull(tsaResDto.getData()) && CollectionUtils.isNotEmpty(tsaResDto.getData().getList())) {
TsaProductListResDTO data = tsaResDto.getData();
List<TsaProductInfoResDTO> resList = data.getList();
// 用户接收调用腾讯接口成功的商品信息
Map<String, TsaProductInfoResDTO> map = resList.stream().collect(Collectors.toMap(TsaProductInfoResDTO::getProduct_outer_id, Function.identity(), (k1, k2) -> k1));
for (TsaAddProductListReqDTO reqDTO : syncTsaList) {
SyncProductDO syncProductDO = syncProductConvertor.doToSyncProductDO(reqDTO);
TsaProductInfoResDTO tsaProductInfo = map.get(syncProductDO.getSkuKey());
if (Objects.nonNull(tsaProductInfo)) {
// 设置媒体侧商品id
syncProductDO.setTsaProdId(tsaProductInfo.getProduct_id());
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ONE);
// 设置同步信息
syncProductDO.setSyncMsg(tsaResDto.getCode() + ":" + tsaResDto.getMessage());
} else {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ZERO);
// 设置同步信息
syncProductDO.setSyncMsg("当前批次同步失败");
}
allList.add(syncProductDO);
}
} else {
// 当前批次全部同步失败
for (TsaAddProductListReqDTO reqDTO : syncTsaList) {
SyncProductDO syncProductDO = syncProductConvertor.doToSyncProductDO(reqDTO);
syncProductDO.setSyncMsg(tsaResDto.getCode() + ":" + tsaResDto.getMessage());
syncProductDO.setSyncStatus(BaseConstant.ZERO);
allList.add(syncProductDO);
}
}
// 获取到商品编码
List<String> skuKeyList = allList.stream().map(SyncProductDO::getSkuKey).filter(s -> StringUtils.isNotEmpty(s)).collect(Collectors.toList());
// 查询中间表是否已有记录
List<SyncProductDO> centerSkuList = syncProductMapper.querySyncProductListBySkuKey(skuKeyList);
if (CollectionUtils.isEmpty(centerSkuList)) {
// 插入中间表记录商品同步信息
syncProductMapper.batchSave(allList);
} else {
Map<String, SyncProductDO> map = centerSkuList.stream().collect(Collectors.toMap(SyncProductDO::getSkuKey, Function.identity(), (k1, k2) -> k1));
// 批量新增的集合
List<SyncProductDO> addList = allList.stream().filter(s -> Objects.isNull(map.get(s.getSkuKey()))).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(addList)) {
// 插入中间表记录商品同步信息
syncProductMapper.batchSave(addList);
}
// 批量更新的集合
List<SyncProductDO> updateList = allList.stream().filter(s -> Objects.nonNull(map.get(s.getSkuKey()))).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(updateList)) {
syncProductMapper.batchUpdate(updateList);
}
}
}
/**
* 批量更新商品
*/
public void dealthUpdateProduct(List<DimSkuDO> list) {
// 创建插入中间list
List<SyncProductDO> updateList = new ArrayList<>();
// 组装请求腾讯接口数据
List<TsaBatchUpdateProductInfoReqDTO> syncTsaList = new ArrayList<>();
for (DimSkuDO dimSkuDO : list) {
// 创建调用腾讯更新商品接口
TsaBatchUpdateProductInfoReqDTO updateTsaReq = new TsaBatchUpdateProductInfoReqDTO();
updateTsaReq.setExpiration_time(BaseConstant.DEFAULT_EXPIRATION_TIME);
updateTsaReq.setProduct_outer_id(dimSkuDO.getSku_key());
syncTsaList.add(updateTsaReq);
}
// 调用更新接口
TsaProductResDTO tsaProductResDTO = tsaProductService.batchUpdateProduct(syncTsaList);
if (Objects.nonNull(tsaProductResDTO) && tsaProductResDTO.getCode() == TsaRemoteConstant.SUCCESS_CODE
&& Objects.nonNull(tsaProductResDTO.getData()) && CollectionUtils.isNotEmpty(tsaProductResDTO.getData().getList())) {
TsaProductListResDTO data = tsaProductResDTO.getData();
List<TsaProductInfoResDTO> resList = data.getList();
// 用户接收调用腾讯接口成功的商品信息
Map<String, TsaProductInfoResDTO> map = resList.stream().collect(Collectors.toMap(TsaProductInfoResDTO::getProduct_outer_id, Function.identity(), (k1, k2) -> k1));
for (TsaBatchUpdateProductInfoReqDTO reqDTO : syncTsaList) {
SyncProductDO syncProductDO = new SyncProductDO();
syncProductDO.setSkuKey(reqDTO.getProduct_outer_id());
TsaProductInfoResDTO tsaProductInfo = map.get(syncProductDO.getSkuKey());
if (Objects.nonNull(tsaProductInfo)) {
// 设置媒体侧商品id
syncProductDO.setTsaProdId(tsaProductInfo.getProduct_id());
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ONE);
// 设置同步信息
syncProductDO.setSyncMsg(tsaProductResDTO.getCode() + ":" + tsaProductResDTO.getMessage());
} else {
// 设置同步状态
syncProductDO.setSyncStatus(BaseConstant.ZERO);
// 设置同步信息
syncProductDO.setSyncMsg("当前批次同步失败");
}
updateList.add(syncProductDO);
}
} else {
// 当前批次全部同步失败
for (TsaBatchUpdateProductInfoReqDTO reqDTO : syncTsaList) {
SyncProductDO syncProductDO = new SyncProductDO();
syncProductDO.setSkuKey(reqDTO.getProduct_outer_id());
syncProductDO.setSyncMsg(tsaProductResDTO.getCode() + ":" + tsaProductResDTO.getMessage());
syncProductDO.setSyncStatus(BaseConstant.ZERO);
updateList.add(syncProductDO);
}
}
syncProductMapper.batchUpdate(updateList);
}
/**
* 检查当前商品是否需要更新
*
* @param
*/
public boolean checkExecuteUpdate(DimSkuDO dimSkuDO, SyncProductDO syncDO) {
if (Objects.isNull(dimSkuDO) || Objects.isNull(syncDO)) {
return false;
}
// 商品名称不一致
if (StringUtils.isNotEmpty(dimSkuDO.getName()) &&
!dimSkuDO.getName().equals(syncDO.getName())) {
return true;
}
// 品牌名称不一致
if (StringUtils.isNotEmpty(dimSkuDO.getBrand_name()) &&
!dimSkuDO.getBrand_name().equals(syncDO.getBrandName())) {
return true;
}
// 商品主图url地址
if (StringUtils.isNotEmpty(dimSkuDO.getSku_image_url()) &&
!dimSkuDO.getSku_image_url().equals(syncDO.getSkuImageUrl())) {
return true;
}
// 一级分类 id
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_one_code()) &&
!dimSkuDO.getMdm_category_one_code().equals(syncDO.getMdmCategoryOneCode())) {
return true;
}
// 一级分类名称
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_one_name()) &&
!dimSkuDO.getMdm_category_one_name().equals(syncDO.getMdmCategoryOneName())) {
return true;
}
// 二级分类 id
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_two_code()) &&
!dimSkuDO.getMdm_category_two_code().equals(syncDO.getMdmCategoryTwoCode())) {
return true;
}
// 二级分类名称
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_two_name()) &&
!dimSkuDO.getMdm_category_two_name().equals(syncDO.getMdmCategoryTwoName())) {
return true;
}
// 三级分类 id
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_three_code()) &&
!dimSkuDO.getMdm_category_three_code().equals(syncDO.getMdmCategoryThreeCode())) {
return true;
}
// 三级分类名称
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_three_name()) &&
!dimSkuDO.getMdm_category_three_name().equals(syncDO.getMdmCategoryThreeName())) {
return true;
}
// 四级分类 id
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_four_code()) &&
!dimSkuDO.getMdm_category_four_code().equals(syncDO.getMdmCategoryFourCode())) {
return true;
}
// 四级分类名称
if (StringUtils.isNotEmpty(dimSkuDO.getMdm_category_four_name()) &&
!dimSkuDO.getMdm_category_four_name().equals(syncDO.getMdmCategoryFourName())) {
return true;
}
// 是否同步成功
if (Objects.isNull(syncDO.getSyncStatus()) || syncDO.getSyncStatus() == 0) {
return true;
}
return false;
}
}
//package com.rome.order.domain.task;
//
//import com.rome.order.domain.service.SyncEmployeService;
//import lombok.extern.slf4j.Slf4j;
//import org.apache.shardingsphere.elasticjob.api.ShardingContext;
//import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//import java.util.ArrayList;
//import java.util.List;
//
///**
// * @Author: zhoupeng
// * @createTime: 2023年04月27日 10:42:35
// * @version: 1.0
// * @Description: 全量同步查询员工(基础信息),每天凌晨12点钟执行
// * @copyright:
// */
//@Component
//@Slf4j
//public class SynAllEmpBaseJob implements DataflowJob {
//
// @Resource
// private SyncEmployeService syncEmployeService;
//
// @Override
// public List fetchData(ShardingContext shardingContext) {
// List list=new ArrayList();
// list.add("ceshi");
// return list;
// }
//
// @Override
// public void processData(ShardingContext shardingContext, List list) {
// log.info("------synAllEmpBaseJob全量同步查询员工(基础信息) 定时器-------");
// syncEmployeService.synAllEmpBase(null);
// }
//}
package com.rome.order.domain.task;
import com.rome.order.domain.service.SyncEmployeService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zhoupeng
* @createTime: 2023年04月27日 10:47:44
* @version: 1.0
* @Description: 同步根据更新时间查询员工(基础信息) 1小时一次
* @copyright:
*/
@Component
@Slf4j
public class SynByStartDateEmpBaseJob implements DataflowJob {
@Autowired
private SyncEmployeService syncEmployeService;
@Override
public List fetchData(ShardingContext shardingContext) {
List list=new ArrayList();
list.add("ceshi");
return list;
}
@Override
public void processData(ShardingContext shardingContext, List list) {
log.info("------synByStartDateEmpBaseJob同步根据更新时间查询员工(基础信息) 定时器-------");
syncEmployeService.synByStartDateEmpBase(null);
}
}
package com.rome.order.domain.task;
import com.rome.order.domain.service.SyncEmployeService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zhoupeng
* @createTime: 2023年04月27日 10:52:04
* @version: 1.0
* @Description: 同步失败员工信息补偿 半个小时一次
* @copyright:
*/
@Component
@Slf4j
public class SynFailEmployeeJob implements DataflowJob {
@Autowired
private SyncEmployeService syncEmployeService;
@Override
public List fetchData(ShardingContext shardingContext) {
List list=new ArrayList();
list.add("ceshi");
return list;
}
@Override
public void processData(ShardingContext shardingContext, List list) {
log.info("------synFailEmployeeJob同步失败员工信息补偿 定时器-------");
syncEmployeService.synFailEmployee();
}
}
package com.rome.order.domain.task;
import com.rome.order.domain.service.SyncEmployeService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zhoupeng
* @createTime: 2023年04月27日 10:50:17
* @version: 1.0
* @Description: 同步失败绑定第三方账号进行补偿 半个小时一次
* @copyright:
*/
@Component
@Slf4j
public class SynFailGenerateAccountJob implements DataflowJob {
@Autowired
private SyncEmployeService syncEmployeService;
@Override
public List fetchData(ShardingContext shardingContext) {
List list=new ArrayList();
list.add("ceshi");
return list;
}
@Override
public void processData(ShardingContext shardingContext, List list) {
log.info("------synFailGenerateAccountJob同步失败绑定第三方账号进行补偿 定时器-------");
syncEmployeService.synFailGenerateAccount();
}
}
package com.rome.order.domain.task;
import com.rome.order.domain.service.OrganizationService;
import com.rome.order.domain.service.OrganizationV2Service;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zhoupeng
* @createTime: 2023年04月27日 10:56:53
* @version: 1.0
* @Description:
* @copyright:
*/
@Component
@Slf4j
public class SynFailOrgJob implements DataflowJob {
@Autowired
private OrganizationService organizationService;
@Autowired
private OrganizationV2Service organizationV2Service;
@Override
public List fetchData(ShardingContext shardingContext) {
List list=new ArrayList();
list.add("ceshi");
return list;
}
@Override
public void processData(ShardingContext shardingContext, List list) {
log.info("------synFailOrgJob同步失败的组织机构任务补偿 定时器-------");
organizationV2Service.synFailOrg();
}
}
package com.rome.order.domain.task;
import com.rome.order.domain.service.OrganizationService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zhoupeng
* @createTime: 2023年04月27日 10:53:57
* @version: 1.0
* @Description: syncAllOrg同步组织架构 每天晚上11点
* @copyright:
*/
@Component
@Slf4j
public class SyncAllOrgJob implements DataflowJob {
@Autowired
private OrganizationService organizationService;
@Override
public List fetchData(ShardingContext shardingContext) {
List list=new ArrayList();
list.add("ceshi");
return list;
}
@Override
public void processData(ShardingContext shardingContext, List list) {
log.info("------syncAllOrgJob同步组织架构 定时器-------");
organizationService.synAllOrg(null);
}
}
package com.rome.order.domain.task;
import com.rome.order.domain.service.SyncTsaProductV2Service;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: zhoupeng
* @createTime: 2023年04月27日 10:53:57
* @version: 1.0
* @Description: syncAllOrg同步组织架构 每天晚上11点
* @copyright:
*/
@Component
@Slf4j
public class SyncProductJob implements DataflowJob {
@Autowired
private SyncTsaProductV2Service syncTsaProductV2Service;
@Override
public List fetchData(ShardingContext shardingContext) {
List list=new ArrayList();
list.add("ceshi");
return list;
}
@Override
public void processData(ShardingContext shardingContext, List list) {
log.info("------SyncProductJob同步商品 定时器-------");
syncTsaProductV2Service.syncAllProduct();
}
}
package com.rome.order.domain.util;
import org.apache.commons.net.util.Base64;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Date;
/**
* @Author: zhoupeng
* @createTime: 2023年08月28日 17:39:12
* @version: 1.0
* @Description:
* @copyright:
*/
public class HmacSHA256Util {
/**
* HmacSHA256算法,返回的结果始终是32位
*
* @param key 加密的键,可以是任何数据
* @param content 待加密的内容
* @return 加密后的内容
* @throws Exception
*/
public static byte[] hmacSHA256(byte[] key, byte[] content) throws Exception {
Mac hmacSha256 = Mac.getInstance("HmacSHA256");
hmacSha256.init(new SecretKeySpec(key, 0, key.length, "HmacSHA256"));
byte[] hmacSha256Bytes = hmacSha256.doFinal(content);
return hmacSha256Bytes;
}
/**
* 将加密后的字节数组转换成字符串
*
* @param b 字节数组
* @return 字符串
*/
public static String byteArrayToHexString(byte[] b) {
StringBuilder hs = new StringBuilder();
String stmp;
for (int n = 0; b != null && n < b.length; n++) {
stmp = Integer.toHexString(b[n] & 0XFF);
if (stmp.length() == 1)
hs.append('0');
hs.append(stmp);
}
return hs.toString().toLowerCase();
}
/**
* sha256_HMAC加密
*
* @param message 消息
* @param secret 秘钥
* @return 加密后字符串
*/
public static String hmacSHA256(String secret, String message) throws Exception {
String hash = "";
Mac hmacSha256 = Mac.getInstance("HmacSHA256");
SecretKeySpec secret_key = new SecretKeySpec(secret.getBytes(), "HmacSHA256");
hmacSha256.init(secret_key);
byte[] bytes = hmacSha256.doFinal(message.getBytes());
hash = byteArrayToHexString(bytes);
return hash;
}
/**
* 获取请求头sign
* @return
*/
public static String getSign(String secret,String message){
try {
String str = HmacSHA256Util.hmacSHA256(secret, message);
String encodeStr = Base64.encodeBase64String(str.getBytes());
return encodeStr;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static void main(String[] args) {
String enterpriseId = "10000001";
String timestamp = DateUtils.convert(new Date(), DateUtils.YYYYMMDDHHMMSS);
// HMAC-SHA256(enterpriseId+ timestamp+去除节点间的空格及换行符的请求体)
String message = enterpriseId + timestamp;
System.out.println("加密前 =========>" + message);
String str = "";
try {
str = HmacSHA256Util.hmacSHA256(enterpriseId, timestamp);
String encodeStr = Base64.encodeBase64String(str.getBytes());
System.out.println("加密后 =========>" + str);
} catch (Exception e) {
System.out.println("Error HmacSHA256 ===========>" + e.getMessage());
}
}
}
package com.rome.order.infrastructure.message.consumer;
import com.alibaba.fastjson.JSON;
import com.rome.arch.core.exception.RomeException;
import com.rome.order.domain.constant.MQConstants;
import com.rome.order.domain.enums.RomeExceptionEnum;
import com.rome.order.domain.service.CusOffAccWecomService;
import com.rome.order.infrastructure.message.dto.MqOffAccAndWecomDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 沉煜科技
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MQConstants.CRM_TOPIC, selectorExpression = MQConstants.WX_LYF_CUS_OFFACC_FOLLOW_OR_CANCEL_TAG, consumerGroup = MQConstants.LYF_QIDIAN_API_OFFACC_FOLLOW_OR_CANCEL)
public class CusOffAccFollowOrCancleConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Autowired
private CusOffAccWecomService offAccFollowOrCancelService;
@Override
public void onMessage(MessageExt messageExt) {
MqOffAccAndWecomDTO offAccDTO = JSON.parseObject(messageExt.getBody(), MqOffAccAndWecomDTO.class);
log.info("公众号关注取关消息体offAccDTO:{}",JSON.toJSONString(offAccDTO));
try {
offAccFollowOrCancelService.offAccFollowOrCancel(offAccDTO);
} catch (Exception e) {
log.error("公众号关注取关处理异常,{}", JSON.toJSONString(offAccDTO));
throw new RomeException(RomeExceptionEnum.MQ_EXCEPTION.getCode(),RomeExceptionEnum.MQ_EXCEPTION.getMsg());
}
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMqPushConsumer) {
defaultMqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultMqPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
}
}
package com.rome.order.infrastructure.message.consumer;
import com.alibaba.fastjson.JSON;
import com.rome.arch.core.exception.RomeException;
import com.rome.order.domain.constant.MQConstants;
import com.rome.order.domain.enums.RomeExceptionEnum;
import com.rome.order.domain.service.CusOffAccWecomService;
import com.rome.order.infrastructure.message.dto.MqOffAccAndWecomDTO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 沉煜科技
*/
@Slf4j
@Component
@RocketMQMessageListener(topic = MQConstants.CRM_TOPIC, selectorExpression = MQConstants.WX_LYF_CUS_WECOM_ADD_OR_DELETE_TAG, consumerGroup = MQConstants.LYF_QIDIAN_API_WECOM_ADD_OR_DELETE)
public class WecomAddOrDeleteConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Autowired
private CusOffAccWecomService cusOffAccWecomService;
@Override
public void onMessage(MessageExt messageExt) {
MqOffAccAndWecomDTO offAccDTO = JSON.parseObject(messageExt.getBody(), MqOffAccAndWecomDTO.class);
log.info("用户添加or删除来伊份门店导购企微消息体offAccDTO:{}",JSON.toJSONString(offAccDTO));
try {
cusOffAccWecomService.wecomAddOrDelete(offAccDTO);
} catch (Exception e) {
log.error("用户添加or删除来伊份门店导购企微处理异常,{}", JSON.toJSONString(offAccDTO));
throw new RomeException(RomeExceptionEnum.MQ_EXCEPTION.getCode(),RomeExceptionEnum.MQ_EXCEPTION.getMsg());
}
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMqPushConsumer) {
defaultMqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultMqPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
}
}
...@@ -54,44 +54,8 @@ server: ...@@ -54,44 +54,8 @@ server:
io-threads: 16 io-threads: 16
buffer-size: 1024 buffer-size: 1024
port: 8089 port: 8089
elasticjob:
jobs:
SynByStartDateEmpBaseJob:
shardingItemParameters: 0=0
elasticJobClass: com.rome.order.domain.task.SynByStartDateEmpBaseJob
cron: 0 0 0/1 * * ?
shardingTotalCount: 1
# SynFailEmployeeJob:
# shardingItemParameters: 0=0
# elasticJobClass: com.rome.order.domain.task.SynFailEmployeeJob
# cron: 0 0/30 * * * ?
# shardingTotalCount: 1
# SyncAllOrgJob:
# shardingItemParameters: 0=0
# elasticJobClass: com.rome.order.domain.task.SyncAllOrgJob
# cron: 0 0 23 * * ?
# shardingTotalCount: 1
# SynFailOrgJob:
# shardingItemParameters: 0=0
# elasticJobClass: com.rome.order.domain.task.SynFailOrgJob
# cron: 0 0/30 * * * ?
# shardingTotalCount: 1
# SyncProductJob:
# shardingItemParameters: 0=0
# elasticJobClass: com.rome.order.domain.task.SyncProductJob
# cron: 0 0 23 * * ?
# shardingTotalCount: 1
reg-center:
max-sleep-time-milliseconds: 8000
server-lists: '10.6.5.66:2181,10.6.5.67:2181,10.6.5.68:2181'
base-sleep-time-milliseconds: 8000
namespace: lyf-beidou-cdp-api
rocketmq:
nameServer: '10.6.5.12:9876'
defaultBigRetryNum: 30
producer:
group: lyf-beidou-cdp-api
defaultSmallRetryNum: 20
eureka: eureka:
client: client:
serviceUrl: serviceUrl:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment