2025년 2월 27일 작성
ksqlDB Collection - Source와 Derived로 나뉘는 Data 저장소
ksqlDB의 collection은 Kafka topic에서 직접 읽는 source collection과 query 결과로 생성되는 derived collection으로 나뉘며, 각각 stream과 table 형태로 존재합니다.
Collection : ksqlDB의 Data 저장소
- ksqlDB에서 data를 저장하고 처리하는 단위를 collection이라고 합니다.
- collection은 data 원천에 따라 Source Collection과 Derived Collection으로, data model에 따라 Stream과 Table로 나뉩니다.
| Source Collection | Derived Collection | |
|---|---|---|
| Stream | Source Stream | Derived Stream |
| Table | Source Table | Derived Table |
flowchart LR
subgraph kafka["Kafka Cluster"]
topic_a[Topic A]
topic_b[Topic B]
derived_topic_1[Derived Topic 1]
derived_topic_2[Derived Topic 2]
end
subgraph ksqldb["ksqlDB"]
subgraph source["Source Collection"]
source_stream[Source Stream]
source_table[Source Table]
end
subgraph derived["Derived Collection"]
derived_stream[Derived Stream]
derived_table[Derived Table]
end
end
topic_a --> source_stream
topic_b --> source_table
source_stream --> derived_stream
source_stream --> derived_table
derived_stream --> derived_topic_1
derived_table --> derived_topic_2
Source Collection : Kafka Topic에서 직접 읽는 Collection
- source collection은 Kafka topic의 data를 직접 읽어와서 생성하는 collection입니다.
CREATE STREAM또는CREATE TABLE문으로 Kafka topic과 연결하여 생성합니다.- source collection 자체는 새로운 Kafka topic을 생성하지 않습니다.
- source collection은 외부 system에서 Kafka로 유입된 원본 data를 ksqlDB에서 처리하기 위한 진입점 역할을 합니다.
Source Stream
- Kafka topic의 event를 시간 순서대로 읽어오는 append-only collection입니다.
- 모든 event가 그대로 보존되며, 수정이나 삭제가 불가능합니다.
CREATE STREAM order_stream (
order_id VARCHAR KEY,
product_name VARCHAR,
amount DECIMAL,
order_time TIMESTAMP
) WITH (
KAFKA_TOPIC = 'orders',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'order_time'
);
Source Table
- Kafka topic의 data를 key 기준으로 최신 상태만 유지하는 collection입니다.
- 동일한 key에 새로운 값이 들어오면 기존 값을 덮어씁니다.
CREATE TABLE user_table (
user_id VARCHAR PRIMARY KEY,
name VARCHAR,
email VARCHAR
) WITH (
KAFKA_TOPIC = 'users',
VALUE_FORMAT = 'JSON'
);
Derived Collection : Query 결과로 생성되는 Collection
- derived collection은 기존 collection에 query를 실행하여 생성하는 collection입니다.
CREATE STREAM AS SELECT또는CREATE TABLE AS SELECT문으로 생성합니다.- source collection과 달리 새로운 Kafka topic이 자동으로 생성되며, query 결과가 해당 topic에 기록됩니다.
- derived collection은 persistent query로 동작하여, 원본 collection에 새로운 data가 들어올 때마다 자동으로 결과가 갱신됩니다.
Derived Stream
- 기존 stream이나 table에 filtering, 변환, join 등의 연산을 적용하여 새로운 event stream을 생성합니다.
CREATE STREAM high_value_orders AS
SELECT order_id, product_name, amount, order_time
FROM order_stream
WHERE amount > 10000
EMIT CHANGES;
Derived Table
- 기존 stream이나 table에 집계 연산을 적용하여 상태를 유지하는 table을 생성합니다.
GROUP BY절이 필수이며, 집계 함수(COUNT,SUM,AVG등)와 함께 사용합니다.
CREATE TABLE product_sales AS
SELECT
product_name,
COUNT(*) AS order_count,
SUM(amount) AS total_amount
FROM order_stream
GROUP BY product_name
EMIT CHANGES;
Source Collection과 Derived Collection의 차이
| 특성 | Source Collection | Derived Collection |
|---|---|---|
| 생성 방법 | CREATE STREAM/TABLE |
CREATE STREAM/TABLE AS SELECT |
| data 원천 | Kafka topic 직접 연결 | 기존 collection의 query 결과 |
| Kafka topic 생성 | 기존 topic 사용 (새로 생성하지 않음) | 새로운 topic 자동 생성 |
| query 실행 | 없음 (data 읽기만 수행) | persistent query 상시 실행 |
| data 갱신 | Kafka topic에 data가 들어올 때 | 원본 collection에 변경이 발생할 때 |
- source collection은 data 유입의 시작점이고, derived collection은 data 가공의 결과물입니다.
- 하나의 source collection에서 여러 derived collection을 생성할 수 있으며, derived collection에서 또 다른 derived collection을 생성하는 것도 가능합니다.