1. 為何要用分布式集群
分布式就是為了解決單點故障問題,想象一下,如果一個服務器承載了1000個大佬同時聊天,服務器突然掛了,1000個大佬瞬間全部掉線,大概明天你就被大佬們吊起來打了。
當聊天室改為集群后,就算服務器A掛了,服務器B上聊天的大佬們還可以愉快的聊天,并且在前端還能通過代碼,讓連接A的大佬們快速重連至存活的服務器B,繼續和大家愉快的聊天,豈不美哉!
總結一下:實現了分布式后,我們可以將流量負載均衡到不同的服務器上并提供一種通信機制讓各個服務器能進行消息同步(不然用戶A連上服務器A,用戶B臉上服務器B,它們發消息的時候對方都沒法收到)。
2. 如何改造為分布式集群
當我們要實現分布式的時候,我們則需要在各個機器上共享這些信息,所以我們需要一個/的中間件。我們現在使用Redis作為我們的解決方案。
2.1 用戶在聊天室集群如何發消息
假設我們的聊天室集群有服務器A和B,用戶Alice連接在A上,Bob連接在B上、
Alice向聊天室的服務器A發送消息,A服務器必須要將收到的消息轉發到Redis,才能保證聊天室集群的所有服務器(也就是A和B)能夠拿到消息。否則,只有Alice在的服務器A能夠讀到消息,用戶Bob在的服務器B并不能收到消息,A和B也就無法聊天了。
2.2 用戶在聊天室集群如何接收消息
說完了發送消息,那么如何保證Alice發的消息,其他所有人都能收到呢,前面我們知道了Alice發送的消息已經被傳到了Redis的頻道qq群聊對話生成器在線制作,那么所有服務器都必須訂閱這個Redis頻道,然后把這個頻道的消息轉發到自己的用戶那里,這樣自己服務器所管轄的用戶就能收到消息。
3. 使用Redis的發布/訂閱功能進行消息的發送
如果你不熟悉Redis的sub/pub(訂閱/發布)功能,請看這里進行簡單了解它的用法,很簡單:
3.1 添加redis依賴
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-data-redisartifactId>
dependency>
3.2 添加yml文件配置
spring:
# redis 連接配置
redis:
database: 0
host: 127.0.0.1
password:
port: 6379
ssl: false
jedis:
pool:
max-idle: 10 # 空閑連接最大數
max-wait: 60000 # 獲取連接最大等待時間(s)
# 頻道名稱定義
redis:
channel:
msgToAll: websocket.msgToAll
3.3 新建
/**
* Redis訂閱頻道屬性類
*/
@Component
public class RedisListenerBean {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerBean.class);
@Value("${server.port}")
private String serverPort;
@Value("${redis.channel.msgToAll}")
private String msgToAll;
/**
* redis消息監聽器容器
* 可以添加多個監聽不同話題的redis監聽器,只需要把消息監聽器和相應的消息訂閱處理器綁定,該消息監聽器
* 通過反射技術調用消息訂閱處理器的相關方法進行一些業務處理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 監聽msgToAll
container.addMessageListener(listenerAdapter, new PatternTopic(msgToAll));
LOGGER.info("Subscribed Redis channel: " + msgToAll);
return container;
}
}
可以看到,我們在代碼里監聽了redis頻道,這個是在.定義的,當然如果你懶得定義,這里可以寫死。
3.4 發消息改造
我們單機聊天室的發送消息是這樣的:
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
return chatMessage;
}
我們前端發給我們消息后,直接給/topic/轉發這個消息,讓其他用戶收到。
在集群中,我們需要把消息轉發給Redisqq群聊對話生成器在線制作,并且不轉發給前端,而是讓服務端監聽Redis消息,在進行消息發送。
將改為:
@Value("${redis.channel.msgToAll}")
private String msgToAll;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@MessageMapping("/chat.sendMessage")
public void sendMessage(@Payload ChatMessage chatMessage) {
try {
redisTemplate.convertAndSend(msgToAll, JsonUtil.parseObjToJson(chatMessage));
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
你會發現我們在代碼中使用了將實體類轉為了Json發送給了Redis,這個Json工具類需要使用到依賴:
3.4.1 添加json工具類 引入依賴
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.58version>
dependency>
添加工具類,提供對象轉json,json轉對象功能
/**
* JSON 轉換
*/
public final class JsonUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonUtil.class);
/**
* 把Java對象轉換成json字符串
*
* @param object 待轉化為JSON字符串的Java對象
* @return json 串 or null
*/
public static String parseObjToJson(Object object) {
String string = null;
try {
string = JSONObject.toJSONString(object);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return string;
}
/**
* 將Json字符串信息轉換成對應的Java對象
*
* @param json json字符串對象
* @param c 對應的類型
*/
public static <T> T parseJsonToObj(String json, Class<T> c) {
try {
JSONObject jsonObject = JSON.parseObject(json);
return JSON.toJavaObject(jsonObject, c);
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
return null;
}
}
這樣,我們接收到用戶發送消息的請求時,就將消息轉發給了redis的頻道.
3.5 接收消息改造
單機的聊天室,我們接收消息是通過直接把消息轉發到所有人的頻道上,這樣就能在所有人的聊天框顯示。
在集群中,我們需要服務器把消息從Redis中拿出來,并且推送到自己管的用戶那邊,我們在層實現消息的推送。
我們在實現發送,需要使用上述第二種方法。
新建類/:
@Service
public class ChatService {
private static final Logger LOGGER = LoggerFactory.getLogger(ChatService.class);
@Autowired
private SimpMessageSendingOperations simpMessageSendingOperations;
public void sendMsg(@Payload ChatMessage chatMessage) {
LOGGER.info("Send msg by simpMessageSendingOperations:" + chatMessage.toString());
simpMessageSendingOperations.convertAndSend("/topic/public", chatMessage);
}
}
我們在哪里調用這個呢,我們需要在監聽到消息后調用,所以我們就要有下面的Redis監聽消息處理專用類
新建類redis/:
@Component
public class RedisListenerHandle extends MessageListenerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListenerHandle.class);
@Value("${redis.channel.msgToAll}")
private String msgToAll;
@Value("${server.port}")
private String serverPort;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private ChatService chatService;
/**
* 收到監聽消息
* @param message
* @param bytes
*/
@Override
public void onMessage(Message message, byte[] bytes) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String rawMsg;
String topic;
try {
// 反序列化數據
rawMsg = redisTemplate.getStringSerializer().deserialize(body);
topic = redisTemplate.getStringSerializer().deserialize(channel);
LOGGER.info("Received raw message from topic:" + topic + ", raw message content:" + rawMsg);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return;
}
if (msgToAll.equals(topic)) {
LOGGER.info("Send message to all users:" + rawMsg);
ChatMessage chatMessage = JsonUtil.parseJsonToObj(rawMsg, ChatMessage.class);
if (chatMessage != null) {
chatService.sendMsg(chatMessage);
}
} else {
LOGGER.warn("No further operation with this topic!");
}
}
}