기사용앱에서 특정 데이터를 승객용 앱에 내려주기 위해서 Server에서 정보를 Proxy 해 주어야 하는데 이와 관련하여 기사용 앱에서 내용을 올려주는 거야 단순히 API 하나를 호출하면 끝나지만, 승객용 앱 입장에서는 언제 내용이 변경되는지 알 수 없어서 고민하던 차에 SQS와 SSE를 사용해서 prototype을 만들어 봤다.
물론 승객용 앱에서 주기적으로 Polling 해서 만들 수도 있지만 조금 더 좋은 방법이 무엇인지 고민하면서 개발을 하게 되었다.
아직 오픈이 안된 상태이므로 잘못된 소스일 수도 있다.
간단한 구조는 아래와 같다.
일단 SQS는 AWS를 쓴다는 입장에서 사용할 수 있는 간단한 Queue 다.
기사용 App에서 특정 API를 호출하게 되면 해당 내용을 SQS에 담는다.
@Slf4j
@RestController
public class SQSController {
@Autowired
private SQSClient sqsClient;
@PostMapping("/route")
public void routeSend(@RequestBody RouteParam routeParam) {
sqsClient.routeSend(routeParam);
}
}
public void routeSend(RouteParam routeParam) {
String messageDeduplicationId = messageGroupId + "_" + new Date().getTime();
Map<String, Object> headers = new HashMap<>();
headers.put("message-group-id", messageGroupId);
headers.put("message-deduplication-id", messageDeduplicationId);
queueMessagingTemplate.convertAndSend(AWSConfig.getDestination(routeParam.getAllocationId()), routeParam, headers);
}
위와 같이 하게 되면 SQS에 RouteParam 이라는 객체가 담기게 된다.
SQS 에서 해당 내용을 가지고 오게 되는 Listener는 아래와 같이 만든다.
@SqsListener(deletionPolicy = SqsMessageDeletionPolicy.NEVER, value = "SQS queue name")
public void receiveMessage1(String message, @Header("SenderId") String senderId, Acknowledgment acknowledgment) throws JsonProcessingException {
log.debug("receiveMessage1 : " + message);
this.routeToLocalQueue(message);
acknowledgment.acknowledge();
}
Queue에서 메시지를 가지고 오게 되면 로컬에 만든 큐에 담는다.
로컬 큐에 담긴 이후 acknowledgment를 호출하여 SQS의 내용을 지워 준다. deletionPolicy를 ALWAYS를 주어도 되지만 로직상 삭제하면 안되는 경우를 고려하여 NEVER로 두고 명시적으로 호출하였다.
처음 생각으로는 Listener에서 직접 SSE 쪽에 이벤트를 주어서 동작시키게 하려 했지만 SSE 구현 예제 소스들이 다 Time Interval로 돌리는 것 밖에 없어서 Local Queue에 담고 SSE 쪽에서 해당 내용을 가지고 가는 방식으로 구현 하였다.
LocalQueue는 단순한 LinkedList를 이용하였다.
아래는 LocalQueue의 소스이다.
public class LocalQueue<T> {
private LinkedList<T> queue;
public LocalQueue() {
this.queue = new LinkedList<>();
}
public void addQueue(T object) {
queue.offer(object);
}
public T consumeQueue() {
try {
return queue.pop();
} catch (NoSuchElementException e) {
return null;
}
}
}
그리고 이를 이용해서 큐에 값을 담고 빼는 부분은 아래와 같다.
private void routeToLocalQueue(String message) throws JsonProcessingException {
RouteParam routeParam = mapper.readValue(message, RouteParam.class);
Long allocationId = routeParam.getAllocationId();
LocalQueue<RouteParam> localQueue = Optional.ofNullable(localQueueMap.get(allocationId)).orElse(new LocalQueue());
localQueue.addQueue(routeParam);
localQueueMap.put(allocationId, localQueue);
}
public String consumeFromLocalQueuee(Long allocationId) {
LocalQueue<RouteParam> localQueue = Optional.ofNullable(localQueueMap.get(allocationId)).orElse(new LocalQueue());
String routeParam = null;
try {
routeParam = mapper.writeValueAsString(localQueue.consumeQueue());
} catch (JsonProcessingException e) {
routeParam = null;
}
return routeParam;
}
다음은 SSE를 구현한 Controller 소스이다. 내용은 특별할 것은 없고 구글링 하면 나오는 소스 그대로 사용 하였다.
@GetMapping("/route-proxy/{allocationId}")
public Flux<ServerSentEvent<String>> routeProxy(@PathVariable Long allocationId) throws InterruptedException {
return Flux.interval(Duration.ofSeconds(2))
.map(sequence -> ServerSentEvent.<String> builder()
// .id(String.valueOf(sequence))
// .event("periodic-event")
.data(sqsClient.consumeFromLocalQueuee(allocationId))
.build());
}
이렇게 만들어서 돌리게 되면 다음과 같이 동작하게 된다.
위의 내용처럼 호출을 하게 되면 승객용 입장에서는 아래와 같이 동작하게 된다.
Postman에서 넘긴 값이 SQS > LocalQueue를 통해 브라우져에서 보여지게 된다.
간단하게 Prototype을 만들어 봤는데 조금 더 손봐야 할 곳들을 찾아 봐야 할 것 같다.
'Java > Spring' 카테고리의 다른 글
Exception을 활용하여 클린코드 작성하기 (0) | 2021.03.11 |
---|---|
명시적인 instance-id 셋팅으로 instance 샤딩하기 (0) | 2021.01.25 |
Asynchronous Methods 만들기 (0) | 2021.01.14 |
API 호출 테스트를 위한 간단한 팁 (0) | 2020.12.21 |
AttributeConverter class registered multiple times 에러가 발생할 경우 (0) | 2020.12.16 |