百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Python中日志异步发送到远程服务器

off999 2024-11-23 20:49 29 浏览 0 评论

背景

在Python中使用日志最常用的方式就是在控制台和文件中输出日志了,logging模块也很好的提供的相应 地类,使用起来也非常方便,但是有时我们可能会有一些需求,如还需要将日志发送到远端,或者直接写入数 据悉,这种需求该如何实现呢?

StreamHandler和FileHandler

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 设置日志格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 将cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天气不错")

先初始化一个logger, 并且设置它的日志级别是DEBUG,然后添初始化了 cmd_handler和 file_handler,最后将它们添加到logger中, 运行脚本,会在cmd中打印出来

[2020-09-23 10:45:56] [DEBUG] 今天天气不错

添加HTTPHandler

# 添加一个httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")

结果在服务端我们收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
'funcName': [b '<module>'],
'created': [b '1600831054.8881223'],
'msecs': [b '888.1223201751709'],
'relativeCreated': [b '22.99976348876953'],
'thread': [b '14876'],
'threadName': [b 'MainThread'],
'processName': [b 'MainProcess'],
'process': [b '8648'],
'message': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'asctime': [b '2020-09-23 11:17:34']
}

可以说是信息非常之多,但是却并不是我们想要的样子,我们只是想要类似于

[2020-09-23 10:45:56][DEBUG] 今天天气不错

logging.handlers.HTTPHandler 只是简单地将日志所有信息发送给服务端,至于服务端要怎么组织内 容是由服务端来完成. 所以我们可以有两种方法,一种是改服务端代码,根据传过来的日志信息重新组织一 下日志内容, 第二种是我们重新写一个类,让它在发送的时候将重新格式化日志内容发送到服务端。

我们采用第二种方法,因为这种方法比较灵活, 服务端只是用于记录,发送什么内容应该是由客户端来决定。

我们需要重新定义一个类,我们可以参考 logging.handlers.HTTPHandler 这个类,重新写一个httpHandler类

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    '''
   重写emit方法,这里主要是为了把初始化时的baseParam添加进来
   :param record:
   :return:
   '''
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={'log': msg}, headers=headers,
timeout=1)

这行代码表示,将会根据日志对象设置的格式返回对应的内容。

{'log': [b'[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99']}

将bytes类型转一下就得到了

[2020-09-23 11:43:50] [DEBUG] 今天天气不错

异步的发送远程日志

async def post(self):
  print(self.getParam('log'))
  await asyncio.sleep(5)
  self.write({"msg": 'ok'})

此时我们再打印上面的日志

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

得到的输出为

[2020-09-23 11:47:33] [DEBUG] 今天天气不错
[2020-09-23 11:47:38] [DEBUG] 是风和日丽的

那么现在问题来了,原本只是一个记录日志,现在却成了拖累整个脚本的累赘,所以我们需要异步的来 处理完远程写日志。

1

使用多线程处理

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{'log': msg},

2

使用线程池处理

python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor类,是线程池和进程池, 就是在初始化的时候先定义几个线程,之后让这些线程来处理相应的函数,这样不用每次都需要重新创建线程

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程
exector.submit(fn, args, kwargs) # 将函数submit到线程池中

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={'log': msg},
headers=headers, timeout=6)

3

使用异步aiohttp库来发送请求

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={'log': msg}) as resp:
          print(await resp.text())

这时代码执行崩溃了

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
'CustomHandler.emit' was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

究其原因是由于emit方法中使用 async with session.post 函数,它需要在一个使用async 修饰的函数 因为执行,所以修改emit函数,使用async来修饰,这里emit函数变成了异步的函数, 返回的是一个 coroutine 对象,要想执行coroutine对象,需要使用await, 但是脚本里却没有在那里调用 await emit() ,所以崩溃信息 中显示 coroutine ‘CustomHandler.emit’ was never awaited。

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

