Skip to content
Snippets Groups Projects
Commit 50acddbd authored by fz2907's avatar fz2907
Browse files

Optimised the code with parallel stream.

parent 235633c7
No related branches found
No related tags found
2 merge requests!34Sprint 2 done,!311. add vuex
/**
* This class contains the business logic about find matches between offers and wishlist items
*/
package vt.CS5934.SwitchRoom.jobs; package vt.CS5934.SwitchRoom.jobs;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -13,6 +17,8 @@ import vt.CS5934.SwitchRoom.utility.LookupTables; ...@@ -13,6 +17,8 @@ import vt.CS5934.SwitchRoom.utility.LookupTables;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static vt.CS5934.SwitchRoom.utility.UsefulTools.DATE_FORMAT; import static vt.CS5934.SwitchRoom.utility.UsefulTools.DATE_FORMAT;
...@@ -48,17 +54,22 @@ public class OfferWishlistMatchingJob { ...@@ -48,17 +54,22 @@ public class OfferWishlistMatchingJob {
lastPullTimeMap.put(LookupTables.OFFER_JOB_DB_ID, null); lastPullTimeMap.put(LookupTables.OFFER_JOB_DB_ID, null);
lastPullTimeMap.put(LookupTables.WISHLIST_JOB_DB_ID, null); lastPullTimeMap.put(LookupTables.WISHLIST_JOB_DB_ID, null);
lastPullTimeMap.put(LookupTables.MATCHING_JOB_DB_ID, null); lastPullTimeMap.put(LookupTables.MATCHING_JOB_DB_ID, null);
List<OfferWaitingMatchModel> newOfferWaitingRecordList = null;
List<WishlistWaitingMatchModel> newWishlistWaitingRecordList = null;
try{ try{
//check the job status table, get the last pull time //check the job status table, get the last pull time
getLastPullTime(lastPullTimeMap); getLastPullTime(lastPullTimeMap);
logger.info("getLastPullTime() finished"); logger.info("getLastPullTime() finished");
// Query and push the new offer records into its waiting list // Query and push the new offer records into its waiting list
pushNewOfferRecordsIntoWaitingTable(lastPullTimeMap.get(LookupTables.OFFER_JOB_DB_ID)); newOfferWaitingRecordList =
pushNewOfferRecordsIntoWaitingTable(lastPullTimeMap.get(LookupTables.OFFER_JOB_DB_ID));
logger.info("pushNewOfferRecordsIntoWaitingTable() finished"); logger.info("pushNewOfferRecordsIntoWaitingTable() finished");
// Query and push the new wishlist records into its waiting list // Query and push the new wishlist records into its waiting list
pushNewWishlistItemRecordsIntoWaitingTable(lastPullTimeMap.get(LookupTables.WISHLIST_JOB_DB_ID)); newWishlistWaitingRecordList =
pushNewWishlistItemRecordsIntoWaitingTable(lastPullTimeMap.get(LookupTables.WISHLIST_JOB_DB_ID));
logger.info("pushNewWishlistItemRecordsIntoWaitingTable() finished"); logger.info("pushNewWishlistItemRecordsIntoWaitingTable() finished");
}catch (Exception e){ }catch (Exception e){
...@@ -69,7 +80,8 @@ public class OfferWishlistMatchingJob { ...@@ -69,7 +80,8 @@ public class OfferWishlistMatchingJob {
try{ try{
//find the match records and push to the matched table //find the match records and push to the matched table
findMatchFromWaitingTable(lastPullTimeMap.get(LookupTables.MATCHING_JOB_DB_ID)); findMatchFromWaitingTable(lastPullTimeMap.get(LookupTables.MATCHING_JOB_DB_ID), newOfferWaitingRecordList,
newWishlistWaitingRecordList);
}catch (Exception e){ }catch (Exception e){
logger.error("Error occur when finding matches and push them to matched table: " +e.fillInStackTrace()); logger.error("Error occur when finding matches and push them to matched table: " +e.fillInStackTrace());
} }
...@@ -105,7 +117,7 @@ public class OfferWishlistMatchingJob { ...@@ -105,7 +117,7 @@ public class OfferWishlistMatchingJob {
* This function will pull the new Offer records from Offer_table and push them into offer_waiting_table * This function will pull the new Offer records from Offer_table and push them into offer_waiting_table
* @param offerJobRecord offer job information to start the task * @param offerJobRecord offer job information to start the task
*/ */
private void pushNewOfferRecordsIntoWaitingTable(MatchingJobInfoModel offerJobRecord){ private List<OfferWaitingMatchModel> pushNewOfferRecordsIntoWaitingTable(MatchingJobInfoModel offerJobRecord){
logger.info("Starting pushNewOfferRecordsIntoWaitingTable function with lastPullTime:{}" logger.info("Starting pushNewOfferRecordsIntoWaitingTable function with lastPullTime:{}"
,DATE_FORMAT.format(offerJobRecord.getLastPullTime())); ,DATE_FORMAT.format(offerJobRecord.getLastPullTime()));
...@@ -119,20 +131,29 @@ public class OfferWishlistMatchingJob { ...@@ -119,20 +131,29 @@ public class OfferWishlistMatchingJob {
.findAllByOfferingAndModifyDateAfter(true, lastPullOfferTime); .findAllByOfferingAndModifyDateAfter(true, lastPullOfferTime);
logger.info("Fetched {} new Offer records, and set the last pull time to {}", newOfferList.size(), logger.info("Fetched {} new Offer records, and set the last pull time to {}", newOfferList.size(),
DATE_FORMAT.format(offerJobRecord.getLastPullTime())); DATE_FORMAT.format(offerJobRecord.getLastPullTime()));
for(UserOfferModel newOffer : newOfferList){
OfferWaitingMatchModel offerWaitingRecord = new OfferWaitingMatchModel(newOffer.getUserId(), // for(UserOfferModel newOffer : newOfferList){
newOffer.getAvailableTimeStart(), newOffer.getAvailableTimeEnd(), newOffer.getStateCityCode()); // OfferWaitingMatchModel offerWaitingRecord = new OfferWaitingMatchModel(newOffer.getUserId(),
offerWaitingMatchRepository.save(offerWaitingRecord); // newOffer.getAvailableTimeStart(), newOffer.getAvailableTimeEnd(), newOffer.getStateCityCode());
} // offerWaitingMatchRepository.save(offerWaitingRecord);
// }
List<OfferWaitingMatchModel> newOfferWaitingRecordList = newOfferList.parallelStream().map(offerRecord->
new OfferWaitingMatchModel(offerRecord.getUserId(), offerRecord.getAvailableTimeStart(),
offerRecord.getAvailableTimeEnd(), offerRecord.getStateCityCode()))
.toList();
offerWaitingMatchRepository.saveAll(newOfferWaitingRecordList);
matchingJobInfoRepository.save(offerJobRecord); matchingJobInfoRepository.save(offerJobRecord);
return newOfferWaitingRecordList;
} }
/** /**
* This function will pull the new wishlist records from wishlist_table and push them into wishlist_waiting_table * This function will pull the new wishlist records from wishlist_table and push them into wishlist_waiting_table
* @param wishlistJobInfo wishlist job information to start the task * @param wishlistJobInfo wishlist job information to start the task
*/ */
private void pushNewWishlistItemRecordsIntoWaitingTable(MatchingJobInfoModel wishlistJobInfo){ private List<WishlistWaitingMatchModel> pushNewWishlistItemRecordsIntoWaitingTable(MatchingJobInfoModel
wishlistJobInfo){
logger.info("Starting pushNewWishlistItemRecordsIntoWaitingTable function with lastPullTime:{}" logger.info("Starting pushNewWishlistItemRecordsIntoWaitingTable function with lastPullTime:{}"
,DATE_FORMAT.format(wishlistJobInfo.getLastPullTime())); ,DATE_FORMAT.format(wishlistJobInfo.getLastPullTime()));
...@@ -146,14 +167,22 @@ public class OfferWishlistMatchingJob { ...@@ -146,14 +167,22 @@ public class OfferWishlistMatchingJob {
.findAllByModifyDateAfter(lastPullOfferTime); .findAllByModifyDateAfter(lastPullOfferTime);
logger.info("Fetched {} new Offer records, and set the last pull time to {}", newWishlistItemList.size(), logger.info("Fetched {} new Offer records, and set the last pull time to {}", newWishlistItemList.size(),
DATE_FORMAT.format(wishlistJobInfo.getLastPullTime())); DATE_FORMAT.format(wishlistJobInfo.getLastPullTime()));
for(WishlistItemModel newWishlistItem : newWishlistItemList){ // for(WishlistItemModel newWishlistItem : newWishlistItemList){
WishlistWaitingMatchModel wishlistWaitingRecord = new WishlistWaitingMatchModel( // WishlistWaitingMatchModel wishlistWaitingRecord = new WishlistWaitingMatchModel(
newWishlistItem.getWishlistItemId(), newWishlistItem.getStartTime() // newWishlistItem.getWishlistItemId(), newWishlistItem.getStartTime()
, newWishlistItem.getEndTime(), newWishlistItem.getStateCityCode()); // , newWishlistItem.getEndTime(), newWishlistItem.getStateCityCode());
wishlistWaitingMatchRepository.save(wishlistWaitingRecord); // wishlistWaitingMatchRepository.save(wishlistWaitingRecord);
} // }
List<WishlistWaitingMatchModel> newWishlistWaitingRecordList = newWishlistItemList
.parallelStream().map( newWishlistItem -> new WishlistWaitingMatchModel(
newWishlistItem.getWishlistItemId(), newWishlistItem.getStartTime()
, newWishlistItem.getEndTime(), newWishlistItem.getStateCityCode()))
.toList();
wishlistWaitingMatchRepository.saveAll(newWishlistWaitingRecordList);
matchingJobInfoRepository.save(wishlistJobInfo); matchingJobInfoRepository.save(wishlistJobInfo);
return newWishlistWaitingRecordList;
} }
/** /**
...@@ -161,7 +190,9 @@ public class OfferWishlistMatchingJob { ...@@ -161,7 +190,9 @@ public class OfferWishlistMatchingJob {
* into matched table. * into matched table.
* @param matchJobRecord job info * @param matchJobRecord job info
*/ */
private void findMatchFromWaitingTable(MatchingJobInfoModel matchJobRecord){ private void findMatchFromWaitingTable(MatchingJobInfoModel matchJobRecord,
List<OfferWaitingMatchModel> newOfferRecords,
List<WishlistWaitingMatchModel> newWishlistRecords){
logger.info("Starting findMatchFromWaitingTable function with lastPullTime:{}" logger.info("Starting findMatchFromWaitingTable function with lastPullTime:{}"
,DATE_FORMAT.format(matchJobRecord.getLastPullTime())); ,DATE_FORMAT.format(matchJobRecord.getLastPullTime()));
// update the job info record time // update the job info record time
...@@ -169,18 +200,23 @@ public class OfferWishlistMatchingJob { ...@@ -169,18 +200,23 @@ public class OfferWishlistMatchingJob {
matchJobRecord.setJobStatus(LookupTables.JOB_STATUS.Done); matchJobRecord.setJobStatus(LookupTables.JOB_STATUS.Done);
matchJobRecord.setLastPullTime(new Date(System.currentTimeMillis())); matchJobRecord.setLastPullTime(new Date(System.currentTimeMillis()));
// fetch both new records // if the list is null, fetch new records
List<OfferWaitingMatchModel> newOfferRecords = if(newOfferRecords == null) newOfferRecords =
offerWaitingMatchRepository.findAllByModifyDateAfter(lastPullOfferTime); offerWaitingMatchRepository.findAllByModifyDateAfter(lastPullOfferTime);
List<WishlistWaitingMatchModel> newWishlistRecords = if(newWishlistRecords == null) newWishlistRecords =
wishlistWaitingMatchRepository.findAllByModifyDateAfter(lastPullOfferTime); wishlistWaitingMatchRepository.findAllByModifyDateAfter(lastPullOfferTime);
// if one of them have new records // if one of them have new records
if(newOfferRecords.size() > 0 || newWishlistRecords.size() > 0){ if(newOfferRecords.size() > 0 || newWishlistRecords.size() > 0){
// load new Offers into map // load new Offers into map
HashMap<Long, List<OfferWaitingMatchModel>> newStateCityCodeOfferMap = new HashMap<>(); //HashMap<Long, List<OfferWaitingMatchModel>> newStateCityCodeOfferMap = new HashMap<>();
HashMap<Long, List<WishlistWaitingMatchModel>> newStateCityCodeWishlistMap = new HashMap<>(); //HashMap<Long, List<WishlistWaitingMatchModel>> newStateCityCodeWishlistMap = new HashMap<>();
loadListIntoMap(newOfferRecords, newWishlistRecords,newStateCityCodeOfferMap, newStateCityCodeWishlistMap); //loadListIntoMap(newOfferRecords, newWishlistRecords,newStateCityCodeOfferMap, newStateCityCodeWishlistMap);
ConcurrentMap<Long, List<OfferWaitingMatchModel>> newStateCityCodeOfferMap =
loadOfferListIntoMap(newOfferRecords);
ConcurrentMap<Long, List<WishlistWaitingMatchModel>> newStateCityCodeWishlistMap =
loadWishlistListIntoMap(newWishlistRecords);
// fetch both old records // fetch both old records
List<OfferWaitingMatchModel> oldOfferRecords = List<OfferWaitingMatchModel> oldOfferRecords =
...@@ -191,9 +227,14 @@ public class OfferWishlistMatchingJob { ...@@ -191,9 +227,14 @@ public class OfferWishlistMatchingJob {
newStateCityCodeOfferMap.keySet()); newStateCityCodeOfferMap.keySet());
// build maps by use the StateCityCode as the key so make the next pair step easier. // build maps by use the StateCityCode as the key so make the next pair step easier.
HashMap<Long, List<OfferWaitingMatchModel>> oldStateCityCodeOfferMap = new HashMap<>(); //HashMap<Long, List<OfferWaitingMatchModel>> oldStateCityCodeOfferMap = new HashMap<>();
HashMap<Long, List<WishlistWaitingMatchModel>> oldStateCityCodeWishlistMap = new HashMap<>(); //HashMap<Long, List<WishlistWaitingMatchModel>> oldStateCityCodeWishlistMap = new HashMap<>();
loadListIntoMap(oldOfferRecords, oldWishlistRecords,oldStateCityCodeOfferMap, oldStateCityCodeWishlistMap); //loadListIntoMap(oldOfferRecords, oldWishlistRecords,oldStateCityCodeOfferMap, oldStateCityCodeWishlistMap);
ConcurrentMap<Long, List<OfferWaitingMatchModel>> oldStateCityCodeOfferMap =
loadOfferListIntoMap(oldOfferRecords);
ConcurrentMap<Long, List<WishlistWaitingMatchModel>> oldStateCityCodeWishlistMap =
loadWishlistListIntoMap(oldWishlistRecords);
// after fetch all new Wishlist item records by unique stateCityCode // after fetch all new Wishlist item records by unique stateCityCode
// match them with all Offer records // match them with all Offer records
...@@ -235,40 +276,60 @@ public class OfferWishlistMatchingJob { ...@@ -235,40 +276,60 @@ public class OfferWishlistMatchingJob {
private void findMatchedOfferForNewWishlistItemToDB(List<OfferWaitingMatchModel> offerList, private void findMatchedOfferForNewWishlistItemToDB(List<OfferWaitingMatchModel> offerList,
List<WishlistWaitingMatchModel> wishlistItemList) { List<WishlistWaitingMatchModel> wishlistItemList) {
wishlistItemList.stream().forEach( wishlistItem ->{ wishlistItemList.parallelStream().forEach(wishlistItem ->
useWishlistItemAndOfferListBuildMatchedRecordDB(wishlistItem, offerList); useWishlistItemAndOfferListBuildMatchedRecordDB(wishlistItem, offerList)
}); );
} }
private void useWishlistItemAndOfferListBuildMatchedRecordDB(WishlistWaitingMatchModel wishlistItem, private void useWishlistItemAndOfferListBuildMatchedRecordDB(WishlistWaitingMatchModel wishlistItem,
List<OfferWaitingMatchModel> offerList) { List<OfferWaitingMatchModel> offerList) {
List<MatchedWishlistRecordModel> matchedRecordList = new ArrayList<>(); // List<MatchedWishlistRecordModel> matchedRecordList = new ArrayList<>();
offerList.stream().forEach( offer ->{ // offerList.stream().forEach( offer ->{
if(isOverlapping(wishlistItem.getStartTime(),wishlistItem.getEndTime(), // if(isOverlapping(wishlistItem.getStartTime(),wishlistItem.getEndTime(),
offer.getStartTime(),offer.getEndTime())) // offer.getStartTime(),offer.getEndTime()))
matchedRecordList.add(new MatchedWishlistRecordModel(wishlistItem.getWishlistItemId(), // matchedRecordList.add(new MatchedWishlistRecordModel(wishlistItem.getWishlistItemId(),
// offer.getOfferId(), LookupTables.MATCHING_RECORD_USER_RESULT.Waiting,
// LookupTables.MATCHING_RECORD_USER_RESULT.Waiting));
// });
List<MatchedWishlistRecordModel> matchedRecordList = offerList.parallelStream()
.filter(offer-> isOverlapping(wishlistItem.getStartTime(),wishlistItem.getEndTime(),
offer.getStartTime(),offer.getEndTime()))
.map(offer -> new MatchedWishlistRecordModel(wishlistItem.getWishlistItemId(),
offer.getOfferId(), LookupTables.MATCHING_RECORD_USER_RESULT.Waiting, offer.getOfferId(), LookupTables.MATCHING_RECORD_USER_RESULT.Waiting,
LookupTables.MATCHING_RECORD_USER_RESULT.Waiting)); LookupTables.MATCHING_RECORD_USER_RESULT.Waiting))
}); .toList();
matchedWishlistRecordRepository.saveAll(matchedRecordList); matchedWishlistRecordRepository.saveAll(matchedRecordList);
} }
private void loadListIntoMap(List<OfferWaitingMatchModel> offerRecords, // private void loadListIntoMap(List<OfferWaitingMatchModel> offerRecords,
List<WishlistWaitingMatchModel> wishlistRecords, // List<WishlistWaitingMatchModel> wishlistRecords,
HashMap<Long, List<OfferWaitingMatchModel>> stateCityCodeOfferMap, // HashMap<Long, List<OfferWaitingMatchModel>> stateCityCodeOfferMap,
HashMap<Long, List<WishlistWaitingMatchModel>> stateCityCodeWishlistMap){ // HashMap<Long, List<WishlistWaitingMatchModel>> stateCityCodeWishlistMap){
offerRecords.stream().forEach(a -> { // offerRecords.stream().forEach(a -> {
if(!stateCityCodeOfferMap.containsKey(a.getStateCityCode())) // if(!stateCityCodeOfferMap.containsKey(a.getStateCityCode()))
stateCityCodeOfferMap.put(a.getStateCityCode(), new ArrayList<>()); // stateCityCodeOfferMap.put(a.getStateCityCode(), new ArrayList<>());
stateCityCodeOfferMap.get(a.getStateCityCode()).add(a); // stateCityCodeOfferMap.get(a.getStateCityCode()).add(a);
}); // });
//
wishlistRecords.stream().forEach(a->{ // wishlistRecords.stream().forEach(a->{
if(!stateCityCodeWishlistMap.containsKey(a.getStateCityCode())) // if(!stateCityCodeWishlistMap.containsKey(a.getStateCityCode()))
stateCityCodeWishlistMap.put(a.getStateCityCode(), new ArrayList<>()); // stateCityCodeWishlistMap.put(a.getStateCityCode(), new ArrayList<>());
stateCityCodeWishlistMap.get(a.getStateCityCode()).add(a); // stateCityCodeWishlistMap.get(a.getStateCityCode()).add(a);
}); // });
// }
private ConcurrentMap<Long, List<OfferWaitingMatchModel>> loadOfferListIntoMap(
List<OfferWaitingMatchModel> offerRecords){
return offerRecords.parallelStream()
.collect(Collectors.groupingByConcurrent(OfferWaitingMatchModel::getStateCityCode));
}
private ConcurrentMap<Long, List<WishlistWaitingMatchModel>> loadWishlistListIntoMap(
List<WishlistWaitingMatchModel> wishlistRecords){
return wishlistRecords.parallelStream()
.collect(Collectors.groupingByConcurrent(WishlistWaitingMatchModel::getStateCityCode));
} }
private boolean isOverlapping(Date start1, Date end1, Date start2, Date end2) { private boolean isOverlapping(Date start1, Date end1, Date start2, Date end2) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment