日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

使用MQTT協(xié)議+Redis緩存實(shí)現(xiàn)APP登錄頂號(hào)功能 | jwcqc個(gè)人筆記 | IT癮

 WindySky 2020-01-15

大家在玩游戲或使用QQ等IM工具時(shí),想必都見(jiàn)到過(guò)彈出被頂號(hào)或者是您的賬號(hào)于xx時(shí)間在另一設(shè)備登錄,您已被迫下線(xiàn)這樣的提示,然后不得不點(diǎn)退出按鈕退出整個(gè)應(yīng)用,或者點(diǎn)擊重新登錄把另一設(shè)備再頂下來(lái)。最近我參與的一個(gè)項(xiàng)目,正好就有這樣的需求,而且,由于我們項(xiàng)目中已經(jīng)使用到了MQTT協(xié)議進(jìn)行消息推送,實(shí)現(xiàn)遠(yuǎn)程控制,后臺(tái)用Java實(shí)現(xiàn),緩存使用了Redis,因此,正好可以利用現(xiàn)有的技術(shù)來(lái)實(shí)現(xiàn)這個(gè)功能。

實(shí)現(xiàn)的思路大概如下:首先,登錄時(shí)不僅需要賬號(hào)密碼,還可以將設(shè)備關(guān)鍵信息記錄下來(lái),如設(shè)備型號(hào)(Android|iPhone)、登錄時(shí)間、登錄IP、設(shè)備唯一標(biāo)識(shí)(UUID)等,這就需要前臺(tái)登錄功能與后臺(tái)接口一起配合實(shí)現(xiàn),并在后臺(tái)把userId已經(jīng)相關(guān)設(shè)備信息保存到Redis中,當(dāng)在另外一臺(tái)新設(shè)備上登錄同一賬號(hào)時(shí),將userId對(duì)應(yīng)的相關(guān)登錄設(shè)備信息直接進(jìn)行覆蓋,此時(shí)如果舊設(shè)備進(jìn)行重連時(shí),因?yàn)樵搖uid已經(jīng)不是當(dāng)前服務(wù)端的uuid了,所以直接返回下線(xiàn)通知,為了進(jìn)行友好提示,也可以將新登錄設(shè)備的主要信息(設(shè)備型號(hào)、登錄時(shí)間)進(jìn)行返回。

下面簡(jiǎn)單介紹一下實(shí)現(xiàn)的方法。

軟件安裝

Linux下mqtt服務(wù)器Apollo的安裝

下載

選擇一個(gè)目錄用來(lái)下載保存
下載地址: http://activemq./apollo/download.html
官網(wǎng)教程: http://activemq./apollo/documentation/getting-started.html
目前版本是 apache-apollo-1.7.1-unix-distro.tar .gz

創(chuàng)建broker

一個(gè)broker實(shí)例是一個(gè)文件夾,其中包含所有的配置文件及運(yùn)行時(shí)的數(shù)據(jù),不如日志和消息數(shù)
據(jù)。Apollo強(qiáng)烈建議不要把實(shí)例同安裝文件放在一起。在linux操作系統(tǒng)下面,建議將實(shí)例建在
/var/lib/目錄下面

首先解壓:tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
選擇一個(gè)目錄存放解壓后的文件,我放在了/server/下,解壓后的文件夾為 apache-apollo-1.7.1

開(kāi)始創(chuàng)建broker實(shí)例:

             
1
2
             
cd /var/lib
sudo /server/apache-apollo-1.7.1/bin/apollo create mybroker

下圖是Apache官方給的一些建議截圖:

啟動(dòng)broker實(shí)例

啟動(dòng)broker實(shí)例可以有兩種方法,如下圖中所示:

可以執(zhí)行

             
1
             
/var/lib/mybroker/bin/apollo-broker run

或者

             
1
2
             
sudo ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start

使其作為一個(gè)service進(jìn)行啟動(dòng),以后系統(tǒng)重啟后只需運(yùn)行/etc/init.d/apollo-broker-service start

訪(fǎng)問(wèn)Apollo的監(jiān)控頁(yè)面: http://localhost:61680/默認(rèn)用戶(hù)名、密碼為為 admin/password

Linux下Redis的安裝與配置

Redis的安裝非常簡(jiǎn)單,已經(jīng)有現(xiàn)成的Makefile文件,解壓后在src目錄下使用make命令完成編譯即可,redis-benchmark、redis-cli、redis-server、redis-stat 這四個(gè)文件,加上一個(gè) redis.conf 就構(gòu)成了整個(gè)redis的最終可用包。它們的作用如下:

redis-server:Redis服務(wù)器的daemon啟動(dòng)程序
redis-cli:Redis命令行操作工具。當(dāng)然,你也可以用telnet根據(jù)其純文本協(xié)議來(lái)操作
redis-benchmark:Redis性能測(cè)試工具,測(cè)試Redis在你的系統(tǒng)及你的配置下的讀寫(xiě)性能
redis-stat:Redis狀態(tài)檢測(cè)工具,可以檢測(cè)Redis當(dāng)前狀態(tài)參數(shù)及延遲狀況