执行依然报错

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方。

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("执行结束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上, 然后调用该循环的方法方法 run_forever() 函数,从而使该循环上的协程对象得以正常的执行。

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了如果不执行 loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封装发送数据函数
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一个httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
logger.debug("是风和日丽的")
loop.run_forever()

loop.create_task(self.submit(msg)) 也可以使用

asyncio.ensure_future(self.submit(msg), loop=loop) 来代替,目的都是将协程对象注册到事件循环中。

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用 loop.stop() 方法. 可以注册到某个task的回调中。

相关推荐

笔记本电脑选哪个品牌比较好

1、苹果APPLE/美国2、戴尔DELL/美国3、华为HUAWEI/中国4、小米MI/中国5、微软Microsoft/美国6、联想LENOVO/中国7、惠普HP/美国8、华硕ASUS/...

10系列显卡排名(10系显卡性能排行)

十系显卡指NVIDIAGeForce10系列,是英伟达研发并推出的图形处理器系列,被用以取代NVIDIAGeForce900系列图形处理器。新系列采用帕斯卡微架构来代替之前的麦克斯韦微架构,并...

最新win7系统下载(windows7最新版本下载)
最新win7系统下载(windows7最新版本下载)

最简单的方法就是,下载完镜像文件后,直接把镜像文件解压,解压到非C盘,然后在解压文件里面找到setup.exe,点击运行即可。安装系统完成后,在C盘找到一个Windows.old(好几个GB,是旧系统打包在这里,垃圾文件了)删除即可。扩展资...

2026-01-15 06:43 off999

哪个电脑管家软件好用(哪个电脑管家好用些)

腾讯电脑管家吧,因为这个是杀毒和管理合一的,占用内存小,因此显得更为简洁,使电脑运行更加流畅此外电脑诊所,工具箱以及4+1的杀毒模式让腾讯电脑管家也收到了广泛的关注4+1杀毒引擎,管家反病毒引擎、金山...

怎么进入win7安全模式(怎么进入win7安全模式界面)

方法如下:1、首先进入Win7系统,然后使用Win键+R组合键打开运行框,输入“Msconfig”回车进入系统配置。2、在打开的系统配置中,找到“引导”选项,然后单击,选择Win7的引导项,然后在“安...

怎么分区固态硬盘(怎样分区固态硬盘)

固态硬盘的分区方法与传统机械硬盘基本相同,以下是一个简单的步骤:1.打开磁盘管理工具:在Windows操作系统中,按下Win+X键,选择"磁盘管理"。或者打开控制面板,在"...

笔记本声卡驱动怎么下载(笔记本如何下载声卡)
笔记本声卡驱动怎么下载(笔记本如何下载声卡)

1、在浏览器中输入并搜索,然后下载并安装。2、安装完成后打开360驱动大师,它就会自动检测你的电脑需要安装或升级的驱动。3、检测完毕后,我们可以看到我们的声卡驱动需要安装或升级,点击安装或升级,就会开始自动安装或升级声卡了。4、升级过程中会...

2026-01-15 05:43 off999

win10加快开机启动速度(加快开机速度 win10)

一、启用快速启动功能1.按win+r键调出“运行”在输入框输入“gpedit.msc”按回车调出“组策略编辑器”?2.在“本地组策略编辑器”依次打开“计算机配置——管理模块——系统——关机”在右侧...

excel的快捷键一览表(excel的快捷键一览表超全)
excel的快捷键一览表(excel的快捷键一览表超全)

Excel快捷键大全的一些操作如下我在工作中经常使用诸如word或Excel之类的办公软件。我相信每个人都不太熟悉这些办公软件的快捷键。使用快捷键将提高办公效率,并使您的工作更加轻松快捷。。例如,在复制时,请使用CtrI+C进行复制,...

2026-01-15 05:03 off999

华硕u盘启动按f几(华硕u盘装系统按f几进入)

F8。1、开机的同时按F8进入BIOS。2、在Boot菜单中,置secure为disabled。3、BootListOption置为UEFI。4、在1stBootPriority中usb—HD...

bootmgr(bootmgrismissing开机不了怎么办)
  • bootmgr(bootmgrismissing开机不了怎么办)
  • bootmgr(bootmgrismissing开机不了怎么办)
  • bootmgr(bootmgrismissing开机不了怎么办)
  • bootmgr(bootmgrismissing开机不了怎么办)
手机云电脑怎么用(手机云端电脑)

使用手机云电脑,您首先需要安装相应的云电脑应用。例如,华为云电脑APP。在安装并打开应用后,您将看到一个显示器的图标,这就是您的云电脑。点击这个图标,您将被连接到一个预装有Windows操作系统和必要...

ie11浏览器怎么安装(ie11浏览器安装步骤)

如果IE浏览器11版本你发现无法正常安装,那么很可能是这样几个原因,一个就是电脑的存储空间不够到时无法安装,再有就是网络的问题,如果没有办法安装的话就不要再安装了,本身这个IE浏览器并不是多好用,你最...

台式机重装系统win7(台式机怎么重装win7)

下面主要介绍两种方法以重装系统:一、U盘重装系统准备:一台正常开机的电脑和一个U盘1、百度下载“U大师”(老毛桃、大白菜也可以),把这个软件下载并安装在电脑上。2、插上U盘,选择一键制作U盘启动(制作...

字母下划线怎么打出来(字母下的下划线怎么去不掉)

第一步,在电脑上找到文字处理软件WPS,双击即自动新建一个新文档。第二步,在文档录入需要处理的字母和数字,双击鼠标或拖动鼠标选择要处理的内容。第三步,在页面的左上方的横向菜单栏,找到字母U的按纽,点击...

取消回复欢迎 发表评论: