isPowerfulBlog
[Logstash] Forwarding messages from Kafka to Elasticsearch 본문
Data Engineering
[Logstash] Forwarding messages from Kafka to Elasticsearch
왕밤빵도라에몽 2022. 12. 18. 15:00Logstash의 Pipeline Config를 Kafka input, ElasticSearch output으로 구성해
kafka메세지를 logstash를 통해 elasticsearch로 전달하고자 한다.
logstash plugin install
logstash-input-kafka
kafka로부터 인풋을 받을거니까 input kafka 플러그인을 설치해준다.
-
$ ./bin/logstash-plugin install logstash-input-kafka
- 난 이미 플러그인이 있긴 했다
logstash-output-elasticsearch
logstash output을 es에 넣을거니까 output elasticsearch 플러그인을 설치해준다.
-
$ ./bin/logstash-plugin install logstash-output-elasticsearch
- 설치 완료
logstash config
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["kafka_topic"]
group_id => "logstash"
consumer_threads => 1
}
}
- kafka로 input 설정
- 부트스트랩 서버, 데이터 가져올 토픽, 컨슈머 그룹 아이디 등 설정
output {
elasticsearch {
hosts => "localhost:9200"
index => "kafka-%{+YYYY.MM.dd}"
user => "elastic"
document_type => "_doc"
}
}
- output으로 elasticsearch 설정
- es 서버, 인덱스 방식, 유저, doc type 등 설정
Consumer group 생성
~/kafka$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_topic --group logstash
- logstash라는 이름의 consumer group 생성
Topic 생성
~/kafka$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafka_topic
kafka_topic
이라는 Topic 생성
~/kafka$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
- 토픽 리스트 확인
Producer 메세지 publishing
~/kafka$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_topic
- 아무 메세지 publish
Consumer로 메세지 consume
~/kafka$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka_topic --from-beginning
- 메세지 잘 comsume 해옴
logstash로 소비되었는지 확인하기
~/kafka$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group logstash
- consumer group 확인
CURRENT-OFFSET
consumer group이 kafka에서 소비한 offset -> 메세지를 4개 보냈는데 4개 소비했으니 잘 소비한걸로 보인다!
ElasticSearch에 인덱싱 되었는지 확인하기
http://localhost:9200/kafka-*/_search?pretty
- 잘 되었다!
References
https://log-laboratory.tistory.com/169
https://yookeun.github.io/elasticsearch/2019/02/17/logstash-kafka/
https://17billion.github.io/elastic,/kafka/2017/10/05/elastic_stack_elk_kafka.html
'Data Engineering' 카테고리의 다른 글
[PostgreSQL] psql 기본 명령어 (0) | 2023.01.24 |
---|---|
[ElasticSearch] Document API: GET, POST, DELETE (0) | 2023.01.11 |
[Kafka] Consumer not receiving messages (0) | 2022.12.02 |
[Kafka] Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups (0) | 2022.12.02 |
[Kafka] kafka.common.InconsistentClusterIdException (0) | 2022.11.26 |