dist. system

NATS streaming client example

식피두 2020. 9. 25. 22:24

https://github.com/nats-io/stan.py

 

nats-io/stan.py

Python Asyncio NATS Streaming Client. Contribute to nats-io/stan.py development by creating an account on GitHub.

github.com

 

STAN을 쓰는 방법을 정리해보았다.

문서와는 다르게, ip 주소 및 포트를 명시해야 했다 (커스텀 포트를 써서 그런듯)

 

stan은 nats를 기반으로 돌아가기 때문에 nats client를 생성해서 stan client에 전달해줘야한다.

 

stan의 첫 인자는 클러스터 아이디로 클러스터를 실행시킬 때 명시를 하지 않았다면,

디폴트가 test-cluster라고 한다. 

 

중요 포인트중 하나는 각 stan 클라이언트를 durable subscription로 먼저 등록을 해놔야,

클라이언트의 session이 끊기고 나서 다시 연결 되었을 때, unAck 메시지를 이어 받을 수 있다는 것이다.

 

그리고 구독시 manual_acks를 True로 설정하게 되면

클라이언트 객체.ack(메시지) 메서드를 통해서 서버에 Ack을 직접 보낼 수 있다. (원래는 자동으로 보내짐)

 

그래서 테스트를 해보면 알겠지만,

manual_acks를 True로 놓고, ack을 하지 않으면, 재실행할 때마다 계속해서 메시지를 누적해서 받는 것을 확인할 수 있다.

 

특이한 점은 또 다른 종류의? ack이 있는 듯하다.

ack을 보내지 않아도, ack 핸들러가 한 번은 실행 되는 것을 확인할 수 있음..

이놈의 정체는 천천히 밝혀내야할 듯.

"""durable 'queue group' example with manual acks
"""
import time
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc1 = NATS()
    sc1 = STAN()
    await nc1.connect("127.0.0.1:4171", io_loop=loop)
    await sc1.connect("test-cluster", "client-1", nats=nc1)

    nc2 = NATS()
    sc2 = STAN()
    await nc2.connect("127.0.0.1:4171", io_loop=loop)
    await sc2.connect("test-cluster", "client-2", nats=nc2)

    nc3 = NATS()
    sc3 = STAN()
    await nc3.connect("127.0.0.1:4171", io_loop=loop)
    await sc3.connect("test-cluster", "client-3", nats=nc3)

    # Generate messages
    async def ack_handler(ack):
        print("Received ack: {}".format(ack.guid))

    for i in range(111, 115):
        await sc3.publish("foo", 'hello-{}'.format(i).encode(), ack_handler=ack_handler)

    group = [sc1, sc2, sc3]
    
    # Subscribe
    for sc in group:
        async def queue_cb(msg):
            nonlocal sc
            print("[{}] Received a message on queue subs: {}".format(msg.sequence, msg.data))
            await sc.ack(msg) # without this line => all published messages are re-delivered 

        await sc.subscribe(
            "foo",
            queue="owl",
            durable_name="owl",
            cb=queue_cb,
            manual_acks=True,
            ack_wait=30)

    await asyncio.sleep(1, loop=loop)

    # Close NATS Streaming session
    for sc in group:
        await sc.close()
    await nc1.close()
    await nc2.close()
    await nc3.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Queue Group

https://github.com/nats-io/stan.py#queue-groups

 

nats-io/stan.py

Python Asyncio NATS Streaming Client. Contribute to nats-io/stan.py development by creating an account on GitHub.

github.com

client.connect()

https://github.com/nats-io/stan.py/blob/master/stan/aio/client.py#L106

 

nats-io/stan.py

Python Asyncio NATS Streaming Client. Contribute to nats-io/stan.py development by creating an account on GitHub.

github.com

client.subscribe()

https://github.com/nats-io/stan.py/blob/master/stan/aio/client.py#L407

 

nats-io/stan.py

Python Asyncio NATS Streaming Client. Contribute to nats-io/stan.py development by creating an account on GitHub.

github.com

 

파이썬 코딩 도장: 47.10 asyncio 사용하기

asyncio(Asynchronous I/O)는 비동기 프로그래밍을 위한 모듈이며 CPU 작업과 I/O를 병렬로 처리하게 해줍니다. 동기(synchronous) 처리는 특정 작업이 끝나면 다음 작업을 처리하는 순차처리 방식이고, 비동

dojang.io