2025년 1월 6일 작성
ksqlDB Data Model - Stream과 Table
ksqlDB의 Stream과 Table은 data를 처리하는 두 가지 주요 추상화 model입니다.
Stream & Table : ksqlDB Data Model
- ksqlDB의 Stream과 Table은 data를 처리하는 두 가지 주요 추상화 model입니다.
특성 | Stream | Table |
---|---|---|
Data 성격 | event 기반 (event-based) | 상태 기반 (state-based) |
Data 처리 방식 | 순차적 event 처리 | 상태 update 및 조회 |
Data 저장 | 모든 event 기록 | 각 key의 최신 값만 유지 |
Data 변경 가능성 | 변경 불가능 (immutable) | 변경 가능 (Mutable) |
시간 관점 | 시간에 따른 모든 변경 이력 보존 | 현재 시점의 상태만 표현 |
지원 연산 | INSERT만 가능 | INSERT, UPDATE, DELETE 가능 |
사용 사례 | transaction log, sensor data, log data | 사용자 정보, 재고 현황, 계좌 잔액 |
다른 유형으로 변환 | Table로 변환 가능 | Stream으로 변환 불가 |
- data의 전체 이력과 시간별 변화를 추적하고 분석할 때는 Stream을 사용하고, 특정 시점의 최신 상태만 유지하고 관리할 때는 Table을 사용합니다.
- append-only model인 Stream은 event의 전체 history를 보관하기 때문에, 시간 기반 처리에 적합합니다.
- update model인 Table은 최신 상태만 유지하기 때문에, 상태 기반 처리에 적합합니다.
Stream : 무한히 지속되는 Event의 흐름
- Stream은 ksqlDB에서 시간에 따라 연속적으로 발생하는 event들을 표현하는 data model입니다.
- 무한히 지속되는 event의 흐름을 나타내며, 한 번 저장된 data는 수정이나 삭제가 불가능한 append-only 형태로 관리됩니다.
-
각 record는 고유한 timestamp를 가지며, 이를 통해 시간 기반의 data 처리와 분석이 가능합니다.
- Stream은 실시간 data 처리와 event 기반 system 구축에 적합한 model입니다.
- 신용카드 거래 내역 추적, IoT sensor data 수집 및 분석, application log 처리, 사용자 행동 분석, 실시간 monitoring system 등.
Stream의 특징
- 불변성 (Immutability) : 한 번 저장된 record는 수정하거나 삭제할 수 없습니다.
- INSERT 연산만 지원되며, UPDATE나 DELETE 연산은 불가능합니다.
- 시간 기반 처리 : 모든 record는 고유한 timestamp를 가집니다.
- 시간을 기준으로 event를 처리하고 분석할 수 있습니다.
- Stream 간의 Join 연산 시 timestamp를 기준으로 matching됩니다.
- 무한한 Data Sequence : 시간에 따라 계속해서 새로운 data가 추가됩니다.
- 또한 과거의 모든 event 기록이 보관됩니다.
Stream 생성
-- Kafka topic으로부터 Stream 생성
CREATE STREAM stream_name (
column1 TYPE,
column2 TYPE,
...
) WITH (
KAFKA_TOPIC='topic_name',
VALUE_FORMAT='FORMAT_TYPE',
PARTITIONS=1,
REPLICAS=1
);
-- 기존 Stream으로부터 새로운 Stream 생성
CREATE STREAM new_stream WITH (
KAFKA_TOPIC='new_topic_name'
) AS
SELECT column1, column2
FROM existing_stream
WHERE condition;
Stream 수정
- Stream의 data는 수정이 불가능하지만, Stream의 metadata는 수정할 수 있습니다.
-- Stream 속성 변경
ALTER STREAM stream_name WITH (KAFKA_TOPIC='new_topic_name');
Stream 삭제
-- Stream 삭제
DROP STREAM stream_name;
-- Stream과 연관된 topic도 함께 삭제
DROP STREAM stream_name DELETE TOPIC;
Stream Data 삽입
-- Stream에 data 삽입
INSERT INTO stream_name (column1, column2)
VALUES (value1, value2);
Stream Query Example
-- Stream data 조회
SELECT column1, column2
FROM stream_name
EMIT CHANGES;
-- 시간 Window 기반 집계
SELECT column1, COUNT(*) AS count
FROM stream_name
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY column1
EMIT CHANGES;
Table : 특정 시점의 상태
- Table은 ksqlDB에서 특정 시점의 상태를 나타내는 data model입니다.
- 각 key에 대한 최신 값을 유지하며, 새로운 data가 들어올 때마다 해당 key의 기존 값이 update되는 변경 가능한(mutable) 특성을 가집니다.
- Table은 상태 기반의 data 관리와 현재 상태 조회가 필요한 system 구축에 적합한 model입니다.
- 사용자 프로필 관리, 상품 재고 현황 추적, 계좌 잔액 관리, system 설정값 관리, 실시간 대시보드 상태 표시 등.
Table의 특징
- 변경 가능성 (Mutability) : record는 변경 가능하며 update될 수 있습니다.
- INSERT, UPDATE, DELETE 연산이 모두 지원됩니다.
- 동일한 key에 대한 새로운 값이 들어오면 기존 값이 update됩니다.
- 상태 관리 : 각 key에 대한 최신 상태만을 유지합니다.
- snapshot 형태로 현재 상태를 저장합니다.
- Table 간의 Join 연산 시 현재 상태값을 기준으로 matching됩니다.
- Key 기반 처리 : 모든 record는 고유한 key를 가집니다.
- key를 기준으로 data가 관리되고 update됩니다.
Table 생성
-- Kafka topic으로부터 Table 생성
CREATE TABLE table_name (
column1 TYPE,
column2 TYPE,
...
PRIMARY KEY (column1)
) WITH (
KAFKA_TOPIC='topic_name',
VALUE_FORMAT='FORMAT_TYPE',
PARTITIONS=1,
REPLICAS=1
);
-- 기존 Table이나 Stream으로부터 새로운 Table 생성
CREATE TABLE new_table WITH (
KAFKA_TOPIC='new_topic_name'
) AS
SELECT column1, column2
FROM existing_table
GROUP BY column1;
Table 수정
-- Table 속성 변경
ALTER TABLE table_name WITH (KAFKA_TOPIC='new_topic_name');
Table 삭제
-- Table 삭제
DROP TABLE table_name;
-- Table과 연관된 topic도 함께 삭제
DROP TABLE table_name DELETE TOPIC;
Table Data 조작
- ksqlDB에서는 일반적인 UPDATE 구문을 사용하지 않고, INSERT를 통해 update를 수행합니다.
- 같은 key 값을 가진 새로운 record가 들어오면 기존 값을 덮어쓰게 됩니다.
- Table 생성 시 key column을 지정할 수 있습니다.
-- data 삽입
INSERT INTO table_name (column1, column2)
VALUES (value1, value2);
-- data update (column1 is primary key)
INSERT INTO table_name (column1, column2)
VALUES (value1, new_value2);
-- data 삭제
DELETE FROM table_name
WHERE column1 = value1;
Table Query Example
-- Table data 조회
SELECT column1, column2
FROM table_name
EMIT CHANGES;
-- Table 집계
SELECT column1, COUNT(*) AS count
FROM table_name
GROUP BY column1
EMIT CHANGES;
-- Table과 Stream Join
SELECT t.column1, s.column2
FROM table_name t
JOIN stream_name s ON t.id = s.id
EMIT CHANGES;
Stream을 Table로 변환하기
- CREATE 뒤에 Table 이름을, FROM 뒤에 Stream 이름을 넣어, Stream으로부터 Tatble을 생성할 수 있습니다.
- Stream의 모든 event는 Table에 반영되며, Table은 항상 각 key에 대한 최신 상태만을 유지합니다.
- 동일한 key에 대한 새로운 data가 들어오면 Table의 해당 record가 update됩니다.
CREATE TABLE table_name AS
SELECT
column1,
column2,
...,
columnN,
aggregation_function(value_column) AS aggregated_value
FROM stream_name
GROUP BY column1, column2, ..., columnN
EMIT CHANGES;
- Stream을 Table로 전환할 때, GROUP BY 절은 필수입니다.
- Table은 각 key에 대한 최신 상태를 유지해야 하므로, 어떤 column을 기준으로 grouping할지 명시해야 합니다.
- GROUP BY를 사용하기 때문에, 집계 함수(COUNT, SUM, MAX 등)도 함께 사용하여, Stream의 event들을 어떻게 집계할지 정의해야 합니다.
- 반대로, Table을 Stream으로 변환하는 것은 근본적인 특성 차이로 인해 불가능합니다.
- Table은 상태 기반으로 각 key의 최신 값만 유지하며, 이전 상태를 저장하지 않고 덮어씁니다.
- Stream은 event 기반으로 모든 변경 event를 순서대로 기록하며, 전체 이력을 보존하고 있습니다.
- 최종 값인 최신 정보(Table)만을 가지고, 모든 변경 event(Stream)를 추측해낼 수는 없습니다.
- 또한 Stream으로부터 만들어진 Table을 다시 Stream으로 변환해야 할 이유도 없습니다.
Stream to Table 변환 예시
- Stream to Table 변환은 실시간 집계나 현재 상태를 추적해야 하는 경우에 유용합니다.
- 실시간 집계가 필요할 때.
- 최신 상태만 유지하면 될 때.
- 시간 Window 기반의 분석이 필요할 때.
- 상태 기반의 monitoring이 필요할 때.
- event data(Stream)를 집계하여 상태(Table)로 저장해야할 때, 변환이 필요합니다.
flowchart TD
subgraph kafka_events[Kafka Events]
user_purchase_event[User Purchase Event]
settings_change_event[Settings Change Event]
page_view_event[Page View Event]
error_event[Error Event]
end
subgraph ksqldb_streams[ksqlDB Streams]
purchase_stream[Purchase Stream]
settings_stream[Settings Stream]
page_view_stream[Page View Stream]
error_stream[Error Stream]
end
subgraph ksqldb_tables[ksqlDB Tables]
purchase_totals_table[Purchase Totals Table]
user_preferences_table[User Preferences Table]
traffic_stats_table[Traffic Stats Table]
error_counts_table[Error Counts Table]
end
user_purchase_event --> purchase_stream
settings_change_event --> settings_stream
page_view_event --> page_view_stream
error_event --> error_stream
purchase_stream -->|GROUP BY user_id| purchase_totals_table
settings_stream -->|LATEST_BY_OFFSET| user_preferences_table
page_view_stream -->|WINDOW TUMBLING| traffic_stats_table
error_stream -->|WINDOW HOPPING| error_counts_table
실시간 집계가 필요한 경우
-- 구매 Event Stream 생성
CREATE STREAM purchase_stream (
user_id VARCHAR,
purchase_amount DECIMAL,
timestamp TIMESTAMP
) WITH (
kafka_topic = 'purchases',
value_format = 'JSON',
timestamp = 'timestamp'
);
-- 사용자별 총 구매액을 추적하는 Table 생성
CREATE TABLE user_purchase_totals AS
SELECT user_id,
SUM(purchase_amount) AS total_spent
FROM purchase_stream
GROUP BY user_id
EMIT CHANGES;
- 사용자별 총 구매액을 실시간으로 추적합니다.
- 새로운 구매가 발생할 때마다 자동으로 합계가 update됩니다.
최신 상태 관리가 필요한 경우
-- 사용자 설정 변경 Stream 생성
CREATE STREAM user_settings_stream (
user_id VARCHAR,
theme_color VARCHAR,
language VARCHAR,
update_time TIMESTAMP
) WITH (
kafka_topic = 'user_settings',
value_format = 'JSON',
timestamp = 'update_time'
);
-- 사용자별 최신 설정을 유지하는 Table 생성
CREATE TABLE user_preferences AS
SELECT user_id,
LATEST_BY_OFFSET(theme_color) AS current_theme,
LATEST_BY_OFFSET(language) AS current_language
FROM user_settings_stream
GROUP BY user_id
EMIT CHANGES;
- 사용자의 최신 설정 정보만 유지합니다.
- 새로운 설정 변경이 있을 때 자동으로 최신 상태로 갱신합니다.
기간별 통계를 보는 경우
-- Page View Event Stream 생성
CREATE STREAM pageview_stream (
url_path VARCHAR,
user_id VARCHAR,
view_time TIMESTAMP
) WITH (
kafka_topic = 'pageviews',
value_format = 'JSON',
timestamp = 'view_time'
);
-- 시간별 Traffic 집계 Table 생성
CREATE TABLE hourly_traffic AS
SELECT url_path,
COUNT(*) AS visit_count,
WINDOWSTART AS window_start
FROM pageview_stream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY url_path
EMIT CHANGES;
- website 각 page의 시간당 방문자 수를 집계합니다.
- 1시간 단위로 traffic 통계를 자동으로 update합니다.
이상 탐지나 임계값 Monitoring
-- Error Event Stream 생성
CREATE STREAM error_stream (
service_name VARCHAR,
error_code VARCHAR,
error_message VARCHAR,
error_time TIMESTAMP
) WITH (
kafka_topic = 'service_errors',
value_format = 'JSON',
timestamp = 'error_time'
);
-- 각 service에 대한 error count Table 생성
CREATE TABLE error_counts AS
SELECT
service_name,
COUNT(*) AS error_count
FROM error_stream
WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 MINUTE)
GROUP BY service_name
EMIT CHANGES;
- 각 service의 error 발생 횟수를 실시간으로 monitoring합니다.
- 특정 임계값을 넘으면 경고를 발생시키는 용도로 활용합니다.