假設(shè)您在Linux上運(yùn)行Django,并且您有一個視圖,并且您希望該視圖從名為cmd的子進(jìn)程返回數(shù)據(jù),該子進(jìn)程對視圖創(chuàng)建的文件進(jìn)行操作,例如:likeo: def call_subprocess(request):
response = HttpResponse()
with tempfile.NamedTemporaryFile("W") as f:
f.write(request.GET['data']) # i.e. some data
# cmd operates on fname and returns output
p = subprocess.Popen(["cmd", f.name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = p.communicate()
response.write(p.out) # would be text/plain...
return response
現(xiàn)在,假設(shè)cmd的啟動時間非常慢,但運(yùn)行時間非???并且它本身沒有守護(hù)進(jìn)程模式.我想改善這種觀點(diǎn)的響應(yīng)時間. 我想通過在工作池中啟動一些cmd實(shí)例,讓它們等待輸入,并讓call_process請求其中一個工作池進(jìn)程處理數(shù)據(jù),使整個系統(tǒng)運(yùn)行得更快. 這實(shí)際上是兩個部分: 第1部分.調(diào)用cmd和cmd的函數(shù)等待輸入.這可以通過管道完成,即 def _run_subcmd():
p = subprocess.Popen(["cmd", fname],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
# write 'out' to a tmp file
o = open("out.txt", "W")
o.write(out)
o.close()
p.close()
exit()
def _run_cmd(data):
f = tempfile.NamedTemporaryFile("W")
pipe = os.mkfifo(f.name)
if os.fork() == 0:
_run_subcmd(fname)
else:
f.write(data)
r = open("out.txt", "r")
out = r.read()
# read 'out' from a tmp file
return out
def call_process(request):
response = HttpResponse()
out = _run_cmd(request.GET['data'])
response.write(out) # would be text/plain...
return response
第2部分.在后臺運(yùn)行的一組正在等待數(shù)據(jù)的工作者.即,我們希望擴(kuò)展上述內(nèi)容,以便子進(jìn)程已經(jīng)運(yùn)行,例如當(dāng)Django實(shí)例初始化時,或者首次調(diào)用此call_process時,會創(chuàng)建一組這些worker WORKER_COUNT = 6
WORKERS = []
class Worker(object):
def __init__(index):
self.tmp_file = tempfile.NamedTemporaryFile("W") # get a tmp file name
os.mkfifo(self.tmp_file.name)
self.p = subprocess.Popen(["cmd", self.tmp_file],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.index = index
def run(out_filename, data):
WORKERS[self.index] = Null # qua-mutex??
self.tmp_file.write(data)
if (os.fork() == 0): # does the child have access to self.p??
out, err = self.p.communicate()
o = open(out_filename, "w")
o.write(out)
exit()
self.p.close()
self.o.close()
self.tmp_file.close()
WORKERS[self.index] = Worker(index) # replace this one
return out_file
@classmethod
def get_worker() # get the next worker
# ... static, incrementing index
應(yīng)該在某處對工作人員進(jìn)行一些初始化,如下所示: def init_workers(): # create WORKERS_COUNT workers
for i in xrange(0, WORKERS_COUNT):
tmp_file = tempfile.NamedTemporaryFile()
WORKERS.push(Worker(i))
現(xiàn)在,我上面的內(nèi)容變成了以下內(nèi)容: def _run_cmd(data):
Worker.get_worker() # this needs to be atomic & lock worker at Worker.index
fifo = open(tempfile.NamedTemporaryFile("r")) # this stores output of cmd
Worker.run(fifo.name, data)
# please ignore the fact that everything will be
# appended to out.txt ... these will be tmp files, too, but named elsewhere.
out = fifo.read()
# read 'out' from a tmp file
return out
def call_process(request):
response = HttpResponse()
out = _run_cmd(request.GET['data'])
response.write(out) # would be text/plain...
return response
現(xiàn)在,問題: >這會有用嗎? (我只是把它從頭頂輸入StackOverflow,所以我確定存在問題,但從概念上講,它會起作用) >要尋找的問題是什么? >有更好的替代品嗎?例如線程是否也能正常運(yùn)行(它是Debian Lenny Linux)?是否有任何庫可以處理這樣的并行流程工作池? >我應(yīng)該注意與Django的交互嗎? 謝謝閱讀!我希望你覺得這和我一樣有趣. 布賴恩 解決方法: 看起來我正在推銷這款產(chǎn)品,因?yàn)檫@是我第二次回復(fù)推薦. 但似乎您需要一個Message Queing服務(wù),特別是一個分布式消息隊(duì)列. 它是如何工作的: >您的Django應(yīng)用程序請求CMD > CMD被添加到隊(duì)列中 > CMD被推到了幾個作品 >它被執(zhí)行并且結(jié)果返回上游 大多數(shù)代碼都存在,您不必去構(gòu)建自己的系統(tǒng). 看看最初用Django構(gòu)建的Celery. http://www./ http:///blog/2009/09/10/rabbitmq-celery-and-django/ 來源:http://www./content-3-227951.html
|