Skip to content

Commit 00ab4be

Browse files
committed
Use TF filesystem API to read HDFS
1 parent 517a99a commit 00ab4be

File tree

3 files changed

+201
-22
lines changed

3 files changed

+201
-22
lines changed

WORKSPACE

+3-2
Original file line numberDiff line numberDiff line change
@@ -587,8 +587,9 @@ http_archive(
587587
http_archive(
588588
name = "liborc",
589589
build_file = "//third_party:liborc.BUILD",
590-
patch_cmds = [
591-
"tar -xzf c++/libs/libhdfspp/libhdfspp.tar.gz -C c++/libs/libhdfspp",
590+
patch_args = ["-p1"],
591+
patches = [
592+
"//third_party:liborc.patch",
592593
],
593594
sha256 = "39d983f4c7feb8ea1e8ab8e3e53e9afc643282b7a500b3a93c91aa6490f65c17",
594595
strip_prefix = "orc-rel-release-1.6.14",

third_party/liborc.BUILD

+3-20
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ cc_library(
3939
],
4040
copts = [],
4141
defines = [],
42+
local_defines = ["BUILD_LIBHDFSPP"],
4243
includes = [
4344
"c++/include",
4445
"c++/src",
@@ -49,34 +50,16 @@ cc_library(
4950
linkopts = [],
5051
visibility = ["//visibility:public"],
5152
deps = [
52-
":libhdfspp",
5353
":orc_cc_proto",
54+
"@local_config_tf//:libtensorflow_framework",
55+
"@local_config_tf//:tf_header_lib",
5456
"@lz4",
5557
"@snappy",
5658
"@zlib",
5759
"@zstd",
5860
],
5961
)
6062

61-
cc_library(
62-
name = "libhdfspp",
63-
srcs = glob(
64-
[
65-
"c++/libs/libhdfspp/include/hdfspp/*.h",
66-
],
67-
exclude = [
68-
],
69-
),
70-
hdrs = [
71-
],
72-
copts = [],
73-
defines = [],
74-
includes = [
75-
"c++/libs/libhdfspp/include",
76-
],
77-
deps = [],
78-
)
79-
8063
proto_library(
8164
name = "orc_proto",
8265
srcs = ["proto/orc_proto.proto"],

third_party/liborc.patch

+195
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
--- a/c++/src/OrcHdfsFile.cc 2022-04-11 04:30:41.000000000 +0800
2+
+++ b/c++/src/OrcHdfsFile.cc 2022-04-11 19:56:37.206680217 +0800
3+
@@ -1,4 +1,5 @@
4+
/**
5+
+ * 1
6+
* Licensed to the Apache Software Foundation (ASF) under one
7+
* or more contributor license agreements. See the NOTICE file
8+
* distributed with this work for additional information
9+
@@ -29,145 +30,57 @@
10+
#include <sys/types.h>
11+
#include <unistd.h>
12+
13+
-#include "hdfspp/hdfspp.h"
14+
+#include "tensorflow/core/platform/env.h"
15+
+#include "tensorflow/core/platform/file_system.h"
16+
+#include "tensorflow/core/platform/logging.h"
17+
+#include "tensorflow/core/platform/status.h"
18+
+#include "tensorflow/core/platform/types.h"
19+
20+
namespace orc {
21+
22+
- class HdfsFileInputStream : public InputStream {
23+
- private:
24+
- std::string filename;
25+
- std::unique_ptr<hdfs::FileHandle> file;
26+
- std::unique_ptr<hdfs::FileSystem> file_system;
27+
- uint64_t totalLength;
28+
- const uint64_t READ_SIZE = 1024 * 1024; //1 MB
29+
-
30+
- public:
31+
- HdfsFileInputStream(std::string _filename) {
32+
- filename = _filename ;
33+
-
34+
- //Building a URI object from the given uri_path
35+
- hdfs::URI uri;
36+
- try {
37+
- uri = hdfs::URI::parse_from_string(filename);
38+
- } catch (const hdfs::uri_parse_error&) {
39+
- throw ParseError("Malformed URI: " + filename);
40+
- }
41+
-
42+
- //This sets conf path to default "$HADOOP_CONF_DIR" or "/etc/hadoop/conf"
43+
- //and loads configs core-site.xml and hdfs-site.xml from the conf path
44+
- hdfs::ConfigParser parser;
45+
- if(!parser.LoadDefaultResources()){
46+
- throw ParseError("Could not load default resources. ");
47+
- }
48+
- auto stats = parser.ValidateResources();
49+
- //validating core-site.xml
50+
- if(!stats[0].second.ok()){
51+
- throw ParseError(stats[0].first + " is invalid: " + stats[0].second.ToString());
52+
- }
53+
- //validating hdfs-site.xml
54+
- if(!stats[1].second.ok()){
55+
- throw ParseError(stats[1].first + " is invalid: " + stats[1].second.ToString());
56+
- }
57+
- hdfs::Options options;
58+
- if(!parser.get_options(options)){
59+
- throw ParseError("Could not load Options object. ");
60+
- }
61+
- hdfs::IoService * io_service = hdfs::IoService::New();
62+
- //Wrapping file_system into a unique pointer to guarantee deletion
63+
- file_system = std::unique_ptr<hdfs::FileSystem>(
64+
- hdfs::FileSystem::New(io_service, "", options));
65+
- if (file_system.get() == nullptr) {
66+
- throw ParseError("Can't create FileSystem object. ");
67+
- }
68+
- hdfs::Status status;
69+
- //Checking if the user supplied the host
70+
- if(!uri.get_host().empty()){
71+
- //Using port if supplied, otherwise using "" to look up port in configs
72+
- std::string port = uri.has_port() ?
73+
- std::to_string(uri.get_port()) : "";
74+
- status = file_system->Connect(uri.get_host(), port);
75+
- if (!status.ok()) {
76+
- throw ParseError("Can't connect to " + uri.get_host()
77+
- + ":" + port + ". " + status.ToString());
78+
- }
79+
- } else {
80+
- status = file_system->ConnectToDefaultFs();
81+
- if (!status.ok()) {
82+
- if(!options.defaultFS.get_host().empty()){
83+
- throw ParseError("Error connecting to " +
84+
- options.defaultFS.str() + ". " + status.ToString());
85+
- } else {
86+
- throw ParseError(
87+
- "Error connecting to the cluster: defaultFS is empty. "
88+
- + status.ToString());
89+
- }
90+
- }
91+
- }
92+
-
93+
- if (file_system.get() == nullptr) {
94+
- throw ParseError("Can't connect the file system. ");
95+
- }
96+
-
97+
- hdfs::FileHandle *file_raw = nullptr;
98+
- status = file_system->Open(uri.get_path(), &file_raw);
99+
- if (!status.ok()) {
100+
- throw ParseError("Can't open "
101+
- + uri.get_path() + ". " + status.ToString());
102+
- }
103+
- //Wrapping file_raw into a unique pointer to guarantee deletion
104+
- file.reset(file_raw);
105+
-
106+
- hdfs::StatInfo stat_info;
107+
- status = file_system->GetFileInfo(uri.get_path(), stat_info);
108+
- if (!status.ok()) {
109+
- throw ParseError("Can't stat "
110+
- + uri.get_path() + ". " + status.ToString());
111+
- }
112+
- totalLength = stat_info.length;
113+
+class HdfsFileInputStream : public InputStream {
114+
+ private:
115+
+ std::string filename_;
116+
+ std::unique_ptr<tensorflow::RandomAccessFile> file_;
117+
+ uint64_t total_length_;
118+
+ const uint64_t READ_SIZE = 1024 * 1024; // 1 MB
119+
+
120+
+ public:
121+
+ HdfsFileInputStream(std::string filename) {
122+
+ filename_ = filename;
123+
+ tensorflow::Status status =
124+
+ tensorflow::Env::Default()->NewRandomAccessFile(filename_, &file_);
125+
+ if (!status.ok()) {
126+
+ LOG(FATAL) << status.ToString();
127+
}
128+
129+
- uint64_t getLength() const override {
130+
- return totalLength;
131+
- }
132+
+ tensorflow::Env::Default()->GetFileSize(filename_, &total_length_);
133+
+ }
134+
135+
- uint64_t getNaturalReadSize() const override {
136+
- return READ_SIZE;
137+
- }
138+
+ uint64_t getLength() const override { return total_length_; }
139+
140+
- void read(void* buf,
141+
- uint64_t length,
142+
- uint64_t offset) override {
143+
-
144+
- if (!buf) {
145+
- throw ParseError("Buffer is null");
146+
- }
147+
-
148+
- hdfs::Status status;
149+
- size_t total_bytes_read = 0;
150+
- size_t last_bytes_read = 0;
151+
-
152+
- do {
153+
- status = file->PositionRead(buf,
154+
- static_cast<size_t>(length) - total_bytes_read,
155+
- static_cast<off_t>(offset + total_bytes_read), &last_bytes_read);
156+
- if(!status.ok()) {
157+
- throw ParseError("Error reading the file: " + status.ToString());
158+
- }
159+
- total_bytes_read += last_bytes_read;
160+
- } while (total_bytes_read < length);
161+
- }
162+
+ uint64_t getNaturalReadSize() const override { return READ_SIZE; }
163+
164+
- const std::string& getName() const override {
165+
- return filename;
166+
+ void read(void* buf, uint64_t length, uint64_t offset) override {
167+
+ if (!buf) {
168+
+ LOG(FATAL) << " Null buf";
169+
+ }
170+
+ tensorflow::StringPiece sp;
171+
+ tensorflow::Status s =
172+
+ file_->Read(offset, length, &sp, static_cast<char*>(buf));
173+
+ if (!(s.ok() || tensorflow::errors::IsOutOfRange(s))) {
174+
+ LOG(FATAL) << s.ToString();
175+
}
176+
+ }
177+
178+
- ~HdfsFileInputStream() override;
179+
- };
180+
+ const std::string& getName() const override { return filename_; }
181+
182+
- HdfsFileInputStream::~HdfsFileInputStream() {
183+
- }
184+
+ ~HdfsFileInputStream() override;
185+
+};
186+
187+
- std::unique_ptr<InputStream> readHdfsFile(const std::string& path) {
188+
- return std::unique_ptr<InputStream>(new HdfsFileInputStream(path));
189+
- }
190+
+HdfsFileInputStream::~HdfsFileInputStream() {}
191+
+
192+
+std::unique_ptr<InputStream> readHdfsFile(const std::string& path) {
193+
+ return std::unique_ptr<InputStream>(new HdfsFileInputStream(path));
194+
}
195+
+} // namespace orc

0 commit comments

Comments
 (0)