- 浏览: 241576 次
- 性别:
- 来自: 南京
文章分类
最新评论
-
phplife:
写的很棒,对我很有帮助,thx!
Erlang OTP学习(1):gen_server -
longshaohang:
写的不错,不过最后markLimit的讨论太少,我的理解是bu ...
BufferedInputStream实现原理分析 -
lin_464025910:
Erlang 很牛逼的 开发语言, 最近才关注 性能确实 不 ...
Erlang 文件处理(读书笔记) -
jaychang:
写的很不错MARK下
BufferedInputStream实现原理分析 -
vavi:
while (count == items.length) ...
Java锁相关总结
上一节(http://diaocow.iteye.com/blog/1734253)我们对TinyMQ进行了概述,这一节我们将着重看一下作者是如何实现消息的发布/订阅
在看源代码之前我们需要了解一些module以及函数的作用:
看完上面的函数说明,我们看下tinymq的源代码目录,它主要有以下几个文件:
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.erl (这个是我后来自己加进去的)
tinymq_app.erl 是一个Application Callback Module,用来启动整个应用:
tinymq_sup:start_link()又做了些什么?
至此顶级supervisor就创建好了,它初始化了监控树(创建了一个两个child process:一个是work process,另一个是supervisor process),现在我们就来先看看这两个child process:
1.worker process
下面我们看一下另一个child process:
2.supervisor process
现在监控树已经初始化完毕,tinymq服务也已经启动,我们来看看tinymq是如何处理客户端的订阅请求(subscribe request)
接下来我们看看Channel服务是如何处理这个订阅请求的
当Channel服务接收到订阅请求后:
至此tinymq处理了客户端的订阅请求,我们可以回顾上一节列出的订阅端流程图,加深理解。
现在我们来看看tinymq如何响应客户端的发消息请求(publish request).
记住:客户端所有请求都是发送给tinymq server,然后由它派发给每一个Channel服务处理
当Channel服务接收到发消息请求后:
发现没,发消息代码也是比较容易的,同样,我们可以回顾上一节列出的发送图逻辑图来加深理解
至此,我们已经查看了tinymq发布/订阅的主要实现代码,现在我们就来写一段测试代码:
订阅者:
start函数中的Num表示要创建几个订阅者,代码比较简单不在赘述...
发布者
发布者每个5秒向Channel里发送消息
最后,在看一下执行效果图
关于TinyMQ的源码学习就到这里,但是里面还有一些细节值得我去挖掘(譬如:monitorSubscriber,优先级队列实现算法),以及新增些改进功能(譬如服务状态的持久化,以及添加unsubscribe请求处理等等),还有自己的erlang基础需要再补补,官方文档再看看
ps:iteye的erlang代码高亮实在是...,辛苦各位看官了....
不是,这个同样是作者写的,只不过作者没有放到同一个project中,而是通过依赖拉下来,我当时rebar使用有问题,所以我就去作者网站直接下载了下来放到源代码目录中,
你可以看下http://diaocow.iteye.com/blog/1734253 这篇文章我最后说发现的问题,以及别人提供的解决方案
在看源代码之前我们需要了解一些module以及函数的作用:
dict:new() 创建一个 Key-Value Dictionary(可以理解为就是一个map) dict:find(Key, Dict) -> {ok, Value} | error 从Dict1查询Key对应的Value,若存在返回 {ok, Value} 否则返回error dict:store(Key, Value, Dict1) -> Dict2 往Dict1中插入一条Key-Value,并且返回新的Dict2(若Key已经存在,则只是更新下相应的Value) ----------------------------------------------------------------------------- lists:foldl(Fun, Acc0, List) -> Acc1 参数Fun是一个函数,它有两个参数:第一个参数是List中的元素(从左->右依次取),第二个参数是Fun函数执行完后的返回值(第一次调用返回值取Acc0),还是来看一个例子: > lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]). 15 执行过程: 首先,Fun第一次执行时,X的值取列表List的第一个元素1,Sum取0, Fun第二次执行时,X的值取列表List的第二个元素2,Sum取Fun第一次的返回值1 依次轮推,直到List中每个元素执行完,foldl返回最后一次函数调用的结果。 lists:foldr(Fun, Acc0, List) -> Acc1 同lists:foldl,唯一区别就是从List中去元素时从右->左 ----------------------------------------------------------------------------- gb_trees module gb_trees实现了Prof. Arne Anderssons General Balanced Trees,它是是一种比AVL效率更高的平衡树.作者基于gb_trees写了一个tiny_pq.erl(优先级队列)作为消息的存储结构;该优先级队列中的每一个节点代表一条消息,消息的时间戳代表这个节点的权值或者说是priority
看完上面的函数说明,我们看下tinymq的源代码目录,它主要有以下几个文件:
tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.erl (这个是我后来自己加进去的)
tinymq_app.erl 是一个Application Callback Module,用来启动整个应用:
start(_StartType, _StartArgs) -> tinymq_sup:start_link().
tinymq_sup:start_link()又做了些什么?
%% 创建supervisor(tinymq_supervisor是一个顶级监控者(可以参看上一节的监控树图)) start_link() -> supervisor:start_link(?MODULE, []). %% tinymq_supervisor监控两个child process: %% 一个是worker proces 用来控制执行mq的相关功能 %% 另一个是supervisor process 用来监控所有Channel服务 init([]) -> %% child process属性(字段具体含义可以查阅erlang文档) MqWorker = {mq_controller, {tinymq_controller, start_link, []}, permanent, 2000, worker, [tinymq_controller]}, ChannelSup = {tinymq_channel_sup, {tinymq_channel_sup, start_link, []}, permanent, 2000, supervisor, [tinymq_channel_sup]}, Children = [MqWorker, ChannelSup], %% 重启策略是:one_for_one,重启极限是10秒中内重启了10次 RestartStrategy = {one_for_one, 10, 10}, {ok, {RestartStrategy, Children}}.
至此顶级supervisor就创建好了,它初始化了监控树(创建了一个两个child process:一个是work process,另一个是supervisor process),现在我们就来先看看这两个child process:
1.worker process
tinymq_controller.erl文件 -record(state, {dict, max_age}). %% 采用gen_server模式,注册了一个名为tinymq的服务 start_link() -> gen_server:start_link({local, tinymq}, ?MODULE, [], []). init([]) -> %% 获取环境变量:消息超时时间(Application Resource File配置的是60s) {ok, MaxAgeSeconds} = application:get_env(max_age), %% 初始化tinymq服务状态(这个状态包含两部分,一部分是Channel和ChannelPid关系的一组列表,另一个是消息默认超时时间) {ok, #state{dict = dict:new(), max_age = MaxAgeSeconds}}.
下面我们看一下另一个child process:
2.supervisor process
tinymq_channel_sup.erl文件 %% 创建一个supervisor(tinymq_channel_supervisor) start_link() -> supervisor:start_link({local, tinymq_channel_sup}, ?MODULE, []). %% 该supervior监控的child process 是动态添加的 init(_StartArgs) -> %% child process 属性 {ok, {{simple_one_for_one, 0, 10}, [ {mq_channel_controller, {tinymq_channel_controller, start_link, []}, temporary, 2000, worker, [tinymq_channel_controller]} ]}}.
现在监控树已经初始化完毕,tinymq服务也已经启动,我们来看看tinymq是如何处理客户端的订阅请求(subscribe request)
tinymq_controller文件: %% 接收到客户端发出的订阅请求 handle_call({subscribe, Channel, Timestamp, Subscriber}, From, State) -> %% 获取该Channel服务对应的的pid(若是一个新的Channel则为其创建一个新的Process) {ChannelPid, NewState} = find_or_create_channel(Channel, State), %% 将客户端的订阅请求交给这个Channle服务处理 gen_server:cast(ChannelPid, {From, subscribe, Timestamp, Subscriber}), {noreply, NewState}; find_or_create_channel(Channel, #state{dict = Chan2Pid, max_age = MaxAge} = State) -> %% 检查该Channel名对应的Channel服务是否存在,若存在返回相应的Pid case dict:find(Channel, Chan2Pid) of {ok, Pid} -> {Pid, State}; _ -> %% 若不存在则创建该Channel服务并添加到 tinymq_channel_sup监控树下 {ok, ChannelPid} = supervisor:start_child(tinymq_channel_sup, [MaxAge, tinymq_channel_sup, Channel]), %% 维护新的Channel和ChannelPid关系列表(add) {ChannelPid, State#state{ dict = dict:store(Channel, ChannelPid, Chan2Pid) }} end.
接下来我们看看Channel服务是如何处理这个订阅请求的
tinymq_channel_controller文件 %% 启动一个Channel服务 start_link(MaxAge, ChannelSup, Channel) -> gen_server:start_link(?MODULE, [MaxAge, ChannelSup, Channel], []). %% 当我们新创建一个Channel服务的时候,它会初始化一些状态,这个状态主要包含以下几部分: %% ---Channel名(其实可以理解为topic名) %% ---消息队列(实际上是一个平衡树,每一个节点的权值就是消息的放入时间) %% ---订阅者列表(通过这个订阅者列表可以将消息派送给每一个订阅者) %% ---消息超时时间(当消息超时后,将会从消息队列中移除) %% ---上一次清除超时消息的时间(默认情况下清除超时消息间隔不小于1秒) %% ---上一次派发消息时间 init([MaxAge, ChannelSup, Channel]) -> {ok, #state{ max_age = MaxAge, supervisor = ChannelSup, channel = Channel, messages = gb_trees:empty(), last_pull = now_to_micro_seconds(erlang:now()), last_purge = now_to_micro_seconds(erlang:now()) }, MaxAge * 1000}.
当Channel服务接收到订阅请求后:
tinymq_channel_controller文件 %% 处理订阅请求 handle_cast({From, subscribe, 'now', Subscriber}, State) -> %% 更新订阅者列表(若是一个新的订阅者则添加到订阅者列表,否则直接返回) NewSubscribers = add_subscriber(Subscriber, State#state.subscribers), %% 告诉客户端订阅成功了(至此客户端就可以接受发送到这个Channle(topic)中的消息了) gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}), %% 删除消息队列中的超时消息,返回新的Channel state %% 文档原文如下:Purging old messages occurs after any channel activity (but no more than once per second) {noreply, purge_old_messages(State#state{ subscribers = NewSubscribers })}; %% 添加订阅者 add_subscriber(NewSubscriber, Subscribers) -> case lists:keymember(NewSubscriber, 2, Subscribers) of %% 若在订阅者列表中存在,则什么也不做 true -> Subscribers; %% 否则添加到订阅者列表中{monitorSubscriberPid, SubscriberPid} false -> [{erlang:monitor(process, NewSubscriber), NewSubscriber} | Subscribers] end. %% 清理消息队列中已经超时的消息(操作间隔不小于1秒) purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), %% now_to_micro_seconds,这个方法就是把系统时间转换成秒 LastPurge = State#state.last_purge, %% 上一次清理消息队列时间 Duration = seconds_to_micro_seconds(1), %% 操作间隔(默认是1秒) if Now - LastPurge > Duration -> State#state{ %% @tiny_pq:prune_old(Tree, Priority) -> Tree1 Remove nodes with priority less than or equal to `Priority' %% 这里的Tree就是消息队列messages, Priority就是Time(当前时间-消息超时时间,若某个消息的Priority小于这个,说明已经超时了) messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), %% 更新上一次清理消息队列时间 last_purge = Now }; true -> State end.
至此tinymq处理了客户端的订阅请求,我们可以回顾上一节列出的订阅端流程图,加深理解。
现在我们来看看tinymq如何响应客户端的发消息请求(publish request).
记住:客户端所有请求都是发送给tinymq server,然后由它派发给每一个Channel服务处理
tinymq_controller文件: %% 接受到客户端的发布消息请求 handle_call({push, Channel, Message}, From, State) -> %% 获取该Channel服务对应的的pid,同之前分析 {ChannelPid, NewState} = find_or_create_channel(Channel, State), %% 将该消息转发给响应的Channel process处理 gen_server:cast(ChannelPid, {From, push, Message}), {noreply, NewState};
当Channel服务接收到发消息请求后:
tinymq_channel_controller文件 (代码我做了一点修改,作者原来只要订阅者成功接受到一条消息,就自动取消订阅,我改成可以持续订阅): %% 处理发布消息请求 handle_cast({From, push, Message}, State) -> Now = now_to_micro_seconds(erlang:now()), %% 向每个订阅者发送这条消息 LastPull = lists:foldr(fun({Ref, Sub}, _) -> Sub ! {self(), Now, [Message]}, %% erlang:demonitor(Ref), 取消monitor Now %% 函数调用的返回值:上一次派发消息时间 end, State#state.last_pull, State#state.subscribers), %% State#state.subscribers是一个List,维护着该Channel的所有订阅者 %% 告诉发布者(publisher)消息发送成功 gen_server:reply(From, {ok, Now}), %% 删除消息队列中超时的消息 State2 = purge_old_messages(State), %% 将该条消息插入消息队列(priorit,message, tree) NewMessages = tiny_pq:insert_value(Now, Message, State2#state.messages), %% 作者原来设计:Subscribers are removed from the channel as soon as the first message is delivered 他会:subscribers = [] {noreply, State2#state{messages = NewMessages, last_pull = LastPull}, State#state.max_age * 1000};
发现没,发消息代码也是比较容易的,同样,我们可以回顾上一节列出的发送图逻辑图来加深理解
至此,我们已经查看了tinymq发布/订阅的主要实现代码,现在我们就来写一段测试代码:
订阅者:
-module(subscriber). -export([start/1]). start(Num) -> create_subscriber(Num). create_subscriber(0) -> ok; create_subscriber(Num) -> spawn(fun subscribe/0), create_subscriber(Num - 1). subscribe() -> tinymq:subscribe("hello_channel", now, self()), loop(). loop() -> receive {_From, _Timestamp, Messages} -> io:format("Pid: ~p received messages: ~p~n", [self(), Messages]) end, loop().
start函数中的Num表示要创建几个订阅者,代码比较简单不在赘述...
发布者
-module(publisher). -export([start/0]). start() -> spawn(fun publish/0). publish() -> Messages = {hello_erlang, erlang:now()}, io:format("Pid: ~p send messages: ~p~n", [self(), Messages]), tinymq:push("hello_channel", Messages), receive after 5000 -> true end, publish().
发布者每个5秒向Channel里发送消息
最后,在看一下执行效果图
关于TinyMQ的源码学习就到这里,但是里面还有一些细节值得我去挖掘(譬如:monitorSubscriber,优先级队列实现算法),以及新增些改进功能(譬如服务状态的持久化,以及添加unsubscribe请求处理等等),还有自己的erlang基础需要再补补,官方文档再看看
ps:iteye的erlang代码高亮实在是...,辛苦各位看官了....
评论
3 楼
standalone
2012-12-28
原来是这样。。。受教了!谢谢!
2 楼
DiaoCow
2012-12-28
standalone 写道
分析的很好。
问一下,我看到
tinymq__channel_controller.erl里面的函数
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
问一下,我看到
tinymq__channel_controller.erl里面的函数
purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), LastPurge = State#state.last_purge, Duration = seconds_to_micro_seconds(1), if Now - LastPurge > Duration -> State#state{ messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), last_purge = Now }; true -> State end.
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
不是,这个同样是作者写的,只不过作者没有放到同一个project中,而是通过依赖拉下来,我当时rebar使用有问题,所以我就去作者网站直接下载了下来放到源代码目录中,
你可以看下http://diaocow.iteye.com/blog/1734253 这篇文章我最后说发现的问题,以及别人提供的解决方案
1 楼
standalone
2012-12-28
分析的很好。
问一下,我看到
tinymq__channel_controller.erl里面的函数
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
问一下,我看到
tinymq__channel_controller.erl里面的函数
purge_old_messages(State) -> Now = now_to_micro_seconds(erlang:now()), LastPurge = State#state.last_purge, Duration = seconds_to_micro_seconds(1), if Now - LastPurge > Duration -> State#state{ messages = tiny_pq:prune_old(State#state.messages, Now - seconds_to_micro_seconds(State#state.max_age)), last_purge = Now }; true -> State end.
用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?
发表评论
-
Erlang rebar源码学习(二)
2013-01-28 11:27 2051之前说了rebar编译的核心部分(rebar_base_com ... -
Erlang rebar源码学习(一)
2013-01-27 17:34 5481最近看霸爷的微博(http ... -
Eralng ets学习总结
2013-01-13 13:57 10235ets是什么? ets是Erlang Te ... -
Erlang 文件处理(读书笔记)
2013-01-08 12:57 12281今天看了下erlang file章节,内容感觉比较散,现在做个 ... -
Erlang OTP学习(3):supervisor
2013-01-05 21:11 10496今天细致的看了下supervisor,现在做个总结: 其中, ... -
Erlang 并发错误处理
2013-01-04 19:51 1701这一章有三个关键的概 ... -
Erlang receive代码块
2012-12-31 11:28 7121receive代码块是如何执行的呢? process会尝 ... -
Erlang OTP学习(2):gen_event
2012-12-30 17:26 4495说完了gen_server,今天我们来看看gen_event。 ... -
Erlang OTP学习(1):gen_server
2012-12-28 19:20 12522在《Programming Erlang》的OTP intro ... -
Erlang 单元测试
2012-12-24 23:29 4779今天学习了下Erlang单元测试,发现非常有用,现在做个总结: ... -
TinyMQ学习(1) 概述
2012-11-24 00:27 2514最近在学习erlang,了解了下它的基本语法以及相关特性,但是 ...
相关推荐
网上很多深度学习C++源码下载后都无法使用,总是有这样活那样的错误,但本源码可定好用,只要装了VS 2013就行,没用第三方库。 这是一个深度学习--深度信念网络(Deep Belief Network)的C++源码,例子中带有数据,...
在线学习平台(易学堂学习系统)源码,php版,可运行,无bug
springMvc学习指南源码
Mir2源码完整版 Mir2源码完整版 Mir2源码完整版 Mir2源码完整版
这是一个C#编写的深度学习源码例子。一般网上流传的深度学习Windows源码都是Linux转过来的,下载后配置非常困难,本人多天配置环境都未成功,最后不得不放弃。原因是需要配置第三方库,这是一个困难。本源码是基于VS...
cocos2d-x+lua游戏demo源码 xcode工程
Java Web开发学习手册随书光盘源码
cocos2dx 游戏开发系列之三 的 android工程源码
一个学院内部管理的网站源码。用ASP.NET编写。
在线学习系统 v1.0 在线学习系统 网页版 多功能 源码
Java实现HTTP连接与浏览,Java源码下载,输入html文件地址或网址,显示页面和HTML源文件,一步步的实现过程请下载本实例的Java源码,代码中包括丰富的注释,对学习有帮助。 Java实现的FTP连接与数据浏览程序 1个...
2、 精准的楼宇定位系统 配送范围、配送价格、配送时间随需变化。 3、 强大的菜单管理系统 单品、套餐均适用,图片展示更直观。 4、 人性化的订单处理系统 订单提示、处理、打印、对账,一站式完成。 5、 快捷流畅的...
Android4.4图库Gallery2源码,变化很大,不可独立运行
0001-2科技发展有限公司升级版源码 0001科技发展有限公司修正版源码 0002机械配件制造销售公司修正版源码 0003家具地板公司修正版源码 0004-1机械有限公司修正版源码 0004机械有限公司修正版源码 0005机械产品公司...
laravel商场项目源码、laravel商场项目源码、可用于个人学习,十分适合新手。基本上商城基本功能都是有的
YY多开源码YY多开器源码源码YY多开器源码源码YY多开器源码源码
SSH2框架搭建实例(spring3.2+strust2.3.4+hibernate4.2)全部采用最新版本.演示了用户登录和用户管理
本源码为2012-12下载的最新版(www.aleph1.co.uk/yaffs2),支持2.6.x.内核。 We'll start by assuming you have a building linux 2.6.x source tree called linux-dir and have the yaffs2 source code in a ...
Cocos2dx2.2游戏源码:奇怪大冒险,(原作熊同学) 质量很好,骨头使用vs2013和cocos2dx2.2打开即可编译运行,感谢原作者熊同学,文档参考http://blog.csdn.net/iamlazybone 相关笔记
Android 自定义相机Camera2