NSQ Source Analyze

Written by with ♥ on in IT

编程技巧收集

判断某个错误是不是文件不存在错误

data, err := ioutil.ReadFile("./xxx")
fmt.Println(err) // open ./xxx: no such file or directory
fmt.Println(os.IsNotExist(err)) // true
fmt.Println(data) // []
fmt.Println(len(data)) // 0
fmt.Println(string(data) == "") // true
// 判断某个错误是不是文件不存在错误
os.IsNotExist(err)

Topic

messagePump

	// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
	for {
		select {
		case <-t.channelUpdateChan:
			continue
		case <-t.pauseChan:
			continue
		case <-t.exitChan:
			goto exit
		case <-t.startChan:
		}
		break
	}

Channel

指定的 topic 对于所有 channel 进行投递,一个 channel 可以被多个消费者消费,类似于 kafka 的 consumer_group,每个 consumer_group 拥有独立的 position,不同 group 之间可以消费同一份内容的多个副本。也就是说,消费者使用不同的 channel 可以重复消费一个消息。

但是 nsq 没有持久化机制,消息消费完了就会删除,没有 kafka 的重复消费机制。只要客户端发送 FIN 后,消息就会销毁。

创建 Channel

SUB 订阅 Channel 消息

PUB 发送消息到 Channel