本文共 9641 字,大约阅读时间需要 32 分钟。
是Spring5添加新的模块,用于web开发的,功能和SpringMVC类似的,Webflux使用当前一种比较流行响应式编程出现的框架。
使用传统web框架,比如SpringMVC,这些基于Servlet容器,Webflux是一种异步非阻塞的框架,异步非阻塞的框架在Servlet3.1以后才支持,核心是基于Reactor的相关API实现的。
什么是异步非阻塞
Webflux特点:
比较SpringMVC
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流。而相关的计算模型会自动将变化的值通过数据流进行传播。
例如,在命令式编程环境中,a=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。
提供的观察者模式两个类 Observer和Observable
public class ObserverDemo extends Observable { public static void main(String[] args) { ObserverDemo observer = new ObserverDemo(); //添加观察者 observer.addObserver((o,arg)->{ System.out.println("发生变化"); }); observer.addObserver((o,arg)->{ System.out.println("收到被观察者通知,准备改变"); }); observer.setChanged(); //数据变化 observer.notifyObservers(); //通知 }}
响应式编程操作中,Reactor是满足Reactive规范框架。
Reactor有两个核心类,Momo和Flux,这两个类实现接口Publisher,提供丰富操作符。Flux对象实现发布者,返回N个元素;Mono实现发布者,返回0或者1个元素。
Flux和Mono都是数据流的发布者,使用Flux和Mono都可以发出三种数据信号:元素值、错误信号、完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。
代表演示Flux和Mono 1.引入依赖io.projectreactor reactor-core 3.1.5.RELEASE
2.编写代码
public class TestReactor { public static void main(String[] args) { //just方法声明 发送数据流 Flux.just(1,2,3,4); Mono.just(1); //其他的方式 发送数据流 //数组 Integer[] array = { 1,2,3,4}; Flux.fromArray(array); //列表 Listlist = Arrays.asList(array); Flux.fromIterable(list); //Stream流 Stream stream = list.stream(); Flux.fromStream(stream); }}
三种信号的特点:
调用just或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。
//just方法声明 发送数据流Flux.just(1,2,3,4).subscribe(System.out::println);Mono.just(1).subscribe(System.out::println);
操作符:
对数据流进行一道道操作,成为操作符,比如工厂流水线。 第一:map 元素映射为新元素 比如在上图中,绿色圆圈的数值代表1,2,3,经过map操作符映射后,映射的规则是平方,最后变成蓝色方块1,4,9。第二: flatMap元素映射为流
把每个元素转换为流,把转换之后多个流合并为大的流。 大的流合并顺序是不确定的。SpringWebflux基于Reactor,默认使用容器是Netty,Netty是高性能的NIO框架,异步非阻塞的框架。
Netty
BIO(阻塞)
NIO(非阻塞)
SpringWebflux执行 过程和SpringMVC相似的
SpringWebflux实现函数式编程需要用到两个接口:
使用注解编程模型方式,和之前SpringMVC使用相似的,只需要把相关依赖配置到项目中,SpringBoot自动配置相关运行容器,默认情况下使用Netty服务器。
1.创建springboot工程,引入WebFlux依赖
org.springframework.boot spring-boot-starter-webflux
2.配置启动端口号
# 应用名称spring.application.name=demowebfluxserver.port=8081
3.创建包和实体类
3.1实体类//实体类public class User { private String name; private String gender; private Integer age; public User(String name, String gender, Integer age) { this.name = name; this.gender = gender; this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getGender() { return gender; } public void setGender(String gender) { this.gender = gender; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; }}
3.2创建接口定义操作的方法
//用户操作接口public interface UserService { //根据id查询用户 MonogetUserById(int id); //查询所有的用户 Flux getAllUser(); //添加用户 Mono saveUserInfo(Mono user);}
3.3接口实现类
@Servicepublic class UserServiceImpl implements UserService { //创建map集合存储数据 private final Mapusers = new HashMap<>(); //初始化map public UserServiceImpl(){ this.users.put(1,new User("lucy","nan",20)); this.users.put(2,new User("mary","nv",30)); this.users.put(3,new User("jack","nv",50)); } //根据id查询用 @Override public Mono getUserById(int id) { //如果查询不到,可以返回空 return Mono.justOrEmpty(this.users.get(id)); } //查询多个用户 @Override public Flux getAllUser() { return Flux.fromIterable(this.users.values()); } //添加用户 @Override public Mono saveUserInfo(Mono userMono) { //获取userMono中的User值 return userMono.doOnNext(person->{ //向map集合里面放值 int id = users.size()+1; users.put(id,person); //添加完之后,将Mono中的值清空 }).thenEmpty(Mono.empty()); }}
3.4创建controller
@RestControllerpublic class UserController { //注入service @Autowired private UserServiceImpl userService; //id查询 @GetMapping("/user/{id}") public MonogetUserId(@PathVariable int id){ return userService.getUserById(id); } //查询所有 @GetMapping("/user") public Flux getUsers(){ return userService.getAllUser(); } //添加 @PostMapping("/saveuser") public Mono saveUser(@RequestBody User user){ Mono userMono = Mono.just(user); return userService.saveUserInfo(userMono); }}
测试:
4.说明 SpringMVC方式实现,同步阻塞的方式,基于SpringMVC+Servlet+Tomcat SpringWebFlux方式实现:异步非阻塞方式,基于SpringWebflux+Reactor+Netty。在使用函数式编程模型操作时候,需要自己初始化服务器。
基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发给对应的handler)和HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数式接口的实现并且启动需要的服务器。
SpringWebflux请求和响应不再是ServletRequest和ServletResponse,而是ServerRequest和ServerResponse
第一步:把注解编程模型工程复制一份,保留entity和service内容
第二步:创建Handler(具体实现方法)public class UserHandler { private final UserService userService; public UserHandler(UserService userService){ this.userService = userService; } //根据id查询 public MonogetUserById(ServerRequest request){ //获取id值 int userId = Integer.valueOf(request.pathVariable("id")); //空值处理 Mono notFound = ServerResponse.notFound().build(); //调用service方法得到数据 Mono userMono = this.userService.getUserById(userId); //把userMono进行转换返回 //使用Reactor操作符flatMap return userMono. flatMap(person-> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON) .body(fromObject(person))) //如果为空,就返回notFount .switchIfEmpty(notFound); } //查询所有 public Mono getAllUsers(ServerRequest request){ //调用service得到结果 Flux users = this.userService.getAllUser(); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users,User.class); } //添加 public Mono saveUser(ServerRequest request){ //得到User对象 Mono userMono = request.bodyToMono(User.class); return ServerResponse.ok().build(this.userService.saveUserInfo(userMono)); }}
第三步:初始化服务器,编写Router
创建路由的方法public class Server { //1.创建Router路由 public RouterFunctionroutingFunction(){ UserService userService = new UserServiceImpl(); UserHandler handler = new UserHandler(userService); return RouterFunctions.route( //accpet: 代码什么类型的数据 //handler:: 执行handler里面的哪个方法 GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById) .andRoute(GET("/users").and(accept(APPLICATION_JSON)),handler::getAllUsers); }}
创建服务器完成适配
//2.创建服务器完成适配 public void createReactorServer(){ //路由和handler适配 RouterFunctionrouter = routingFunction(); HttpHandler httpHandler = toHttpHandler(router); ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler); //创建服务器 HttpServer httpServer = HttpServer.create(); httpServer.handle(adapter).bindNow(); }
最终调用
public static void main(String[] args) throws IOException { Server server = new Server(); server.createReactorServer(); System.out.println("enter to exit"); System.in.read(); }
测试:
使用WebClient调用
public class Client { public static void main(String[] args) { //调用服务器地址 WebClient webClient = WebClient.create("http://127.0.0.1:64238"); //根据id查询 String id = "1"; User userresult = webClient.get() //指定发送请求的路径 .uri("/users/{id}", id) //接收类型 .accept(MediaType.APPLICATION_JSON) //初始化类型 .retrieve() //结果映射 .bodyToMono(User.class) //执行 .block(); System.out.println(userresult.getName()); //查询所有 Fluxresults = webClient.get().uri("/users") .accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToFlux(User.class); results.map(stu->stu.getName()) .buffer() .doOnNext(System.out::println).blockFirst(); }}
转载地址:http://opiwi.baihongyu.com/