python 爬虫 多线程用queue做队列,消费者线程无法从queue中取出数据。
高洛峰
高洛峰 2017-04-18 09:26:42
[Python讨论组]
高洛峰
高洛峰

拥有18年软件开发和IT教学经验。曾任多家上市公司技术总监、架构师、项目经理、高级软件工程师等职务。 网络人气名人讲师,...

全部回复(2)
大家讲道理

消费者join一下试试,然后你判断一下条件,如果queue为空的时候,循环break

阿神

如果self.Queue为空的话, self.Queue.get() 会抛出 Queue.Empty 异常,这时候线程会退出,这时候都没有工作线程了。
在run()方法的while循环里面增加try...except捕获该异常试试。代码大概如下:

while True:
    try:
       self.Queue.get(timeout=5) #这里的timeout可以根据情况设置为合适的值
    except Queue.Empty:  # 任务队列空的时候结束此线程
        break
    except:
        raise
        

=======================================================================
首先:对于你说的mysql不支持多线程写入问题,我简单写了个验证程序,结论是mysql支持多线程写入的(注:实际工作中在多线程中往db中写入数据,需要加入锁机制,这里简化了),代码如下:


#coding: utf-8

import MySQLdb
import MySQLdb.cursors
import threading

class MySql(object):
    def __init__(self, host, user, pwd, db_name, port=3306):
        self.host = host
        self.user = user
        self.pwd = pwd
        self.db_name = db_name
        self.port = port
        self.conn = None
        self.cursor = None
        self.connect()

    def connect(self):
        try:
            self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pwd, db=self.db_name,
                                        port=self.port)
            self.cursor = self.conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
        except Exception, err:
            print 'connect: %s' % err
        return self.conn
            
    def execute(self, sql):
        rs = ()
        try:
            self.cursor.execute(sql)
            rs = self.cursor.fetchall()
        except Exception, err:
            pass
        return rs

    def exec_and_result(self, sql):
        ret_id = -1
        try:
            self.cursor.execute(sql)
            self.conn.commit()
            ret_id = self.cursor.lastrowid
        except Exception, err:
            print 'exec_and_result: %s' % err
        return ret_id

    def close(self):
        try:
            self.cursor.close()
            self.conn.close()
        except Exception, err:
            pass

db = {
    'ip': 'xxx.xxx.xxx.xxx',
    'port': xxx,
    'user': 'xxx',
    'pwd': 'xxx',
    'db_name': 'xxx'
}             
mysql = MySql(db['ip'], db['user'], db['pwd'], db['db_name'], int(db['port']))
threads = []

def do(name):
    sql = "insert into site(name, status, create_time, update_time, update_user_account, comment) values('{0}', 0, NOW(), NOW(), 'daiyapeng', 'test');"
    rid = mysql.exec_and_result(sql.format(name))
    print rid

for i in ['test-0','test-1','test-2']:
    t = threading.Thread(target=do, args=(i, ))
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()
            
mysql.close()
            
            
            

另外:因为不清楚你的代码的具体细节,所以不能完全定位问题,我自己写了个模拟程序,没有出现你的那种情况,希望对你有帮助,代码如下:


#coding: utf-8

import Queue
import threading

class MyThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.queue = queue

    def run(self):
        while True:
            try:
                task = self.queue.get(timeout=2)
                print 'task: %s' % task
                # 这里可以处理task
            except Exception, err:
                break

if __name__ == '__main__':
    threads = []
    q = Queue.Queue()
    for i in xrange(3):
        thread = MyThread(q)
        threads.append(thread)

    for t in threads:
        t.start()

    for i in xrange(30):
        q.put(i)
    
    for t in threads:
        t.join()

    print '====== done ======'      
            
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号