https://github.com/nats-io/stan.py
STAN을 쓰는 방법을 정리해보았다.
문서와는 다르게, ip 주소 및 포트를 명시해야 했다 (커스텀 포트를 써서 그런듯)
stan은 nats를 기반으로 돌아가기 때문에 nats client를 생성해서 stan client에 전달해줘야한다.
stan의 첫 인자는 클러스터 아이디로 클러스터를 실행시킬 때 명시를 하지 않았다면,
디폴트가 test-cluster라고 한다.
중요 포인트중 하나는 각 stan 클라이언트를 durable subscription로 먼저 등록을 해놔야,
클라이언트의 session이 끊기고 나서 다시 연결 되었을 때, unAck 메시지를 이어 받을 수 있다는 것이다.
그리고 구독시 manual_acks를 True로 설정하게 되면
클라이언트 객체.ack(메시지) 메서드를 통해서 서버에 Ack을 직접 보낼 수 있다. (원래는 자동으로 보내짐)
그래서 테스트를 해보면 알겠지만,
manual_acks를 True로 놓고, ack을 하지 않으면, 재실행할 때마다 계속해서 메시지를 누적해서 받는 것을 확인할 수 있다.
특이한 점은 또 다른 종류의? ack이 있는 듯하다.
ack을 보내지 않아도, ack 핸들러가 한 번은 실행 되는 것을 확인할 수 있음..
이놈의 정체는 천천히 밝혀내야할 듯.
"""durable 'queue group' example with manual acks
"""
import time
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
async def run(loop):
nc1 = NATS()
sc1 = STAN()
await nc1.connect("127.0.0.1:4171", io_loop=loop)
await sc1.connect("test-cluster", "client-1", nats=nc1)
nc2 = NATS()
sc2 = STAN()
await nc2.connect("127.0.0.1:4171", io_loop=loop)
await sc2.connect("test-cluster", "client-2", nats=nc2)
nc3 = NATS()
sc3 = STAN()
await nc3.connect("127.0.0.1:4171", io_loop=loop)
await sc3.connect("test-cluster", "client-3", nats=nc3)
# Generate messages
async def ack_handler(ack):
print("Received ack: {}".format(ack.guid))
for i in range(111, 115):
await sc3.publish("foo", 'hello-{}'.format(i).encode(), ack_handler=ack_handler)
group = [sc1, sc2, sc3]
# Subscribe
for sc in group:
async def queue_cb(msg):
nonlocal sc
print("[{}] Received a message on queue subs: {}".format(msg.sequence, msg.data))
await sc.ack(msg) # without this line => all published messages are re-delivered
await sc.subscribe(
"foo",
queue="owl",
durable_name="owl",
cb=queue_cb,
manual_acks=True,
ack_wait=30)
await asyncio.sleep(1, loop=loop)
# Close NATS Streaming session
for sc in group:
await sc.close()
await nc1.close()
await nc2.close()
await nc3.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.close()
Queue Group
https://github.com/nats-io/stan.py#queue-groups
client.connect()
https://github.com/nats-io/stan.py/blob/master/stan/aio/client.py#L106
client.subscribe()
https://github.com/nats-io/stan.py/blob/master/stan/aio/client.py#L407
'dist. system' 카테고리의 다른 글
Zookeeper를 이용한 분산 시스템 공용 데이터 관리 (1) | 2020.11.03 |
---|---|
Zookeeper 구조와 일관성에 대해 (0) | 2020.10.26 |
Zookeeper 컨셉 정리 (0) | 2020.10.26 |
Zookeeper 설치법 정리 (0) | 2020.10.26 |
nats streaming server (NATS.io) (2) | 2020.09.02 |