Infra

메시징 큐에서의 Pub/Sub과 P2P: 이해와 활용

yoongrammer 2024. 10. 26. 13:41
728x90

메시징 큐에서 흔히 사용되는 두 가지 패턴, 퍼블리시/서브스크라이브(Pub/Sub)포인트 투 포인트(Point-to-Point, P2P)의 개념과 실제 사용 방법에 대해 알아보겠습니다.

각 패턴에 대한 설명과 함께 Python 코드 예시를 통해 실무에서의 적용 방식도 함께 살펴보겠습니다.

퍼블리시/서브스크라이브(Pub/Sub) 패턴


Pub/Sub 패턴에서는 메시지를 발행하는 퍼블리셔(Publisher)와 그 메시지를 구독하는 서브스크라이버(Subscriber)로 구성됩니다.

퍼블리셔는 특정 주제(Topic)에 메시지를 발행하고, 해당 주제를 구독한 서브스크라이버들은 그 메시지를 수신합니다.

Pub/Sub pattern

특징

  • 비동기 통신: 퍼블리셔와 서브스크라이버는 서로 직접 통신하지 않으며, 메시징 시스템이 중간에서 메시지를 전달합니다.
  • 확장성: 여러 서브스크라이버가 동일한 주제를 구독할 수 있어 메시지의 병렬 처리가 가능합니다.
  • 유연성: 새로운 주제나 서브스크라이버를 동적으로 추가하거나 제거할 수 있습니다.

사용 사례

  • 이벤트 방송 시스템: 실시간 뉴스, 소셜 미디어 알림 등.
  • 로그 및 모니터링: 시스템 상태 변화나 오류를 다양한 서비스에 전달.
  • IoT 디바이스 관리: 센서 데이터의 실시간 수집 및 배포.

파이썬 코드 예시 (RabbitMQ와 Pika 라이브러리 사용)


환경 설정

먼저, RabbitMQ를 설치해야 합니다. 설치 방법은 RabbitMQ 공식 문서를 참고하시면 됩니다.

또한, 파이썬용 RabbitMQ 클라이언트 라이브러리인 Pika를 설치합니다.

pip install pika

Publisher 코드

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 주제(Topic) 교환기 선언
channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "Hello, Pub/Sub!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)

connection.close()
728x90

Subscriber 코드

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 교환기 선언
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 임시 큐 생성
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 큐를 교환기에 바인딩
channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

설명

  • Publisherlogs라는 팬아웃(fanout) 교환기에 메시지를 발행합니다.
  • Subscriber는 해당 교환기에 바인딩된 임시 큐를 생성하여 메시지를 수신합니다.
  • 팬아웃 교환기는 수신한 메시지를 바인딩된 모든 큐에 브로드캐스트합니다.

포인트 투 포인트(P2P) 패턴


P2P 패턴에서는 메시지가 하나의 프로듀서(Producer)에서 하나의 컨슈머(Consumer)로 전달됩니다. 메시지는 큐에 저장되고, 한 번에 하나의 컨슈머만 해당 메시지를 수신하여 처리합니다.

P2P pattern

특징

  • 단일 수신자: 각 메시지는 하나의 컨슈머에게만 전달됩니다.
  • 순서 보장: 메시지가 큐에 저장된 순서대로 처리됩니다.
  • 로드 밸런싱: 여러 컨슈머가 있을 경우, 메시지가 컨슈머들 간에 균등하게 분배됩니다.

사용 사례

  • 작업 분산 처리: 대용량 데이터 처리, 이미지 렌더링 등.
  • 비동기 작업 처리: 이메일 발송, 보고서 생성 등 시간이 걸리는 작업.
  • 주문 처리 시스템: 전자상거래에서 주문 정보를 순차적으로 처리.

파이썬 코드 예시 (RabbitMQ와 Pika 라이브러리 사용)


Producer 코드

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 큐 선언
channel.queue_declare(queue='task_queue', durable=True)

message = "Hello, P2P!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2,  # 메시지를 영구적으로 저장
    ))
print(" [x] Sent %r" % message)

connection.close()

Consumer 코드

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 큐 선언
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))  # 메시지 내용에 따라 처리 시간 가정
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 공정한 분배를 위해 prefetch 설정
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

설명

  • 프로듀서task_queue라는 이름의 큐에 메시지를 보냅니다.
  • 컨슈머는 해당 큐에서 메시지를 가져와 처리합니다.
  • 여러 컨슈머가 실행되면 RabbitMQ가 메시지를 컨슈머들 사이에 로드 밸런싱합니다.
  • basic_qosprefetch_count=1 설정을 통해 한 번에 하나의 메시지만 처리하도록 합니다.

Pub/Sub과 P2P의 비교


특징 Pub/Sub P2P
메시지 수신자 여러 서브스크라이버 하나의 컨슈머
메시지 전달 방식 주제 기반으로 브로드캐스트 큐를 통해 단일 컨슈머에게 전달
통신 방향 1:N 1:1
사용 목적 이벤트 방송, 상태 업데이트 작업 큐잉, 로드 밸런싱

결론


메시징 큐에서의 Pub/Sub과 P2P 패턴은 시스템 요구 사항에 따라 적절하게 선택되어야 합니다.

이벤트를 여러 곳에 동시에 전달해야 한다면 Pub/Sub 패턴이, 작업을 분산 처리해야 한다면 P2P 패턴이 더 유용합니다.

728x90