thrift正常的客户端调用服务端情况是直接返回的。这种情况对于订阅来说并不满足(客户端发起订阅, 服务器异步处理,再不断推送给客户端)。 客户端发起订阅(如行情,指标等),服务端实现推送。
重写类:
class TBufferedTransportFactoryMy : public TTransportFactory { public: TBufferedTransportFactoryMy(std::shared_ptr<IndexServerHandler> serverHandler) :m_indexServerHandler(serverHandler) { }; virtual ~TBufferedTransportFactoryMy() {}; /** * Wraps the transport into a buffered one. */ virtual stdcxx::shared_ptr<TTransport> getTransport(stdcxx::shared_ptr<TTransport> trans); private: std::shared_ptr<IndexServerHandler> m_indexServerHandler; };重载函数getTransport,用TTransport构建client放到全局变量中,在服务需用推送数据的时候用。
//IndexSubscribeClient thrift中定义的客户端接收服务,以接收服务端推送过来得数据。 std::map<stdcxx::shared_ptr<TTransport>, std::shared_ptr<IndexSubscribeClient> > g_lstIndexSubscribeClient; stdcxx::shared_ptr<TTransport> TBufferedTransportFactoryMy::getTransport(stdcxx::shared_ptr<TTransport> trans) { if (trans->isOpen() && g_lstIndexSubscribeClient.find(trans) == g_lstIndexSubscribeClient.end()) { std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(trans)); std::shared_ptr<IndexSubscribeClient> client(new IndexSubscribeClient(protocol)); { g_lstIndexSubscribeClient[trans]=client; } if (m_indexServerHandler) { m_indexServerHandler->setTransport(trans); } } return stdcxx::shared_ptr<TTransport>(new TBufferedTransport(trans)); }服务端的main:
int main(int argc, char** argv) { const int workerCount = 100; //支持的连接数 std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); threadManager->threadFactory(std::make_shared<PlatformThreadFactory>()); threadManager->start(); std::shared_ptr<IndexServerHandler> handler(new IndexServerHandler()); //全局只有一个server,一个serverhandler。多个连接线程 std::shared_ptr<TProcessor> processor(new IndexServerProcessor(handler)); std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(9090)); std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactoryMy(handler)); std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); TThreadPoolServer server(processor, serverTransport, transportFactory,protocolFactory, threadManager); std::cout<<"Starting the server"<<std::endl; server.serve(); return 0; }以上,整体代码恕不能放送,涉及到实际项目。上面连接的给的代码挺全的了。
第二种方式问题:这种方式需要客户端起一个不断阻塞监听的线程,这样相当于把客户端到服务端的连接独占了。会引起别的非推送方式直接调用服务端方法的异常。 处理1:加锁处理。但是一但服务端一直不推送数据,一直阻塞着,占用着锁,也有问题。 处理2:客户端起两个到服务端的连接。 大概能ok。 小问题,服务端并不知道这两个连接的区别,相应的就起了两个到客户端的client,都推送了。后续可以细分,删去不可用的那个。
寻求更好thrift实现订阅服务方法!如像回调函数那样的?