Python使用Redis实现作业调度系统(超简单)
概述
Redis是一个开源,先进的key-value存储,并用于构建高性能,可扩展的Web应用程序的完美解决方案。
Redis从它的许多竞争继承来的三个主要特点:
Redis数据库完全在内存中,使用磁盘仅用于持久性。
相比许多键值数据存储,Redis拥有一套较为丰富的数据类型。
Redis可以将数据复制到任意数量的从服务器。
Redis 优势
异常快速:Redis的速度非常快,每秒能执行约11万集合,每秒约81000+条记录。
支持丰富的数据类型:Redis支持最大多数开发人员已经知道像列表,集合,有序集合,散列数据类型。这使得它非常容易解决各种各样的问题,因为我们知道哪些问题是可以处理通过它的数据类型更好。
操作都是原子性:所有Redis操作是原子的,这保证了如果两个客户端同时访问的Redis服务器将获得更新后的值。
多功能实用工具:Redis是一个多实用的工具,可以在多个用例如缓存,消息,队列使用(Redis原生支持发布/订阅),任何短暂的数据,应用程序,如Web应用程序会话,网页命中计数等。
步入主题:
Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。
下面是实现上的想法
MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。
MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。
channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。
channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。
Master代码
#!/usr/bin/env python # -*- coding: utf-8 -*- import time import threading import random import redis REDIS_HOST = 'localhost' REDIS_PORT = 6379 REDIS_DB = 0 CHANNEL_DISPATCH = 'CHANNEL_DISPATCH' CHANNEL_RESULT = 'CHANNEL_RESULT' class MyMaster(): def __init__(self): pass def start(self): MyServerResultHandleThread().start() MyServerDispatchThread().start() class MyServerDispatchThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) for i in range(1, 100): channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3)) print("Dispatch job %s to %s" % (str(i), channel)) ret = r.publish(channel, str(i)) if ret == 0: print("Dispatch job %s failed." % str(i)) time.sleep(5) class MyServerResultHandleThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) p = r.pubsub() p.subscribe(CHANNEL_RESULT) for message in p.listen(): if message['type'] != 'message': continue print("Received finished job %s" % message['data']) if __name__ == "__main__": MyMaster().start() time.sleep(10000)
说明
MyMaster类 - master主程序,用来启动dispatch和resulthandler的线程
MyServerDispatchThread类 - 派发作业线程,产生作业并派发到计算节点
MyServerResultHandleThread类 - 作业运行结果处理线程,从channel里获取作业结果并显示
Slave代码
#!/usr/bin/env python # -*- coding: utf-8 -*- from datetime import datetime import time import threading import random import redis REDIS_HOST = 'localhost' REDIS_PORT = 6379 REDIS_DB = 0 CHANNEL_DISPATCH = 'CHANNEL_DISPATCH' CHANNEL_RESULT = 'CHANNEL_RESULT' class MySlave(): def __init__(self): pass def start(self): for i in range(1, 4): MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start() class MyJobWorkerThread(threading.Thread): def __init__(self, channel): threading.Thread.__init__(self) self.channel = channel def run(self): r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) p = r.pubsub() p.subscribe(self.channel) for message in p.listen(): if message['type'] != 'message': continue print("%s: Received dispatched job %s " % (self.channel, message['data'])) print("%s: Run dispatched job %s " % (self.channel, message['data'])) time.sleep(2) print("%s: Send finished job %s " % (self.channel, message['data'])) ret = r.publish(CHANNEL_RESULT, message['data']) if ret == 0: print("%s: Send finished job %s failed." % (self.channel, message['data'])) if __name__ == "__main__": MySlave().start() time.sleep(10000)
说明
MySlave类 - slave节点主程序,用来启动MyJobWorkerThread的线程
MyJobWorkerThread类 - 从channel里获取派发的作业并将运行结果发送回master
测试
首先运行MySlave来定义派发作业channel。
然后运行MyMaster派发作业并显示执行结果。
有关Python使用Redis实现作业调度系统(超简单),小编就给大家介绍这么多,希望对大家有所帮助!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。 Python以简洁和强大的生态系统着称,C 则以高性能和底层控制能力闻名。

Python在游戏和GUI开发中表现出色。1)游戏开发使用Pygame,提供绘图、音频等功能,适合创建2D游戏。2)GUI开发可选择Tkinter或PyQt,Tkinter简单易用,PyQt功能丰富,适合专业开发。

2小时内可以学会Python的基本编程概念和技能。1.学习变量和数据类型,2.掌握控制流(条件语句和循环),3.理解函数的定义和使用,4.通过简单示例和代码片段快速上手Python编程。

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

两小时内可以学到Python的基础知识。1.学习变量和数据类型,2.掌握控制结构如if语句和循环,3.了解函数的定义和使用。这些将帮助你开始编写简单的Python程序。

要在有限的时间内最大化学习Python的效率,可以使用Python的datetime、time和schedule模块。1.datetime模块用于记录和规划学习时间。2.time模块帮助设置学习和休息时间。3.schedule模块自动化安排每周学习任务。

Python在自动化、脚本编写和任务管理中表现出色。1)自动化:通过标准库如os、shutil实现文件备份。2)脚本编写:使用psutil库监控系统资源。3)任务管理:利用schedule库调度任务。Python的易用性和丰富库支持使其在这些领域中成为首选工具。

Python在web开发、数据科学、机器学习、自动化和脚本编写等领域有广泛应用。1)在web开发中,Django和Flask框架简化了开发过程。2)数据科学和机器学习领域,NumPy、Pandas、Scikit-learn和TensorFlow库提供了强大支持。3)自动化和脚本编写方面,Python适用于自动化测试和系统管理等任务。