下載安裝:

            
1
2
3
4
5
            
wget http://download./redis-stable.tar.gz
tar xzf redis-stable.tar.gz
cd redis-stable
make
make install

啟動(dòng)

編譯后生成的可執(zhí)行文件:
redis-server 是Redis的服務(wù)器,啟動(dòng)Redis即運(yùn)行redis-server
redis-cli 是Redis自帶的Redis命令行客戶(hù)端,學(xué)習(xí)Redis的重要工具

./redis-server & 不指定配置直接運(yùn)行,這時(shí)采用默認(rèn)配置,無(wú)密碼
./redis-server –port 6379 僅指定端口
./redis-server ../redis.conf 指定配置文件

最好還是使用最后一種方式進(jìn)行啟動(dòng)

如果只是在本機(jī)連接,那麼使用默認(rèn)配置文件不會(huì)有什么問(wèn)題,但是,如果是連接遠(yuǎn)程服務(wù)器端的Redis,則需要對(duì)配置文件進(jìn)行一些修改:

             
1
2
3
             
requirepass foobared
#bind 127.0.0.1 ##注釋掉
protected-mode no ##從yes改成no

至于如何將Redis設(shè)置后臺(tái)服務(wù),開(kāi)機(jī)自啟等,這里就不介紹了,可以去搜索一下。

功能實(shí)現(xiàn)

后臺(tái)接口

