Spring/Chat

[Java / Spring] Redis - Pub/Sub

JWonK 2022. 5. 25. 23:36
728x90
반응형

지난 게시글이 Spring에서 채팅 서버를 구현하기 위한 STOMP 프로토콜이 무엇인지 알아보는 게시글이었다.

이번 게시글은 레디스의 Pub / Sub 기능이 무엇인지 알아보고 이를 어떻게 Spring에 적용하는지 알아보자.


▶ Request / Response Method vs Messaging Method

 

메시징 방법 중 Pub / Sub에 대해 알아보기 전에, 기본적인 웹의 통신 방법에 대해 간단히 짚고 넘어가보자.

대부분의 웹 서버는 HTTP Request / Response 방식을 많이 사용한다. 대표적인 예로 REST API, GraphQL이 존재한다.

HTTP Request / Response 방법

Request / Response 방법은 HTTP 프로토콜을 기반으로 구현되기 떄문에 Stateless 한 특징이 있으며, 심플하고 단순하여 구현하기 쉬운 장점이 있다. 하지만, Client - Server 사이에 강한 의존성이 존재하며, 서버가 실행 중일 때만 데이터 송/수신이 가능하다. 또한, 클라이언트는 서버가 다음 응답을 보내기 전까지 기다리고 있다. 즉, 동기식 통신 방식이다.

 

 

반대로, HTTP 방식보다 느슨하게 결합하여 비동기 시스템 통합 방식인 기술은 메시징이 있다.

메시징은 중간 시스템을 통해 발신자에서 수신자로 데이터를 전송하는 포괄적 용어이다.

중간 시스템에 대한 것은 지난 게시글 마지막 부분에 짧게 언급했었다.

Messaging System

위 그림처럼 중간 시스템을 통해 전송이 이루어지기 때문에, 서버는 데이터를 전송받는 수신자(클라이언트)에 대해 전혀 알지 못하도 상관 없다. 채널만 연결되어 있으면 된다.

 


▶ Message Queuing vs Pub / Sub

 

"메시징"은 매우 포괄적인 용어이다. "메시징"을 구현하는 대표적인 두 가지 방식에 대해 알아보자.

 

  • 메시지 큐잉 (Point-to-Point Channel)
  • Publish-Subscribe (Pub / Sub)

 

메시지 큐잉Point-to-Point Channel 방식으로, 오직 한 수신자만 메시지를 수신할 수 있다. 말그대로 Point-to-Point 방식, 즉 1:1 통신으로 이루어진다.

Point-to-Point Method

 

위 처럼 1:1로 통신이 이루어지게 된다면 순차적으로 정보를 처리할 때, 정보의 양이 많아질 경우 시간이 오래 걸리게 되어 병목현상이 발생할 수 있다. 그래서 이와 같은 단점을 해결하기 위해 1:1 통신이 아닌 클라이언트의 수를 늘려 동시에 여러 메시지를 처리할 수 있게 구축하는 방법이 있다.

위 사진의 단점을 보완하는 방법

 

 

메시지 큐잉과는 반대로 Pub / Sub은 수신자(클라이언트) 모두에게 메시지를 전송하게 된다. 이 말이 조금 이해하기 어려울 수 있는데 간단히 이야기하면 서버와 클라이언트를 연결하는 Messaging System을 공유하는 모든 클라이언트에게 서버측에서 같은 메시지를 한 번에 전송한다고 이해하면 된다. 사진으로 쉽게 이해할 수 있다.

Messaging System을 공유하는 Sub / Pub 관계

 


▶ Redis Pub/Sub

 

위 글들을 읽었다면 이제 Pub / Sub가 무엇인지는 이해할 수 있을 것이다. 그럼 왜 Redis이냐?

Redis는 Pub/Sub 기능을 기본적으로 제공하고 있다. 그렇기 때문에 Spring에서 채팅 서비스를 개발하고자 할 때 가장 먼저 접할 수 있는 것이 바로 Redis이다.

 

Redis Pub/Sub는 다른 메시지 브로커와는 다르게, Redis Pub/Sub 메시지 지속성이 없다. 즉, 메시지를 전송한 후 해당 메시지는 삭제되는데, Redis어디에도 저장되지 않는다. 실시간으로 데이터를 처리해야할 때는 매우 적합하다고 볼 수 있으나 메시지가 저장되지 않는다는 점은 반드시 인지하고 개발을 진행해야할 것이다.

 

 


▶ Spring에 Redis Pub/Sub 적용하기

 

