题目如下:
spam系统,假设我们可以获得线上的实时请求(按时间顺序)
每个请求包含如下信息:
时间(unix时间戳)
用户名
动作(提问、回答、评论)
内容
依次考虑如何解决以下问题:
1.当发现动作频率较高的用户时,输出报警(1分钟内超过10次评论、回答、提问)
2.当发现一个用户连续发重复的内容时,输出报警(连续发了10个相同的评论、回答、提问)
3.使用你觉得最优美的方法将上面的策略与程序分离开。使得上面的策略可以随时更新而不用修改程序。
要求:
服务监听一个端口,使用测试程序模拟用户行为不断向服务发送请求,
请求格式为json,如:
{"time":1323333"user":"xxx","action":"answer","content":"abcde"}
服务输出报警格式如下
xxx,"频繁提问"
一下是我的代码:
client.py
import socket
import json
import time
import gevent
import random
def generateContents():
"""
随机生成内容
"""
chars = [chr(c) for c in range(ord('a'), ord('z')+1)]
result = [''.join(random.sample(chars, 5)) for i in range(3)]
return result
class Worker():
def __init__(self, name, socket):
self.name = name
self.socket = socket
self.actions = ['question', 'answer', 'comment']
self.contents = generateContents()
def __call__(self):
while 1:
data = self._generateData()
data = json.dumps(data)
self._request(data)
def _request(self, data):
s = self.socket.socket()
socket = self.socket.gethostname()
port = 1234
s.connect((socket, port))
s.recv(1024)
s.send(data)
s.close()
gevent.sleep(random.randint(1, 3) * 0.5)
def _generateData(self):
data = {}
data['time'] = time.time()
data['user'] = self.name
data['action'] = random.choice(self.actions)
data['content'] = random.choice(self.contents)
return data
if __name__ == '__main__':
threads = [gevent.spawn(Worker(chr(i), socket))
for i in range(ord('a'), ord('f'))]
gevent.joinall(threads)
server.py
import socket
import json
from functools import wraps
from MyExamine import MyExamine
def run():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 1234
s.bind((host, port))
s.listen(10)
while 1:
c, addr = s.accept()
c.send('This is a simple server'.encode('utf-8'))
rec = c.recv(1024)
do_something(rec)
c.close()
def format2Json(func):
def f(*args):
data = json.loads(args[0])
func(data)
return f
@format2Json
@MyExamine
def do_something(data):
"""
模拟后台处理用户发来的请求
"""
print data
if __name__ == '__main__':
run()
MyExamine.py
from baseExamine import BaseExamine
from collections import deque
class MyExamine(BaseExamine):
def __init__(self, func):
self.rateCache = {}
self.timesCache = {}
self.func = func
def examineRate(self, data):
userQueue = self.rateCache.setdefault(data['user'], deque())
while userQueue:
item = userQueue.pop()
if data['time'] - item['time'] < 60:
userQueue.append(item)
break
userQueue.appendleft(data)
if len(userQueue) > 10:
self.showWarning('User: %s 频繁操作'.decode('utf8') % data['user'])
def examineContentTimes(self, data):
userActions = self.timesCache.setdefault(data['user'], {})
contentTimes = self.timesCache.setdefault(data['action'], {})
key = data['content']
contentTimes[key] = contentTimes.get(key, 0) + 1
if contentTimes[key] > 10:
self.showWarning('User: %s %s %s 超过10次'.decode('utf8') %
(data['user'], data['action'], key))
def showWarning(self, msg):
print '\033[;31m' + msg + '\033[0m'
def __call__(self, *args):
data = args[0]
self.examineRate(data)
self.examineContentTimes(data)
self.func(*args)
baseExamine.py
class BaseExamine():
"""
模板方法的基类
"""
def __init__(self, func):
pass
def __call__(self, *args):
pass
求大神指点,谢谢!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号
这个没有最佳解法吧,看个人理解……
下面是我的理解:
1. 不同的策略分为不同的类,提供一个统一的接口,比如
#strategy.py class UserRate(object): def __init__(self, comment_per_user_min=10): #init def check(self, userdata): #检查用户数据,超过限制即报警 class UserDupContent(object): def __init__(self, content_send_per_user=10): #init def check(self, userdata): #检查用户数据2. 使用依赖注入将策略注入到检查程序:
class Guarder(object): def addStrategy(self, strategy): #添加一个策略 def check(self, userdata): #使用已经添加的策略逐个检查 #返回检查结果 def reload_from(self, conf): #解析配置并创建相应对象 self.addStrategy(strategy) @classmethod def create(cls, conf=''): obj = cls() if conf: obj.reload_from(conf)3. 调用Guarder实例检查
guarder=Guarder.create('antispam.ini') def index(): if guarder.check(userdata): pass else: #error def admin_reload_guarder(): '''根据web请求运行时重载配置''' import strategy reload(strategy) guarder.reload(conf)示例配置文件:
以上能够完成的功能如下:
1. 隔离了策略代码,使用依赖注入的方式完成
2. 策略本身是duck typing,灵活扩充
3. 策略代码文件strategy.py不停止服务器热部署
当然,配置文件可以调整格式,直接用python代码写都可以
关于“最优美的方法将上面的策略与程序分开”:引入轻量级的信号/事件机制,降低耦合度。更进一步,使用诸如 rabbitmq 这样的消息队列,策略和程序可以分开布署。
关于这个问题实现上的优化:使用类似 redis 这样的服务,写入自动过期的数据,只要检查相应用户的数据数量是不是超过阈值即可。这样系统在多台设备上负载均衡也方便。