Post

[Nest에서 Kafka 해보기] Kafka로 받아온 데이터를 실시간 통신으로 FE에 전달하기

[Nest에서 Kafka 해보기] Kafka로 받아온 데이터를 실시간 통신으로 FE에 전달하기

kafka-websoket.gif

이번 게시글에선 받은 kafka 데이터를 websocket으로 FE에 전달하는 로직을 구현하려한다.

구현한 코드 Github 링크

Nestjs 소켓통신 환경 구축하기

yarn add

1
yarn add @nestjs/websockets @nestjs/platform-socket.io socket.io

버전맞추기

websocket의 경우 버전이 다르면 오류가 쉽게 발생한다고함. @nestjs/....이런 애들은 버전을 맞춰줘야함.

버전이 낮은 것들을 yarn add 활용해서 업데이트해서 버전을 맞춰준다.

gateway.ts

Nest.js에서 websocket은 gateway.ts가 담당한다.

gateway.ts에 다음과 같이 기능을 구현했다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import {
  OnGatewayConnection,
  WebSocketGateway,
  WebSocketServer
} from "@nestjs/websockets";
import {Server, Socket} from "socket.io";

@WebSocketGateway({
  // ws://localhost:3001/chart_monitoring
  namespace: 'chart_monitoring',
  cors: {
    origin: ['http://localhost:5173'],
    credentials: true,
  }
})
export class AppGateway implements OnGatewayConnection {
  @WebSocketServer()
  server : Server

  // 연결되면 동작하는 함수
  handleConnection(socket: Socket): any {
    console.log('on connected called :', socket.id)
  }

  sendKafka(data : any) {
    // kafka라는 이벤트에 data를 보낼게
    this.server.emit('kafka', data)
  }
}

그리고 기존에 구현했던 kafka 메세지를 받고 console을 표시하는 로직에 gateway의 sendKafka를 실행하는 로직을 추가적으로 구현했다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import { Injectable } from '@nestjs/common';
import {EachMessagePayload, Kafka, logLevel} from "kafkajs";
import {AppGateway} from "./app.gateway";

@Injectable()
export class AppService {
  // kafka 객체 생성
  private kafka = new Kafka({
    clientId: process.env.CLIENT_ID,
    brokers: [process.env.KAFKA_CLIENT_BOOTSTRAP_SERVER],
    logLevel: logLevel.INFO,
  })

  private consumer = this.kafka.consumer({
    groupId: process.env.GROUP_ID
  })

  constructor(
    private readonly appGateway: AppGateway
  ) {
    this.consumer.connect().then(r => console.log('connected'));
    this.consumer.subscribe({ topics: ['tradeData']}).then(r => console.log('subscribe'))
    this.consumer.run({
      eachMessage: async(payload: EachMessagePayload) => {
        await this.consumerCallback(payload)
      }
    }).then(r => console.log('run'));
  }

  async consumerCallback(payload: EachMessagePayload) {
    console.log('✨✨✨✨✨✨✨')
    console.log('kafka message 도착')
    console.log(`topic : ${payload.topic}, Message: ${payload.message.value.toString()}`)
    console.log('✨✨✨✨✨✨✨')
    // 도착했으니 저장하고 FE로 전달하는 과정 필요함.

    /**
     * 추후 DB에 저장하는 과정을 구현할 위치
     */

      // FE로 전달하는 과정
    const messageData = JSON.parse(payload.message.value.toString());
    this.appGateway.sendKafka(messageData);
  }
}

React에서 값을 받아오는 과정

단순하게 값이 넘어올 때 마다 해당 값을 표시하는 로직으로 이를 구현해볼까한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import kafka from './assets/kafka.svg'
import './App.css'
import {io} from "socket.io-client";
import {useEffect, useState} from "react";

function App() {
  const URL = import.meta.env.VITE_SOCKET_URL
  const socket = io(URL)
  const [receivedData, setReceivedData] = useState<any>(null)
  useEffect(() => {
    socket.on('connect', () => console.log('Connected!'))
    socket.on('kafka', (data) => {
      setReceivedData(data)
    })
  })
  return (
    <>
      <div>
        <a href="https://scorchedrice.github.io" target="_blank">
          <img src={kafka} className="logo" alt="Kafka logo" />
        </a>
      </div>
      <h1>Kafka Data Receive</h1>
      {receivedData ? <div className="card">
        <p>코인명 : {receivedData.coinName}</p>
        <p>거래된 코인 수 : {receivedData.numOfCoin}코인</p>
        <p>총 거래액 : {receivedData.price * receivedData.numOfCoin}포인트</p>
      </div> : null}
    </>
  )
}

export default App

결론

내가 구현한 기능의 아키텍처는 다음과 같다.

flowchart TD
  subgraph Kafka[Kafka Broker]
    topic[Topic]
  end

  subgraph Services
    producer[Producer localhost:3000]
    consumer[Consumer localhost:3001]
    frontend[React localhost:5173]
    db[(Database)]
  end

%% Current data flow
  producer --> topic
  topic --> consumer
  consumer --> frontend

%% Future DB implementation
  consumer -.-|DB 저장 - 구현예정|db

%% Styling with universal colors
  classDef kafkaStyle fill:#ff9966,stroke:#666,stroke-width:2px,color:#333
  classDef serviceStyle fill:#90EE90,stroke:#666,color:#333
  classDef topicStyle fill:#FFF,stroke:#ff9966,color:#333
  classDef dbStyle fill:#87CEEB,stroke:#666,stroke-dasharray: 5 5,color:#333

  class Kafka kafkaStyle
  class producer,consumer,frontend serviceStyle
  class topic topicStyle
  class db dbStyle

이런 나만의 개인 프로젝트를 진행한 이유는 계획중인 프로젝트에 앞서 Kafka 사용법을 알고자 함이였다.

비록 단순히 데이터를 주고 표시하는 단순한 구현이지만 사용법을 알게되어 좋았다.

DB 저장하는 기능은 아직 구현하지 않았지만, 추후 진행할 프로젝트에서 이를 구현하고 포스트로 공유해보겠다~!

This post is licensed under CC BY 4.0 by the author.