[Fluentd] Kafka 로컬 파이프라인 구축

by 비타민찌 2024. 8. 8.

Docker Container로 간단히 구동하기

Docker Image Pull

docker pull fluent/fluentd:edge-debian


Fluentd 설정 파일 생성

http input을 stdout으로 출력하는 형식



$ sudo vi fluentd.conf
  @type http
  port 9880
<match **>
  @type stdout


Docker Run

$ sudo docker run -d --name fluentd -p 9880:9880 -v $(pwd):/fluentd/etc fluent/fluentd:edge-debian -c /fluentd/etc/fluentd.conf
# http call
$ curl -X POST -d 'json={"json":"message"}'
# 로그 확인
$ sudo docker logs -f fluentd


Kafka 플러그인 설치

# root 권한 bash로 진입
$ docker exec -it -u root fluentd bash
#현재 설치된 플러그인 확인
$ gem list
#플러그인 설치
$ gem install fluent-plugin-kafka
docker exec -u root fluentd sh -c "apt update; apt install gcc make -y ; gem install fluent-plugin-kafka"

fluentd와 kafka를 연동하려면 플러그인 설치가 필요하다.

그리고 이를 이미지화 하려면 Dockerfile을 작성하여 커스텀 이미지로 만들어야 한다. 


Fluentd와 Kafka를 연동하기 위한 Dockerfile 생성


FROM fluent/fluentd:edge-debian
# Use root account to use apt
USER root
RUN buildDeps="sudo make gcc g++ libc-dev" \
 && apt-get update \
 && apt-get install -y --no-install-recommends $buildDeps \
 && sudo gem install fluent-plugin-kafka



# Docker Build
$ docker build {경로} -t {이미지명}
# 예시
$ docker build ./ -t jaeshim-fluentd-kafka
# Docker Image 목록 확인
$ docker image ls


플러그인 설치 확인

# fluentd cotainer root 권한으로 접근
$ docker exec -it -u root fluentd bash
# 플러그인 설치 확인
$ gem list | grep kafka
fluent-plugin-kafka (0.18.1)
ruby-kafka (1.5.0)


설치된 플러그인:

  • fluent-plugin-kafka (0.18.1)
  • ruby-kafka (1.5.0)



version: '2.2'
    container_name: fluentd
    image: jaeshim-fluent-kafka
      - FLUENTD_CONF=fluentd.conf
      - 9880:9880
      - ./:/fluentd/etc
    image: confluentinc/cp-zookeeper:7.1.2
    container_name: zoo1
      - "2181:2181"
      ZOOKEEPER_SERVERS: zoo1:2888:3888
    image: confluentinc/cp-kafka:7.1.2
    hostname: kafka1
    container_name: kafka1
      - "9092:9092"
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      - zoo1
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:latest
      - 8080:8080
      - zoo1
      - kafka1
      KAFKA_CLUSTERS_0_NAME: local


예제1) 카프카에서 메시지 Consume 후 stdout

  • input: http (fluentd)
  • output: kafka 



  @type kafka_group
  brokers kafka1:19092,kafka2:19093,kafka3:19094,kafka4:19095
  topics "mytopic"
  consumer_group mygroup
  add_prefix kafka
  start_from_beginning true
# 태그는 kafka.[topic 이름]이 된다.
# 여기서는 kafka.mytopic
<match kafka.mytopic>
  @type stdout


Consumer Group 확인


Kafka Admin에서 메시지 생성



-> 파싱이 되지 않는다는 오류 발생



JSON 형식으로 생성

{"test": "vitamin"}


-> 파싱 성공



예제2) http로 메시지를 받고 카프카에 Produce

  • input: kafka
  • output: stdout


# http msg -> fluentd -> kafka
# http message 를 받는다.
  @type http
  port 9880
# http.msg 태그로 match 한다.
<match http.msg>
  @type kafka
  # list of seed brokers
  # 여기서 brokers 의 ip 가 kafka 인 이유는, Docker Swarm 때문이다.
  brokers kafka1:19092
  use_event_time true
 # <buffer>
 #   @type file
 #   path /fluentd/buffer
 #   flush_interval 3s
 # </buffer>
  # topic settings
  # topic_key 는 토픽명
  # default_topic 은 topic_key 가 없을 때 사용되는 default 토픽명
  topic_key mytopic
  default_topic mytopic
    @type json
  # producer settings
  required_acks -1
# curl로 JSON 메시지 생성
$ curl -X POST -d 'json={"json":"test", "sample":"hello world!"}' http://localhost:9880/http.msg


Kafka Admin에서 메시지 생성 확인






Docker fluentd : https://hub.docker.com/r/fluent/fluentd/ 

docker deployment 공식 fluentd 문서 : https://docs.fluentd.org/container-deployment/install-by-docker 

kafka output plugin 공식 fluentd 문서 : https://docs.fluentd.org/output/kafka 

fluent-plugin-kafka plugin : https://github.com/fluent/fluent-plugin-kafka 

kafka 기초 명령어 : https://kafka.apache.org/quickstart 



