的數據來自Mysql數據庫中,所以當我們的MySQL發生改變時,也要跟著改變mysql數據庫同步工具mysql數據庫同步工具,這時候我們的es的數據就要和mysql同步了
同步實現思路
常見的數據同步方案有三種:
方案一:
也就是說MySQL修改完去修改es的數據
方案二
方式三:
在這里使用的是第二種實現方案:使用MQ來寫
同步案例代碼
用來操控ES的代碼(負責監聽MQ隊列)
/**
* 監聽增加和修改的隊列
* 因為我們的ES中可以進行全量修改,當有這個id的數據的時候那么就先刪除再新增,沒有這個數據那么就直接新增
* 所以隊列過來的id不管是新增還是修改es都可以判斷如果有這個數據id那么就先刪除再新增,如果沒有這個數據就直接新增,所以新增和修改他倆用一個方法就行了
*
* @param id 隊列中需要進行操作的id
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE),

exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
key = MqConstants.HOTEL_INSERT_KEY
))
public void insertAndUpdate(Long id) {
if (id == null) {
return;
}
log.info("入參:{}", id);
//監聽到以后拿到id去數據庫查詢整個數據
Hotel hotel = iHotelService.getById(id);
//因為查的mysql數據和es的數據有些不一樣所以需要做轉換
HotelDoc hotelDoc = new HotelDoc(hotel);
//轉換為json
String hotelDocJson = JSON.toJSONString(hotelDoc);
System.out.println("hotelDocJson = " + hotelDocJson);
//發送到ES中,因為我們的ES中可以進行全量修改,當有這個id的數據的時候那么就先刪除再新增,沒有這個數據那么就直接新增
//創建請求語義對象 添加文檔數據
IndexRequest request = new IndexRequest("hotel");
//這個新增就是PUT在es中
request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON);
//發送請求

try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
RestStatus status = response.status();
log.info("響應結果為:{}", status);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 監聽刪除隊列
*
* @param id 隊列中需要進行操作的id
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE),
exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
key = MqConstants.HOTEL_DELETE_KEY
))
public void deleteByMqId(Long id) {
if (id == null) {

return;
}
log.info("入參:{}", id);
//先創建語義對象,直接就可以給里面寫id的字段
DeleteRequest request = new DeleteRequest("hotel", id.toString());
//發送請求
try {
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
RestStatus status = response.status();
log.info("響應結果為:{}", status);
} catch (IOException e) {
e.printStackTrace();
}
用來操作MySQL的代碼:
@RestController
@RequestMapping("hotel")
public class HotelController {
//注入和RabbitMQ鏈接
@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private IHotelService hotelService;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id) {
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
) {
Page result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel) {

hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel) {
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能為空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
}
}
當然也是可以不使用注解來寫,直接在配置文件中寫隊列綁定的交換機
小白要變大牛的博客-CSDN博客