PersonMergeV2Impl.java 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package com.winhc.service.impl;
  2. import com.winhc.common.enums.CompanyEnum;
  3. import com.winhc.config.ConfigConstant;
  4. import com.winhc.kafka.KafkaProduce;
  5. import com.winhc.pojo.MergePerson;
  6. import com.winhc.service.RelationService;
  7. import com.winhc.utils.CompanyUtils;
  8. import lombok.AllArgsConstructor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.apache.commons.lang3.StringUtils;
  11. import org.neo4j.driver.Driver;
  12. import org.neo4j.driver.Session;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.beans.factory.annotation.Qualifier;
  15. import org.springframework.stereotype.Service;
  16. import java.util.Collection;
  17. import java.util.HashMap;
  18. import java.util.List;
  19. import java.util.Map;
  20. /**
  21. * @author π
  22. * @Description:人员合并
  23. * @date 2021/1/11 10:03
  24. */
  25. @Slf4j
  26. @Service("personMergeV2Impl")
  27. @AllArgsConstructor
  28. public class PersonMergeV2Impl implements RelationService {
  29. @Autowired
  30. @Qualifier("DriverV1")
  31. Driver driver;
  32. @Autowired
  33. KafkaProduce kafkaProduce;
  34. @Autowired
  35. ConfigConstant configConstant;
  36. @Override
  37. public String save(List<Map<String, Object>> batch_list) {
  38. if (batch_list.isEmpty()) return null;
  39. long start = System.currentTimeMillis();
  40. Session session = driver.session();
  41. final String cql =
  42. "\nWITH {batch_list} AS batch_list \n" +
  43. "UNWIND batch_list AS row \n" +
  44. "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
  45. "WHERE ID(p) <> ID(q)\n" +
  46. "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(q))) as all_ids\n" +
  47. "UNWIND all_ids as all_id\n" +
  48. "MATCH(m:" + CompanyEnum.Lable.PERSON.code + ")-[r]-(x:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
  49. "WHERE ID(m) = all_id\n" +
  50. "WITH person_id, first_node, m as other_node, x, r\n" +
  51. "CALL apoc.merge.relationship.eager(first_node, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
  52. "SET first_node:" + CompanyUtils.getIncrPersonLabelV2("新增", CompanyEnum.SPLIT_HOUR) + "\n" +
  53. "SET other_node:" + CompanyUtils.getIncrPersonLabelV2("删除", CompanyEnum.SPLIT_HOUR) + "\n" +
  54. "DELETE r" + "\n" +
  55. "RETURN first_node.person_id as new_human_pid,first_node.name as new_human_name,other_node.person_id as old_human_pid,other_node.name as old_human_name" + "\n";
  56. log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
  57. String data = CompanyUtils.writeNeo4j2(session, cql, new HashMap<String, Object>() {{
  58. put("batch_list", batch_list);
  59. }});
  60. session.close();
  61. log.info("class:{} | save size:{} | cost:{}", PersonMergeV2Impl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
  62. //发送变更人员记录
  63. if (StringUtils.isNotBlank(data)) {
  64. kafkaProduce.produce(configConstant.topic_pid_update_v1, data);
  65. }
  66. return data;
  67. }
  68. }