2026년 4월 8일 작성
ksqlDB Stream-Table Join의 Key Mismatch - Stream Key와 Join Key가 다를 때 발생하는 Lookup Miss
Stream-Table Join에서 stream의 Kafka key와 join key가 다르면 자동 repartition의 상태 동기화 문제로 table lookup miss가 발생할 수 있으며, 명시적 rekey 파생 stream으로 해결합니다.
Stream Key와 Join Key가 다른 문제
- Stream-Table Join에서 stream의 Kafka key와
ON절에 사용하는 join key가 서로 다른 column이면, table lookup miss가 발생할 수 있습니다.- stream의 Kafjjjka key는 stream 생성 시
KEYcolumn이나PARTITION BY로 지정된 column입니다. - join key는
ON stream.column = table.column에서 사용하는 column입니다. - 이 두 column이 다르면, 동일한 join key 값을 가진 stream record와 table record가 서로 다른 partition에 위치하게 됩니다.
- stream의 Kafjjjka key는 stream 생성 시
- ksqlDB는 stream key와 join key가 다른 경우 자동 repartition을 수행합니다.
- 내부적으로 join key 기준의 intermediate topic을 생성하고, stream data를 해당 topic으로 재분배합니다.
- 문법적으로는 허용되며, query 생성 시 error가 발생하지 않습니다.
- 그러나 자동 repartition이 항상 안정적으로 동작하지는 않습니다.
- ksqlDB server 재시작, 상태 복구(state restoration), offset reset 등의 상황에서 repartition된 stream과 table의 상태가 동기화되지 않을 수 있습니다.
- 이 경우 stream record가 도착하는 시점에 table에 해당 key의 상태가 아직 load되지 않아 lookup이 실패합니다.
증상 : INNER JOIN에서 Row 소실, LEFT JOIN에서 Null 발생
- key mismatch로 인한 lookup miss는 join 유형에 따라 다른 증상으로 나타납니다.
| Join 유형 | 증상 | 설명 |
|---|---|---|
| INNER JOIN | row가 결과에서 사라짐 | lookup miss된 record는 양쪽 모두 match 실패로 판정되어 출력되지 않음 |
| LEFT JOIN | 오른쪽(table) 값이 모두 null | stream record는 유지되지만 table 조회가 실패하여 null로 채워짐 |
INNER JOIN (row dropped on lookup miss)
┌─────────────────────────┐ ┌─────────────────────────┐
│ Stream │ │ Table │
│ (key: order_id) │ │ (key: customer_id) │
├─────────────────────────┤ ├─────────────────────────┤
│ ORD-001, customer_id=42 │──── ✗ ──│ customer_id=42, Alice │
│ ORD-002, customer_id=99 │──── ✗ ──│ customer_id=99, Bob │
└─────────────────────────┘ └─────────────────────────┘
partition mismatch -> lookup miss -> result : 0 rows
LEFT JOIN (null on lookup miss)
┌─────────────────────────┐ ┌─────────────────────────┐
│ Stream │ │ Table │
│ (key: order_id) │ │ (key: customer_id) │
├─────────────────────────┤ ├─────────────────────────┤
│ ORD-001, customer_id=42 │──── ✗ ──│ customer_id=42, Alice │
│ ORD-002, customer_id=99 │──── ✗ ──│ customer_id=99, Bob │
└─────────────────────────┘ └─────────────────────────┘
partition mismatch -> lookup miss -> result : ORD-001 | null | null
ORD-002 | null | null
- LEFT JOIN으로 바꿨을 때 오른쪽 값이 null로 나타나는 것은 문제를 해결한 것이 아니라, lookup miss를 눈에 보이게 만든 것입니다.
- INNER JOIN에서는 실패한 lookup이 조용히 row를 삭제합니다.
- LEFT JOIN에서는 실패한 lookup이 null로 드러나므로, 문제의 존재를 확인할 수 있습니다.
예시 : Stream Key(ORDER_ID)와 Join Key(CUSTOMER_ID)가 다른 경우
- 주문 stream의 Kafka key는
order_id이고, join 조건에는customer_id를 사용하여 고객 table과 join하는 상황입니다.
-- stream의 Kafka key : order_id
CREATE STREAM order_stream WITH (
KAFKA_TOPIC='orders',
VALUE_FORMAT='JSON',
PARTITIONS=3
) AS
SELECT *
FROM raw_order_stream
PARTITION BY order_id
EMIT CHANGES;
-- table의 primary key : customer_id
CREATE TABLE customer_table (
customer_id BIGINT PRIMARY KEY,
name VARCHAR,
region VARCHAR
) WITH (
KAFKA_TOPIC='customers',
VALUE_FORMAT='JSON'
);
-- stream key(order_id) != join key(customer_id) -> lookup miss 가능
CREATE STREAM enriched_order_stream AS
SELECT
o.order_id,
o.customer_id,
o.amount,
c.name AS customer_name,
c.region
FROM order_stream o
INNER JOIN customer_table c
ON o.customer_id = c.customer_id
EMIT CHANGES;
order_stream은order_id기준으로 partition되어 있지만, join은customer_id기준으로 수행됩니다.- ksqlDB가
customer_id기준으로 자동 repartition을 시도하지만, 상태 동기화 문제로 lookup miss가 발생할 수 있습니다. - INNER JOIN에서는 결과가 나오지 않습니다.
- LEFT JOIN으로 변경하면
customer_name과region이 null로 출력됩니다.
- ksqlDB가
원인 : Partition 기반 Join과 Key Mismatch
- ksqlDB의 Stream-Table Join은 partition 단위로 독립적으로 처리됩니다.
- partition 0을 처리하는 worker는 stream의 partition 0과 table의 partition 0만 접근합니다.
- 따라서 join이 성공하려면, 같은 join key를 가진 stream record와 table record가 같은 번호의 partition에 위치해야 합니다.
- Kafka는 record의 key에 hash function을 적용하여 partition을 결정합니다.
hash(key) % partition_count = partition_number로 계산합니다.- 같은 key 값은 항상 같은 partition에 저장됩니다.
- stream key와 join key가 다르면, stream record와 table record가 서로 다른 partition에 위치하게 됩니다.
- stream의 Kafka key가
order_id이면,hash(order_id)로 partition이 결정됩니다. - table의 primary key가
customer_id이면,hash(customer_id)로 partition이 결정됩니다. - join 조건이
ON o.customer_id = c.customer_id여도, stream record는order_id기준으로 배치되어 있으므로 같은customer_id를 가진 두 record가 다른 partition에 있을 수 있습니다.
- stream의 Kafka key가
order_stream (key : order_id) :
order_id=ORD-001, customer_id=42 -> hash(ORD-001) % 3 = partition 0
order_id=ORD-002, customer_id=42 -> hash(ORD-002) % 3 = partition 2
customer_table (key : customer_id) :
customer_id=42 -> hash(42) % 3 = partition 1
결과 : customer_id=42로 join하려 하지만,
stream record는 partition 0, 2에, table record는 partition 1에 위치.
partition 0을 처리하는 worker는 partition 1의 table data에 접근 불가 -> lookup miss.
- ksqlDB는 이 문제를 감지하면 자동 repartition을 수행하여, join key 기준으로 stream data를 재분배합니다.
- 그러나 이 자동 repartition이 항상 안정적으로 동작하지는 않습니다.
자동 Repartition의 한계
- ksqlDB의 자동 repartition은 query 실행 중에 내부적으로 처리되므로, 외부에서 직접 제어하거나 상태를 확인하기 어렵습니다.
flowchart LR
subgraph problem["Key Mismatch 상황"]
stream_order["Stream\n(key : order_id)"]
auto_repart["자동 Repartition\n(key : customer_id)"]
table_customer["Table\n(key : customer_id)"]
stream_order --> auto_repart
auto_repart -. "lookup miss 가능" .-> table_customer
end
subgraph solution["명시적 Rekey 해결"]
stream_order2["Stream\n(key : order_id)"]
derived_stream["파생 Stream\n(key : customer_id)"]
table_customer2["Table\n(key : customer_id)"]
stream_order2 --> derived_stream
derived_stream --> table_customer2
end
- 자동 repartition에서 lookup miss가 발생하는 주요 원인은 intermediate topic과 table state store의 동기화 실패입니다.
- 상태 복구 timing : ksqlDB server 재시작 시, intermediate topic과 table의 state store가 서로 다른 속도로 복구됩니다.
- stream record가 먼저 도착하면 table에 아직 해당 key가 없어 lookup이 실패합니다.
- offset reset :
auto.offset.reset이latest로 설정된 경우, intermediate topic의 기존 data를 건너뛰고 새로운 data만 처리합니다.- table이 아직 해당 key를 가지고 있지 않은 상태에서 stream event가 도착하면 miss가 발생합니다.
- partition 할당 변경 : consumer group rebalancing이나 ksqlDB instance 추가/제거 시, partition 할당이 변경되면서 state store의 일부가 재구축됩니다.
- 재구축이 완료되기 전에 도착하는 stream record는 lookup에 실패합니다.
- 상태 복구 timing : ksqlDB server 재시작 시, intermediate topic과 table의 state store가 서로 다른 속도로 복구됩니다.
해결 : 명시적 Rekey 파생 Stream 생성
- join key를 Kafka key로 갖는 파생 stream을 명시적으로 생성한 후, 해당 stream으로 table을 join합니다.
- 자동 repartition에 의존하지 않고, join key 기준으로 partition된 독립적인 stream을 만드는 방식입니다.
- 파생 stream은 별도의 Kafka topic을 가지므로, state store와의 동기화가 안정적입니다.
해결 방법
PARTITION BY절로 join key를 Kafka key로 지정한 파생 stream을 생성합니다.
-- Step 1. join key(customer_id) 기준으로 rekey된 파생 stream 생성
CREATE STREAM order_by_customer_stream WITH (
KAFKA_TOPIC='orders-by-customer',
VALUE_FORMAT='JSON',
PARTITIONS=3
) AS
SELECT *
FROM order_stream
PARTITION BY customer_id
EMIT CHANGES;
-- Step 2. 파생 stream으로 table join (INNER JOIN 정상 동작)
CREATE STREAM enriched_order_stream AS
SELECT
o.order_id,
o.customer_id,
o.amount,
c.name AS customer_name,
c.region
FROM order_by_customer_stream o
INNER JOIN customer_table c
ON o.customer_id = c.customer_id
EMIT CHANGES;
- 파생 stream의 Kafka key가
customer_id이므로, table의 primary keycustomer_id와 동일한 기준으로 partition됩니다.- stream과 table이 같은 key 기준으로 partition되어 있으므로, 동일한 key 값을 가진 record가 같은 partition에 위치합니다.
- 자동 repartition이 필요 없어지므로, 상태 복구 timing에 의한 lookup miss가 발생하지 않습니다.
기존 Pipeline에 적용하는 절차
- 기존에 자동 repartition에 의존하던 pipeline을 명시적 rekey 방식으로 전환하는 절차입니다.
-- 1. 기존 join stream 제거
DROP STREAM IF EXISTS enriched_order_stream DELETE TOPIC;
-- 2. rekey 파생 stream 생성
CREATE STREAM order_by_customer_stream WITH (
KAFKA_TOPIC='orders-by-customer',
VALUE_FORMAT='JSON',
PARTITIONS=3
) AS
SELECT *
FROM order_stream
PARTITION BY customer_id
EMIT CHANGES;
-- 3. 파생 stream 기반으로 join stream 재생성
CREATE STREAM enriched_order_stream AS
SELECT
o.order_id,
o.customer_id,
o.amount,
c.name AS customer_name,
c.region
FROM order_by_customer_stream o
INNER JOIN customer_table c
ON o.customer_id = c.customer_id
EMIT CHANGES;
auto.offset.reset이latest인 경우, 파생 stream 생성 후 새로운 event가 유입되어야 join 결과가 출력됩니다.- 기존 data로 test하려면
SET 'auto.offset.reset' = 'earliest';를 실행한 후 pipeline을 재생성합니다.
- 기존 data로 test하려면
Debugging : LEFT JOIN으로 Lookup Miss 확인하기
- INNER JOIN에서 결과가 나오지 않을 때, LEFT JOIN으로 변경하면 lookup miss 여부를 확인할 수 있습니다.
- INNER JOIN은 lookup이 실패한 record를 조용히 제거하므로, 문제의 원인을 파악하기 어렵습니다.
- LEFT JOIN은 lookup이 실패해도 stream record를 유지하고 table 쪽 값을 null로 채우므로, 어떤 record에서 miss가 발생하는지 확인할 수 있습니다.
-- INNER JOIN : 결과가 나오지 않음 (원인 파악 불가)
SELECT o.customer_id, c.name, c.region
FROM order_stream o
INNER JOIN customer_table c
ON o.customer_id = c.customer_id
EMIT CHANGES;
-- LEFT JOIN : null로 lookup miss 확인 가능
SELECT o.customer_id, c.name, c.region
FROM order_stream o
LEFT JOIN customer_table c
ON o.customer_id = c.customer_id
EMIT CHANGES;
-- 결과 예시 : 1042 | null | null
-- -> customer_id=1042인 record가 table에서 조회되지 않았음을 의미
- LEFT JOIN 결과에서 table 쪽 column이 null이면, 두 가지 가능성을 순서대로 확인합니다.
- table에 해당 key의 row가 존재하는지 :
SELECT * FROM customer_table WHERE customer_id = 1042;pull query로 확인합니다. - row가 존재하는데 null이면 key mismatch : stream key와 join key가 다르거나, partition 정렬이 맞지 않아 lookup miss가 발생한 것입니다.
- 명시적 rekey 파생 stream을 생성하여 해결합니다.
- table에 해당 key의 row가 존재하는지 :
Reference
- https://docs.confluent.io/platform/current/ksqldb/developer-guide/joins/join-streams-and-tables.html
- https://docs.ksqldb.io/en/latest/developer-guide/joins/partition-data/