จากบทความที่แล้ว “สร้าง Reactive RESTful Web Service ด้วย Spring Boot 3 WebFlux” ได้อธิบายเกี่ยวกับ Reactive (Webflux) และแนะนำวิธีการสร้างโปรเจคเบื้องต้นของ Reactive RESTful Web Service แบบง่าย ๆ สำหรับบทความนี้จะมาเพิ่มความสามารถของโปรเจคในกรณีต้องการที่จะเรียกข้อมูลภายนอก เราจำเป็นต้องมี http client เพื่อใช้เป็นตัวเรียกข้อมูลที่สามารถกำหนด GET, POST, PUT, DELETE และข้อมูลร้องขอ WebClient จะทำหน้าที่นี้ บทความนี้จะพามาตั้งค่าเพิ่มเติมให้โปรเจคตั้งต้นของเรา
สร้าง Mock Data ด้วย Mockoon
ก่อนที่จะไปโด๊ดเพิ่มในโปรเจค เราจำเป็นต้องจำลองข้อมูลเพื่อให้ Reactive Service ของเราเรียกใช้งาน Mockoon จะมาช่วยทำเรื่องนี้
[
"Project Reactor",
"RxJava",
"RxJS",
"Rx.NET",
"RxScala",
"RxClojure",
"RxKotlin",
"RxSwift",
"RxGo",
"RxPHP"
]
Configuration WebClient เพิ่มในโปรเจค
– src/main/java/com/poolsawat/reactivewebflux/configs/WebClientConfiguration.java
package com.poolsawat.reactivewebflux.configs;
import com.poolsawat.reactivewebflux.loggers.ClientCallLogger;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import lombok.val;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import java.time.Duration;
@Configuration
@EnableWebFlux
public class WebClientConfiguration implements WebFluxConfigurer {
@Value("${clients.endpoints.reactive.host}")
private String reactiveHost;
@Bean
public WebClient createWebClient(HttpClient httpClient){
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.baseUrl(reactiveHost)
.build();
}
@Bean
public HttpClient createHttpClient(ClientCallLogger clientLogger){
val connectionProvider = ConnectionProvider.builder("reactive-tcp-connection-pool")
.maxConnections(10)
.pendingAcquireTimeout(Duration.ofMillis(15000)) // 15 sec
.maxIdleTime(Duration.ofMillis(30000)) // 30 sec
.build();
return HttpClient.create(connectionProvider)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 15000) // 30 sec
.doOnConnected( it -> {
it.addHandlerLast(new ReadTimeoutHandler(30000)) // 30 sec
.addHandlerLast(new WriteTimeoutHandler(30000)); // 30 sec
})
.doOnRequest( (x,conn) -> conn.addHandlerFirst(clientLogger))
.doOnResponse( (x, conn) -> conn.addHandlerFirst(clientLogger));
}
}
– src/main/java/com/poolsawat/reactivewebflux/clients/proxy/ReactiveClientProxy.java
package com.poolsawat.reactivewebflux.clients.proxy;
import reactor.core.publisher.Mono;
import java.util.List;
public interface ReactiveClientProxy {
Mono<List<String>> getReactiveItems();
}
– src/main/java/com/poolsawat/reactivewebflux/clients/ReactiveClient.java
package com.poolsawat.reactivewebflux.clients;
import com.poolsawat.reactivewebflux.clients.proxy.ReactiveClientProxy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Repository;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.List;
@Repository
public class ReactiveClient implements ReactiveClientProxy {
@Value("${clients.endpoints.reactive.path}")
private String reactivePath;
@Autowired
private WebClient webClient;
@Override
public Mono<List<String>> getReactiveItems() {
return webClient.get()
.uri(reactivePath)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<List<String>>() {});
}
}
– src/main/java/com/poolsawat/reactivewebflux/loggers/ClientCallLogger.java
package com.poolsawat.reactivewebflux.loggers;
import com.poolsawat.reactivewebflux.constants.HeaderKeys;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.springframework.stereotype.Component;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import static io.netty.handler.codec.http.LastHttpContent.EMPTY_LAST_CONTENT;
@Component
@Slf4j
public class ClientCallLogger extends LoggingHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
super.connect(ctx, remoteAddress, localAddress, promise);
}
private void log(String content) {
if (content != null)
log.info(content);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log(format(ctx, "READ", msg));
super.channelRead(ctx, msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log(format(ctx, "WRITE", msg));
super.write(ctx, msg, promise);
}
@Override
protected String format(ChannelHandlerContext ctx, String eventName, Object arg) {
if (arg instanceof ByteBuf) {
return super.format(ctx, eventName, arg);
} else {
if (arg instanceof HttpRequest i) {
return formatRequestHeadersAndBody((HttpRequest) arg);
} else if (arg instanceof HttpResponse l) {
return formatResponseMetaData((HttpResponse) arg);
} else if (arg instanceof ByteBufHolder d) {
return formatResponseBody((ByteBufHolder) arg);
} else {
return super.format(ctx, eventName, arg);
}
}
}
private String tryExtractBody(Object arg) {
try {
return ((ByteBufHolder)arg).content().toString(StandardCharsets.UTF_8);
} catch (Exception ex ) {
log.warn("error occurred while extracting body from http, ignored with empty", ex);
return "";
}
}
private String formatRequestHeadersAndBody(HttpRequest req) {
val builder = new StringBuilder("HTTPClient Request --> method=[");
if (req instanceof DefaultFullHttpRequest) {
return builder.append(req.method()).append(HeaderKeys.URI.getKey()).append(req.uri()).append(HeaderKeys.HEADERS.getKey()).append(req.headers())
.append("] body=[").append(tryExtractBody(req)).append("]").toString();
} else {
return builder.append(req.method()).append(HeaderKeys.URI.getKey()).append(req.uri()).append(HeaderKeys.HEADERS.getKey()).append(req.headers())
.append("]").toString();
}
}
private String formatResponseMetaData(HttpResponse res) {
return new StringBuilder("HTTPClient Response Meta Data <-- status=[").append(res.status()).append(HeaderKeys.HEADERS.getKey())
.append(res.headers()).append("]").toString();
}
private String formatResponseBody(ByteBufHolder res){
if (res == EMPTY_LAST_CONTENT)
return null;
return new StringBuilder("HTTPClient Response Body <-- body=[").append(tryExtractBody(res)).append("]").toString();
}
}
– src/main/resources/application.properties
server.port=8443
spring.webflux.base-path=/reactive
clients.endpoints.reactive.host=http://localhost:3000
clients.endpoints.reactive.path=/reactive/items
– src/main/java/com/poolsawat/reactivewebflux/services/impl/ReactiveServiceImpl.java
package com.poolsawat.reactivewebflux.services.impl;
import com.poolsawat.reactivewebflux.clients.ReactiveClient;
import com.poolsawat.reactivewebflux.services.ReactiveService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.List;
@Service
public class ReactiveServiceImpl implements ReactiveService {
@Autowired
private ReactiveClient reactiveClient;
@Override
public Mono<List<String>> getReactiveListItems() {
return reactiveClient.getReactiveItems();
}
}
ทำการ start application server ด้วยคำสั่ง `mvn spring-boot:run`
ทดสอบเรียก http://localhost:8443/reactive/welcome
curl --silent --location 'http://localhost:8443/reactive/welcome'
สังเกตุ logging ของโปรเจค
2023-08-21T23:11:15.128+07:00 INFO 94946 --- [ctor-http-nio-3] c.p.r.loggers.ClientCallLogger : HTTPClient Request --> method=[GET] uri=[/reactive/items] headers=[DefaultHttpHeaders[user-agent: ReactorNetty/1.1.9, host: localhost:3000, Content-Type: application/json, Accept: application/json]]
2023-08-21T23:11:15.138+07:00 INFO 94946 --- [ctor-http-nio-3] c.p.r.loggers.ClientCallLogger : HTTPClient Response Meta Data <-- status=[200 OK] headers=[DefaultHttpHeaders[Content-Type: application/json; charset=utf-8, Content-Length: 113, Date: Mon, 21 Aug 2023 16:11:15 GMT, Connection: keep-alive, Keep-Alive: timeout=5]]
2023-08-21T23:11:15.172+07:00 INFO 94946 --- [ctor-http-nio-3] c.p.r.loggers.ClientCallLogger : HTTPClient Response Body <-- body=[[
"Project Reactor",
"RxJava",
"RxJS",
"Rx.NET",
"RxScala",
"RxClojure",
"RxKotlin",
"RxSwift",
"RxGo",
"RxPHP"
]]
สรุปท้ายบทความ
หวังว่าบทความนี้จะเป็นประโยชน์กับผู้อ่าน รอติดตามบทความหน้าจะมีอะไรมาแบ่งปัน คอยติดตามด้วยนะครับ
Github Source code -> https://github.com/pool13433/spring-reactive-webflux