怎么使用Python实现tail

其他教程   发布日期:2023年07月21日   浏览次数:522

本篇内容介绍了“怎么使用Python实现tail”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1.第一版--从文件尾部读取实时数据

主要思路是: 打开文件, 把指针移动到文件最后, 然后有数据则输出数据, 无数据则休眠一段时间.

  1. import time
  2. import sys
  3. from typing import Callable, NoReturn
  4. class Tail(object):
  5. def __init__(
  6. self,
  7. file_name: str,
  8. output: Callable[[str], NoReturn] = sys.stdout.write,
  9. interval: int = 1
  10. ):
  11. self.file_name: str = file_name
  12. self.output: Callable[[str], NoReturn] = output
  13. self.interval: int = interval
  14. def __call__(self):
  15. with open(self.file_name) as f:
  16. f.seek(0, 2) # 从文件结尾处开始seek
  17. while True:
  18. line: str = f.readline()
  19. if line:
  20. self.output(line) # 使用print都会每次都打印新的一行
  21. else:
  22. time.sleep(self.interval)
  23. if __name__ == '__main__':
  24. filename: str = sys.argv[0]
  25. Tail(filename)()

之后只要做如下调用即可:

python xxx.py filename

2.第二版--实现tail -f

  1. tail -f
默认先读取最后10行数据,再从文件尾部读取实时数据.如果对于小文件,可以先读取所有文件内容,并输出最后10行, 但是读取全文再获取最后10行的性能不高, 而从后滚10行的边界条件也很复杂, 先看先读取全文再获取最后10行的实现:
  1. import time
  2. import sys
  3. from typing import Callable, NoReturn
  4. class Tail(object):
  5. def __init__(
  6. self,
  7. file_name: str,
  8. output: Callable[[str], NoReturn] = sys.stdout.write,
  9. interval: int = 1
  10. ):
  11. self.file_name: str = file_name
  12. self.output: Callable[[str], NoReturn] = output
  13. self.interval: int = interval
  14. def __call__(self):
  15. with open(self.file_name) as f:
  16. self.read_last_line(f)
  17. while True:
  18. line: str = f.readline()
  19. if line:
  20. self.output(line) # 使用print都会每次都打印新的一行
  21. else:
  22. time.sleep(self.interval)
  23. def read_last_line(self, f):
  24. last_lines = f.readlines()[-10:]
  25. for line in last_lines:
  26. self.output(line)
  27. if __name__ == '__main__':
  28. filename: str = sys.argv[0]
  29. Tail(filename)()

可以看到实现很简单, 相比第一版只多了个

  1. read_last_line的函数
, 接下来就要解决性能的问题了, 当文件很大的时候, 这个逻辑是不行的, 特别是有些日志文件经常有几个G大, 如果全读出来内存就爆了. 而在Linux系统中, 没有一个接口可以指定指针跳到倒数10行, 只能使用如下方法来模拟输出倒数10行:
  • 首先游标跳转到最新的字符, 保存当前游标, 然后预估一行数据的字符长度, 最好偏多, 这里我按1024字符长度为一行来处理

  • 然后利用seek的方法,跳转到seek(-1024 * 10, 2)的字符, 这就是我们预估的倒数10行内的内容

  • 接着对内容进行判断, 如果跳转的字符长度小于 10 * 1024, 则证明整个文件没有10行, 则采用原来的

    1. read_last_line
    方法.
  • 如果跳转到字符长度等于1024 * 10, 则利用换行符计算已取字符长度共有多少行,如果行数大于10,那只输出最后10行,如果只读了4行,则继续读6*1024,直到读满10行为止

通过以上步奏, 就把倒数10行的数据计算好了可以打印出来, 可以进入追加数据了, 但是这时候文件内容可能发生改变了, 我们的游标也发生改变了, 这时候要把游标跳回到刚才保存的游标,防止漏打或者重复打印数据.

分析完毕后, 就可以开始重构

  1. read_last_line
