公司里目前用的是RocketMQ,用的过程中遇到一些问题,逐渐将一些业务转到 kafka 上,正好目前项目是spring boot项目,所以就来试试 spring cloud stream,本地环境有 rabbitmq,所以使用它了。

依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependencies>  
    <dependency>  
        <groupId>org.springframework.boot</groupId>  
        <artifactId>spring-boot-starter-web</artifactId>  
    </dependency>  
    <dependency>  
        <groupId>org.springframework.cloud</groupId>  
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>  
    </dependency>  
    <dependency>  
        <groupId>org.projectlombok</groupId>  
        <artifactId>lombok</artifactId>  
    </dependency>  
  
    <dependency>  
        <groupId>org.springframework.boot</groupId>  
        <artifactId>spring-boot-starter-test</artifactId>  
        <scope>test</scope>  
    </dependency>  
    <dependency>  
        <groupId>org.springframework.cloud</groupId>  
        <artifactId>spring-cloud-stream-test-support</artifactId>  
        <scope>test</scope>  
    </dependency>  
</dependencies>  

定义接收器

1
2
3
4
5
public interface MyReceiver {  
    @Input("my-channel")  
    SubscribableChannel input();  
}  

定义发送器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public interface MySender {  
    @Output("my-channle")  
    MessageChannel output();  
}  

```上面的 `@Input()`  `@Output()` 里的参数就是绑定的名字需要使用一致的名字  

## 接受发送消息

```java
@EnableBinding(value = {MyReceiver.class, MySender.class})  
@RestController  
@Slf4j  
public class MessageController {  
  
     @Autowired  
    private MySender sender;  
      
    /**  
     * 发送  
     */  
    @PostMapping("/send")  
    public void send(@RequestParam("message") String message) {  
        sender.output().send(MessageBuilder.withPayload(message).build());  
    }  
  
     /**  
     * 接收  
     */  
    @StreamListener("my-channel")  
    public void receive(String message) {  
        log.info("Received: {}", message);  
    }  
}  

```必须加上 `@EnableBinding(value = {MyReceiver.class})`,使接收生效在接收方法上加上 `@StreamListener("my-channel")`。  
和接收一样需要加上 `@EnableBinding(value = {MySender.class})` 使发送生效(应该是创建代理类吧)  
但上面接收和发送在一个项目里面启动的时候会失败spring会把 `@Input`  `@Output` 绑定的名字作为Bean的名字创建Bean所以就重复了解决方法是在代码里设置成不同的名字然后在配置文件里指定 `destination`。  

```yaml
spring:  
  cloud:  
    stream:  
      bindings:  
        input:  
          destination: my-channel  
        output:  
          destination: my-channel  

最终的代码是

1
2
3
4
5
public interface MyReceiver {  
    @Input(Sink.INPUT)  
    SubscribableChannel input();  
}  

public interface MySender {
@Output(Source.OUTPUT)
MessageChannel output();
}

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@EnableBinding(value = {MyReceiver.class, MySender.class})  
@RestController  
@Slf4j  
public class MessageController {  
  
    @Autowired  
    private MySender sender;  
  
    @PostMapping("/send")  
    public void send(@RequestParam("message") String message) {  
        sender.output().send(MessageBuilder.withPayload(message).build());  
    }  
  
    @StreamListener(Sink.INPUT)  
    public void receive(String message) {  
        log.info("Received: {}", message);  
    }  
}  
```

调用接口可以看到控制台的打印  

```bash
curl -XPOST 'localhost:8080/send?message=hello-world'  
```

---
## Comments:
### 测试评论
[shellj](https://www.blogger.com/profile/12177127586046544427 "noreply@blogger.com") - <time datetime="2018-12-06T17:36:53.917+08:00">Dec 4, 2018</time>

测试评论
<hr />