2022/2022-2

[오픈소스 소프트웨어 프로젝트] 2. Stomp Protocol 기반 웹 소켓 채팅방 구현

JWonK 2023. 1. 11. 20:01
728x90
반응형

https://wonsjung.tistory.com/463

[2022년 2학기] 오픈소스 소프트웨어 프로젝트

2022년도 2학기 교내 수업으로 오픈소스 소프트웨어 프로젝트 수업을 수강하였다. 하나의 주제를 정해 한 학기 동안 팀 프로젝트를 진행하는 수업이었다. 비록 성적은 30명 중 30등을 하였지만 얻

wonsjung.tistory.com

위 게시글에서 소개했듯이 오픈소스 소프트웨어 프로젝트로 진행했던 프로젝트 중 Stomp Protocol 기반 웹 소켓 채팅방 구현에 대해 정리하고자 한다.
 
https://wonsjung.tistory.com/403

[Protocol] STOMP Protocol

기본적으로 채팅 서비스를 구현하기 위해서는 Half Duplex 방식의 HTTP보다 Full Duplex 방식의 Socket 통신을 기반으로 구현한다. 그리고 이 위에 메시징 전송을 효율적으로 하기 위해 프로토콜 STOMP Proto

wonsjung.tistory.com

 
https://wonsjung.tistory.com/405

[Java / Spring] Redis - Pub/Sub

지난 게시글이 Spring에서 채팅 서버를 구현하기 위한 STOMP 프로토콜이 무엇인지 알아보는 게시글이었다. 이번 게시글은 레디스의 Pub / Sub 기능이 무엇인지 알아보고 이를 어떻게 Spring에 적용하는

wonsjung.tistory.com

전에 채팅방 구현을 위해 공부하고 정리해뒀던 글 목록이다. 간단하게 살펴볼 수 있도록 정리해두었었다.
 


결론적으로 이야기하면 아래 사진과 같이 동작할 수 있도록 구현해야한다. Pub를 할 때 하나의 Topic에 Sub되어있는 모든 Chat Room에 메세지를 전송해주면 된다.

동작 방식

 
Stomp Protocol 특징 중 하나인 헤더 값을 기반으로 인증 시스템을 구현할 수 있다. 나는 본 프로젝트를 진행하면서 인증 및 인가 기능을 JWT(Json Web Token)으로 구현하였고 채팅방 사용을 위해서도 JWT를 이용하여 인가 기능을 적용시켰다.
 
 
 
 

구현 파트


◎ build.gradle

가장 먼저 Spring Project에 필요한 라이브러리를 모두 추가해준다.

(본 프로젝트에서 사용한 build.gradle 그대로 가져옴, redis 및 socket, jwt가 기본적으로 채팅방 구현을 위해 필요한 라이브러리이다.)

plugins {
	id 'org.springframework.boot' version '2.7.4'
	id 'io.spring.dependency-management' version '1.0.14.RELEASE'
	id 'java'
}

group = 'com.url'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
	implementation 'org.springframework.boot:spring-boot-starter-data-redis'
	implementation 'org.springframework.boot:spring-boot-starter-security'
	implementation 'org.springframework.boot:spring-boot-starter-web'
	implementation 'org.springframework.boot:spring-boot-starter-websocket'
	implementation group: 'org.webjars', name: 'stomp-websocket', version: '2.3.3-1'
	implementation 'org.springframework.boot:spring-boot-devtools'

	implementation 'org.springframework.boot:spring-boot-starter-data-redis'
	// embedded-redis
	implementation 'it.ozimov:embedded-redis:0.7.2'
	implementation 'io.jsonwebtoken:jjwt:0.9.1'

	implementation 'org.springframework.boot:spring-boot-starter-webflux'

	compileOnly 'org.projectlombok:lombok'
	runtimeOnly 'com.h2database:h2'
	runtimeOnly 'org.postgresql:postgresql'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testImplementation 'org.springframework.security:spring-security-test'
}

tasks.named('test') {
	useJUnitPlatform()
}

 
 
 
 

  Socket 통신 및 Stomp 프로토콜 적용을 위한 config 작성

  • Stomp를 사용하기 위해 @EnableWebSocketMessageBroker를 선언해준다.
  • WebSocketMessageBrokerConfigurer를 상속받아 configureMessageBroker를 구현한다.
  • 아까 위에서 소개한 사진처럼 pub/sub 메시징을 구현하기 위해 메시지를 발행하는 요청의 prefix는 /pub로 시작, 메시지를 구독하는 요청의 prefix는 /sub로 시작하도록 설정한다.
  • stomp websocket의 연결 endpoint는 /ws-stomp로 설정한다