函数了.
  1. import time
  2. import sys
  3. from typing import Callable, List, NoReturn
  4. class Tail(object):
  5. def __init__(
  6. self,
  7. file_name: str,
  8. output: Callable[[str], NoReturn] = sys.stdout.write,
  9. interval: int = 1,
  10. len_line: int = 1024
  11. ):
  12. self.file_name: str = file_name
  13. self.output: Callable[[str], NoReturn] = output
  14. self.interval: int = interval
  15. self.len_line: int = len_line
  16. def __call__(self, n: int = 10):
  17. with open(self.file_name) as f:
  18. self.read_last_line(f, n)
  19. while True:
  20. line: str = f.readline()
  21. if line:
  22. self.output(line) # 使用print都会每次都打印新的一行
  23. else:
  24. time.sleep(self.interval)
  25. def read_last_line(self, file, n):
  26. read_len: int = self.len_line * n
  27. # 跳转游标到最后
  28. file.seek(0, 2)
  29. # 获取当前结尾的游标位置
  30. now_tell: int = file.tell()
  31. while True:
  32. if read_len > file.tell():
  33. # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
  34. file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标
  35. last_line_list: List[str] = file.read().split('
  36. ')[-n:]
  37. # 重新获取游标位置
  38. now_tell: int = file.tell()
  39. break
  40. # 跳转到我们预估的字符位置
  41. file.seek(-read_len, 2)
  42. read_str: str = file.read(read_len)
  43. cnt: int = read_str.count('
  44. ')
  45. if cnt >= n:
  46. # 如果获取的行数大于要求的行数,则获取前n行的行数
  47. last_line_list: List[str] = read_str.split('
  48. ')[-n:]
  49. break
  50. else:
  51. # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
  52. if cnt == 0:
  53. line_per: int = read_len
  54. else:
  55. line_per: int = int(read_len / cnt)
  56. read_len = line_per * n
  57. for line in last_line_list:
  58. self.output(line + '
  59. ')
  60. # 重置游标,确保接下来打印的数据不重复
  61. file.seek(now_tell)
  62. if __name__ == '__main__':
  63. import argparse
  64. parser = argparse.ArgumentParser()
  65. parser.add_argument("-f", "--filename")
  66. parser.add_argument("-n", "--num", default=10)
  67. args, unknown = parser.parse_known_args()
  68. if not args.filename:
  69. raise RuntimeError('filename args error')
  70. Tail(args.filename)(int(args.num))

3.第三版--优雅的读取输出日志文件

可以发现实时读取那块的逻辑性能还是很差, 如果每秒读一次文件,实时性就太慢了,把间隔改小了,则处理器占用太多. 性能最好的情况是如果能得知文件更新再进行打印文件, 那性能就能得到保障了.庆幸的是,在Linux中

  1. inotify
提供了这样的功能. 此外,日志文件有一个特点就是会进行logrotate,如果日志被logrotate了,那我们就需要重新打开文件,并进一步读取数据, 这种情况也可以利用到
  1. inotify
, 当
  1. inotify
获取到文件重新打开的事件时,我们就重新打开文件,再读取.
  1. import os
  2. import sys
  3. from typing import Callable, List, NoReturn
  4. import pyinotify
  5. multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF # 监控多个事件
  6. class InotifyEventHandler(pyinotify.ProcessEvent): # 定制化事件处理类,注意继承
  7. """
  8. 执行inotify event的封装
  9. """
  10. f: 'open()'
  11. filename: str
  12. path: str
  13. wm: 'pyinotify.WatchManager'
  14. output: Callable
  15. def my_init(self, **kargs):
  16. """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化"""
  17. # 获取文件
  18. filename: str = kargs.pop('filename')
  19. if not os.path.exists(filename):
  20. raise RuntimeError('Not Found filename')
  21. if '/' not in filename:
  22. filename = os.getcwd() + '/' + filename
  23. index = filename.rfind('/')
  24. if index == len(filename) - 1 or index == -1:
  25. raise RuntimeError('Not a legal path')
  26. self.f = None
  27. self.filename = filename
  28. self.output: Callable = kargs.pop('output')
  29. self.wm = kargs.pop('wm')
  30. # 只监控路径,这样就能知道文件是否移动
  31. self.path = filename[:index]
  32. self.wm.add_watch(self.path, multi_event)
  33. def read_line(self):
  34. """统一的输出方法"""
  35. for line in self.f.readlines():
  36. self.output(line)
  37. def process_IN_MODIFY(self, event):
  38. """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取"""
  39. if event.pathname == self.filename:
  40. self.read_line()
  41. def process_IN_MOVE_SELF(self, event):
  42. """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取"""
  43. if event.pathname == self.filename:
  44. # 检测到文件被移动重新打开文件
  45. self.f.close()
  46. self.f = open(self.filename)
  47. self.read_line()
  48. def __enter__(self) -> 'InotifyEventHandler':
  49. self.f = open(self.filename)
  50. return self
  51. def __exit__(self, exc_type, exc_val, exc_tb):
  52. self.f.close()
  53. class Tail(object):
  54. def __init__(
  55. self,
  56. file_name: str,
  57. output: Callable[[str], NoReturn] = sys.stdout.write,
  58. interval: int = 1,
  59. len_line: int = 1024
  60. ):
  61. self.file_name: str = file_name
  62. self.output: Callable[[str], NoReturn] = output
  63. self.interval: int = interval
  64. self.len_line: int = len_line
  65. wm = pyinotify.WatchManager() # 创建WatchManager对象
  66. inotify_event_handler = InotifyEventHandler(
  67. **dict(filename=file_name, wm=wm, output=output)
  68. ) # 实例化我们定制化后的事件处理类, 采用**dict传参数
  69. wm.add_watch('/tmp', multi_event) # 添加监控的目录,及事件
  70. self.notifier = pyinotify.Notifier(wm, inotify_event_handler) # 在notifier实例化时传入,notifier会自动执行
  71. self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler
  72. def __call__(self, n: int = 10):
  73. """通过inotify的with管理打开文件"""
  74. with self.inotify_event_handle as i:
  75. # 先读取指定的行数
  76. self.read_last_line(i.f, n)
  77. # 启用inotify的监听
  78. self.notifier.loop()
  79. def read_last_line(self, file, n):
  80. read_len: int = self.len_line * n
  81. # 获取当前结尾的游标位置
  82. file.seek(0, 2)
  83. now_tell: int = file.tell()
  84. while True:
  85. if read_len > file.tell():
  86. # 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
  87. file.seek(0)
  88. last_line_list: List[str] = file.read().split('
  89. ')[-n:]
  90. # 重新获取游标位置
  91. now_tell: int = file.tell()
  92. break
  93. file.seek(-read_len, 2)
  94. read_str: str = file.read(read_len)
  95. cnt: int = read_str.count('
  96. ')
  97. if cnt >= n:
  98. # 如果获取的行数大于要求的行数,则获取前n行的行数
  99. last_line_list: List[str] = read_str.split('
  100. ')[-n:]
  101. break
  102. else:
  103. # 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
  104. if cnt == 0:
  105. line_per: int = read_len
  106. else:
  107. line_per: int = int(read_len / cnt)
  108. read_len = line_per * n
  109. for line in last_line_list:
  110. self.output(line + '
  111. ')
  112. # 重置游标,确保接下来打印的数据不重复
  113. file.seek(now_tell)
  114. if __name__ == '__main__':
  115. import argparse
  116. parser = argparse.ArgumentParser()
  117. parser.add_argument("-f", "--filename")
  118. parser.add_argument("-n", "--num", default=10)
  119. args, unknown = parser.parse_known_args()
  120. if not args.filename:
  121. raise RuntimeError('filename args error')
  122. Tail(args.filename)(int(args.num))

可以看到, 从原本的open打开文件改为用inotify打开文件(这时候会调用my_init方法进行初始化), 打开后还是运行我们打开原来n行的代码, 然后就交给inotify运行. 在inotify运行之前, 我们把重新打开文件方法和打印文件方法都挂载在inotifiy对应的事件里, 之后inotify运行时, 会根据对应的事件执行对应的方法。

以上就是怎么使用Python实现tail的详细内容,更多关于怎么使用Python实现tail的资料请关注九品源码其它相关文章!