若谷学院
互联网公司技术架构分享

NSQ高性能分布式实时消息队列

NSQ介绍

NSQ是Go编写的实时的分布式消息队列系统,用来处理大规模消息,一天可以处理数十亿的消息。NSQ 具有分布式和去中心化拓扑结构,无单点故障(SPOF)、故障容错(Fault Tolerant)、高可用性(HA)以及能够保证消息的可靠传递的特征,在bitly, path, strip, digg等产品中用到。
 

启动注册、查找服务

 
nsqlookupd主要负责服务发现 负责nsqd的心跳、状态监测,给客户端、nsqadmin提供nsqd地址与状态。
$nsqlookupd
nsqlookupd监听两个端口,一个 http 客户端的端口(默认4161),一个nsqd用作广播的TCP端口(4160)。
 
-broadcast-address string
    address of this lookupd node, (default to the OS hostname) (default “PROSNAKES.local“)
-http-address string
    <addr>:<port> to listen on for HTTP clients (default “0.0.0.0:4161”)
-tcp-address string
    <addr>:<port> to listen on for TCP clients (default “0.0.0.0:4160”)
 

启动消息服务进程

mkdir -p /data/nsqdata
nsqd  –lookupd-tcp-address=127.0.0.1:4160 –data-path=/data/nsqdata
 
nsqd–lookupd-tcp-address=127.0.0.1:4160–broadcast-address=127.0.0.1-tcp-address=127.0.0.1:4154-http-address=”0.0.0.0:4155″–data-path=/data/nsqdata 

nsqd -data-path=/data/nsq –lookupd-tcp-address=127.0.0.1:4160

nsqd 负责接收消息,存储队列和将消息发送给客户端,nsqd 可以多机部署,当你使用客户端向一个topic发送消息时,可以配置多个nsqd地址,消息会随机的分配到各个nsqd上,nsqd优先把消息存储到内存channel中,当内存channel满了之后,则把消息写到磁盘文件中。他监听了两个tcp端口,一个用来服务客户端,一个用来提供http的接口 ,nsqd 启动时置顶下nsqlookupd地址即可,也可以指定端口 与数据目录。

启动Web管理台(可选项)

 
nsqadmin是一个web管理界面 启动方式如下:
 
nsqadmin–lookupd-http-address=127.0.0.1:4161
 
可以通过web访问管理台:  http://127.0.0.1:4171/
 

命令行发送测试消息/消费消息

curl -d ‘hello world 1’ ‘http://127.0.0.1:4151/pub?topic=test’
curl -d ‘hello world 2’ ‘http://127.0.0.1:4151/pub?topic=test’
curl -d ‘hello world 3’ ‘http://127.0.0.1:4151/pub?topic=test’
 

如何部署分布式消息队列

启动2个nsqlookupd 主机实例, 5个 nsqd 节点,以及一个运行在9000端口上的nsqadmin 管理台。
#!/usr/bin/env bash
NSQLOOKUPD_LOG=/tmp/log/nsqlookupd.log
NSQD_LOG=/tmp/log/nsqd.log
NSQADMIN_LOG=/tmp/log/nsqadmin.log
for PROCESS in nsqlookupd nsqd nsqadmin
do
    pkill “$PROCESS”
done
 
for NODE in {1..2};
do
    /usr/local/bin/nsqlookupd –broadcast-address=”nsqlookupd-0$NODE” \
     –tcp-address=”127.0.0.1:900$NODE” –http-address=”127.0.0.1:901$NODE” >> “$NSQLOOKUPD_LOG” 2>&1 &
done
for NODE in {1..5};
do
    /usr/local/bin/nsqd –broadcast-address=”nsqd-0$NODE” –tcp-address=”127.0.0.1:903$NODE” \
        –http-address=”127.0.0.1:904$NODE” –lookupd-tcp-address=”127.0.0.1:9001″ \
        –lookupd-tcp-address=”127.0.0.1:9002″ >> “$NSQD_LOG” 2>&1 &
done
 
/usr/local/bin/nsqadmin –http-address=”0.0.0.0:9000″ \
    –lookupd-http-address=”127.0.0.1:9011″ \
    –lookupd-http-address=”127.0.0.1:9012″ >> “$NSQADMIN_LOG” 2>&1 &
 
 

Python客户端如何使用

Writer通过nsqd_tcp_addresses来制定多个nsqd的地址来发送消息,
Reader通过lookupd_http_addresses或者nsqd_tcp_addresses来监听消息。
 

管理台系统截图

topic详情页界面如下
channel详情页如下 ,empty可以清空当前channel的信息,delete删除当前channel, pause是暂停消息消费。
下方也有几个比较重要的参数 depth当前的积压量,in-flight代表已经投递还未消费掉的消息,deferred是未消费的定时(延时)消息数,ready count比较重要,go的客户端是通过设置max-in-flight 除以客户端连接数得到的,代表一次推给客户端多少条消息,或者客户端准备一次性接受多少条消息,谨慎设置其值,因为可能造成服务器压力,如果消费能力比较弱,ready count建议设置的低一点比如3。
 
Topic 和  Channel 
nsq的topic 可以设置多个channel,因为有可能有多个业务方需要定值topic的消息,这样互不影响,
当然一个消息会发送到topic下的所有channel,然后会分配到不同客户端的连接上,如下图。
 

Go client pub

 
package main
 
import(
)
 
varproducer *nsq.Producer
 
funcmain(){
nsqd :=”127.0.0.1:4150″
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
producer.Publish(“test”, []byte(“nihao”))
iferr != nil {
panic(err)
}
}
 

Go client sub

 
package main
 
import (
“fmt”
“sync”
)
typeNSQHandlerstruct{
}
 
func (this*NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println(“receive”, msg.NSQDAddress,”message:”, string(msg.Body))
returnnil
}
 
func testNSQ() {
waiter := sync.WaitGroup{}
waiter.Add(1)
 
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=9
 
//建立多个连接
fori :=0; i<10; i++ {
consumer, err := nsq.NewConsumer(“test”,”struggle”, config)
ifnil!= err {
fmt.Println(“err”, err)
return
}
 
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD(“127.0.0.1:4150”)
ifnil!= err {
fmt.Println(“err”, err)
return
}
}
select{}
 
}()
 
waiter.Wait()
}
func main() {
testNSQ();
 
}
 
 
好烂呀没啥价值凑合看看还不错很精彩 (还没有人评分)
Loading...
本站文章来自互联网一线技术博客,若有侵权,请联系我们:若谷技术学院 » NSQ高性能分布式实时消息队列
关注若谷技术,获得个性化即时架构文章推送

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

全球互联网技术架构,前沿架构参考

联系我们博客/网站内容提交