따라서, 개발서버의 접속 주소는 ws://localhost:8080/ws-stomp가 되며 FrontEnd와 협업을 위해 setAllowedOrigins을 이용하여 localhost:3000도 추가해주었다. 이 부분은 프론트엔드와 협업이 필요한 부분이다.

@RequiredArgsConstructor
@Configuration
@EnableWebSocketMessageBroker
public class WebSockConfig implements WebSocketMessageBrokerConfigurer {

    private final StompHandler stompHandler;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/sub");
        config.setApplicationDestinationPrefixes("/pub");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-stomp").setAllowedOrigins("http://localhost:3000")
                .withSockJS();
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompHandler);
    }
}

 
 
 
 
 

◎ StompHandler 구현

StompHandler에서 헤더에 담긴 JWT를 이용하여 인가를 확인하고 확인이 된 경우에만 다음 과정을 처리한다.
연결을 시도하는 CONNECT / 구독을 요청하는 SUBSCRIBE / 연결해제를 요청하는 DISCONNECT
로 나누어 처리해준다.

@Log4j2
@RequiredArgsConstructor
@Component
public class StompHandler implements ChannelInterceptor {

    private static final int JWT_SUBSTRING_INDEX = 7;
    private final TokenUtils tokenUtils;
    private final ChatRoomRepository chatRoomRepository;
    private final MemberRepository memberRepository;
    private final ChatService chatService;
    private final MemberService memberService;

    @Override // websocket을 통해 들어온 요청이 처리 되기 전 실행된다.
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        if(StompCommand.CONNECT == accessor.getCommand()){
            String jwt = accessor.getFirstNativeHeader(AuthConstants.AUTHORIZATION_HEADER);
            if(StringUtils.hasText(jwt) && jwt.startsWith(AuthConstants.TOKEN_TYPE)){
                String accessToken = jwt.substring(JWT_SUBSTRING_INDEX, jwt.length());

                boolean validToken = tokenUtils.isValidToken(accessToken);

                String uid = tokenUtils.getUid(accessToken);
                if(!validToken) {
                    return null;
                }
            }
        }
        if(StompCommand.SUBSCRIBE == accessor.getCommand()){
            // 채팅방에 들어온 클라이언트 sessionId를 roomId와 맵핑해 놓는다.(나중에 특정 세션이 어떤 채팅방에 들어가 있는지 알기 위함)
            String roomId = chatService.getRoomId(Optional.ofNullable((String) message.getHeaders().get("simpDestination")).orElse("InvalidRoomId"));

            String jwt = accessor.getFirstNativeHeader(AuthConstants.AUTHORIZATION_HEADER);
            String accessToken = jwt.substring(JWT_SUBSTRING_INDEX, jwt.length());
            String userEmail = tokenUtils.getUid(accessToken);

            chatRoomRepository.setUserEnterInfo(userEmail, roomId);

            Member member = memberService.connectMemberAndChatRoom(roomId, userEmail);

            // 클라이언트 입장 메시지를 채팅방에 발송한다.(redis publish)
            chatService.sendChatMessage(ChatMessage.builder().type(ChatMessage.MessageType.ENTER).roomId(roomId).sender(member.getName()).build());

        } else if (StompCommand.DISCONNECT == accessor.getCommand()) { // Websocket 연결 종료
            // 연결이 종료된 클라이언트 sesssionId로 채팅방 id를 얻는다.
            String sessionId = (String) message.getHeaders().get("simpSessionId");
            String roomId = chatRoomRepository.getUserEnterRoomId(sessionId);
            // 클라이언트 퇴장 메시지를 채팅방에 발송한다.(redis publish)
            String name = Optional.ofNullable((Principal) message.getHeaders().get("simpUser")).map(Principal::getName).orElse("UnknownUser");
            chatService.sendChatMessage(ChatMessage.builder().type(ChatMessage.MessageType.QUIT).roomId(roomId).sender(name).build());
            // 퇴장한 클라이언트의 roomId 맵핑 정보를 삭제한다.
            chatRoomRepository.removeUserEnterInfo(sessionId);
        }
        return message;
    }

}

 
 
 
 

 

  Redis 적용을 위한 config 작성

Spring 채팅 서버가 실행될 때 Embedded Redis 서버도 동시에 실행될 수 있도록 설정을 추가해야한다.
local 환경에서만 실행되도록 @Profile("local")을 상단에 선언해준다.

