`
DiaoCow
  • 浏览: 241576 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

TinyMQ学习(2) 源码

阅读更多
上一节(http://diaocow.iteye.com/blog/1734253)我们对TinyMQ进行了概述,这一节我们将着重看一下作者是如何实现消息的发布/订阅


在看源代码之前我们需要了解一些module以及函数的作用:

dict:new()
创建一个 Key-Value Dictionary(可以理解为就是一个map)

dict:find(Key, Dict) -> {ok, Value} | error
从Dict1查询Key对应的Value,若存在返回 {ok, Value} 否则返回error

dict:store(Key, Value, Dict1) -> Dict2
往Dict1中插入一条Key-Value,并且返回新的Dict2(若Key已经存在,则只是更新下相应的Value)

-----------------------------------------------------------------------------
lists:foldl(Fun, Acc0, List) -> Acc1
参数Fun是一个函数,它有两个参数:第一个参数是List中的元素(从左->右依次取),第二个参数是Fun函数执行完后的返回值(第一次调用返回值取Acc0),还是来看一个例子:
> lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
15
执行过程:
首先,Fun第一次执行时,X的值取列表List的第一个元素1,Sum取0,
Fun第二次执行时,X的值取列表List的第二个元素2,Sum取Fun第一次的返回值1
依次轮推,直到List中每个元素执行完,foldl返回最后一次函数调用的结果。

lists:foldr(Fun, Acc0, List) -> Acc1
同lists:foldl,唯一区别就是从List中去元素时从右->左

-----------------------------------------------------------------------------
gb_trees module
gb_trees实现了Prof. Arne Anderssons General Balanced Trees,它是是一种比AVL效率更高的平衡树.作者基于gb_trees写了一个tiny_pq.erl(优先级队列)作为消息的存储结构;该优先级队列中的每一个节点代表一条消息,消息的时间戳代表这个节点的权值或者说是priority


看完上面的函数说明,我们看下tinymq的源代码目录,它主要有以下几个文件:

tinymq_app.erl
tinymq_sup.erl
tinymq_channel_sup.erl
tinymq_controller.erl
tinymq_channel_controller.erl
tinymq.erl
tiny_pq.erl (这个是我后来自己加进去的)

tinymq_app.erl 是一个Application Callback Module,用来启动整个应用

start(_StartType, _StartArgs) ->
    tinymq_sup:start_link().   

tinymq_sup:start_link()又做了些什么?

%% 创建supervisor(tinymq_supervisor是一个顶级监控者(可以参看上一节的监控树图))
start_link() ->
    supervisor:start_link(?MODULE, []).

%% tinymq_supervisor监控两个child process:
%% 一个是worker proces 用来控制执行mq的相关功能
%% 另一个是supervisor process 用来监控所有Channel服务
init([]) ->

	%% child process属性(字段具体含义可以查阅erlang文档)
    MqWorker = {mq_controller, {tinymq_controller, start_link, []},
           permanent, 2000, worker, [tinymq_controller]},

    ChannelSup = {tinymq_channel_sup, {tinymq_channel_sup, start_link, []},
                  permanent, 2000, supervisor, [tinymq_channel_sup]},

    Children = [MqWorker, ChannelSup],

    %% 重启策略是:one_for_one,重启极限是10秒中内重启了10次
    RestartStrategy = {one_for_one, 10, 10},
    {ok, {RestartStrategy, Children}}.

至此顶级supervisor就创建好了,它初始化了监控树(创建了一个两个child process:一个是work process,另一个是supervisor process),现在我们就来先看看这两个child process:

1.worker process

tinymq_controller.erl文件

-record(state, {dict, max_age}).

%% 采用gen_server模式,注册了一个名为tinymq的服务
start_link() ->
    gen_server:start_link({local, tinymq}, ?MODULE, [], []).

init([]) ->
    %% 获取环境变量:消息超时时间(Application Resource File配置的是60s)
    {ok, MaxAgeSeconds} = application:get_env(max_age), 
    %% 初始化tinymq服务状态(这个状态包含两部分,一部分是Channel和ChannelPid关系的一组列表,另一个是消息默认超时时间)
    {ok, #state{dict = dict:new(), max_age = MaxAgeSeconds}}.
 
下面我们看一下另一个child process:

2.supervisor process

tinymq_channel_sup.erl文件

%% 创建一个supervisor(tinymq_channel_supervisor)
start_link() ->
    supervisor:start_link({local, tinymq_channel_sup}, ?MODULE, []).

%% 该supervior监控的child process 是动态添加的
init(_StartArgs) ->
	%% child process 属性
    {ok, {{simple_one_for_one, 0, 10},
          [
           {mq_channel_controller, {tinymq_channel_controller, start_link, []},
            temporary, 2000, worker, [tinymq_channel_controller]}
          ]}}.


现在监控树已经初始化完毕,tinymq服务也已经启动,我们来看看tinymq是如何处理客户端的订阅请求(subscribe request)

tinymq_controller文件:

%% 接收到客户端发出的订阅请求
handle_call({subscribe, Channel, Timestamp, Subscriber}, From, State) ->    
    %% 获取该Channel服务对应的的pid(若是一个新的Channel则为其创建一个新的Process)
    {ChannelPid, NewState} = find_or_create_channel(Channel, State),
    %% 将客户端的订阅请求交给这个Channle服务处理
    gen_server:cast(ChannelPid, {From, subscribe, Timestamp, Subscriber}),
    {noreply, NewState};

find_or_create_channel(Channel, #state{dict = Chan2Pid, max_age = MaxAge} = State) ->
    %% 检查该Channel名对应的Channel服务是否存在,若存在返回相应的Pid
    case dict:find(Channel, Chan2Pid) of
        {ok, Pid} ->
            {Pid, State};
        _ ->
            %% 若不存在则创建该Channel服务并添加到 tinymq_channel_sup监控树下
            {ok, ChannelPid} = supervisor:start_child(tinymq_channel_sup, [MaxAge, tinymq_channel_sup, Channel]),
            %% 维护新的Channel和ChannelPid关系列表(add) 
            {ChannelPid, State#state{
                    dict = dict:store(Channel, ChannelPid, Chan2Pid)
                }} 
    end.

接下来我们看看Channel服务是如何处理这个订阅请求的

tinymq_channel_controller文件

%% 启动一个Channel服务
start_link(MaxAge, ChannelSup, Channel) ->
    gen_server:start_link(?MODULE, [MaxAge, ChannelSup, Channel], []).

%% 当我们新创建一个Channel服务的时候,它会初始化一些状态,这个状态主要包含以下几部分:
%% ---Channel名(其实可以理解为topic名)
%% ---消息队列(实际上是一个平衡树,每一个节点的权值就是消息的放入时间)
%% ---订阅者列表(通过这个订阅者列表可以将消息派送给每一个订阅者)
%% ---消息超时时间(当消息超时后,将会从消息队列中移除)
%% ---上一次清除超时消息的时间(默认情况下清除超时消息间隔不小于1秒)
%% ---上一次派发消息时间
init([MaxAge, ChannelSup, Channel]) ->
    {ok, #state{
            max_age = MaxAge,
            supervisor = ChannelSup,       
            channel = Channel,             
            messages = gb_trees:empty(),   
            last_pull = now_to_micro_seconds(erlang:now()),    
            last_purge = now_to_micro_seconds(erlang:now()) },  
     MaxAge * 1000}.

当Channel服务接收到订阅请求后:

tinymq_channel_controller文件

%%  处理订阅请求
handle_cast({From, subscribe, 'now', Subscriber}, State) ->
    %% 更新订阅者列表(若是一个新的订阅者则添加到订阅者列表,否则直接返回)
    NewSubscribers = add_subscriber(Subscriber, State#state.subscribers), 
    %% 告诉客户端订阅成功了(至此客户端就可以接受发送到这个Channle(topic)中的消息了)
    gen_server:reply(From, {ok, now_to_micro_seconds(erlang:now())}),     
    %% 删除消息队列中的超时消息,返回新的Channel state
    %% 文档原文如下:Purging old messages occurs after any channel activity (but no more than once per second)
    {noreply, purge_old_messages(State#state{ subscribers = NewSubscribers })};  

%% 添加订阅者
add_subscriber(NewSubscriber, Subscribers) ->
        case lists:keymember(NewSubscriber, 2, Subscribers) of
        %% 若在订阅者列表中存在,则什么也不做
        true -> Subscribers; 
        %% 否则添加到订阅者列表中{monitorSubscriberPid, SubscriberPid}
        false -> [{erlang:monitor(process, NewSubscriber), NewSubscriber} | Subscribers]  
    end.

%% 清理消息队列中已经超时的消息(操作间隔不小于1秒)
purge_old_messages(State) ->
    Now = now_to_micro_seconds(erlang:now()),	%% now_to_micro_seconds,这个方法就是把系统时间转换成秒
    LastPurge = State#state.last_purge,     	%% 上一次清理消息队列时间
    Duration = seconds_to_micro_seconds(1), 	%% 操作间隔(默认是1秒)
    if
        Now - LastPurge > Duration ->    
            State#state{ 
                %% @tiny_pq:prune_old(Tree, Priority) -> Tree1  Remove nodes with priority less than or equal to `Priority'
                %% 这里的Tree就是消息队列messages, Priority就是Time(当前时间-消息超时时间,若某个消息的Priority小于这个,说明已经超时了)
                messages = tiny_pq:prune_old(State#state.messages, 
                    Now - seconds_to_micro_seconds(State#state.max_age)),
                %% 更新上一次清理消息队列时间
                last_purge = Now };
        true ->
            State
    end.

至此tinymq处理了客户端的订阅请求,我们可以回顾上一节列出的订阅端流程图,加深理解。

现在我们来看看tinymq如何响应客户端的发消息请求(publish request).

记住:客户端所有请求都是发送给tinymq server,然后由它派发给每一个Channel服务处理

tinymq_controller文件:

%% 接受到客户端的发布消息请求
handle_call({push, Channel, Message}, From, State) ->
    %% 获取该Channel服务对应的的pid,同之前分析
    {ChannelPid, NewState} = find_or_create_channel(Channel, State),
    %% 将该消息转发给响应的Channel process处理
    gen_server:cast(ChannelPid, {From, push, Message}),
    {noreply, NewState};

当Channel服务接收到发消息请求后:

tinymq_channel_controller文件
(代码我做了一点修改,作者原来只要订阅者成功接受到一条消息,就自动取消订阅,我改成可以持续订阅):

%% 处理发布消息请求
handle_cast({From, push, Message}, State) ->
    Now = now_to_micro_seconds(erlang:now()),  

    %% 向每个订阅者发送这条消息
    LastPull = lists:foldr(fun({Ref, Sub}, _) -> 
                Sub ! {self(), Now, [Message]},   
                %% erlang:demonitor(Ref), 取消monitor 
                Now    %% 函数调用的返回值:上一次派发消息时间
        end, State#state.last_pull, State#state.subscribers), %% State#state.subscribers是一个List,维护着该Channel的所有订阅者

    %% 告诉发布者(publisher)消息发送成功
    gen_server:reply(From, {ok, Now}),  
    %% 删除消息队列中超时的消息
    State2 = purge_old_messages(State),
    %% 将该条消息插入消息队列(priorit,message, tree)                                    
    NewMessages = tiny_pq:insert_value(Now, Message, State2#state.messages), 
    %% 作者原来设计:Subscribers are removed from the channel as soon as the first message is delivered 他会:subscribers = []
    {noreply, State2#state{messages = NewMessages, last_pull = LastPull}, State#state.max_age * 1000};


发现没,发消息代码也是比较容易的,同样,我们可以回顾上一节列出的发送图逻辑图来加深理解

至此,我们已经查看了tinymq发布/订阅的主要实现代码,现在我们就来写一段测试代码:

订阅者:

-module(subscriber).
-export([start/1]).

start(Num) ->
	create_subscriber(Num).

create_subscriber(0) -> ok;
create_subscriber(Num) ->
	spawn(fun subscribe/0),
	create_subscriber(Num - 1).

subscribe() ->
	tinymq:subscribe("hello_channel", now, self()),
	loop().

loop() ->
	receive
   		{_From, _Timestamp, Messages} ->
        	io:format("Pid: ~p received messages: ~p~n", [self(), Messages])
	end,
	loop().

start函数中的Num表示要创建几个订阅者,代码比较简单不在赘述...

发布者

-module(publisher).
-export([start/0]).

start() ->
	spawn(fun publish/0).

publish() ->
	Messages = {hello_erlang, erlang:now()},
    io:format("Pid: ~p send messages: ~p~n", [self(), Messages]),
	tinymq:push("hello_channel", Messages),

	receive
	after 5000 ->
		true
	end,

	publish().

发布者每个5秒向Channel里发送消息

最后,在看一下执行效果图



关于TinyMQ的源码学习就到这里,但是里面还有一些细节值得我去挖掘(譬如:monitorSubscriber,优先级队列实现算法),以及新增些改进功能(譬如服务状态的持久化,以及添加unsubscribe请求处理等等),还有自己的erlang基础需要再补补,官方文档再看看



ps:iteye的erlang代码高亮实在是...,辛苦各位看官了....






  • 大小: 85.8 KB
0
0
分享到:
评论
3 楼 standalone 2012-12-28  
原来是这样。。。受教了!谢谢!
2 楼 DiaoCow 2012-12-28  
standalone 写道
分析的很好。

问一下,我看到
tinymq__channel_controller.erl里面的函数

purge_old_messages(State) ->
    Now = now_to_micro_seconds(erlang:now()),
    LastPurge = State#state.last_purge,
    Duration = seconds_to_micro_seconds(1),
    if
        Now - LastPurge > Duration ->
            State#state{ 
                messages = tiny_pq:prune_old(State#state.messages, 
                    Now - seconds_to_micro_seconds(State#state.max_age)),
                last_purge = Now };
        true ->
            State
    end.


用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?



不是,这个同样是作者写的,只不过作者没有放到同一个project中,而是通过依赖拉下来,我当时rebar使用有问题,所以我就去作者网站直接下载了下来放到源代码目录中,
你可以看下http://diaocow.iteye.com/blog/1734253 这篇文章我最后说发现的问题,以及别人提供的解决方案 
1 楼 standalone 2012-12-28  
分析的很好。

问一下,我看到
tinymq__channel_controller.erl里面的函数

purge_old_messages(State) ->
    Now = now_to_micro_seconds(erlang:now()),
    LastPurge = State#state.last_purge,
    Duration = seconds_to_micro_seconds(1),
    if
        Now - LastPurge > Duration ->
            State#state{ 
                messages = tiny_pq:prune_old(State#state.messages, 
                    Now - seconds_to_micro_seconds(State#state.max_age)),
                last_purge = Now };
        true ->
            State
    end.


用到了tiny_pq这个module,为什么找不到,你说你后来加的那么是自己写的么。。。?

相关推荐

Global site tag (gtag.js) - Google Analytics