// 分页查询方法
public static Query getQuery(long startId) {
Query query = new Query()
.addCriteria(Criteria.where("_id").gt(startId))
.with(Sort.by(new Sort.Order(Sort.Direction.ASC, "_id")))
.limit(5000);
return query;
}
```java
private static final String REGEX = "^((13[0-9])|(14[5,7])|(15[0-3,5-9])|(17[0,3,5-8])|(18[0-9])|166|198|199|(147))\\d{8}$";
private static final AtomicInteger saveCount = new AtomicInteger();
private static final AtomicInteger phoneIsNullCount = new AtomicInteger();
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
@Autowired
MongoTemplate mongoTemplate;
@Autowired
SendMessageImMapper sendMessageImMapper;
public String selectUserByProperty() {
ExecutorService serviceExecutor = Executors.newFixedThreadPool(50);
List<Future<Boolean>> futureList = new ArrayList<>();
long lastDataResultId = 0;
Integer dealDataCount = 0;
int i = (1676955 / 5000) + 1;
for (int k = 0; k < i; k++) {
List<CompanyInfoFromMongoDocument> mongoDocuments = mongoTemplate.find(
getQuery(lastDataResultId), CompanyInfoFromMongoDocument.class);
log.info("====>数据信息分批处理-mongoDocuments数据大小:{}", mongoDocuments.size());
dealDataCount = dealDataCount + mongoDocuments.size();
log.info("====>数据处理大小:{}", mongoDocuments.size());
if (CollectionUtils.isNotEmpty(mongoDocuments)) {
for (CompanyInfoFromMongoDocument po: mongoDocuments) {
Future<Boolean> future = serviceExecutor.submit(() -> {
MongoDBPO mongodpo = new MongoDBPO();
String phoneNumber = po.getPhoneNumber();// 电话
if (StringUtils.isNotBlank(phoneNumber)) {
Pattern PATTERN_REGEX = Pattern.compile(REGEX);
Matcher matcher = PATTERN_REGEX.matcher(phoneNumber);
if (matcher.find()) {
sendMessageImMapper.saveMongoDBData(tianYanChaPO);
if (saveCount.get() % 2000 == 0) {
log.info("====>已保存用户数量:{}", saveCount.get());
}
}
} else {
phoneIsNullCount.incrementAndGet();
}
}, true);
lastDataResultId = mongoDocuments.get(mongoDocuments.size() - 1).getId();
futureList.add(future);
}
}
futureList.forEach(f -> {
try {
f.get(10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
f.cancel(true);
} catch (InterruptedException | ExecutionException e) {
log.info("处理异常:{}", e);
}
});
log.info("时间:{}, 起始查询id:{}, phone是空的数量:{}", dateFormat.format(new Date()), lastDataResultId, phoneIsNullCount.get());
}
return "数据处理结束";
}
```
原文链接: https://onlyou.blog.csdn.net//article/details/102505314