@Profile("local")
@Configuration
public class EmbeddedRedisConfig {

    @Value("${spring.redis.port}")
    private int redisPort;
    private RedisServer redisServer;

    @PostConstruct
    public void redisServer() throws IOException {
        redisServer = new RedisServer(redisPort);
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis(){
        if(redisServer != null){
            redisServer.stop();
        }
    }

}

 
 
 
redis의 pub/sub 기능을 이용할 것이므로 MessageListener 설정을 추가해준다. 그리고 어플리케이션에서 redis 사용을 위해 redisTemplate 설정도 추가해준다.

@Log4j2
@RequiredArgsConstructor
@Configuration
public class RedisConfig {

    @Bean
    public ChannelTopic channelTopic(){
        return new ChannelTopic("chatroom");
    }

    @Bean // 메세지 리스너 단일화
    public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory,
                                                              MessageListenerAdapter listenerAdapter,
                                                              ChannelTopic channelTopic){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, channelTopic);
        return container;
    }

    @Bean // 메세지를 구독자게에 보내는 역할
    public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber){
        return new MessageListenerAdapter(subscriber, "sendMessage");
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
        return redisTemplate;
    }
}

 
 
 
 
 

◎ ChatController 구현

우선 Chat 관련 Controller는 2개 구현해주었다. 
1) 채팅 메시지에 관여하는 Controller : ChatController
2) 채팅방 생성 및 조회에 관여하는 Controller : ChatRoomController
 

▶ ChatController

메세지 처리를 위해 존재하는 Controller이다. /chat/message로 메세지가 오게 되면 해당 메세지를 ChatService를 이용하여 처리하는 일을 수행한다.

@RequiredArgsConstructor
@RestController
@Log4j2
public class ChatController {

    private final TokenUtils tokenUtils;
    private final ChannelTopic channelTopic;
    private final RedisTemplate<String, Object> redisTemplate;
    private final ChatService chatService;

    @MessageMapping("/chat/message")
    public void message(ChatMessage message){
        String name = message.getSender();
        message.setSender(name);
        if(ChatMessage.MessageType.ENTER.equals(message.getType())) {
            // message.setSender("[알림]");
            message.setMessage(name + "님이 입장하셨습니다.");
        }
        chatService.sendChatMessage(message);
    }
}

 

  ChatRoomController

크게 
1) 모든 채팅방 조회
2) 채팅방 생성
두 가지 기능을 담당한다. 프로젝트를 진행할 때 해당 부분을 어떻게 처리할까 고민이 많았는데 채팅방 사용의 목적이 개인 채팅을 위한 목적이 아닌 URL 공유를 위한 목적이 더 중요하다고 생각하였고 주제 또한 URL 공유이기 때문에 인가가 허가된 모든 사용자들에게 동등하게 채팅방에 입장할 수 있도록 하자는 방향으로 결정되었다. 
 
생성과 조회를 위한 API 호출 과정에서는 모두 엔티티를 참조하는 것이 아닌 DTO 생성을 통해 처리해주었다. 
 
 
https://wonsjung.tistory.com/421

회원 조회 API 개발 / Result 클래스로 유연한 JSON 반환

회원 조회는 값을 가져와 화면에 보여주기만 하면 된다. 즉, 생성 / 수정 없이 조회만 하면 된다 -> REST API : GET Method 사용 회원 조회 V1 : 응답 값으로 엔티티를 직접 외부에 노출 @RestController @Require

wonsjung.tistory.com

해당 게시글에서 정리한 엔티티를 외부에 노출하지 않는 이유를 확인할 수 있다.
 
 

@Log4j2
@RequiredArgsConstructor
@Controller
@RequestMapping("/chat")
public class ChatRoomController{

    private final TokenUtils tokenUtils;
    private final FileRepository fileRepository;
    private final ChatService chatService;
    private final ServletContext servletContext;
    private final ChatRoomRepository chatRoomRepository;
    private final ChatRepository chatRepository;
    private final ImageUrlRepository imageUrlRepository;
    private final MemberService memberService;
    private final FileStore fileStore;

    @GetMapping("/rooms")
    @ResponseBody
    public List<ChatRoomDto> room(){
        List<ChatRoom> allRoom = chatRoomRepository.findAllRoom();
        List<ChatRoomDto> all = new ArrayList<>();
        for (ChatRoom chatRoom : allRoom) {
            ChatRoomDto chatRoomDto = ChatRoomDto.builder()
                    .name(chatRoom.getName())
                    .roomId(chatRoom.getRoomId())
                    .imageUrl(chatRoom.getImageUrl().getFilePath())
                    .build();

            all.add(chatRoomDto);
        }

        return all;
    }

