From 342f2395d02452ce1ddc06be43e280be174c1d71 Mon Sep 17 00:00:00 2001 From: Udi Meiri Date: Tue, 26 Nov 2019 12:51:03 -0800 Subject: [PATCH] [BEAM-8399] Add --hdfs_full_urls option (wip) --- .../python/apache_beam/io/hadoopfilesystem.py | 79 +++++++++++------ .../apache_beam/io/hadoopfilesystem_test.py | 84 ++++++++++++++++--- .../apache_beam/options/pipeline_options.py | 7 ++ 3 files changed, 133 insertions(+), 37 deletions(-) diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py index 0abdbaf8e24df..54c5344dfe0b6 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem.py @@ -41,6 +41,7 @@ _HDFS_PREFIX = 'hdfs:/' _URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)') +_FULL_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'/([^/]*)(/.*)') _COPY_BUFFER_SIZE = 2 ** 16 _DEFAULT_BUFFER_SIZE = 20 * 1024 * 1024 @@ -115,10 +116,12 @@ def __init__(self, pipeline_options): hdfs_host = hdfs_options.hdfs_host hdfs_port = hdfs_options.hdfs_port hdfs_user = hdfs_options.hdfs_user + self.full_urls = hdfs_options.hdfs_full_urls else: hdfs_host = pipeline_options.get('hdfs_host') hdfs_port = pipeline_options.get('hdfs_port') hdfs_user = pipeline_options.get('hdfs_user') + self.full_urls = pipeline_options.get('hdfs_full_urls', False) if hdfs_host is None: raise ValueError('hdfs_host is not set') @@ -126,6 +129,8 @@ def __init__(self, pipeline_options): raise ValueError('hdfs_port is not set') if hdfs_user is None: raise ValueError('hdfs_user is not set') + if not isinstance(self.full_urls, bool): + raise ValueError('hdfs_full_urls should be bool, got: %s', self.full_urls) self._hdfs_client = hdfs.InsecureClient( 'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user) @@ -133,24 +138,37 @@ def __init__(self, pipeline_options): def scheme(cls): return 'hdfs' - @staticmethod - def _parse_url(url): + def _parse_url(self, url, full_urls=None): """Verifies that url begins with hdfs:// prefix, strips it and adds a leading /. - Raises: - ValueError if url doesn't begin with hdfs://. + Parsing behavior is determined by HadoopFileSystemOptions.hdfs_url_style. Args: - url: A URL in the form hdfs://path/... + url: (str) A URL in the form hdfs://path/... + or in the form hdfs://server/path/... + full_urls: (bool) For testing purposes only. Overrides full_urls setting. + + Raises: + ValueError if the URL doesn't match the expect format. Returns: - For an input of 'hdfs://path/...', will return '/path/...'. + (str, str) If using full_urls For an input of 'hdfs://[server/]path/...', will return + (server, '/path/...') """ - m = _URL_RE.match(url) - if m is None: - raise ValueError('Could not parse url: %s' % url) - return m.group(1) + if full_urls is None: + full_urls = self.full_url + + if not full_urls: + m = _URL_RE.match(url) + if m is None: + raise ValueError('Could not parse url: %s' % url) + return None, m.group(1) + else: + m = _FULL_URL_RE.match(url) + if m is None: + raise ValueError('Could not parse url: %s' % url) + return m.groups() def join(self, base_url, *paths): """Join two or more pathname components. @@ -163,19 +181,25 @@ def join(self, base_url, *paths): Returns: Full url after combining all the passed components. """ - basepath = self._parse_url(base_url) - return _HDFS_PREFIX + self._join(basepath, *paths) + server, basepath = self._parse_url(base_url) + # TODO full_urls check and test + return _HDFS_PREFIX + self._join(server, basepath, *paths) def _join(self, basepath, *paths): return posixpath.join(basepath, *paths) def split(self, url): - rel_path = self._parse_url(url) + server, rel_path = self._parse_url(url) + if server is None: + server = '' + else: + server = '/' + server head, tail = posixpath.split(rel_path) - return _HDFS_PREFIX + head, tail + # TODO full_urls check and test + return _HDFS_PREFIX + server + head, tail def mkdirs(self, url): - path = self._parse_url(url) + _, path = self._parse_url(url) if self._exists(path): raise BeamIOError('Path already exists: %s' % path) return self._mkdirs(path) @@ -188,9 +212,10 @@ def has_dirs(self): def _list(self, url): try: - path = self._parse_url(url) + server, path = self._parse_url(url) for res in self._hdfs_client.list(path, status=True): - yield FileMetadata(_HDFS_PREFIX + self._join(path, res[0]), + # TODO full_urls check and test + yield FileMetadata(_HDFS_PREFIX + self._join(server, path, res[0]), res[1][_FILE_STATUS_LENGTH]) except Exception as e: # pylint: disable=broad-except raise BeamIOError('List operation failed', {url: e}) @@ -213,7 +238,7 @@ def create(self, url, mime_type='application/octet-stream', Returns: A Python File-like object. """ - path = self._parse_url(url) + _, path = self._parse_url(url) return self._create(path, mime_type, compression_type) def _create(self, path, mime_type='application/octet-stream', @@ -230,7 +255,7 @@ def open(self, url, mime_type='application/octet-stream', Returns: A Python File-like object. """ - path = self._parse_url(url) + _, path = self._parse_url(url) return self._open(path, mime_type, compression_type) def _open(self, path, mime_type='application/octet-stream', @@ -289,8 +314,8 @@ def _copy_path(source, destination): exceptions = {} for source, destination in zip(source_file_names, destination_file_names): try: - rel_source = self._parse_url(source) - rel_destination = self._parse_url(destination) + _, rel_source = self._parse_url(source) + _, rel_destination = self._parse_url(destination) _copy_path(rel_source, rel_destination) except Exception as e: # pylint: disable=broad-except exceptions[(source, destination)] = e @@ -302,8 +327,8 @@ def rename(self, source_file_names, destination_file_names): exceptions = {} for source, destination in zip(source_file_names, destination_file_names): try: - rel_source = self._parse_url(source) - rel_destination = self._parse_url(destination) + _, rel_source = self._parse_url(source) + _, rel_destination = self._parse_url(destination) try: self._hdfs_client.rename(rel_source, rel_destination) except hdfs.HdfsError as e: @@ -324,7 +349,7 @@ def exists(self, url): Returns: True if url exists as a file or directory in HDFS. """ - path = self._parse_url(url) + _, path = self._parse_url(url) return self._exists(path) def _exists(self, path): @@ -336,7 +361,7 @@ def _exists(self, path): return self._hdfs_client.status(path, strict=False) is not None def size(self, url): - path = self._parse_url(url) + _, path = self._parse_url(url) status = self._hdfs_client.status(path, strict=False) if status is None: raise BeamIOError('File not found: %s' % url) @@ -351,7 +376,7 @@ def checksum(self, url): Returns: String describing the checksum. """ - path = self._parse_url(url) + _, path = self._parse_url(url) file_checksum = self._hdfs_client.checksum(path) return '%s-%d-%s' % ( file_checksum[_FILE_CHECKSUM_ALGORITHM], @@ -363,7 +388,7 @@ def delete(self, urls): exceptions = {} for url in urls: try: - path = self._parse_url(url) + _, path = self._parse_url(url) self._hdfs_client.delete(path, recursive=True) except Exception as e: # pylint: disable=broad-except exceptions[url] = e diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index 42d1e2df5cd3f..003ee5c74fdd6 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -211,16 +211,18 @@ def setUpClass(cls): cls.assertCountEqual = cls.assertItemsEqual def setUp(self): + # TODO: isn't this racey? self._fake_hdfs = FakeHdfs() hdfs.hdfs.InsecureClient = ( lambda *args, **kwargs: self._fake_hdfs) pipeline_options = PipelineOptions() hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions) - hdfs_options.hdfs_host = '' - hdfs_options.hdfs_port = 0 + hdfs_options.hdfs_host = 'host' + hdfs_options.hdfs_port = 123 hdfs_options.hdfs_user = '' self.fs = hdfs.HadoopFileSystem(pipeline_options) + # TODO full_urls check and test self.tmpdir = 'hdfs://test_dir' for filename in ['old_file1', 'old_file2']: @@ -231,6 +233,55 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'hdfs') self.assertEqual(hdfs.HadoopFileSystem.scheme(), 'hdfs') + def test_parse_url(self): + cases = [ + ('hdfs://', '/', auto), + ('hdfs://one_element', '/one_element', auto), + ('hdfs://path/to/file', '/path/to/file', auto), + ('hdfs://host:123/path/to/file', '/path/to/file', auto), + ('hdfs://host:123/', '/', auto), + ('hdfs://host:123', '/host:123', auto), + + # Using 'any_host' instead of host:123 since host:port values are not + # supposed to match those in HadoopFileSystemOptions. + ('hdfs://path/to/file', '/to/file', host), + ('hdfs://any_host/path/to/file', '/path/to/file', host), + ('hdfs://any_host/', '/', host), + + ('hdfs://', '/', no_host), + ('hdfs://path/to/file', '/path/to/file', no_host), + ('hdfs://any_host/path/to/file', '/any_host/path/to/file', no_host), + ('hdfs://any_host/', '/any_host/', no_host), + ('hdfs://any_host', '/any_host', no_host), + ] + for url, expected, url_style in cases: + self.assertEqual(expected, self.fs._parse_url(url, full_urls=url_style), + msg=(url, expected, url_style)) + + def test_parse_url_fail(self): + auto = HadoopFileSystemOptions.URL_STYLE_AUTO + host = HadoopFileSystemOptions.URL_STYLE_HOST + no_host = HadoopFileSystemOptions.URL_STYLE_NO_HOST + cases = [ + ('hdfs:/missing_slash', auto), + ('invalid', auto), + ('hdfs://bad_host:123/', auto), + ('hdfs://host:1337/', auto), + + ('hdfs:/missing_slash', host), + ('invalid', host), + ('hdfs://bad_host:123/', host), + ('hdfs://host:1337/', host), + ('hdfs://', host), + ('hdfs://host_no_slash_at_end', host), + + ('hdfs:/missing_slash', no_host), + ('invalid', no_host), + ] + for url, url_style in cases: + with self.assertRaises(ValueError, msg=(url, url_style)): + self.fs._parse_url(url, full_urls=url_style) + def test_url_join(self): self.assertEqual('hdfs://tmp/path/to/file', self.fs.join('hdfs://tmp/path', 'to', 'file')) @@ -298,6 +349,7 @@ def test_match_file_error(self): r'^Match operation failed .* %s' % bad_url): result = self.fs.match([bad_url, url])[0] files = [f.path for f in result.metadata_list] + # TODO full_urls check and test self.assertEqual(files, [self.fs._parse_url(url)]) def test_match_directory(self): @@ -323,7 +375,7 @@ def test_create_success(self): url = self.fs.join(self.tmpdir, 'new_file') handle = self.fs.create(url) self.assertIsNotNone(handle) - url = self.fs._parse_url(url) + _, url = self.fs._parse_url(url) expected_file = FakeFile(url, 'wb') self.assertEqual(self._fake_hdfs.files[url], expected_file) @@ -332,7 +384,7 @@ def test_create_write_read_compressed(self): handle = self.fs.create(url) self.assertIsNotNone(handle) - path = self.fs._parse_url(url) + _, path = self.fs._parse_url(url) expected_file = FakeFile(path, 'wb') self.assertEqual(self._fake_hdfs.files[path], expected_file) data = b'abc' * 10 @@ -527,7 +579,7 @@ def test_delete_error(self): url2 = self.fs.join(self.tmpdir, 'old_file1') self.assertTrue(self.fs.exists(url2)) - path1 = self.fs._parse_url(url1) + _, path1 = self.fs._parse_url(url1) with self.assertRaisesRegex(BeamIOError, r'^Delete operation failed .* %s' % path1): self.fs.delete([url1, url2]) @@ -538,10 +590,12 @@ class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase): """Tests pipeline_options, in the form of a RuntimeValueProvider.runtime_options object.""" - def test_dict_options(self): + def setUp(self): self._fake_hdfs = FakeHdfs() hdfs.hdfs.InsecureClient = ( lambda *args, **kwargs: self._fake_hdfs) + + def test_dict_options(self): pipeline_options = { 'hdfs_host': '', 'hdfs_port': 0, @@ -549,12 +603,9 @@ def test_dict_options(self): } self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options) + self.assertFalse(self.fs.full_url) def test_dict_options_missing(self): - self._fake_hdfs = FakeHdfs() - hdfs.hdfs.InsecureClient = ( - lambda *args, **kwargs: self._fake_hdfs) - with self.assertRaisesRegex(ValueError, r'hdfs_host'): self.fs = hdfs.HadoopFileSystem( pipeline_options={ @@ -579,6 +630,19 @@ def test_dict_options_missing(self): } ) + def test_dict_options_full_urls(self): + pipeline_options = { + 'hdfs_host': '', + 'hdfs_port': 0, + 'hdfs_user': '', + 'hdfs_full_urls': 'invalid', + } + + with + self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options) + self.assertFalse(self.fs.full_urls) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 8ab71374d76bd..ac8e707ae5525 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -580,6 +580,13 @@ def _add_argparse_args(cls, parser): default=None, help= ('HDFS username to use.')) + parser.add_argument( + '--hdfs_full_urls', + default=False, + help= + ('If set, URLs will be parsed as "hdfs://server/path/...", instead of' + '"hdfs://path/...". The "server" part will be unused (use --hdfs_host' + 'and --hdfs_port).')) def validate(self, validator): errors = []