dist. system

nats streaming server (NATS.io)

식피두 2020. 9. 2. 01:06

NATS에 대해서...

서브 프로젝트에서 NATS(넷츠)라는 메시지 큐를 사용하게 되었다.

 

https://docs.nats.io/nats-concepts/intro

 

What is NATS

 

docs.nats.io

프로젝트의 메인 언어인 Golang으로 구현이 되어있고,

다양한 언어로 작성가능한 클라이언트 라이브러리를 지원하고 있으며,

무엇보다 사용이 간편, 가볍고, 설정이 쉬운 장점이 있어 쓰기로 했다.

 

기본적으로 Pub/Sub 구조를 지원하며,

데이터는 인코드 되어 전송되고, 받은 메시지는 디코드되어 처리된다.

 

NATS의 기본 코어는 'at most once' QoS를 지원한다는 것을 기억해야한다.

subscriber가 특정 subject(topic)을 구독하고 있지 않다면, 메시지를 받을 수 없다.

 

NATS의 코어는 'fire-and-forger' 메시지 시스템이어서 메모리에만 메시지를 기록한다.

 

따라서 kafka 같은 메시징 플랫폼이 지원하는 일반적인 기능
(at least once QoS, 이전에 쌓인 메시지들 읽기, 디스크에 메시지 저장 등)들이 필요하다면

'NATS Streaming(STAN ; 넷츠를 거꾸로)'을 사용해야한다. 

 

 

STAN에 대해서...

https://docs.nats.io/nats-streaming-concepts/intro

 

Introduction

 

docs.nats.io

NATS Streaming(STAN)은 NATS 코어에 기반을 둔 데이터 스트리밍 시스템이다.

제공하는 기능 리스트는 다음과 같다.

 

- ProtoBuf에 기반을 둔 메시지 포멧

- persistence한 메시지 ; 메모리 이외에 파일, 디비에 메시지 유지가 가능

- At-least-once-delivery ; 스탠에선 message ACK 개념이 있어서, 제대로 메시지가 전송되지 못한 경우 스토어에 저장해 놓은 메시지를 재전송 할 수 있다.

- 메시지 전송 Rate 제한 가능

- 메시지 리플레이 가능 ; 새로 들어온 구독자가 메시지를 읽을 때 starting point 지정이 가능하다. 아마도 시퀀스 넘버 혹은 시간대 지정이 가능한 것으로 암

- Durable subscription ; 구독자가 'durable name'을 명시하면 클라이언트가 재시작 되었을 때도 세션을 유지할 수 있다. '상태'를 유지할 수 있다는 말.

 

개발 과정에서 알아야할 개념이 몇 가지 있다.

주저리 주저리 번역하여 정리해본다.

설명이 워낙 장황해서 개발에서 실제로 세팅해줘야할 부분은 빨간색으로 표기한다.

NATS와의 관계에 대해...

출처: nats.io docs

NATS 스트리밍 서버는 사실은 서버가 아니고, (그럼 이름을 왜 이렇게 지어 놓은 건데;)

내부적으로 NATS 서버와 상호작용할 뿐인 'NATS 서버의 클라이언트'이다.

 

따라서 실제로는, 스트리밍 클라이언트NATS 스트리밍 서버와 직접 연결되지 않고,

(여기서 스트리밍 서버는 그림과 다르게, 맥락상 스트리밍 모듈을 의미하는듯)

NATS 서버를 통해서 스트리밍 서버로 간접적으로 연결된다.

(스트리밍 서버는 클라이언트와 통신하기 위해서 internal subject를 구독하여 통신한다고 함)

 

직접적인 연결이 존재하지 않기 때문에, 스트리밍 서버 입장에선 스트리밍 클라이언트의 존재여부를

알 방법이 없어서, 결국엔 Heartbeat를 이용해서 클라이언트의 존재 여부를 체크하고 있다.

 

이러한 이유로 클라이언트 개발할 때

애플리케이션이 종료되는 상황에서 connection close를 명시적으로 해주는 것을 권장한다.

그렇지 않으면, Missing Heartbeat Count를 통해 알 때까지 스트리밍 서버 입장에서는 클라이언트의 부재를 알 수 없다.

 

 

Client Connection에 관해...

구조적으로 클라이언트가 직접적으로 스트리밍 서버에 연결되지 않기 때문에,

연결 요청시 'ClientID'가 필요하다. (라이브러리 사용시 명시해야함)

 

같은 클라이언트를 가진 두 개의 커넥션은 존재할 수 없다.

(유니크하게 알아서 명시해야하는 듯)

 

클라이언트 아이디는 'durable subscription'에서 특히 유용하게 사용되는데,

클라이언트 아이디듀어러블 네임을 묶어서 서버에 기록하면

클라이언트가 껐다 켜져도 세션 유지가 가능하다 (메시지 이어서 읽기)

 

이 외에도,

클라이언트가 갑자기 꺼져서 제대로된 closing connection 절차를 밟지 못한 상황에서

클라이언트가 같은 아이디로 재시작 되어 커넥션을 요청했다면,

