Apache Thrift:多任务单服务器和客户端
我读过this和this。但是,我的情况是不同的。我不需要服务器上的多路复用服务,也不需要多个连接到服务器。
背景:
我的大数据项目,我需要计算一个给定的大数据的coreset。 Coreset是保留大数据最重要的数学关系的大数据的子集。
工作流程:Apache Thrift:多任务单服务器和客户端
- 片庞大的数据,以更小的块
- 客户端解析块并将其发送到服务器
- 服务器计算coreset并保存结果
我的问题:
整个事情作为单个执行线程工作。 客户端解析一个块,然后等待服务器完成计算核心集,然后解析另一个块,等等。
目标:
利用多处理。客户端同时分析多个块,并且对于每个请求,服务器任务一个线程来处理它。线程数量有限的地方。就像一个池。
我知道我需要使用不同的协议,然后TSimpleServer和TThreadPoolServer或TThreadedServer。我不能让我明白哪一个可以选择,因为它们都不适合我。
TThreadedServer为每个客户端连接生成一个新线程,并且每个线程保持活动状态,直到客户端连接关闭。
在TThreadedServer每个客户端连接都有自己的专用服务器线程。在客户端关闭连接以供重用之后,服务器线程返回到线程池。
我不需要每个连接的线程,我想要一个连接,并且服务器同时处理多个服务请求。 Visiualization:
Client:
Thread1: parses(chunk1) --> Request compute coreset
Thread2: parses(chunk2) --> Request compute coreset
Thread3: parses(chunk3) --> Request compute coreset
Server: (Pool of 2 threads)
Thread1: Handle compute Coreset
Thread2: handle compute Coreset
.
.
Thread1 becomes available and handles another compute coreset
代码:
API。节俭:
struct CoresetPoint {
1: i32 row,
2: i32 dim,
}
struct CoresetAlgorithm {
1: string path,
}
struct CoresetWeightedPoint {
1: CoresetPoint point,
2: double weight,
}
struct CoresetPoints {
1: list<CoresetWeightedPoint> points,
}
service CoresetService {
void initialize(1:CoresetAlgorithm algorithm, 2:i32 coresetSize)
oneway void compressPoints(1:CoresetPoints message)
CoresetPoints getTotalCoreset()
}
服务器:(执行情况更好看移除)
class CoresetHandler:
def initialize(self, algorithm, coresetSize):
def _add(self, leveledSlice):
def compressPoints(self, message):
def getTotalCoreset(self):
if __name__ == '__main__':
logging.basicConfig()
handler = CoresetHandler()
processor = CoresetService.Processor(handler)
transport = TSocket.TServerSocket(port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
# You could do one of these for a multithreaded server
# server = TServer.TThreadedServer(processor, transport, tfactory, pfactory)
# server = TServer.TThreadPoolServer(processor, transport, tfactory, pfactory)
print 'Starting the server...'
server.serve()
print 'done.'
客户:
try:
# Make socket
transport = TSocket.TSocket('localhost', 9090)
# Buffering is critical. Raw sockets are very slow
transport = TTransport.TBufferedTransport(transport)
# Wrap in a protocol
protocol = TBinaryProtocol.TBinaryProtocol(transport)
# Create a client to use the protocol encoder
client = CoresetService.Client(protocol)
# Connect!
transport.open()
// Here data is sliced, and in a loop I move on all files
Saved in the directory I specified, then they are parsed and
client.compressPoints(data) is invoked.
SliceFile(...)
p = CoresetAlgorithm(...)
client.initialize(p, 200)
for filename in os.listdir('/home/tony/DanLab/slicedFiles'):
if filename.endswith(".txt"):
data = _parse(filename)
client.compressPoints(data)
compressedData = client.getTotalCoreset()
# Close!
transport.close()
except Thrift.TException, tx:
print '%s' % (tx.message)
问题: 在Thrift有可能吗?我应该使用什么协议? 我解决了客户端等待服务器完成计算的部分问题,方法是在函数声明 中加入oneway
来表示客户端只发出请求,根本不等待任何响应。
从本质上讲,这更像是一个架构问题,与其说是一个节俭问题。鉴于房屋
我不需要每个连接线程,我想要一个连接,并在同一时间处理多个服务请求的服务器。 Visiualization:
和
我加入单向到函数声明解决了客户的等待服务器来完成计算的部分问题,以表示该客户端只发出请求,不等待任何响应在所有。
准确地描述使用的情况下,你想这样:
+---------------------+
| Client |
+---------+-----------+
|
|
+---------v-----------+
| Server |
+---------+-----------+
|
|
+---------v-----------+ +---------------------+
| Queue<WorkItems> <----------+ Worker Thread Pool |
+---------------------+ +---------------------+
的服务器唯一的任务是让请求,并尽快将其插入到工作项目排队。这些工作项目由独立的工作线程池处理,该工作线程池另外完全独立于服务器部分。唯一的共享部分是工作项目队列,这当然需要正确同步的访问方法。
关于serevr的选择:如果服务器足够快地提供请求,即使是TSimpleServer
也可以。
好吧,你的方法确实比我的好。服务器对工作进行排队,然后工人接受一个块并进行计算。还有一个问题,如果我在客户端多任务解析(这很耗时,所以我想多任务处理),我可能同一个客户端的不同线程同时拥有'client.compressPoints(message)'。这是否容易出现问题? –
通常,客户端不是线程安全的,也不是基础的物理连接处理。 IOW,每个线程一个客户端。如果客户端断开连接,那么您将需要足够数量的服务器端点,因此'TSimpleSever'将不再适合这种情况。 – JensG