    @PostMapping("/room")
    @ResponseBody
    public ChatRoomDto createRoom(@RequestBody NewChatRoomDto newChatRoomDto, HttpServletRequest request, HttpServletResponse response) throws Exception{
        Member member = getMemberThroughRequest(request);
        ImageUrl imageUrl = ImageUrl.builder()
                .filePath(newChatRoomDto.getImageUrl())
                .build();
        imageUrlRepository.save(imageUrl);

        ChatRoomDto chatRoomDto = chatRoomRepository.createChatRoom(newChatRoomDto.getName(), imageUrl);
        memberService.connectMemberAndChatRoom(chatRoomDto.getRoomId(), member.getEmail());

        return ChatRoomDto.builder()
                .name(chatRoomDto.getName())
                .roomId(chatRoomDto.getRoomId())
                .imageUrl(chatRoomDto.getImageUrl())
                .build();
    }

    private Member getMemberThroughRequest(HttpServletRequest request) {
        String author = request.getHeader(AuthConstants.AUTHORIZATION_HEADER);
        String token = author.substring(7, author.length());
        String email = tokenUtils.getUid(token);

        return memberService.findByEmail(email);
    }
}

 
 
 
 
 

◎ ChatRoomRepository 구현

Redis 적용을 위해 SpingDataJpa가 아닌 Hash를 이용한 Repository 구현이다.
해당 부분은 어떻게 해야할 지 막막해서 다른 분이 구현한 걸 참고했다. 다른 방법이 분명 존재할텐데 아직은 어려워서 변경할 수가 없었다. 
다른 방법을 찾아 적용시키게 되면 게시글을 수정 할 예정이다.

@RequiredArgsConstructor
@Repository
public class ChatRoomRepository {

    private static final String CHAT_ROOMS = "CHAT_ROOM"; // 채팅룸 저장
    public static final String USER_COUNT = "USER_COUNT"; // 채팅룸에 입장한 클라이언트수 저장
    public static final String ENTER_INFO = "ENTER_INFO"; // 채팅룸에 입장한 클라이언트의 sessionId와 채팅룸 id를 맵핑한 정보 저장
    private final ChatService chatService;
    private final FileStore fileStore;

    @Resource(name = "redisTemplate")
    private HashOperations<String, String, ChatRoom> hashOpsChatRoom;
    @Resource(name = "redisTemplate")
    private HashOperations<String, String, String> hashOpsEnterInfo;
    @Resource(name = "redisTemplate")
    private ValueOperations<String, String> valueOps;

    // 모든 채팅방 조회
    public List<ChatRoom> findAllRoom() {
        return hashOpsChatRoom.values(CHAT_ROOMS);
    }

    // 특정 채팅방 조회
    public ChatRoom findRoomById(String id) {
        return hashOpsChatRoom.get(CHAT_ROOMS, id);
    }

    // 채팅방 생성 : 서버간 채팅방 공유를 위해 redis hash에 저장한다.
    public ChatRoomDto createChatRoom(String name, ImageUrl imageUrl) throws MalformedURLException {
        ChatRoom chatRoom = ChatRoom.create(name, imageUrl);

        chatService.saveChatRoom(chatRoom, imageUrl);

        hashOpsChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
        return ChatRoomDto.builder()
                .name(chatRoom.getName())
                .roomId(chatRoom.getRoomId())
                .imageUrl(imageUrl.getFilePath())
                .build();
    }

    // 유저가 입장한 채팅방ID와 유저 세션ID 맵핑 정보 저장
    public void setUserEnterInfo(String sessionId, String roomId) {
        hashOpsEnterInfo.put(ENTER_INFO, sessionId, roomId);
    }

    // 유저 세션으로 입장해 있는 채팅방 ID 조회
    public String getUserEnterRoomId(String sessionId) {
        return hashOpsEnterInfo.get(ENTER_INFO, sessionId);
    }

    // 유저 세션정보와 맵핑된 채팅방ID 삭제
    public void removeUserEnterInfo(String sessionId) {
        hashOpsEnterInfo.delete(ENTER_INFO, sessionId);
    }
}

 
 
 
 

 

 

◎ ChatService 구현