서버는 해당 아이디 원래 사용되고 있던 아이디라는 것을 인지하고

이전에 알고 있던 클라이언트에 컨택을 시도해본다.

 

이 때,

만약 정상 응답이 오면 해당 아이디를 가지고 있던 클라이언트에 문제가 없었던 것이므로

해당 아이디를 달고 새로운 연결 요청이 온 것을 거부한다.

만약 정상 응답이 오지 않는다면, 해당 아이디를 가지고 있던 클라이언트에 문제가 있었던 것이므로

새로운 연결 요청을 승인, 그리고 이전 클라이언트를 대체하여 기록한다.

 

Reconnection에 관해...

만약 NATS TCP 연결에 문제가 생기면, NATS 코어에 의해서 재연결 로직이 실행된다.

그런데 스트리밍 서버 입장에선 이런일이 일어나는지 여부를 알 수가 없다. (아키텍쳐 때문에)

하지만 문제 될게 없는게, 만약 클라이언트가 죽은 상황에서 스트리밍 서버가 메시지를 전송했더라도

클라이언트로 부터 ACK을 받지 못했을 것이므로, AckWait 시간이 흐르고 난 뒤에 해당 메시지들은

재전송 될 것이기 때문이다.

 

추가적으로, 주의해야할 사항 중 하나는

NATS 스트리밍 서버를 실행시킬 때 기본 옵션이 'memory store'이므로 모든 state를 메모리에 기록한다.

따라서 스트리밍 서버가 종료되면 모든 상태 정보는 날아가 버림...

(만약 state를 안정적으로 유지하길 바란다면 persistence mode를 설정하자)

 

위와 같은 상황에서는

서버가 재시작 되더라도, 모든 연결에 대한 상태 정보도 없어진 상태이므로

연결되어 있던 구독자들은 읽기를 멈추고,

새로 퍼블리시된 메시지들은 'invalid publish request' 에러를 띄우게 된다.

이 때 Connection Lost 핸들러를 구현했다면 다음 에러와 함께 핸들러가 실행이 된다.

'client has been replaced or is no longer registered'

 

스트리밍 연결 상태(세션)을 유지하기 위해선 서버와 클라이언트가 heartbeat 혹은 PING을 주기적으로 공유해야한다.

만약 서버가 클라이언트로 부터 설정된 수치 만큼의 heartbeat을 받지 못했다면 서버에선 연결을 종료해버리고,

non-durable subscription도 모두 지워버린다. 이 때 클라이언트는 어떤 일이 일어났는지 모른다. (아키텍쳐 때문에)

이 문제를 해결하기 위해서, 클라이언트는 서버로 핑을 보내는 것!

만약 핑이 충분히 missing 되었음을 확인 했다면, 연결이 종료되고(close the connection)

ConnectionLost 핸들러가 실행된다.

=> 이러한 상황이 오면 클라이언트의 connection과 subscription이 더 이상 유효하지 않기 때문에 다시 생성해야함.

 

여기서 중요한 것!

client-to-server PING 설정 값이 다소 공격적으로 세팅되어 있기 때문에

일반적인 세팅으로 쓰기 위해선 값을 높여줘야 될 수도 있다.

아마 디폴트 값은 15초만 참는 것으로 되어 있어서

(클라이언트 입장에서 서버가 다운 되어 있다고 판단하는 시간)

Pings() 옵션을 통해 클라이언트가 얼마나 오랫동안 서버와 통신이 안되는 상황을 참게 할지 설정 가능하다!

 

 

Channel에 관해... (여기가 본론이다)

채널은 넷츠 스트리밍 서버의 심장과도 같은 것이라고함 ;;

클라이언트가 데이터를 퍼블리쉬 하는 대상이자, 컨슈밍 하는 대상이다.

 

채널로 생성되는 메시지는 채널 안에 있는 '메시지 로그'에 기록된다.

 

채널은 크게

- 메시지 로그(message log)

- 구독(subscription)

 

 두 가지로 구성된다.

 

메시지 로그는 FIFO 큐이고. 새로 도착한 메시지는 뒤에 쌓인다.

만약 채널에 대한 limit이 설정되면 해당 limit에 도달할 경우, 가장 오래된 메시지부터 삭제된다.

설정에 의한 queue size/age limit에 의한 것을 제외하곤 메시지가 삭제될 일은 없다.

즉, 컨슈밍을 한다고 메시지가 삭제되진 않는다.

메시지는 저장되는 채널에 대한 구독의 존재 여부와 상관없이 저장된다.

 

구독(subscription)은 클라이언트가 특정 채널에 대해 구독을 함으로써 생성된다.

서버에선 클라이언트 대신에 구독 상태를 유지한다. (내부 스토어에)

클라이언트가 커넥션을 종료하거나 구독을 종료할 경우 서버에 저장된 구독 상태 정보가 삭제된다.

 

만약 특정 채널에 대한 구독자가 있는 상황에서

새로운 메시지가 채널의 메시지 로그에 쌓인다면,

