From 60d9a2bf8209c24ed1c70c6fea2e638d798ba299 Mon Sep 17 00:00:00 2001 From: "Haoyu (Daniel)" Date: Fri, 13 Sep 2024 15:50:04 +0800 Subject: [PATCH] Fix line ending handling in reverse readline --- src/monty/io.py | 31 +++++++++++-------------------- tests/test_io.py | 17 +++++++++++------ 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/monty/io.py b/src/monty/io.py index bd61f4dc..2c482321 100644 --- a/src/monty/io.py +++ b/src/monty/io.py @@ -11,12 +11,11 @@ import io import mmap import os -import platform import subprocess import time import warnings from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast try: import lzma @@ -169,7 +168,6 @@ def reverse_readline( Cases where file would be read forwards and reversed in RAM: - If file size is smaller than RAM usage limit (max_mem). - - In Windows. TODO: explain reason. - For Gzip files, as reverse seeks are not supported. # TODO: now supported Files larger than max_mem are read one block each time. @@ -193,7 +191,7 @@ def reverse_readline( """ # Generate line ending l_end: Literal["\r\n", "\n"] = _get_line_ending(m_file) - len_l_end: Literal[1, 2] = len(l_end) + len_l_end: Literal[1, 2] = cast(Literal[1, 2], len(l_end)) # Check if the file stream is a buffered text stream (text instead of binary) is_text: bool = isinstance(m_file, io.TextIOWrapper) @@ -207,15 +205,7 @@ def reverse_readline( # If the file size is within desired RAM limit, just reverse it in memory. # Gzip files must use this method because there is no way to negative seek. - # For windows, we also read the whole file. - if ( - platform.system() == "Windows" # TODO: platform is not important, len_l_end is - or ( - len_l_end != 1 - ) # TODO: the following code wouldn't work for "\r\n" as its len is 2 - or file_size < max_mem - or isinstance(m_file, gzip.GzipFile) - ): + if file_size < max_mem or isinstance(m_file, gzip.GzipFile): for line in reversed(m_file.readlines()): yield (line if isinstance(line, str) else line.decode("utf-8")) @@ -230,20 +220,21 @@ def reverse_readline( buffer: str = "" m_file.seek(0, 2) + eof_pos = m_file.tell() # need end of file to skip first empty line while True: l_end_pos: int = buffer.rfind(l_end) - pt_pos: int = ( - m_file.tell() - ) # pointer position (also size of remaining file to read) + # Pointer position (also size of remaining file to read) + pt_pos: int = m_file.tell() # Line ending found within buffer if l_end_pos != -1: line = buffer[l_end_pos + len_l_end :] buffer = buffer[:l_end_pos] # buffer doesn't include l_end - if pt_pos != 0 or l_end_pos != 0: # TODO: why is this condition needed? - line += l_end - yield line + + # Skip first match (the last line ending) + if l_end_pos != eof_pos: + yield line + l_end # Line ending not in current buffer, load next block into the buffer elif pt_pos > 0: @@ -254,7 +245,7 @@ def reverse_readline( else: buffer += m_file.read(to_read).decode("utf-8") - # Move pointer forward # TODO: why pointer is moved forward again? + # Move pointer forward m_file.seek(pt_pos - to_read) # Add a l_end to the start of file diff --git a/tests/test_io.py b/tests/test_io.py index 7f2b9b5e..d13cf32d 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -143,11 +143,11 @@ def test_file_with_empty_lines(self, l_end): with ScratchDir("."): # Test text file - with open(filename, "w", newline="", encoding="utf-8") as file: + with open(filename, "wb") as file: for line in contents: - file.write(line) + file.write(line.encode()) - with zopen(filename, mode="r") as file: + with open(filename, mode="r", newline="") as file: revert_contents = tuple(reverse_readline(file)) assert revert_contents[::-1] == contents @@ -175,14 +175,19 @@ def test_line_ending(self, l_end): file_name = "test_file.txt" with ScratchDir("."): - with open(file_name, "w", newline="", encoding="utf-8") as file: + with open(file_name, "wb") as file: for line in contents: - file.write(line) + file.write(line.encode()) # Test text mode with open(file_name, "r", encoding="utf-8") as file: for idx, line in enumerate(reverse_readline(file)): - assert line == contents[len(contents) - idx - 1] + # Open text in "r" mode would trigger OS + # line ending handing + assert ( + line.rstrip(os.linesep) + l_end + == contents[len(contents) - idx - 1] + ) assert isinstance(line, str) # # TODO: Test binary mode