Chat 관련 Service 로직을 작성하였다. 많은 기능이 없는 걸보니 Service 로직 분리를 잘 못한 것 같다. 수정이 필요할 것 같다.
그리고 마지막에 EntityManager로 persist로 Entity를 저장하는 부분이 있는데 위에서 작성했듯이 ChatRoomRepository를 Redis 사용을 위해 Hash를 사용했었다. 데이터베이스에도 똑같이 ChatRoom을 저장하기 위해 엔티티를 생성해준 후 아래와 같은 방법으로 저장해주는 방법을 사용하였다.

@Log4j2
@RequiredArgsConstructor
@Service
public class ChatService {

    @PersistenceContext
    private EntityManager em;

    private final ChannelTopic channelTopic;
    private final RedisTemplate redisTemplate;
    private final ChatRepository chatRepository;

    /**
     * destination정보에서 roomId 추출
     */
    public String getRoomId(String destination) {
        int lastIndex = destination.lastIndexOf('/');
        if (lastIndex != -1) {
            String substring = destination.substring(lastIndex + 1);
            log.info("SubString : " + substring);
            return substring;
        }
        else
            return "";
    }

    /**
     * 채팅방에 메시지 발송
     */
    public void sendChatMessage(ChatMessage chatMessage) {
        if (ChatMessage.MessageType.ENTER.equals(chatMessage.getType())) {
            chatMessage.setMessage(chatMessage.getSender() + "님이 방에 입장했습니다.");
            chatMessage.setSender("[알림]");
        } else if (ChatMessage.MessageType.QUIT.equals(chatMessage.getType())) {
            chatMessage.setMessage(chatMessage.getSender() + "님이 방에서 나갔습니다.");
            chatMessage.setSender("[알림]");
        }
        redisTemplate.convertAndSend(channelTopic.getTopic(), chatMessage);
    }

    @Transactional
    public void saveChatRoom(ChatRoom chatRoom, ImageUrl imageUrl) {
        chatRoom.setImageUrl(imageUrl);
        em.persist(chatRoom);
    }
}

 
 
 
 
 

Redis 구독 서비스 구현

메시지 리스너에 메시지가 수신되면 아래 RedisSubscriber.sendMessage가 수행된다.
수신된 메시지는 /sub/chat/room/{roomId}를 구독한 websocket 클라이언트에게 메시지가 발송된다.

@Log4j2
@RequiredArgsConstructor
@Service
public class RedisSubscriber{

    private final ObjectMapper objectMapper;
    private final SimpMessageSendingOperations messagingTemplate;

   public void sendMessage(String publishMessage){
       try{
           ChatMessage chatMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
           messagingTemplate.convertAndSend("/sub/chat/room/" + chatMessage.getRoomId(), chatMessage);
       } catch (Exception e){
           log.error("Exception {}", e);
       }
   }
}

 

 

 

정리하자면,


Stomp Protocol을 이해해야 동작과정을 정확히 이해할 수 있다. 나도 처음 해당 부분을 구현할 때 이해가 완전히 이루어지지 못한 채 참고하는 코드를 보고 이해하는 방법으로 진행하였는데 오류가 발생하면 어디서 무슨 오류가 발생했는지 정확히 찾기 조차 힘들었다. 그래서 Log를 출력해서 확인해보며 진행했었는데 이제는 조금 감이 잡힌다.
 

  1. 채팅방에 입장하게 되면 Stomp Server에 연결된다. Stomp Server에 연결되었기 때문에 Pub / Sub 기능이 활성화되고, 채팅방 입장은 Sub에 해당된다. 따라서, CONNECT-SUBSCRIBE가 [채팅방 입장]이라는 행위에 의해 거의 동시에 이루어지게 된다.
  2. 채팅방에 메세지를 보내게 되면 ChatController(/chat/message)에서 수신하게 되고 이를 ChatService 로직(sendChatMessage메서드)에 의해 수행된다. ChatService ChatMessage 정보를 최신화한 후 이를 redisTemplate로 넘겨준다.
  3. RedisTemplate는 Stomp Protocol의 Pub / Sub 기능을 담당하고 redisTemplate가 가진 Topic에 해당하는 모든 채팅방에 메세지를 전송하게 된다(=Publish 기능)

 
 
 


https://github.com/CSID-DGU/2022-2-OSSProj-KKJ-5/tree/main/ossproj-spring-server

GitHub - CSID-DGU/2022-2-OSSProj-KKJ-5

Contribute to CSID-DGU/2022-2-OSSProj-KKJ-5 development by creating an account on GitHub.

github.com

프로젝트 Spring Server에 해당하는 코드이다. 
 
 
 
 

728x90
반응형