2021/01/22 - [Java/Spring] - SQS와 SSE를 이용한 Proxy 서버 만들기
위 글에서 SQS를 Listen 하는 과정에서 Parrparallel 하게 EC2 인스턴스가 생성되게 되면 어느 쪽에서 Queue item을 consume 하게 되는지 알 수 없어서 접속해 있는 고객용 앱에 Event를 내려줄 수 없는 상황에 직면하게 된다.
이를 해결하기 위해 명시적인 샤딩 및 고객번호를 통해 특정 서버로 접속할 수 있도록 분기를 쳐야 하는데, 이렇게 하기 위해서는 일단 Eureka에 등록되는 instance-id를 필요에 맞게 설정하고, consume을 할 수 있는 queue name도 설정해야 한다.
보통 1개의 yml 파일에서 셋팅을 하게 될 경우 동적으로 값을 바꿔 가며 올릴 수 없는데 이와 관련하여 내용을 찾다 보니 아래와 같은 내용을 찾게 되었다.
www.latera.kr/reference/java/2019-09-29-spring-boot-config-externalize/
검색 결과 페이지에서 처음에 확인해 본 내용은 "4.2.3. 애플리케이션 프로퍼티 파일" 이였다.
이를 적용하기 위해 yml 파일을 샤딩하는 ec2에 맵핑하여 만들 생각을 했다.
java -jar myproject.jar --spring.config.name=application-local2.yml
하지만 instance-id 및 queue name 2개의 값을 바꾸기 위해 yml 파일 전체를 바꿔서 셋팅한다는 것은 오버스럽다는 생각에 다른 내용을 더 찾아 보니 "4.2.2. 커맨드 라인 프로퍼티 접근" 부분이 눈에 들어왔다.
그래서 intellij 기준으로 다음과 같이 셋팅하여 테스트 해 보니 적용이 잘 되는 것을 볼 수 있었다.
queue name과 관련된 변수명은 임의로 잡았으며, eureka instance-id와 관련된 내용도 셋팅해 주었다. 필요한 변수가 더 있을 경우 오른쪽 플러스 버튼을 이용해 추가로 셋팅 가능하다.
아마도 Shell Console 화면에서 실행을 하게 된다면 "--sqs.queue-name.route-proxy=macaront-rtc-router-proxy-1.fifo --eureka.instance.instance-id=rtc-inst001" 과 같이 주면 될 것으로 보인다.
아직 Gateway에 적용은 안했지만 다음과 같은 방법으로 instance-id를 가져와서 분기를 치는 Route Rule을 만들어 적용하면 될 것으로 보인다.
@GetMapping("/intance-list")
public void getRtcListInEureka(@RequestParam String serviceId) {
List<ServiceInstance> serviceInstanceList = discoveryClient.getInstances(serviceId);
log.debug("serviceInstanceList : " + serviceInstanceList);
serviceInstanceList.stream().forEach(serviceInstance -> {
log.debug("Host : {}", serviceInstance.getHost());
log.debug("InstanceId : {}", serviceInstance.getInstanceId());
log.debug("Metadata : {}", serviceInstance.getMetadata());
log.debug("Port : {}", serviceInstance.getPort());
log.debug("Scheme : {}", serviceInstance.getScheme());
log.debug("ServiceId : {}", serviceInstance.getServiceId());
log.debug("Uri : {}", serviceInstance.getUri());
});
}
2021.01.27 추가 내용
Gateway에서 샤딩 된 Instance로 Routing 하기 위해서 다음과 같이 작업 하였다.
AbstractRoutePredicateFactory 를 상속받아 RouteProxyPredicateFactory 를 구현하였고, apply method에서 test를 아래와 같이 작성해 주었다.
@Override
public Predicate<ServerWebExchange> apply(Config config) {
return new GatewayPredicate() {
@Override
public boolean test(ServerWebExchange serverWebExchange) {
String path = serverWebExchange.getRequest().getURI().getRawPath();
List<String> pathList = Arrays.asList(StringUtils.tokenizeToStringArray(path, "/"));;
log.debug("pathList : {}", pathList);
if (CollectionUtils.isEmpty(pathList) || pathList.size() < 3) {
return false;
}
Long allocationId = Long.parseLong(pathList.get(pathList.size() - 1));
int instanceId = (int) (allocationId % 2) + 1;
log.debug("allocationId : {}", allocationId);
log.debug("instanceId : {}", instanceId);
log.debug("rtcInstance : {}", config.getRtcInstance());
return instanceId == config.getRtcInstance();
}
};
}
@AllArgsConstructor
@Getter
public static class Config {
private int rtcInstance;
}
기본적인 내용은 StripPrefixGatewayFilterFactory 에 나와 있는 내용을 참고 삼아 path를 계산하였고, 유효성 체크해서 적합하지 않을 경우 false를 리턴하도록 했다.
Config에서는 현재 어떤 instance로 할당할지 받게 하였고, route path에서 받게 된 내용을 기초로 해당 인스턴스 번호랑 맞게 된다면 true를 리턴하는 방식이다.
실제 사용하는 곳의 소스를 보면 아래와 같다.
@Bean
public RouteLocator rtcRoutes(RouteLocatorBuilder builder, UriConfiguration uriConfiguration, RouteProxyPredicateFactory routeProxyPredicateFactory) {
String rtcUrl = uriConfiguration.getRtc();
List<ServiceInstance> serviceInstanceList = discoveryClient.getInstances("RTC");
String defaultUrl = "http://localhost:8100";
String rtc001Url = serviceInstanceList.stream()
.filter(serviceInstance -> StringUtils.equals(serviceInstance.getInstanceId(), "rtc-inst001"))
.map(serviceInstance -> serviceInstance.getUri().toString())
.findFirst()
.orElse(defaultUrl);
String rtc002Url = serviceInstanceList.stream()
.filter(serviceInstance -> StringUtils.equals(serviceInstance.getInstanceId(), "rtc-inst002"))
.map(serviceInstance -> serviceInstance.getUri().toString())
.findFirst()
.orElse(defaultUrl);
log.debug("rtc001Url : {}", rtc001Url);
log.debug("rtc002Url : {}", rtc002Url);
return builder.routes()
.route("chauffer-sqs-send", p -> p
.path("/rtc/route")
.filters(f -> f
.stripPrefix(1)
)
.uri(rtcUrl)
)
.route("proxy-sse-1", p -> p
.path("^/rtc/route-proxy/^[0-9]+$")
.filters(f -> f
.stripPrefix(1)
)
.uri(rtc001Url)
.predicate(routeProxyPredicateFactory.apply(new RouteProxyPredicateFactory.Config(1)))
)
.route("proxy-sse-2", p -> p
.path("^/rtc/route-proxy/^[0-9]+$")
.filters(f -> f
.stripPrefix(1)
)
.uri(rtc002Url)
.predicate(routeProxyPredicateFactory.apply(new RouteProxyPredicateFactory.Config(2)))
)
.build();
}
discoverClient는 org.springframework.cloud.client.discovery.DiscoveryClient 를 사용하였다.
eureka에서 service에 해당하는 인스턴스 목록을 가지고 온 후 uri를 001, 002로 만들어 두었고, 실제 Route rule을 설정 하는 곳에서는 1번 인스턴스에 해당 되는 route라면 1번 URL로, 2번 인스턴스에 해당 하는 경우라면 2번 URL로 가도록 해 두었다.
실제 인스턴스가 안 떠 있는 경우라면 localhost의 /route-proxy/{allocationId} controller를 만들어 두어 fallback 비슷하게 구현을 해 두었다.
'Java > Spring' 카테고리의 다른 글
JUnit Test Code에서 Slf4j logger 이용하기 (0) | 2021.04.01 |
---|---|
Exception을 활용하여 클린코드 작성하기 (0) | 2021.03.11 |
SQS와 SSE를 이용한 Proxy 서버 만들기 (0) | 2021.01.22 |
Asynchronous Methods 만들기 (0) | 2021.01.14 |
API 호출 테스트를 위한 간단한 팁 (0) | 2020.12.21 |