@@ -136,6 +136,7 @@ public class KafkaProducerConfig {
136136하지만 실제 운영 환경에서는 메시지 유실이나 중복을 방지하기 위해 더 많은 옵션을 고려해야 한다.
137137이어지는 '프로듀서 설정하기' 섹션에서 이러한 세부 옵션들을 알아보고, 글의 마지막에서 이 모든 것을 종합한 최종 설정 예시를 다시 살펴보자.
138138
139+
139140## 메시지 전송 방법
140141
141142프로듀서는 메시지를 보내는 세 가지 주요 방식을 제공한다.
@@ -187,55 +188,45 @@ Spring Boot 환경에서는 앞서 설정한 `KafkaTemplate`을 이용하여 편
187188> CouponIssueProducer.java
188189
189190``` java
190- /**
191- * CouponIssueProducer
192- * <p >
193- * 쿠폰 발급 요청을 Kafka 토픽("coupon_issue")으로 비동기 전송합니다.
194- * 이 메시지는 coupon-consumer 모듈의 CouponIssueConsumer 가 수신하여 DB에 저장합니다.
195- */
196191@Component
197192public class CouponIssueProducer {
198193
199- private final KafkaTemplate<String , Object > kafkaTemplate;
200- private static final String GLOBAL_TRACE_ID_HEADER = " globalTraceId" ;
201- private static final Logger log = LoggerFactory . getLogger(CouponIssueProducer . class);
202-
203- public CouponIssueProducer (final KafkaTemplate<String , Object > kafkaTemplate ) {
204- this . kafkaTemplate = kafkaTemplate;
205- }
206-
207- public void issue (final UUID userId , final UUID couponId ) {
208- // 1. 카프카로 보낼 데이터를 담은 객체(Payload) 생성
209- CouponIssueMessage payload = new CouponIssueMessage (userId, couponId);
210- String globalTraceId = MDC . get(GLOBAL_TRACE_ID_HEADER ); // MDC에서 추적 ID 가져오기
211-
212- // 2. 토픽, 헤더 정보 등을 포함한 Message 객체 생성
213- Message<CouponIssueMessage > message = MessageBuilder
214- .withPayload(payload)
215- .setHeader(KafkaHeaders . TOPIC , " coupon_issue" )
216- .setHeader(GLOBAL_TRACE_ID_HEADER , globalTraceId) // 추적 ID
217- .build();
218-
219- // 3. KafkaTemplate을 이용해 메시지 비동기 전송 및 결과 처리
220- CompletableFuture<SendResult<String , Object > > future = kafkaTemplate. send(message);
221- future. whenComplete((result, exception) - > {
222- if (exception == null ) {
223- log. info(" 메시지 전송 성공. GlobalTraceId: {}, Topic: {}, Partition: {}, Offset: {}, Payload: {}" ,
224- globalTraceId,
225- result. getRecordMetadata(). topic(),
226- result. getRecordMetadata(). partition(),
227- result. getRecordMetadata(). offset(),
228- payload);
229- } else {
230- log. error(" 메시지 전송 실패 GlobalTraceId: {}, Payload: {}" ,
231- globalTraceId,
232- payload,
233- exception);
234- // 여기에 실패 시 필요한 추가 로직을 구현 (e.g., 재시도, 알림, DB에 실패 기록 등)
194+ private final KafkaTemplate<String , Object > kafkaTemplate;
195+ private static final String GLOBAL_TRACE_ID_HEADER = " globalTraceId" ;
196+ private static final Logger log = LoggerFactory . getLogger(CouponIssueProducer . class);
197+
198+ // ...
199+
200+ /**
201+ * 쿠폰 발급 요청 메시지를 동기적으로 Kafka에 발행합니다.
202+ * .join()을 호출하여 메시지 전송이 완료될 때까지 현재 스레드를 블로킹합니다.
203+ */
204+ public void issue (final UUID userId , final UUID couponId ) {
205+ CouponIssueMessage payload = new CouponIssueMessage (userId, couponId);
206+ String globalTraceId = MDC . get(" globalTraceId" );
207+
208+ ProducerRecord<String , Object > record = new ProducerRecord<> (KafkaTopic . COUPON_ISSUE. getTopicName(), payload);
209+
210+ if (globalTraceId != null ) {
211+ record. headers(). add(GLOBAL_TRACE_ID_HEADER , globalTraceId. getBytes(StandardCharsets . UTF_8 ));
235212 }
236- });
237- }
213+
214+ kafkaTemplate. send(record). whenComplete((result, ex) - > {
215+ if (ex != null ) {
216+ log. error(" 메시지 전송 실패. GlobalTraceId: {}, Record: {}" ,
217+ globalTraceId, record, ex);
218+ } else {
219+ log. info(" 메시지 전송 성공. GlobalTraceId: {}, Topic: {}, Partition: {}, Offset: {}, Payload: {}" ,
220+ globalTraceId,
221+ result. getRecordMetadata(). topic(),
222+ result. getRecordMetadata(). partition(),
223+ result. getRecordMetadata(). offset(),
224+ payload);
225+ }
226+ }). join(); // 이 메서드가 호출되면, whenComplete의 콜백이 실행될 때까지 스레드가 블로킹됩니다.
227+ }
238228}
229+
239230```
240231
241232이 클래스는 ` KafkaProducerConfig ` 에서 Bean으로 등록한 ` KafkaTemplate ` 을 의존성 주입 받는다.
@@ -244,59 +235,44 @@ public class CouponIssueProducer {
244235
2452361 . 페이로드 생성: 카프카로 보낼 실제 데이터(` CouponIssueMessage ` )를 만든다.
246237
247- 2 . ` Message ` 객체 빌드: ` MessageBuilder ` 를 이용해 페이로드와 함께 전송할 토픽 , 추적 ID 등 메타데이터를 담은 ` Message ` 객체를 생성한다.
238+ 2 . ` ProducerRecord ` 객체 생성: 토픽 이름, 페이로드 , 추적 ID가 담긴 ` ProducerRecord ` 객체를 직접 생성한다.
248239
249- 3 . 비동기 전송: ` kafkaTemplate.send() ` 를 호출하여 카프카로 메시지를 최종 발행한다.
250- 이때 반환되는 ` CompletableFuture ` 를 사용하여 메시지 전송의 성공/실패 여부를 ` whenComplete ` 콜백을 통해 비동기적으로 처리한다.
251- 이를 통해 애플리케이션의 블로킹 없이 전송 결과를 로깅하거나 추가적인 실패 처리 로직을 구현할 수 있다 .
240+ 3 . 동기적 전송 및 대기 : ` kafkaTemplate.send() ` 를 호출하여 메시지를 발행하고, 반환된 ` CompletableFuture ` 에 ` .join() ` 을 호출한다.
241+ 이로 인해 메시지 전송이 완료될 때까지(성공 또는 실패) 현재 스레드는 블로킹된다.
242+ 전송 결과는 ` whenComplete ` 콜백을 통해 로그로 남겨진다 .
252243
253244
254- ---
245+ > 앞서 비동기 전송이 가장 널리 쓰인다고 설명했는데, 왜 코드에서는 .join()을 호출하여 동기 방식으로 처리했을까?
255246
256- 그렇다면 이 ` CouponIssueProducer ` 는 언제 호출될까?
247+ 이는 ** 응답 신뢰도 ** 와 ** 순서 보장 ** 이라는 두 가지 중요한 목표를 달성하기 위한 설계적 선택이다.
257248
258- 아래는 주기적으로 쿠폰 발급 대기열을 확인하여 메시지를 발행하는 ` CouponIssueScheduler ` 의 코드다.
249+ * 응답 신뢰도
259250
251+ * ` .join() ` 을 사용하면, API 서버는 Kafka 브로커가 메시지를 성공적으로 받았다는 확신을 얻기 전까지 사용자에게 최종 성공 응답을 보내지 않는다.
260252
261- > CouponIssueScheduler.java
253+ * 만약 Kafka 전송 단계에서 문제가 발생하면, API 서버는 사용자에게 즉시 에러를 반환하여 '요청이 실패했음'을 명확히 알릴 수 있다.
262254
263- ``` java
264- @Component
265- public class CouponIssueScheduler {
266- private final CouponWaitingQueueRepository waitingQueueRepository;
267- private final CouponIssueProducer couponIssueProducer;
268- // ...
255+ * 이는 '요청 성공' 응답을 받았는데 실제로는 발급 처리가 시작조차 되지 않는 상황을 방지할 수 있는 안전장치다.
269256
270- @Scheduled (fixedRate = 3000 )
271- public void issueCoupons () {
272- // ... (생략) ...
273- for (Coupon coupon : activeCoupons) {
274- // ... (생략) ...
275- processQueueForCoupon(coupon. getId());
276- }
277- }
257+ * 순서 보장(선착순)
278258
279- private void processQueueForCoupon (final UUID couponId ) {
280- // 1. 대기열에서 처리할 사용자 조회
281- Set<String > userIds = waitingQueueRepository. getWaitingUsers(couponId, batchSize);
259+ * 선착순 쿠폰 시스템에서는 ** 요청이 들어온 순서대로 처리** 되는 것이 매우 중요하다.
282260
283- // ... (생략) ...
261+ * 만약 프로듀서가 메시지를 비동기적으로 보낸다면 네트워크 상황에 따라 나중에 보낸 메시지가 먼저 도착할 수 있
284262
285- // 2. Kafka 토픽으로 메시지 발행 위임
286- for (String userIdStr : userIds) {
287- UUID userId = UUID . fromString(userIdStr);
288- couponIssueProducer. issue(userId, couponId); // Producer 호출
289- }
263+ * ` .join() ` 을 통해 각 요청 스레드가 Kafka로부터 ** 전송 완료 응답을 받을 때까지 대기** 하게 함으로써,
290264
291- // 3. 처리 시작한 사용자들을 대기열에서 제거
292- waitingQueueRepository. remove(couponId, userIds);
293- }
294- }
295- ```
265+ * API 서버에 도달한 순서대로 Kafka에 메시지가 기록되는 것을 최대한 보장할 수 있다.
266+
267+ 이렇게 프로듀서가 메시지를 Kafka에 안전하게 전달하고 나면, 그 책임은 이제 컨슈머(Consumer)에게로 넘어간다.
296268
297- 이 스케줄러는 3초마다 실행되어, Redis와 같은 대기열에서 쿠폰을 발급받을 사용자 목록을 가져온다. 그리고 각 사용자에 대해 앞서 살펴본 ` couponIssueProducer.issue() ` 를 호출하여 실제 메시지 발행을 위임한다.
269+ 컨슈머는 ` coupon_issue ` 토픽을 구독하고 있다가, 새로운 메시지가 들어오면 이를 가져와 최종적으로 데이터베이스에 쿠폰 발급 내역을 저장하는 역할을 수행한다.
298270
299- 이처럼 ** 스케줄러(비즈니스 로직) -> 전용 프로듀서(발행 로직) -> ` KafkaTemplate ` (실제 전송)** 의 흐름으로 역할을 분리하면, 코드가 깔끔해지고 테스트와 유지보수가 쉬워지는 장점이 있다.
271+ (컨슈머에 대한 자세한 내용은 다른 포스팅에서 설명할 예정이다)
272+
273+ 결론적으로, 비동기 전송이 가장 널리 쓰이지만, 비즈니스 상황에 따라 동기적 전송을 사용할 수 있다.
274+
275+ ---
300276
301277
302278## 프로듀서 설정하기
0 commit comments