카프카를 이용한 로그 전송 애플리케이션 구현: Spring Boot, Kafka

728x90
반응형

카프카를 이용한 로그 전송 애플리케이션 구현하기

Github 실습 레포: https://github.com/bestlalala/spring-logging-with-kafka.git > LogSend 폴더

이번 섹션에서는 Spring 애플리케이션에서 카프카를 사용하여 로그 메시지를 전송하는 방법을 알아보겠습니다.
이 실습에서는 Spring Boot와 Apache Kafka를 활용하며, 로그를 특정 카프카 토픽으로 전송하는 기능을 구현합니다.
이를 위해 필요한 의존성과 카프카 설정을 추가하고, 실제 메시지를 전송하는 애플리케이션을 구현하겠습니다.

 

애플리케이션 아키텍처


1) 의존성 추가

먼저, 로그 전송을 위한 의존성을 pom.xml 파일 또는 build.gradle 파일에 추가한다.
필요한 의존성은 다음과 같다:

Dependencies

Spring Web: RESTful 웹 애플리케이션을 만들기 위한 의존성
Spring DevTools: 개발 중 편리한 기능을 제공하는 의존성
Lombok: 코드 간소화를 위한 라이브러리
Spring for Apache Kafka: Kafka와의 통합을 위한 의존성

 

2) application.yml 설정

카프카 설정을 application.yml 파일에 추가한다. 이 파일에서 카프카의 bootstrap-servers와 producer 설정을 정의할 수 있다. bootstrap-servers에는 EC2 인스턴스의 퍼블릭 IP를 입력하여 카프카 클러스터와의 연결을 설정한다.

spring:
  kafka:
    bootstrap-servers: <public-ip>:9092
    producer:
      key-deserializer: org.apache.kafka.common.StringSerializer
      value-deserializer: org.apache.kafka.common.serialization.StringSerializer
    template:
      default-topic: log-topic

 

3) Kafka 환경 설정 클래스 추가

Kafka와 Spring을 연결하기 위해, KafkaConfiguration 클래스를 생성하여 카프카 프로듀서 설정을 진행한다. 이 클래스는 application.yml에서 설정한 값을 읽어 KafkaTemplate을 빈으로 등록한다.

프로젝트에 카프카 환경 설정 클래스를 추가(KafkaConfiguration)

import org.apache.kafka.common.serialization.StringSerializer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfiguration {
   //application.properties 나 application.yaml 파일에 있는
   //키의 값을 가져와서 설정
   @Value("${spring.kafka.bootstrap-servers}")
   private String bootstrapServers;

   @Bean
   public ProducerFactory<String, String> producerFactory(){
       Map<String, Object> configs = new HashMap<>();
       configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       return new DefaultKafkaProducerFactory(configs);
   }

@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
   return new KafkaTemplate<>(producerFactory());
}


}

 

4) 로그 전송 클래스

KafkaProducer 클래스는 카프카의 KafkaTemplate을 사용하여 메시지를 카프카 토픽에 전송하는 기능을 담당한다. 로그 메시지의 내용을 생성하여 지정된 토픽(log-topic)으로 전송한다.

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;


@Service
public class KafkaProducer {
   //토픽이름
   private static final String TOPIC = "log-topic";


   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;



   //토픽을 전송하는 메서드
   public void sendTopic(String timestamp){
       StringBuilder sb = new StringBuilder("time:");
       sb.append(timestamp);
       kafkaTemplate.send(TOPIC, sb.toString());
   }
}

 

5) Controller 클래스

웹 애플리케이션에서 카프카 로그를 전송하려면, 간단한 REST API를 구현하여 클라이언트 요청에 따라 로그를 전송한다. 아래는 FrontController 클래스의 예시로, index 엔드포인트에서 카프카로 로그를 전송한다.

import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;


import java.util.GregorianCalendar;
import java.util.Calendar;

@RestController
@RequiredArgsConstructor
public class FrontController {
   private final KafkaProducer kafkaProducer;

   @GetMapping("/health")
   public String health() {
       return "OK";
   }

   @GetMapping("/")
   public String index() {
       Calendar cal = new GregorianCalendar();
       kafkaProducer.sendTopic(cal.toString());	// 로그 전송
       return "MyWeb";
   }
}

 

6) 애플리케이션 실행 및 확인

애플리케이션을 실행한 후, 브라우저에서 localhost:8080을 호출하여 로그 메시지가 카프카 토픽으로 정상적으로 전송되는지 확인한다. 또한, 카프카 클러스터의 메시지 수신 여부를 터미널에서 확인할 수 있다.

애플리케이션 실행 후, 브라우저에서 localhost:8080을 호출해서 확인한다.

터미널 창에서 메시지가 잘 수신되는지 확인한다.

터미널에서 메시지 수신 확인:

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic log-topic --from-beginning

위 명령어로, 카프카에서 log-topic 토픽에 전송된 메시지를 실시간으로 확인할 수 있다.


다음 글에서는 이 애플리케이션에서 전송한 메시지를 수신하여 파일로 만들고, S3 버킷에 업로드하는 애플리케이션을 만들어보겠습니다.

728x90
반응형