2025년 4월 13일 작성
OpenSearch Sink Connector - Kafka Data를 OpenSearch로 전송하기
OpenSearch Sink Connector는 Kafka topic의 data를 OpenSearch index로 전송하는 Kafka Connect plugin입니다.
OpenSearch Sink Connector
- OpenSearch Sink Connector는 Kafka topic의 data를 OpenSearch index로 전송하는 Kafka Connect plugin입니다.
- Kafka에 저장된 event나 log data를 OpenSearch에 indexing하여 검색 및 분석 기능을 제공합니다.
- Aiven에서 개발하고 관리하며, Apache 2.0 license로 제공됩니다.
flowchart LR
subgraph kafka[Kafka]
topic[Kafka Topic]
end
subgraph connect[Kafka Connect]
connector[OpenSearch Sink Connector]
end
subgraph opensearch[OpenSearch]
index[OpenSearch Index]
end
topic --> connector
connector --> index
주요 기능
- Bulk Indexing : 여러 record를 batch로 묶어 OpenSearch에 전송합니다.
batch.size와linger.ms설정으로 batch 크기와 대기 시간을 조절합니다.
- Document ID 생성 : Kafka record key 또는 자동 생성 전략으로 document ID를 결정합니다.
key.ignore설정으로 key 무시 여부를 선택합니다.topic.partition.offset전략으로 고유한 ID를 자동 생성합니다.
- Upsert 지원 : 동일 ID의 document를 insert하거나 update합니다.
index.write.method설정으로 insert 또는 upsert mode를 선택합니다.
- Data Stream 지원 : OpenSearch Data Stream으로 시계열 data를 효율적으로 저장합니다.
data.stream.enabled설정으로 Data Stream mode를 활성화합니다.
- Tombstone 처리 : null value record(Kafka tombstone)를 처리하는 방식을 설정합니다.
behavior.on.null.values설정으로 무시, 삭제, 실패 중 선택합니다.
사용 사례
- Log 분석 : application log를 OpenSearch에 저장하여 검색 및 시각화합니다.
- 실시간 검색 : Kafka의 event data를 OpenSearch에 indexing하여 실시간 검색 기능을 제공합니다.
- CDC Pipeline : database 변경 사항을 Kafka를 거쳐 OpenSearch에 동기화합니다.
- Monitoring : metric data를 OpenSearch에 저장하여 dashboard로 monitoring합니다.
Connector 설정
- OpenSearch Sink Connector는 connection, batching, data conversion, data stream, authentication 관련 설정을 제공합니다.
기본 설정 예시
{
"name": "opensearch-sink-connector",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"connection.url": "http://localhost:9200",
"connection.username": "admin",
"connection.password": "password",
"topics": "logs,events",
"key.ignore": "true",
"schema.ignore": "true",
"batch.size": 2000,
"tasks.max": "1"
}
}
Connection 설정
| 설정 | 설명 | 기본값 |
|---|---|---|
connection.url |
OpenSearch HTTP 연결 URL 목록 | 필수 |
connection.username |
인증 username | null |
connection.password |
인증 password | null |
connection.timeout.ms |
연결 timeout | 1000 |
read.timeout.ms |
읽기 timeout | 3000 |
Batching 설정
| 설정 | 설명 | 기본값 |
|---|---|---|
batch.size |
한 번에 처리할 record 수 | 2000 |
max.in.flight.requests |
동시에 진행 가능한 indexing 요청 수 | 5 |
max.buffered.records |
task당 buffer에 저장할 최대 record 수 | 20000 |
linger.ms |
batch 전송 전 대기 시간 | 1 |
flush.timeout.ms |
flush 작업 timeout | 10000 |
linger.ms를 늘리면 batch 효율이 높아지지만 지연 시간이 증가합니다.max.buffered.records로 memory 사용량을 제한합니다.
Retry 설정
| 설정 | 설명 | 기본값 |
|---|---|---|
max.retries |
실패한 indexing 요청의 최대 재시도 횟수 | 5 |
retry.backoff.ms |
재시도 간 대기 시간 | 100 |
- 재시도할 때마다 대기 시간이 최대 2배씩 증가합니다.
Data Conversion 설정
| 설정 | 설명 | 기본값 |
|---|---|---|
index.write.method |
쓰기 방식 (insert, upsert) |
insert |
key.ignore |
record key를 document ID로 사용하지 않음 | false |
key.ignore.id.strategy |
key 무시 시 ID 생성 전략 | topic.partition.offset |
schema.ignore |
schema 무시 여부 | false |
compact.map.entries |
map entry를 compact하게 저장 | true |
drop.invalid.message |
변환 실패 message 삭제 여부 | false |
key.ignore.id.strategy옵션은none,record.key,topic.partition.offset중 선택합니다.topic.partition.offset:topic+partition+offset조합으로 고유 ID를 생성합니다.
Error Handling 설정
| 설정 | 설명 | 기본값 |
|---|---|---|
behavior.on.null.values |
null value record 처리 방식 | ignore |
behavior.on.malformed.documents |
잘못된 document 처리 방식 | fail |
behavior.on.version.conflict |
version 충돌 처리 방식 | fail |
- 처리 방식 옵션은
ignore,warn,report,fail중 선택합니다.ignore: 무시하고 계속 진행합니다.warn: 경고 log를 남기고 계속 진행합니다.report: errant record reporter에 보고합니다.fail: task를 실패 처리합니다.
Data Stream 설정
| 설정 | 설명 | 기본값 |
|---|---|---|
data.stream.enabled |
Data Stream 사용 여부 | false |
data.stream.prefix |
Data Stream 이름 prefix | null |
data.stream.timestamp.field |
timestamp field 이름 | @timestamp |
- Data Stream을 사용하면
{data.stream.prefix}-{topic}형식으로 data stream 이름이 결정됩니다. - 시계열 data(log, metric 등)에 적합합니다.
장점과 한계점
- OpenSearch Sink Connector는 설정만으로 pipeline을 구축할 수 있지만, 복잡한 변환에는 한계가 있습니다.
장점
- code 작성 없이 configuration만으로 Kafka-OpenSearch pipeline을 구축합니다.
- bulk indexing으로 높은 처리량을 제공합니다.
- retry와 error handling 옵션으로 안정적인 data 전송이 가능합니다.
- Data Stream 지원으로 시계열 data를 효율적으로 관리합니다.
한계점
- 복잡한 data 변환은 SMT만으로 한계가 있어 별도 처리가 필요합니다.
- SMT(Single Message Transform)는 Kafka Connect에서 각 record를 개별적으로 변환하는 기능입니다.
- OpenSearch의 mapping 충돌 시 수동 개입이 필요할 수 있습니다.
- 대량 data 처리 시 batch size와 memory 설정 최적화가 필요합니다.