Redis客戶(hù)端使用的是Jedis,如下代碼是一個(gè)對(duì)Jedis簡(jiǎn)單的封裝

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
            
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;
import java.util.ResourceBundle;
/**
* Jedis Cache 工具類(lèi)
*/
public class JedisUtils {
private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);
private static JedisPool jedisPool;
/**
* 讀取相關(guān)的配置
*/
static {
ResourceBundle resourceBundle = ResourceBundle.getBundle("redis");
int maxActive = Integer.parseInt(resourceBundle.getString("redis.pool.maxActive"));
int maxIdle = Integer.parseInt(resourceBundle.getString("redis.pool.maxIdle"));
int maxWait = Integer.parseInt(resourceBundle.getString("redis.pool.maxWait"));
int port = Integer.parseInt(resourceBundle.getString("redis.port"));
int timeout = Integer.parseInt(resourceBundle.getString("redis.timeout"));
String ip = resourceBundle.getString("redis.ip");
String auth = resourceBundle.getString("redis.auth");
JedisPoolConfig config = new JedisPoolConfig();
//設(shè)置最大連接數(shù)
config.setMaxTotal(maxActive);
//設(shè)置最大空閑數(shù)
config.setMaxIdle(maxIdle);
//設(shè)置超時(shí)時(shí)間
config.setMaxWaitMillis(maxWait);
//初始化連接池
jedisPool = new JedisPool(config, ip, port, timeout, auth);
}
/**
* 獲取緩存
* @param key 鍵
* @return 值
*/
public static String get(String key) {
String value = null;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)) {
value = jedis.get(key);
value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null;
logger.debug("get {} = {}", key, value);
}
} catch (Exception e) {
logger.warn("get {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return value;
}
/**
* 設(shè)置緩存
* @param key 鍵
* @param value 值
* @param cacheSeconds 超時(shí)時(shí)間,0為不超時(shí)
* @return
*/
public static String set(String key, String value, int cacheSeconds) {
String result = null;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.set(key, value);
if (cacheSeconds != 0) {
jedis.expire(key, cacheSeconds);
}
logger.debug("set {} = {}", key, value);
} catch (Exception e) {
logger.warn("set {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 刪除緩存
* @param key 鍵
* @return
*/
public static long del(String key) {
long result = 0;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)){
result = jedis.del(key);
logger.debug("del {}", key);
}else{
logger.debug("del {} not exists", key);
}
} catch (Exception e) {
logger.warn("del {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 緩存是否存在
* @param key 鍵
* @return
*/
public static boolean exists(String key) {
boolean result = false;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.exists(key);
logger.debug("exists {}", key);
} catch (Exception e) {
logger.warn("exists {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 獲取資源
* @return
* @throws JedisException
*/
public static Jedis getResource() throws JedisException {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
} catch (JedisException e) {
logger.warn("getResource.", e);
returnBrokenResource(jedis);
throw e;
}
return jedis;
}
/**
* 歸還資源
* @param jedis
*/
public static void returnBrokenResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnBrokenResource(jedis);
}
}
/**
* 釋放資源
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnResource(jedis);
}
}
}

然后在登錄接口中,當(dāng)判斷完登錄的用戶(hù)名密碼正確后,可以參考如下代碼的思路去實(shí)現(xiàn),首先判斷Redis中是否已保存有這個(gè)userId對(duì)用的值,有的話(huà)說(shuō)明當(dāng)前已經(jīng)有登錄,需要被替換到,同時(shí)使用MQTT發(fā)送消息給客戶(hù)端使其退出,Redis中不存在則只需保存userId和uuidStr即可

            
1
2
3
4
5
6
7
8
9
10
11
12
            
String uuidStr = ""; //這個(gè)值從APP端傳過(guò)來(lái)
// 先判斷Redis中是否已經(jīng)有,有的話(huà)需要替換掉
if(JedisUtils.get(userId) != null && !JedisUtils .get(userId).equals(uuidStr)) {
MqttClient client = MyMqttClient.getInstance();
String topic = "TOPIC/LOGIN_LOGOUT";
client.subscribe(topic, 1);
MyMqttClient.sendMessage("Log out", topic);
client.unsubscribe(topic);
}
JedisUtils.set(userId, uuidStr, 0);

至于MQTT協(xié)議的實(shí)現(xiàn),這里使用的是Paho,如果后臺(tái)項(xiàng)目是使用Maven構(gòu)建的話(huà),在pom.xml中加入如下幾行即可:

            
1
2
3
4
5
            
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>

然后對(duì)其進(jìn)行了一個(gè)簡(jiǎn)單的封裝

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
            
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MyMqttClient {
private MyMqttClient() {}
private static MqttClient mqttClientInstance = null;
private static MqttConnectOptions options;
//靜態(tài)工廠方法
public static synchronized MqttClient getInstance() {
try {
if (mqttClientInstance == null) {
mqttClientInstance = new MqttClient("tcp://125.216.242.151:61613",
MqttClient.generateClientId(), new MemoryPersistence());
options = new MqttConnectOptions();
//設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶(hù)端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
options.setCleanSession(true);
//設(shè)置連接的用戶(hù)名
options.setUserName("admin");
//設(shè)置連接的密碼
options.setPassword("password".toCharArray());
// 設(shè)置超時(shí)時(shí)間 單位為秒
options.setConnectionTimeout(10);
// 設(shè)置會(huì)話(huà)心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶(hù)端發(fā)送個(gè)消息判斷客戶(hù)端是否在線(xiàn),但這個(gè)方法并沒(méi)有重連的機(jī)制
options.setKeepAliveInterval(20);
mqttClientInstance.connect(options);
}
return mqttClientInstance;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void sendMessage(String content, String myTopic) {
MqttTopic topic = getInstance().getTopic(myTopic);
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(false);
message.setPayload(content.getBytes());
try {
MqttDeliveryToken token = topic.publish(message);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static MqttConnectOptions getOptions(){
return options;
}
}

app端

客戶(hù)端的做法思路也很簡(jiǎn)單,由于使用了MQTT,因此客戶(hù)端和服務(wù)器端其實(shí)已經(jīng)保持了一個(gè)長(zhǎng)連接,可以為客戶(hù)端寫(xiě)一個(gè)MQTTService,隨時(shí)監(jiān)聽(tīng)服務(wù)器推送過(guò)來(lái)的消息進(jìn)行處理

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
            
//為MTQQ client設(shè)置回調(diào)
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//連接丟失后,一般在這里面進(jìn)行重連
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后會(huì)執(zhí)行到這里
}
@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {
if(message.toString().equals("Log out")) {
handler.post(new Runnable() {
@Override
public void run() {
AlertDialog.Builder builder = new AlertDialog.Builder(getApplicationContext());
builder.setMessage("被頂號(hào)了");
builder.setNegativeButton("退出", new DialogInterface.OnClickListener() {
@Override
public void onClick(DialogInterface dialog, int which) {
// TODO 退出當(dāng)前賬號(hào),在這里簡(jiǎn)單粗暴的結(jié)束了應(yīng)用
stopSelf();
android.os.Process.killProcess(android.os.Process.myPid());
}
});
Dialog dialog = builder.create();
dialog.setCanceledOnTouchOutside(false);
dialog.getWindow().setType(WindowManager.LayoutParams.TYPE_SYSTEM_ALERT);
dialog.show();
}
});
}
}
});

總結(jié)

上述代碼可能在嚴(yán)謹(jǐn)性和可靠性上還會(huì)存在一些問(wèn)題,還需要經(jīng)過(guò)不斷的完善,但思路是很明確的。在這里尤其要安利一下MTQQ,現(xiàn)在越來(lái)越多的產(chǎn)品都是基于這個(gè)協(xié)議進(jìn)行開(kāi)發(fā),進(jìn)行消息推送等。它開(kāi)銷(xiāo)很小,支持各種流行編程語(yǔ)言,能夠適應(yīng)不穩(wěn)定的網(wǎng)絡(luò)傳輸需求,在未來(lái)幾年,相信MQTT的應(yīng)用會(huì)越來(lái)越廣。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多