thrift实现订阅服务

it2024-03-27  55

问题提出

thrift正常的客户端调用服务端情况是直接返回的。这种情况对于订阅来说并不满足(客户端发起订阅, 服务器异步处理,再不断推送给客户端)。 客户端发起订阅(如行情,指标等),服务端实现推送。

解决

这里给出了一个比较笨的方法: https://www.cnblogs.com/liaocheng/p/4978371.html 也就是客户端也起了一个服务,这样和服务端互为客户端和服务端。实现服务端向客户端推送数据。 问题:客户端起服务不稳定。服务器未必能推送过去(测试过都在本机没问题,客户端放另一台机器就有连接问题)。同一台机器起多个客户端,还需找本机ip和一个可用的端口(本机起服务器),再发送给服务器让服务器连接过来。麻烦。这里给出了一个好一些的方法: http://www.360doc.com/content/20/0104/17/21698786_884131548.shtml 通过重写服务端类TBufferedTransportFactory,主要是重载getTransport函数。这个函数每当客户端有连接过来时都会调用到,从而获取到TTransport,根据TTransport构建client,实现服务端调用客户端的函数,以实现不断推送数据。

重写类:

class TBufferedTransportFactoryMy : public TTransportFactory { public: TBufferedTransportFactoryMy(std::shared_ptr<IndexServerHandler> serverHandler) :m_indexServerHandler(serverHandler) { }; virtual ~TBufferedTransportFactoryMy() {}; /** * Wraps the transport into a buffered one. */ virtual stdcxx::shared_ptr<TTransport> getTransport(stdcxx::shared_ptr<TTransport> trans); private: std::shared_ptr<IndexServerHandler> m_indexServerHandler; };

重载函数getTransport,用TTransport构建client放到全局变量中,在服务需用推送数据的时候用。

//IndexSubscribeClient thrift中定义的客户端接收服务,以接收服务端推送过来得数据。 std::map<stdcxx::shared_ptr<TTransport>, std::shared_ptr<IndexSubscribeClient> > g_lstIndexSubscribeClient; stdcxx::shared_ptr<TTransport> TBufferedTransportFactoryMy::getTransport(stdcxx::shared_ptr<TTransport> trans) { if (trans->isOpen() && g_lstIndexSubscribeClient.find(trans) == g_lstIndexSubscribeClient.end()) { std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(trans)); std::shared_ptr<IndexSubscribeClient> client(new IndexSubscribeClient(protocol)); { g_lstIndexSubscribeClient[trans]=client; } if (m_indexServerHandler) { m_indexServerHandler->setTransport(trans); } } return stdcxx::shared_ptr<TTransport>(new TBufferedTransport(trans)); }

服务端的main:

int main(int argc, char** argv) { const int workerCount = 100; //支持的连接数 std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); threadManager->threadFactory(std::make_shared<PlatformThreadFactory>()); threadManager->start(); std::shared_ptr<IndexServerHandler> handler(new IndexServerHandler()); //全局只有一个server,一个serverhandler。多个连接线程 std::shared_ptr<TProcessor> processor(new IndexServerProcessor(handler)); std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(9090)); std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactoryMy(handler)); std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); TThreadPoolServer server(processor, serverTransport, transportFactory,protocolFactory, threadManager); std::cout<<"Starting the server"<<std::endl; server.serve(); return 0; }

以上,整体代码恕不能放送,涉及到实际项目。上面连接的给的代码挺全的了。

第二种方式问题:这种方式需要客户端起一个不断阻塞监听的线程,这样相当于把客户端到服务端的连接独占了。会引起别的非推送方式直接调用服务端方法的异常。 处理1:加锁处理。但是一但服务端一直不推送数据,一直阻塞着,占用着锁,也有问题。 处理2:客户端起两个到服务端的连接。 大概能ok。 小问题,服务端并不知道这两个连接的区别,相应的就起了两个到客户端的client,都推送了。后续可以细分,删去不可用的那个。

结语

寻求更好thrift实现订阅服务方法!如像回调函数那样的?

最新回复(0)