Java: 使用Netty实现Socket和WebSocket服务器

pom.xml中增加依赖:

1
2
3
4
5
6
7
8
9
10
 <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.22</version>
 </dependency>
<!-- netty -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>

java标志
image-3415

Netty服务基类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author puruidong
 * @version 2022/7/2
 */

@Slf4j
public abstract class NettyBaseService {

  private Map<String, EventLoopGroup> eventLoopGroupMap = new ConcurrentHashMap<>();

  protected abstract ChannelHandler[] createHandlers();

  /**
   * 端口
   *
   * @return 端口
   */

  public abstract int getPort();

  /**
   * 名称
   *
   * @return 名称
   */

  public abstract String getName();

  @PostConstruct
  public void start() throws Exception {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
      serverBootstrap
          .group(bossGroup, workerGroup)
          .channel(NioServerSocketChannel.class)
          .childHandler(
              new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                  ChannelHandler[] handlers = createHandlers();
                  for (ChannelHandler handler : handlers) {
                    ch.pipeline().addLast(handler);
                  }
                }
              })
          .option(ChannelOption.SO_BACKLOG, 128)
          .option(ChannelOption.SO_REUSEADDR, true)
          .childOption(ChannelOption.SO_KEEPALIVE, true)
          .childOption(ChannelOption.SO_REUSEADDR, true);
      ChannelFuture cf = serverBootstrap.bind(getPort()).await();
      if (!cf.isSuccess()) {
        log.error("绑定端口失败:{}", getPort());
        throw new Exception("绑定端口失败:" + getPort());
      }
      eventLoopGroupMap.put(getPort() + "-" + getName() + "-1", bossGroup);
      eventLoopGroupMap.put(getPort() + "-" + getName() + "-2", workerGroup);
      log.info("服务[{}]启动完毕,监听端口: [{}]", getName(), getPort());
      // cf.channel().closeFuture().await();
    } catch (Exception e) {
      workerGroup.shutdownGracefully().sync();
      bossGroup.shutdownGracefully().sync();
    } /*finally {
        workerGroup.shutdownGracefully().sync();
        bossGroup.shutdownGracefully().sync();
      }*/

  }

  @PreDestroy
  public void stop() {
    eventLoopGroupMap.values().forEach((item) -> item.shutdownGracefully().syncUninterruptibly());
    log.info("全部Socket服务: [{}],关闭成功", eventLoopGroupMap.keySet());
  }
}

Socket服务定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author puruidong
 * @version 2022/7/2
 */

@Setter
@Getter
@Slf4j
public class TcpServer extends NettyBaseService {
  private int port;
  private String name = "String Server";

  public TcpServer(int port) {
    this.port = port;
    log.info("Socket上报数据时,数据需要以\\r\\n结尾.请求地址为(127.0.0.1改为实际ip): 127.0.0.1:{}", port);
  }

  @Override
  protected ChannelHandler[] createHandlers() {
    return new ChannelHandler[] {
      new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()),
      new StringDecoder(),
      new StringEncoder(),
      new TcpServerHandler()
    };
  }
}

Socket服务实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.net.InetSocketAddress;

/**
 * @author puruidong
 * @version 2022/7/2
 */

@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler<String> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
    String clientIp = socketAddress.getAddress().getHostAddress();
    int clientPort = socketAddress.getPort();
    ChannelId channelId = ctx.channel().id();
    log.info("Socket客户端: {},IP: {},端口: {} ,上线!", channelId, clientIp, clientPort);
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    log.info("---------------------socket断线检测-------------------------");
    InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
    String clientIp = socketAddress.getAddress().getHostAddress();
    int clientPort = socketAddress.getPort();
    ChannelId channelId = ctx.channel().id();
    log.info("Socket客户端: {},IP: {},端口: {} ,成功下线!", channelId, clientIp, clientPort);
  }

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {}

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    log.info("Socket消息读取: {}", msg);
    // 依赖后面的SpringUtils工具类.如不需要可注释,注释后不影响Socket正常使用.
    StringRedisTemplate stringRedisTemplate = SpringUtils.getBean(StringRedisTemplate.class);
    String stringValue = stringRedisTemplate.opsForValue().get("order-test");
    ctx.writeAndFlush("返回内容:" + stringValue + msg + "\r\n");
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    log.info("channelReadComplete");
    ctx.flush();
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    String socketString = ctx.channel().remoteAddress().toString();
    if (evt instanceof IdleStateEvent) {
      IdleStateEvent event = (IdleStateEvent) evt;
      if (event.state() == IdleState.READER_IDLE) {
        log.info("Socket Client: {} READER_IDLE 读超时", socketString);
        ctx.disconnect();
      } else if (event.state() == IdleState.WRITER_IDLE) {
        log.info("Socket Client: {} WRITER_IDLE 写超时", socketString);
        ctx.disconnect();
      } else if (event.state() == IdleState.ALL_IDLE) {
        log.info("Socket Client: {} ALL_IDLE 总超时", socketString);
        ctx.disconnect();
      }
    } else {
      super.userEventTriggered(ctx, evt);
    }
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    log.error("Socket消息处理异常!");
    if (cause != null) {
      cause.printStackTrace();
    }
    if (ctx != null) {
      ctx.close();
    }
  }
}

WebSocket服务定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
 * @author puruidong
 * @version 2022/7/2
 */

@Getter
@Slf4j
public class WebSocketServer extends NettyBaseService {
  private int port;
  private String name = "WebSocket";

  public WebSocketServer(int port) {
    this.port = port;
    log.info("WebSocket上报数据时,请求地址为(127.0.0.1改为实际ip): ws://127.0.0.1:{}/ws", port);
  }

