微信公眾號:bugstack蟲洞棧 | 關注獲得源碼 沉淀、分享、成長,專注于原創(chuàng)專題案例,以最易學習編程的方式分享知識,讓自己和他人都能有所收獲。目前已完成的專題有;Netty4.x實戰(zhàn)專題案例、用Java實現JVM、基于JavaAgent的全鏈路監(jiān)控、手寫RPC框架、架構設計專題案例[Ing]等。
前言介紹
在物聯(lián)網開發(fā)中,常常需要通過網頁端來控制設備,包括;獲取信息、執(zhí)行操作、啟動停止等,就像我們在手機上會控制家里的小米盒子、路由器、電飯煲或者在線養(yǎng)狗等一些設備一樣。在這里所有的下層設備都可以通過socket通信鏈接到服務端,而用戶一端在通過http鏈接或者websocket鏈接到服務端,通過發(fā)送和接收數據來做出相應的行為操作。如下圖;
案例目標
本章節(jié)整合Springboot+Netty,通過部署nettySocket與webSocket兩套服務端,來接收轉發(fā)行為消息。 客戶端采用js鏈接websocket,用于接收服務端反饋與發(fā)送指令,用于獲取下位機信息。 在test中啟動一個模擬下位機,用于反饋信息數據。在真實開發(fā)中下位機與服務端通信通常是定義好的字節(jié)碼,需要自己編寫解碼器。
環(huán)境準備
jdk 1.8.0 IntelliJ IDEA Community Edition 2018.3.1 x64 Netty 4.1.36.Final
代碼示例
itstack- demo- netty- 3 - 01
└── src
├── main
│ ├── java
│ │ └── org. itstack. demo. ark
│ │ ├── domain
│ │ │├── msgobj
│ │ ││ ├── Feedback. java
│ │ ││ ├── QueryInfoReq. java
│ │ ││ └── Text. java
│ │ │├── Device. java
│ │ │├── EasyResult. java
│ │ │├── InfoProtocol. java
│ │ │└── ServerInfo. java
│ │ ├── server
│ │ │├── socket
│ │ ││ ├── MyChannelInitializer. java
│ │ ││ ├── MyServerHandler. java
│ │ ││ └── NettyServer. java
│ │ │└── websocket
│ │ │ ├── MyWsChannelInitializer. java
│ │ │ ├── MyWsServerHandler. java
│ │ │ └── WsNettyServer. java
│ │ ├── util
│ │ │├── CacheUtil. java
│ │ │├── MsgUtil. java
│ │ │└── MsgUtil. java
│ │ ├── web
│ │ │└── NettyController. java
│ │ └── Application. java
│ └── resources
│ │ └── application. yml
│ └── webapp
│ ├── arkWs
│ │├── js
│ ││ └── ws. js
│ │ └── arkWsControlCenter. html
│ ├── res
│ └── WEB- INF
│ └── index. jsp
│
└── test
└── java
└── org. itstack. demo. test
└── ApiTest. java
演示部分重點代碼塊,完整代碼下載關注公眾號;bugstack蟲洞棧,回復Netty案例
server/socket/MyServerHandler.java & socket數據處理
當有下位機鏈接服務端時,構建下位機信息,實際使用可以通過注冊方式進行鏈接驗證。 當收到下位機信息后轉發(fā)到websocket端,使網頁端收到下位機信息反饋
public class MyServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory. getLogger ( MyServerHandler. class ) ;
/**
* 當客戶端主動鏈接服務端的鏈接后,這個通道就是活躍的了。也就是客戶端與服務端建立了通信通道并且可以傳輸數據
*/
@Override
public void channelActive ( ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = ( SocketChannel) ctx. channel ( ) ;
String channelId = channel. id ( ) . toString ( ) ;
System. out. println ( "鏈接報告開始" ) ;
System. out. println ( "鏈接報告信息:有一客戶端鏈接到本服務端。channelId:" + channelId) ;
System. out. println ( "鏈接報告IP:" + channel. localAddress ( ) . getHostString ( ) ) ;
System. out. println ( "鏈接報告Port:" + channel. localAddress ( ) . getPort ( ) ) ;
System. out. println ( "鏈接報告完畢" ) ;
//構建設備信息{下位機、中繼器、IO板卡}
Device device = new Device ( ) ;
device. setChannelId ( channelId) ;
device. setNumber ( UUID. randomUUID ( ) . toString ( ) ) ;
device. setIp ( channel. remoteAddress ( ) . getHostString ( ) ) ;
device. setPort ( channel. remoteAddress ( ) . getPort ( ) ) ;
device. setConnectTime ( new Date ( ) ) ;
//添加設備信息
deviceGroup. put ( channelId, device) ;
CacheUtil. cacheClientChannel. put ( channelId, channel) ;
}
@Override
public void channelRead ( ChannelHandlerContext ctx, Object objMsgJsonStr) throws Exception {
//接收設備發(fā)來信息
System. out. println ( new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) . format ( new Date ( ) ) + " 接收到消息內容:" + objMsgJsonStr) ;
//轉發(fā)消息到Ws
CacheUtil. wsChannelGroup. writeAndFlush ( new TextWebSocketFrame ( objMsgJsonStr. toString ( ) ) ) ;
}
}
server/websocket/MyWsServerHandler.java & websocket數據處理
websocket數據需要轉換后使用,可以支持文本消息,本案例中使用json字符串,方便對象傳輸 channelRead轉發(fā)數據,當收到數據后發(fā)送給下位機,主要通過內容中channel控制
public class MyWsServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead ( ChannelHandlerContext ctx, Object msg) throws Exception {
. . .
//ws
if ( msg instanceof WebSocketFrame ) {
WebSocketFrame webSocketFrame = ( WebSocketFrame) msg;
//關閉請求
if ( webSocketFrame instanceof CloseWebSocketFrame ) {
handshaker. close ( ctx. channel ( ) , ( CloseWebSocketFrame) webSocketFrame. retain ( ) ) ;
return ;
}
//ping請求
if ( webSocketFrame instanceof PingWebSocketFrame ) {
ctx. channel ( ) . write ( new PongWebSocketFrame ( webSocketFrame. content ( ) . retain ( ) ) ) ;
return ;
}
//只支持文本格式,不支持二進制消息
if ( ! ( webSocketFrame instanceof TextWebSocketFrame ) ) {
throw new Exception ( "僅支持文本格式" ) ;
}
String request = ( ( TextWebSocketFrame) webSocketFrame) . text ( ) ;
System. out. println ( "服務端收到:" + request) ;
InfoProtocol infoProtocol = JSON. parseObject ( request, InfoProtocol. class ) ;
//socket轉發(fā)消息
String channelId = infoProtocol. getChannelId ( ) ;
Channel channel = CacheUtil. cacheClientChannel. get ( channelId) ;
if ( null == channel) return ;
channel. writeAndFlush ( MsgUtil. buildMsg ( infoProtocol) ) ;
//websocket消息反饋發(fā)送成功
ctx. writeAndFlush ( MsgUtil. buildWsMsgText ( ctx. channel ( ) . id ( ) . toString ( ) , "向下位機傳達消息success!" ) ) ;
}
}
}
web/NettyController.java & 控制層方便獲取服務端信息
控制層提供了查詢服務列表、鏈接設備信息、以及主動觸發(fā)信息發(fā)送 另外如果需要將服務端的啟動關閉進行手動控制,可以在這里面提供方法供調用
@Controller
public class NettyController {
private Logger logger = LoggerFactory. getLogger ( NettyController. class ) ;
@RequestMapping ( "/index" )
public String index ( ) {
return "index" ;
}
@RequestMapping ( "/queryNettyServerList" )
@ResponseBody
public Collection< ServerInfo> queryNettyServerList ( ) {
try {
Collection< ServerInfo> serverInfos = CacheUtil. serverInfoMap. values ( ) ;
logger. info ( "查詢服務端列表。{}" , JSON. toJSONString ( serverInfos) ) ;
return serverInfos;
} catch ( Exception e) {
logger. info ( "查詢服務端列表失敗。" , e) ;
return null;
}
}
@RequestMapping ( "/queryDeviceList" )
@ResponseBody
public Collection< Device> queryDeviceList ( ) {
try {
Collection< Device> deviceInfos = CacheUtil. deviceGroup. values ( ) ;
logger. info ( "查詢設備鏈接列表。{}" , JSON. toJSONString ( deviceInfos) ) ;
return deviceInfos;
} catch ( Exception e) {
logger. info ( "查詢設備鏈接列表失敗。" , e) ;
return null;
}
}
@RequestMapping ( "/doSendMsg" )
@ResponseBody
public EasyResult doSendMsg ( String reqStr) {
try {
logger. info ( "向設備發(fā)送信息[可以通過另外一個Web服務調用本接口發(fā)送信息]:{}" , reqStr) ;
InfoProtocol infoProtocol = MsgUtil. getMsg ( reqStr) ;
String channelId = infoProtocol. getChannelId ( ) ;
Channel channel = CacheUtil. cacheClientChannel. get ( channelId) ;
channel. writeAndFlush ( MsgUtil. buildMsg ( infoProtocol) ) ;
return EasyResult. buildSuccessResult ( ) ;
} catch ( Exception e) {
logger. error ( "向設備發(fā)送信息失敗:{}" , reqStr, e) ;
return EasyResult. buildErrResult ( e) ;
}
}
}
Application.java & 啟動層
通過繼承CommandLineRunner方法,在服務就緒后啟動socket服務 啟動后需要循環(huán)驗證是否啟動完成
@SpringBootApplication
@ComponentScan ( "org.itstack.demo.ark" )
public class Application implements CommandLineRunner {
private Logger logger = LoggerFactory. getLogger ( Application. class ) ;
@Value ( "${netty.socket.port}" )
private int nettyServerPort;
@Value ( "${netty.websocket.port}" )
private int nettyWsServerPort;
//默認線程池
private static ExecutorService executorService = Executors. newFixedThreadPool ( 2 ) ;
public static void main ( String[ ] args) {
SpringApplication. run ( Application. class , args) ;
}
@Override
public void run ( String. . . args) throws Exception {
//啟動NettyServer-socket
logger. info ( "啟動NettyServer服務,啟動端口:{}" , nettyServerPort) ;
NettyServer nettyServer = new NettyServer ( new InetSocketAddress ( nettyServerPort) ) ;
Future< Channel> future = executorService. submit ( nettyServer) ;
Channel channel = future. get ( ) ;
if ( null == channel) {
throw new RuntimeException ( "netty server-s open error channel is null" ) ;
}
while ( ! channel. isActive ( ) ) {
logger. info ( "啟動NettyServer服務,循環(huán)等待啟動..." ) ;
Thread. sleep ( 500 ) ;
}
logger. info ( "啟動NettyServer服務,完成:{}" , channel. localAddress ( ) ) ;
CacheUtil. serverInfoMap. put ( nettyServerPort, new ServerInfo ( "NettySocket" , NetUtil. getHost ( ) , nettyServerPort, new Date ( ) ) ) ;
//啟動NettyServer-websocket
logger. info ( "啟動NettyWsServer服務,啟動端口:{}" , nettyWsServerPort) ;
WsNettyServer wsNettyServer = new WsNettyServer ( new InetSocketAddress ( nettyWsServerPort) ) ;
Future< Channel> wsFuture = executorService. submit ( wsNettyServer) ;
Channel wsChannel = wsFuture. get ( ) ;
if ( null == wsChannel) {
throw new RuntimeException ( "netty server-ws open error channel is null" ) ;
}
while ( ! wsChannel. isActive ( ) ) {
logger. info ( "啟動NettyWsServer服務,循環(huán)等待啟動..." ) ;
Thread. sleep ( 500 ) ;
}
logger. info ( "啟動NettyWsServer服務,完成:{}" , wsChannel. localAddress ( ) ) ;
CacheUtil. serverInfoMap. put ( nettyServerPort, new ServerInfo ( "NettyWsSocket" , NetUtil. getHost ( ) , nettyServerPort, new Date ( ) ) ) ;
}
}
webapp/arkWs/js/ws.js & 鏈接websocket服務端
socket = new WebSocket ( "ws://localhost:7398/websocket" ) ;
socket. onmessage = function ( event) {
var msg = JSON. parse ( event. data) ;
console. info ( msg) ;
$( "#msgBox" ) . val ( $( "#msgBox" ) . val ( ) + event. data + "\r\n" ) ;
} ;
案例測試
分別啟動如下內容;
Application.java,Plugins/spring-boot/spring-boot:run ApiTest.java,右鍵啟動模擬下位機 打開服務端鏈接;http://localhost:8080/ http://localhost:8080/arkWs/arkWsControlCenter.html
發(fā)送模擬信息,觀察執(zhí)行結果;
2019 - 12 - 01 15 : 11 : 49.965 INFO 7620 -- - [ nio- 8080 - exec- 1 ] o. a. c. c. C. [ Tomcat] . [ localhost] . [ / ] : Initializing Spring FrameworkServlet 'dispatcherServlet'
2019 - 12 - 01 15 : 11 : 49.965 INFO 7620 -- - [ nio- 8080 - exec- 1 ] o. s. web. servlet. DispatcherServlet : FrameworkServlet 'dispatcherServlet' : initialization started
2019 - 12 - 01 15 : 11 : 49.980 INFO 7620 -- - [ nio- 8080 - exec- 1 ] o. s. web. servlet. DispatcherServlet : FrameworkServlet 'dispatcherServlet' : initialization completed in 15 ms
2019 - 12 - 01 15 : 11 : 51.157 INFO 7620 -- - [ nio- 8080 - exec- 3 ] o. itstack. demo. ark. web. NettyController : 查詢設備鏈接列表。[ { "channelId" : "281d1279" , "connectTime" : 1575184302964 , "ip" : "127.0.0.1" , "number" : "74de0967-c0b4-4426-a9d1-183feaff2acf" , "port" : 3972 } ]
2019 - 12 - 01 15 : 11 : 51.162 INFO 7620 -- - [ io- 8080 - exec- 10 ] o. itstack. demo. ark. web. NettyController : 查詢服務端列表。[ { "ip" : "10.13.70.50" , "openDate" : 1575184290501 , "port" : 7397 , "typeInfo" : "NettyWsSocket" } ]
2019 - 12 - 01 15 : 11 : 58.476 INFO 7620 -- - [ ntLoopGroup- 7 - 1 ] o. i. d. a. s. websocket. MyWsServerHandler : 鏈接報告開始
2019 - 12 - 01 15 : 11 : 58.476 INFO 7620 -- - [ ntLoopGroup- 7 - 1 ] o. i. d. a. s. websocket. MyWsServerHandler : 鏈接報告信息:有一客戶端鏈接到本服務端
2019 - 12 - 01 15 : 11 : 58.476 INFO 7620 -- - [ ntLoopGroup- 7 - 1 ] o. i. d. a. s. websocket. MyWsServerHandler : 鏈接報告IP: 0 : 0 : 0 : 0 : 0 : 0 : 0 : 1
2019 - 12 - 01 15 : 11 : 58.476 INFO 7620 -- - [ ntLoopGroup- 7 - 1 ] o. i. d. a. s. websocket. MyWsServerHandler : 鏈接報告Port: 7398
2019 - 12 - 01 15 : 11 : 58.476 INFO 7620 -- - [ ntLoopGroup- 7 - 1 ] o. i. d. a. s. websocket. MyWsServerHandler : 鏈接報告完畢
服務端收到:{ "channelId" : "281d1279" , "msgType" : 2 , "msgObj" : { "stateType" : "1" } }
2019 - 12 - 01 15 : 12 : 05 接收到消息內容:{ "msgObj" : { "stateMsg" : "溫度信息:91.31334894176383°C" , "stateType" : 1 , "channelId" : "93c1120a" } , "msgType" : 3 , "channelId" : "93c1120a" }
服務端收到:{ "channelId" : "281d1279" , "msgType" : 2 , "msgObj" : { "stateType" : "1" } }
2019 - 12 - 01 15 : 12 : 05 接收到消息內容:{ "msgObj" : { "stateMsg" : "溫度信息:44.83794772946285°C" , "stateType" : 1 , "channelId" : "93c1120a" } , "msgType" : 3 , "channelId" : "93c1120a" }
綜上總結
在使用springboot與netty結合,開發(fā)一個簡便的服務端還是很方便的,而且在集合一些springcloud的服務,可以使項目工程更加完善。 可以嘗試做一些設備控制服務,在我們不在家的時候也可以通過一個h5鏈接控制家里的設備,比如快到家將熱水器打開。 本案例還偏向于模擬,在實際開發(fā)過程中還是會出現很多業(yè)務問題需要解決,尤其是服務端與下位機的通信,需要編寫編碼器與解碼器。