作者BlgAtlfans (BLG_Eric)
看板Python
標題[問題] django celery問題
時間Thu Oct 27 15:37:47 2016
各位大大好
最近在用celery處理
csv,xlsx檔案寫入postgresql的功能
但是有一些問題想請教
P.S下面的程式碼沒加入celery時都可以正常執行(大檔案例外)
1. csv檔寫入時celery debug有以下錯誤
http://imgur.com/8Fg3dmJ
http://imgur.com/DSpZU1s
附上task.py程式碼
# -*- coding: utf-8 -*-
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.http import HttpResponseRedirect
from django.core.urlresolvers import reverse
from django.contrib import messages
from django.conf import settings
from django.db import connection
from django.views.decorators.csrf import csrf_exempt
from celery import Celery
from celery import task
import json
import csv
import sys
import random
import psycopg2
import xlrd
import openpyxl as pyxl
from .models import Document
from .forms import DocumentForm
app = Celery(
'tasks',
broker='amqp://guest:guest@localhost:5672//',
backend='rpc://'
)
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_RESULT_PERSISTENT = False
@app.task()
def csvwritein(doc):# Transform csv to table
doc = doc
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
readcur = conn.cursor()
readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
check = readcur.fetchone()[0]
try:
fr = open(doc.path,encoding = 'utf-8-sig')
dr.delay(fr,doc,check)
fr.close()
except Exception as e:
fr = open(doc.path,encoding = 'big5')
dr.delay(fr,doc,check)
fr.close()
conn.commit()
readcur.close()
@app.task()
def dr(fr,doc,check): # make datareader as function to keep code 'dry'
csvt = 0 #count csv reader loop time
row_id = 1 # used for following id field
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
maincur = conn.cursor()
writecur = conn.cursor()
datareader = csv.reader(fr, delimiter=',')
for row in datareader:
if csvt == 0: # first time in loop(create field) and check no
same file exists
if check == True:
app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
tname = '%s-%s' % (doc.tablename,app
tablename = '"%s-%s"' % (doc.tablename,app)
doc.tablename = tname
doc.save()
else:
tablename = '"%s"' % doc.tablename
maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
row_count = sum(1 for line in datareader)
col_count = len(row)
frow = row
for i in range(0,col_count,1):
row[i] = '"%s"' % row[i] # change number to
string
maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,row[i]))
csvt = csvt+1
fr.seek(0)
next(datareader)
elif csvt > 0: # not first time(insert data) and check no
same file exists
for j in range(0,col_count,1):
if j == 0:
writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');" % (tablename,frow[j],row[j]))
else:
writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';" %(tablename,frow[j],row[j],row_id))
csvt = csvt+1
row_id = row_id+1
else:
break
conn.commit()
maincur.close()
writecur.close()
conn.close()
csvt = 0
doc = Document.objects.all()
呼叫時是用csvwritein.delay(doc)
2. xlsx 檔案(13萬筆資料)寫入時 worker卡了兩三分鐘後跑出以下錯誤
http://imgur.com/rC1uun0
以下是task.py xlsx寫入函數
@app.task()
def xlsxwritein(doc): # write into database for file type xlsx
xlsxt = 0
conn = psycopg2.connect("dbname='apidb' user='api' host='localhost'
password='eric40502' port='5432'")
maincur = conn.cursor()
readcur = conn.cursor()
writecur = conn.cursor()
readcur.execute("select exists(select * from
information_schema.tables where table_name='%s')" % doc.tablename) # check if
same file is already in database
check = readcur.fetchone()[0]
row_id = 1 # used for following id field
wb = pyxl.load_workbook(doc.path)
sheetnames = wb.get_sheet_names()
ws = wb.get_sheet_by_name(sheetnames[0])
for rown in range(ws.get_highest_row()):
if xlsxt == 0:
if check == True:
app =
''.join([random.SystemRandom().choice('abcdefghijklmnopqrstuvwxyz0123456789')
for i in range(6)])
tname = '%s-%s' % (doc.tablename,app)
tablename = '"%s-%s"' % (doc.tablename,app)
doc.tablename = tname
doc.save()
else:
tablename = '"%s"' % doc.tablename
field = [ws.cell(row=1,column=col_index).value for
col_index in range(1,ws.get_highest_column()+1)]
maincur.execute("CREATE TABLE %s (id SERIAL PRIMARY
KEY);" % tablename)
for coln in range(ws.get_highest_column()):
field[coln] = '"%s"' % field[coln] # change
number to string
if field[coln] == 'ID':
field[coln] = 'original_id'
maincur.execute("ALTER TABLE %s ADD %s
CITEXT;" % (tablename,field[coln]))
xlsxt = xlsxt+1
elif xlsxt > 0 and check == False: # not first time(insert
data) and check no same file exists
for coln in range(ws.get_highest_column()):
if coln == 0:
writecur.execute("INSERT INTO %s (%s)
VALUES ('%s');"
%(tablename,field[coln],str(ws.cell(row=rown,column=coln+1).value)))
else:
writecur.execute("UPDATE %s SET %s =
'%s' WHERE id = '%d';"
%(tablename,field[coln],str(ws.cell(row=rown+1,column=coln+1).value),row_id))
xlsxt = xlsxt+1
row_id = row_id+1
else:
break
conn.commit()
maincur.close()
readcur.close()
writecur.close()
conn.close()
xlsxt = 0
--
※ 發信站: 批踢踢實業坊(ptt.cc), 來自: 114.32.19.185
※ 文章網址: https://www.ptt.cc/bbs/Python/M.1477553869.A.5F0.html
→ kenduest: 好像是被作業系統 kernel 踢出去了? 10/27 16:12
→ kenduest: 比方吃太多記憶體等,被 linux OOM killer 處理掉 10/27 16:12
→ BlgAtlfans: 那應該要怎麼樣處理 10/27 16:52
→ kenduest: 你先獨立把那個處理task寫成獨立檔案單獨終端跑看看 10/27 18:33
→ kenduest: 後續用 free 與 top 看一下記憶體使用情況 10/27 18:33
→ kenduest: 或許是實際那個 server 本來記憶體就不多所以就爆掉了 10/27 18:34
→ kenduest: 題外話你的程式碼貼這邊很亂很難看清楚 10/27 18:40
→ kenduest: 另外建議請用 4 個空白代替 tab, 建議這樣在 python 上 10/27 18:41
→ uranusjr: 先試試看 DEBUG = False, 這兩個記憶體用量差很多 10/27 20:24
→ uranusjr: 第一個問題要看你 doc 到底是什麼 10/27 20:24
→ BlgAtlfans: 感謝各位回答 我的doc是個django的model 10/27 21:26
→ BlgAtlfans: 內容是上傳檔案的一些資料 10/27 21:27
→ BlgAtlfans: 像是tablename path id...之類的 10/27 21:28
→ BlgAtlfans: 這裡主要是用來傳遞tablename來做為建table的依據 10/27 21:30
→ BlgAtlfans: 多問一個 一般來說寫入一個13萬行的資料需要很多記憶 10/27 21:32
→ BlgAtlfans: 體嗎? 10/27 21:32