  @Override
  protected ChannelHandler[] createHandlers() {
    return new ChannelHandler[] {
      new HttpServerCodec(),
      new ChunkedWriteHandler(),
      new HttpObjectAggregator(1048576),
      new WebSocketServerProtocolHandler("/ws"),
      new WebSocketServerHandler()
    };
  }
}

WebSocket服务实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
 * @author puruidong
 * @version 2022/7/2
 */

@Slf4j
@Component
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
    String clientIp = socketAddress.getAddress().getHostAddress();
    int clientPort = socketAddress.getPort();
    ChannelId channelId = ctx.channel().id();
    log.info("WebSocket客户端: {},IP: {},端口: {} ,上线!", channelId, clientIp, clientPort);
  }

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {}

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) msg;
    log.info("WebSocket读取数据: {} ", textWebSocketFrame.text());
    ctx.writeAndFlush("返回数据: " + msg);
  }

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    log.info("WebSocket用户:{}上线", ctx.channel().id().asLongText());
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    log.info("WebSocket用户:{}下线", ctx.channel().id().asLongText());
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    log.info("---------------------socket断线检测-------------------------");
    InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
    String clientIp = socketAddress.getAddress().getHostAddress();
    int clientPort = socketAddress.getPort();
    ChannelId channelId = ctx.channel().id();
    log.info("WebSocket客户端: {},IP: {},端口: {} ,成功下线!", channelId, clientIp, clientPort);
  }

  @Override
  public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    log.info("channelReadComplete");
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    String socketString = ctx.channel().remoteAddress().toString();
    if (evt instanceof IdleStateEvent) {
      IdleStateEvent event = (IdleStateEvent) evt;
      if (event.state() == IdleState.READER_IDLE) {
        log.info("WebSocket Client: {} READER_IDLE 读超时", socketString);
        ctx.disconnect();
      } else if (event.state() == IdleState.WRITER_IDLE) {
        log.info("WebSocket Client: {} WRITER_IDLE 写超时", socketString);
        ctx.disconnect();
      } else if (event.state() == IdleState.ALL_IDLE) {
        log.info("WebSocket Client: {} ALL_IDLE 总超时", socketString);
        ctx.disconnect();
      }
    } else {
      super.userEventTriggered(ctx, evt);
    }
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    log.error("WebSocket消息处理异常!");
    if (cause != null) {
      cause.printStackTrace();
    }
    if (ctx != null) {
      ctx.close();
    }
  }
}

启动类(监听Spring Boot启动事件,然后启动该类):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

/**
 * @author puruidong
 * @version 2022/7/2
 */

@Component
public class SocketServerReadyEventListener
    implements ApplicationListener<ApplicationReadyEvent>, Ordered {

  private int socketServerPort=5602;

  private int webSocketServerPort=5603;

  @Override
  public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
    TcpServer tcpServer = new TcpServer(socketServerPort);
    WebSocketServer webSocketServer = new WebSocketServer(webSocketServerPort);
    try {
      tcpServer.start();
      webSocketServer.start();
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }

  @Override
  public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE;
  }
}

Spring工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * Spring工具类
 *
 *
 * 在非spring管理环境中获取bean
 *
 */

@Component
public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware
{
    /** Spring应用上下文环境 */
    private static ConfigurableListableBeanFactory beanFactory;

    private static ApplicationContext applicationContext;

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException
    {
        SpringUtils.beanFactory = beanFactory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
    {
        SpringUtils.applicationContext = applicationContext;
    }

    /**
     * 获取对象
     *
     * @param name
     * @return Object 一个以所给名字注册的bean的实例
     * @throws org.springframework.beans.BeansException
     *
     */

    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) throws BeansException
    {
        return (T) beanFactory.getBean(name);
    }

    /**
     * 获取类型为requiredType的对象
     *
     * @param clz
     * @return
     * @throws org.springframework.beans.BeansException
     *
     */

    public static <T> T getBean(Class<T> clz) throws BeansException
    {
        T result = (T) beanFactory.getBean(clz);
        return result;
    }

    /**
     * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
     *
     * @param name
     * @return boolean
     */

    public static boolean containsBean(String name)
    {
        return beanFactory.containsBean(name);
    }

    /**
     * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
     *
     * @param name
     * @return boolean
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */

    public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.isSingleton(name);
    }

    /**
     * @param name
     * @return Class 注册对象的类型
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */

    public static Class<?> getType(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getType(name);
    }

    /**
     * 如果给定的bean名字在bean定义中有别名,则返回这些别名
     *
     * @param name
     * @return
     * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException
     *
     */

    public static String[] getAliases(String name) throws NoSuchBeanDefinitionException
    {
        return beanFactory.getAliases(name);
    }

    /**
     * 获取aop代理对象
     *
     * @param invoker
     * @return
     */

    @SuppressWarnings("unchecked")
    public static <T> T getAopProxy(T invoker)
    {
        return (T) AopContext.currentProxy();
    }

    /**
     * 获取当前的环境配置,无配置返回null
     *
     * @return 当前的环境配置
     */

    public static String[] getActiveProfiles()
    {
        return applicationContext.getEnvironment().getActiveProfiles();
    }

    /**
     * 获取当前的环境配置,当有多个环境配置时,只获取第一个
     *
     * @return 当前的环境配置
     */

    public static String getActiveProfile()
    {
        final String[] activeProfiles = getActiveProfiles();
        return StringUtils.isNotEmpty(activeProfiles) ? activeProfiles[0] : null;
    }
}

WebSocket访问地址(在线调试: http://www.websocket-test.com/): ws://127.0.0.1:5603/ws
Socket访问地址: 127.0.0.1:5602

image-3416

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

*

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据