◎ build.gradle

가장 먼저 Spring에서 Redis의 Pub/Sub 기능을 이용하기 위해 dependencies에 라이브러리를 추가해야한다.

implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation group: 'it.ozimov', name: 'embedded-redis', version: '0.7.2'

 

 Embedded Redis 서버 사용을 위한 설정

Spring 채팅 서버가 실행될 때 Embedded Redis 서버도 동시에 실행될 수 있도록 설정을 추가해야한다.

local 환경에서만 실행되도록 @Profile("local")을 상단에 선언해준다.

@Profile("local")
@Configuration
public class EmbeddedRedisConfig {

    @Value("${spring.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    @PostConstruct
    public void redisServer(){
        redisServer = new RedisServer(redisPort);
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis(){
        if(redisServer != null){
            redisServer.stop();
        }
    }
}

 

 Redis 설정

redis의 Pub/Sub 기능을 이용할 것이므로 MessageListener 설정을 추가한다.

그리고 어플리케이션에서 redis 사용을 위해 redisTemplate 설정도 추가해준다.

@Configuration
public class RedisConfig {

    /**
     * redis pub / sub 메세지를 처리하는 listener 설정
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }

    /**
     * 어플리케이션에서 사용할 redisTemplate 설정
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){

        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();

        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));

        return redisTemplate;
    }

}

 

 환경 설정 수정 및 추가

application.yml

spring:
    profiles:
        active: local
    redis:
        host: localhost
        port: 6739

 

 

 Redis Pub/Sub 모델 구현을 위한 서비스 생성

위에서 알아보았던 Pub/Sub 기능을 구현해야한다. 공통으로 공유하는 Messaging System의 클라이언트에게 서버는 한 번에 메시지를 보낼 수 있다고 하였는데 채팅방 서비스에서 채팅방이 redis의 topic(공유 System)이라고 가정하고, Pub/Sub는 대화를 하거나 / 보는 행위라고 생각하면 된다.

 

Spring에서는 redis topic에 대하여 구독 및 발행을 처리할 수 있도록 아래와 같은 방법을 제공한다.

 

 

 Redis 발행 서비스 구현

채팅방에 입장하여 메시지를 작성하면 해당 메시지를 Redis Topic에 발행하는 기능의 서비스이다.

이 서비스를 통해 메시지를 발행하면 대기하고 있던 redis 구독 서비스가 메시지를 처리한다.

@Service
@RequiredArgsConstructor
public class RedisPublisher {

    private final RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic topic, ChatMessage message){
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}

 

 

  Redis 구독 서비스 구현

Redis에 메시지 발행이 될 때까지 대기하였다가 메시지가 발행되면 해당 메시지를 읽어 처리하는 리스너이다.

MessageListener를 상속받아 onMessage 메서드를 작성한다.

 

아래 코드에서는 Redis에 메시지가 발행되면 해당 메시지를 ChatMessage로 변환하고 messaging Template을 이용하여 채팅방의 모든 webSocket 클라이언트들에게 메시지를 전달한다.

@Slf4j
@Service
@RequiredArgsConstructor
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final RedisTemplate redisTemplate;
    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * Redis에서 메세지가 발행(publish)되면 대기하고 있던 onMessage가 해당 메세지를 받아 처리
     */

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try{
            // redis에서 발행된 데이터를 받아 deserialize
            String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
            // ChatMessage 객체로 매핑
            ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
            // WebSocket 구독자에게 채팅 메세지 Send
            messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
        } catch (Exception e){
            log.error(e.getMessage());
        }
    }
}

 

 

  ChatController 구현

클라이언트가 채팅방 입장시 채팅방(topic)에서 대화가 가능하도록 리스너를 연동하는 enterChatRoom 메서드를 세팅한다. 채팅방에 발행된 메시지는 서로 다른 서버에 공유하기 위해 redis의 Topic으로 발행한다.

@RequiredArgsConstructor
@Controller
public class ChatController {

    private final RedisPublisher redisPublisher;
    private final ChatRoomRepository chatRoomRepository;

    /**
     * websocket "/pub/chat/message"로 들어오는 메시징을 처리한다.
     */
    @MessageMapping("/chat/message")
    public void message(ChatMessage message) {
        if (ChatMessage.MessageType.ENTER.equals(message.getType())) {
            chatRoomRepository.enterChatRoom(message.getRoomId());
            message.setMessage(message.getSender() + "님이 입장하셨습니다.");
        }
        // Websocket에 발행된 메시지를 redis로 발행한다(publish)
        redisPublisher.publish(chatRoomRepository.getTopic(message.getRoomId()), message);
    }
}

 

 

