rtc::Thread介绍
rtc::Thread类不仅仅实现了线程这个执行器(比如posix底层调用pthread相关接口创建线程,管理线程等),还包括消息队列(message_queue)的实现,rtc::Thread启动后就作为一个永不停止的event loop,没有任务待执行就阻塞等待,添加任务后就唤醒event loop,去执行任务,周而复始,直到调用stop退出event loop,退出线程(线程join)。
在WebRTC内部,可以将消息队列等同于event loop,消息队列为空,就进行阻塞等待。
class RTC_LOCKABLE Thread : public MessageQueue {
Thread关键接口
public:
// Starts the execution of the thread.
bool Start(Runnable* runnable = nullptr);
// Tells the thread to stop and waits until it is joined.
// Never call Stop on the current thread. Instead use the inherited Quit
// function which will exit the base MessageQueue without terminating the
// underlying OS thread.
virtual void Stop();
virtual void Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
// Convenience method to invoke a functor on another thread. Caller must
// provide the |ReturnT| template argument, which cannot (easily) be deduced.
// Uses Send() internally, which blocks the current thread until execution
// is complete.
// Ex: bool result = thread.Invoke(RTC_FROM_HERE,
// &MyFunctionReturningBool);
// NOTE: This function can only be called when synchronous calls are allowed.
// See ScopedDisallowBlockingCalls for details.
template
ReturnT Invoke(const Location& posted_from, FunctorT&& functor) {
FunctorMessageHandler handler(
std::forward(functor));
InvokeInternal(posted_from, &handler);
return handler.MoveResult();
}
// ProcessMessages will process I/O and dispatch messages until:
// 1) cms milliseconds have elapsed (returns true)
// 2) Stop() is called (returns false)
bool ProcessMessages(int cms);
protected:
// Blocks the calling thread until this thread has terminated.
void Join();
MessageQueue关键接口
public:
virtual void Quit();
// Get() will process I/O until:
// 1) A message is available (returns true)
// 2) cmsWait seconds have elapsed (returns false)
// 3) Stop() is called (returns false)
virtual bool Get(Message* pmsg,
int cmsWait = kForever,
bool process_io = true);
virtual void Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr,
bool time_sensitive = false);
virtual void PostDelayed(const Location& posted_from,
int cmsDelay,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
virtual void PostAt(const Location& posted_from,
int64_t tstamp,
MessageHandler* phandler,
uint32_t id = 0,
MessageData* pdata = nullptr);
virtual void Dispatch(Message* pmsg);
virtual void ReceiveSends();
protected:
void WakeUpSocketServer();
MessageList msgq_ RTC_GUARDED_BY(crit_);
PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_);
线程启动Start
调用Start接口启动底层线程,同时进入一个永不停止的event loop(除非调用Stop接口)
流程如下:
Start->pthread_create->PreRun->Run
void Thread::Run() {
ProcessMessages(kForever);
}

最终通过Get接口获取消息去执行(Dispatch),Get获取不到消息就是进入阻塞状态(wait),等待有消息后被唤醒。

线程消息队列处理消息的流程ProcessMessage
- 1、处理从其他线程发送的要在本线程去执行的消息,即同步调用
接收者线程处理流程:


发送者线程流程:

-
2、处理延迟消息(存储在优先级队列)
延迟消息是通过PostDelayed和PostAt接口调用然后push到优先级队列中(dmsgq_,小根堆)

-
3、异步消息(存储在普通队列里)
延迟消息是通过Pos接口调用然后push到普通队列中(msgq_)

任务提交方式(Invoke/Post)
webrtc内部消息其实是对待执行任务的封装,消息和任务可以认为是一个意思
消息要继承MessageHandler,实现OnMessage
class MessageHandler { public: virtual ~MessageHandler(); virtual void OnMessage(Message* msg) = 0; protected: MessageHandler() {} private: RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandler); };因为执行消息,实际上就是执行OnMessage(详见Dispatch接口实现)

上一章节其实已经把三种任务提交方式介绍过了
1、同步阻塞调用(Send,Invoke)
Invoke其实最终也是调用Send,Invoke是个函数模版,可以非常方便在目标执行线程执行函数然后获得返回值,Invoke实现如下:
// Convenience method to invoke a functor on another thread. Caller must // provide the |ReturnT| template argument, which cannot (easily) be deduced. // Uses Send() internally, which blocks the current thread until execution // is complete. // Ex: bool result = thread.Invoke
(RTC_FROM_HERE, // &MyFunctionReturningBool); // NOTE: This function can only be called when synchronous calls are allowed. // See ScopedDisallowBlockingCalls for details. template ReturnT Invoke(const Location& posted_from, FunctorT&& functor) { FunctorMessageHandler handler( std::forward (functor)); InvokeInternal(posted_from, &handler); return handler.MoveResult(); } void Thread::InvokeInternal(const Location& posted_from, MessageHandler* handler) { TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file_and_line", posted_from.file_and_line(), "src_func", posted_from.function_name()); Send(posted_from, handler); } 调用方式举例:
bool result = thread.Invoke
(RTC_FROM_HERE, &MyFunctionReturningBool); 2、异步非阻塞延迟调用
PostDelayed和PostAt
3、异步非阻塞调用
Post
线程退出Stop
void Thread::Stop() { MessageQueue::Quit(); Join(); } void MessageQueue::Quit() { AtomicOps::ReleaseStore(&stop_, 1); WakeUpSocketServer(); } void Thread::Join() { if (!IsRunning()) return; RTC_DCHECK(!IsCurrent()); if (Current() && !Current()->blocking_calls_allowed_) { RTC_LOG(LS_WARNING) << "Waiting for the thread to join, " << "but blocking calls have been disallowed"; } #if defined(WEBRTC_WIN) RTC_DCHECK(thread_ != nullptr); WaitForSingleObject(thread_, INFINITE); CloseHandle(thread_); thread_ = nullptr; thread_id_ = 0; #elif defined(WEBRTC_POSIX) pthread_join(thread_, nullptr); thread_ = 0; #endif }
-
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章
