isPowerfulBlog

[Logstash] Forwarding messages from Kafka to Elasticsearch 본문

Data Engineering

[Logstash] Forwarding messages from Kafka to Elasticsearch

왕밤빵도라에몽 2022. 12. 18. 15:00

Logstash의 Pipeline Config를 Kafka input, ElasticSearch output으로 구성해
kafka메세지를 logstash를 통해 elasticsearch로 전달하고자 한다.

logstash plugin install

logstash-input-kafka

kafka로부터 인풋을 받을거니까 input kafka 플러그인을 설치해준다.

-

$ ./bin/logstash-plugin install logstash-input-kafka

스크린샷, 2022-11-25 23-55-55

  • 난 이미 플러그인이 있긴 했다

logstash-output-elasticsearch

logstash output을 es에 넣을거니까 output elasticsearch 플러그인을 설치해준다.

-

$ ./bin/logstash-plugin install logstash-output-elasticsearch

스크린샷, 2022-11-25 23-56-12

  • 설치 완료

logstash config

스크린샷, 2022-12-04 22-03-17

input {
   kafka {
      bootstrap_servers => "localhost:9092"
      topics => ["kafka_topic"]
      group_id => "logstash"
      consumer_threads => 1
   }
}
  • kafka로 input 설정
  • 부트스트랩 서버, 데이터 가져올 토픽, 컨슈머 그룹 아이디 등 설정
output {
    elasticsearch {
        hosts => "localhost:9200"
        index => "kafka-%{+YYYY.MM.dd}"
        user => "elastic"
        document_type => "_doc"
    }
}
  • output으로 elasticsearch 설정
  • es 서버, 인덱스 방식, 유저, doc type 등 설정

Consumer group 생성

~/kafka$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_topic --group logstash
  • logstash라는 이름의 consumer group 생성

Topic 생성

스크린샷, 2022-12-04 20-07-24

~/kafka$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafka_topic
  • kafka_topic 이라는 Topic 생성

스크린샷, 2022-12-04 20-09-43

~/kafka$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 토픽 리스트 확인

Producer 메세지 publishing

스크린샷, 2022-12-18 14-52-27

~/kafka$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic
  • 아무 메세지 publish

Consumer로 메세지 consume

스크린샷, 2022-12-18 14-53-50

~/kafka$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_topic --from-beginning
  • 메세지 잘 comsume 해옴

logstash로 소비되었는지 확인하기

스크린샷, 2022-12-18 14-56-40

~/kafka$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group logstash
  • consumer group 확인
  • CURRENT-OFFSET consumer group이 kafka에서 소비한 offset -> 메세지를 4개 보냈는데 4개 소비했으니 잘 소비한걸로 보인다!

ElasticSearch에 인덱싱 되었는지 확인하기

스크린샷, 2022-12-18 14-59-02

http://localhost:9200/kafka-*/_search?pretty
  • 잘 되었다!

References

https://log-laboratory.tistory.com/169
https://yookeun.github.io/elasticsearch/2019/02/17/logstash-kafka/
https://17billion.github.io/elastic,/kafka/2017/10/05/elastic_stack_elk_kafka.html