  ChatRoomRepository 구현

채팅방 정보는 초기화 되지 않도록 생성 시 Redis Hash에 저장하도록 처리한다. 채팅방 정보를 조회할 때는 Redis Hash에 저장된 데이터를 불러오도록 메서드를 구현한다. 채팅방 입장 시에는 채팅방 id로 Redis Topic을 조회하여 Pub/Sub 메세지 리스너로 연동한다.

@RequiredArgsConstructor
@Repository
public class ChatRoomRepository {
    // 채팅방(topic)에 발행되는 메시지를 처리할 Listner
    private final RedisMessageListenerContainer redisMessageListener;
    // 구독 처리 서비스
    private final RedisSubscriber redisSubscriber;
    // Redis
    private static final String CHAT_ROOMS = "CHAT_ROOM";
    private final RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, ChatRoom> opsHashChatRoom;
    // 채팅방의 대화 메시지를 발행하기 위한 redis topic 정보. 서버별로 채팅방에 매치되는 topic정보를 Map에 넣어 roomId로 찾을수 있도록 한다.
    private Map<String, ChannelTopic> topics;

    @PostConstruct
    private void init() {
        opsHashChatRoom = redisTemplate.opsForHash();
        topics = new HashMap<>();
    }

    public List<ChatRoom> findAllRoom() {
        return opsHashChatRoom.values(CHAT_ROOMS);
    }

    public ChatRoom findRoomById(String id) {
        return opsHashChatRoom.get(CHAT_ROOMS, id);
    }

    /**
     * 채팅방 생성 : 서버간 채팅방 공유를 위해 redis hash에 저장한다.
     */
    public ChatRoom createChatRoom(String name) {
        ChatRoom chatRoom = ChatRoom.create(name);
        opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
        return chatRoom;
    }

    /**
     * 채팅방 입장 : redis에 topic을 만들고 pub/sub 통신을 하기 위해 리스너를 설정한다.
     */
    public void enterChatRoom(String roomId) {
        ChannelTopic topic = topics.get(roomId);
        if (topic == null) {
            topic = new ChannelTopic(roomId);
            redisMessageListener.addMessageListener(redisSubscriber, topic);
            topics.put(roomId, topic);
        }
    }

    public ChannelTopic getTopic(String roomId) {
        return topics.get(roomId);
    }
}

 

 

 

  ChatRoom Serialize

Redis에 저장되는 객체들은 Serialize(직렬화)가능해야 하므로 Serializable을 참조하도록 선언하고 serialVersionUID를 세팅한다.

@Getter
@Setter
public class ChatRoom implements Serializable {

    private static final long serialVersionUID = 6494678977089006639L;

    private String roomId;
    private String name;

    public static ChatRoom create(String name) {
        ChatRoom chatRoom = new ChatRoom();
        chatRoom.roomId = UUID.randomUUID().toString();
        chatRoom.name = name;
        return chatRoom;
    }
}

 

 


※ Serialize(직렬화)

  • 자바 시스템 내부에서 사용되는 Object 또는 Data를 외부의 자바 시스템에서도 사용할 수 있도록 byte형태로 데이터를 변환하는 기술
  • JVM(Java Virtual Machine 이하 JVM)의 메모리에 상주(힙 또는 스택)되어 있는 객체 데이터를 바이트 형태로 변환하는 기술

 

※ Deserialize(역직렬화)

  • byte로 변환된 Data를 원래대로 Object나 Data로 변환하는 기술
  • 직렬화된 바이트 형태의 데이터를 객체로 변환해서 JVM으로 상주시키는 상태

 

 


참고 : https://daddyprogrammer.org/post/4731/spring-websocket-chatting-server-redis-pub-sub/

 

Spring websocket chatting server(3) - 여러대의 채팅서버간에 메시지 공유하기 by Redis pub/sub

앞 장에서 실습을 통해 채팅을 구현해 보았습니다. websocket과 Stomp를 이용한 구현만으로도 채팅의 기본 기능은 충분히 구현할 수 있는 것을 확인할 수 있었습니다. 하지만 서비스에 사용하려면

daddyprogrammer.org

https://brunch.co.kr/@springboot/374

 

728x90
반응형

'Spring > Chat' 카테고리의 다른 글

[Protocol] STOMP Protocol  (2) 2023.10.09