자동으로 컨슈머에게 전송되는 방식으로 동작한다,.

 

한 번에 전송하는 maximum inflight message의 갯수도 클라이언트가 구독을 생성할 때 설정 가능하다고 함!

 

클라이언트는 메시지를 정상적으로 받았을 때 '자동으로!' ACK을 서버에게 보내게 되고,

ACK을 받은 서버는 다음 메시지를 보낸다.

 

여기서, 클라이언트에서 메시지에 대해 일련의 처리가 필요하고, 해당 처리 까지 완벽히 마쳤을 때

ACK 처리를 하고 싶은 경우가 있을 수 있는데, 가능하다. (github의 advanced usage를 참고하면 됨)

 

https://github.com/nats-io/stan.go/

 

nats-io/stan.go

NATS Streaming System. Contribute to nats-io/stan.go development by creating an account on GitHub.

github.com

nats-io/stan.py

 

nats-io/stan.py

Python Asyncio NATS Streaming Client. Contribute to nats-io/stan.py development by creating an account on GitHub.

github.com

그리고 구독이 생성될 당시에, 메시지를 어디서 부터 읽을 것인가? 에 대한 명시도 가능하다.

메시지의 시퀀스 번호를 명시할 수 있고, 혹은 생성 시간을 명시할 수도 있다. 방법은 알아서 찾아서 하쇼.

 

구독은 3가지 방식을 지원한다.

Regular, Durable, QueueGroup (+Redelivery)

 

기본적으로 구독 시 starting point를 명시해서 그 부분부터 읽을 수 있다.

 

Regular 방식은 구독이 unsubscribed 혹은 closed 되었을 때 혹은 클라이언트의 connection이 closed 되었을 때 구독 상태가 삭제된다.

하지만, 서버 고장일 경우엔 살아남는다. (persistent store 모드일 때만)

 

Durable 방식은 클라이언트가 껐다 켜지는 경우, 이전에 중단되었던 지점부터 메시지 읽기를 재개하고 싶을 때 사용.

Durable Subscription을 생성해야 하며, 생성시 듀어러블 네임과 커넥션 생성 당시 사용했던 클라이언트 아이디가 필요하다.

이 두가지 정보를 서버에 등록하고, 클라이언트의 커넥션이 닫힌 이후에도 구독 정보는 유지된다.

(물론 커넥션이 재개될 때, starting point를 명시하는 것은 무의미하다)

Durable subscription 상황에서 메시지 수신을 단하고 싶은 경우엔 반드시 unsubscribe가 아닌 subscription close를 해야한다.

라이브러리에 subscription close 옵션이 없을 경우엔 connection을 close하자.

구독을 delete하고 싶은 경우에만 unsubscribe를 하자. 그 순간 서버에서의 durable subscription 정보가 지워진다.

 

Queue Group 방식은 같은 그룹 내의 컨슈머들이 큐의 메시지를 나누어 가질 수 있는 기능이다. (로드밸런싱)

그룹의 마지막 멤버가 구독을 닫거나/취소하거나 혹은 커넥션은 닫아 버림으로써 그룹을 떠날 때,

그룹의 상태 정보는 서버에서 삭제된다. 

Queue Group 방식도 물론 durable 이 가능하다. 이를 위해 클라이언트는 Queue name 및 Durable name을 제공해야한다.

여기서, 클라이언트 아이디는 필요가 없다. 왜냐면 하나의 그룹을 구성함에 있어서 유니크한 클라이언트 아이디까지 조합하는 경우에

여러 클라이언트가 공유하는 것이 불가능하니까!

한 가지 차이점이 더 있는데, 마지막 멤버가 그룹을 떠나도 서버에 저장되는 그룹 상태 정보는 삭제되지 않는다.

마지막 멤버가 unsubscribe 혹은 subscription close를 하는 경우에 삭제된다.

 

마지막으로 Redelivery에 대해서 정리하면서 마무리하자.

서버는 컨슈머에게 메시지를 보내면 컨슈머로 부터 ACK을 기대한다.

컨슈머는 처음에 서버가 얼마나 오랫동안 ACK을 기다리게 할지 정할 수 있다.

그 시간이 지나면, 서버에선 ACK을 받지 못한 메시지들을 재.전.송.한다!

 

서버가 재시작되는 경우, ACK을 받지 못한 메시지들을 복원하고

가장 먼저 복원된 메시지들을 재전송하고 나서야 새로운 메시지들을 전송한다.

 

따라서, 서버가 재시작 되는 경우, 컨슈머는 unAcked 메시지와 새로운 메시지를 섞어서 받을 여지가 있는데,

크게 상관 없을 듯하다. 굳이 구분해야할 이유도 없을테고... 어차피 클라이언트 입장에선 못받았던 메시지들이니까

 

Queue Subscription에서는 그룹의 멤버중 하나가 ACK을 못해서

AckWait 시간이 경과했을 때 서버가 재전송을 시작하는데,

이 때 그룹내의 다른 멤버에게 메시지가 재전송 될 수 있다! 브굳!