jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "arrow/filesystem/filesystem.h"
#include "arrow/util/macros.h"
#include "arrow/util/uri.h"
namespace Aws {
namespace Auth {
class AWSCredentialsProvider;
class STSAssumeRoleCredentialsProvider;
} // namespace Auth
namespace STS {
class STSClient;
}
} // namespace Aws
namespace arrow {
namespace fs {
/// Options for using a proxy for S3
struct ARROW_EXPORT S3ProxyOptions {
std::string scheme;
std::string host;
int port = -1;
std::string username;
std::string password;
/// Initialize from URI such as http://username:password@host:port
/// or http://host:port
static Result<S3ProxyOptions> FromUri(const std::string& uri);
static Result<S3ProxyOptions> FromUri(const ::arrow::util::Uri& uri);
bool Equals(const S3ProxyOptions& other) const;
};
enum class S3CredentialsKind : int8_t {
/// Anonymous access (no credentials used)
Anonymous,
/// Use default AWS credentials, configured through environment variables
Default,
/// Use explicitly-provided access key pair
Explicit,
/// Assume role through a role ARN
Role,
/// Use web identity token to assume role, configured through environment variables
WebIdentity
};
/// Pure virtual class for describing custom S3 retry strategies
class ARROW_EXPORT S3RetryStrategy {
public:
virtual ~S3RetryStrategy() = default;
/// Simple struct where each field corresponds to a field in Aws::Client::AWSError
struct AWSErrorDetail {
/// Corresponds to AWSError::GetErrorType()
int error_type;
/// Corresponds to AWSError::GetMessage()
std::string message;
/// Corresponds to AWSError::GetExceptionName()
std::string exception_name;
/// Corresponds to AWSError::ShouldRetry()
bool should_retry;
};
/// Returns true if the S3 request resulting in the provided error should be retried.
virtual bool ShouldRetry(const AWSErrorDetail& error, int64_t attempted_retries) = 0;
/// Returns the time in milliseconds the S3 client should sleep for until retrying.
virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error,
int64_t attempted_retries) = 0;
/// Returns a stock AWS Default retry strategy.
static std::shared_ptr<S3RetryStrategy> GetAwsDefaultRetryStrategy(
int64_t max_attempts);
/// Returns a stock AWS Standard retry strategy.
static std::shared_ptr<S3RetryStrategy> GetAwsStandardRetryStrategy(
int64_t max_attempts);
};
/// Options for the S3FileSystem implementation.
struct ARROW_EXPORT S3Options {
/// \brief AWS region to connect to.
///
/// If unset, the AWS SDK will choose a default value. The exact algorithm
/// depends on the SDK version. Before 1.8, the default is hardcoded
/// to "us-east-1". Since 1.8, several heuristics are used to determine
/// the region (environment variables, configuration profile, EC2 metadata
/// server).
std::string region;
/// \brief Socket connection timeout, in seconds
///
/// If negative, the AWS SDK default value is used (typically 1 second).
double connect_timeout = -1;
/// \brief Socket read timeout on Windows and macOS, in seconds
///
/// If negative, the AWS SDK default value is used (typically 3 seconds).
/// This option is ignored on non-Windows, non-macOS systems.
double request_timeout = -1;
/// If non-empty, override region with a connect string such as "localhost:9000"
// XXX perhaps instead take a URL like "http://localhost:9000"?
std::string endpoint_override;
/// S3 connection transport, default "https"
std::string scheme = "https";
/// ARN of role to assume
std::string role_arn;
/// Optional identifier for an assumed role session.
std::string session_name;
/// Optional external identifier to pass to STS when assuming a role
std::string external_id;
/// Frequency (in seconds) to refresh temporary credentials from assumed role
int load_frequency = 900;
/// If connection is through a proxy, set options here
S3ProxyOptions proxy_options;
/// AWS credentials provider
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
/// Type of credentials being used. Set along with credentials_provider.
S3CredentialsKind credentials_kind = S3CredentialsKind::Default;
/// Whether to use virtual addressing of buckets
///
/// If true, then virtual addressing is always enabled.
/// If false, then virtual addressing is only enabled if `endpoint_override` is empty.
///
/// This can be used for non-AWS backends that only support virtual hosted-style access.
bool force_virtual_addressing = false;
/// Whether OutputStream writes will be issued in the background, without blocking.
bool background_writes = true;
/// Whether to allow creation of buckets
///
/// When S3FileSystem creates new buckets, it does not pass any non-default settings.
/// In AWS S3, the bucket and all objects will be not publicly visible, and there
/// will be no bucket policies and no resource tags. To have more control over how
/// buckets are created, use a different API to create them.
bool allow_bucket_creation = false;
/// Whether to allow deletion of buckets
bool allow_bucket_deletion = false;
/// Whether to allow pessimistic directory creation in CreateDir function
///
/// By default, CreateDir function will try to create the directory without checking its
/// existence. It's an optimization to try directory creation and catch the error,
/// rather than issue two dependent I/O calls.
/// Though for key/value storage like Google Cloud Storage, too many creation calls will
/// breach the rate limit for object mutation operations and cause serious consequences.
/// It's also possible you don't have creation access for the parent directory. Set it
/// to be true to address these scenarios.
bool check_directory_existence_before_creation = false;
/// Whether to allow file-open methods to return before the actual open.
///
/// Enabling this may reduce the latency of `OpenInputStream`, `OpenOutputStream`,
/// and similar methods, by reducing the number of roundtrips necessary. It may also
/// allow usage of more efficient S3 APIs for small files.
/// The downside is that failure conditions such as attempting to open a file in a
/// non-existing bucket will only be reported when actual I/O is done (at worse,
/// when attempting to close the file).
bool allow_delayed_open = false;
/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr<const KeyValueMetadata> default_metadata;
/// Optional retry strategy to determine which error types should be retried, and the
/// delay between retries.
std::shared_ptr<S3RetryStrategy> retry_strategy;
/// Optional customer-provided key for server-side encryption (SSE-C).
///
/// This should be the 32-byte AES-256 key, unencoded.
std::string sse_customer_key;
/// Optional path to a single PEM file holding all TLS CA certificates
///
/// If empty, global filesystem options will be used (see FileSystemGlobalOptions);
/// if the corresponding global filesystem option is also empty, the underlying
/// TLS library's defaults will be used.
///
/// Note this option may be ignored on some systems (Windows, macOS).
std::string tls_ca_file_path;
/// Optional path to a directory holding TLS CA
///
/// The given directory should contain CA certificates as individual PEM files
/// named along the OpenSSL "hashed" format.
///
/// If empty, global filesystem options will be used (see FileSystemGlobalOptions);
/// if the corresponding global filesystem option is also empty, the underlying
/// TLS library's defaults will be used.
///
/// Note this option may be ignored on some systems (Windows, macOS).
std::string tls_ca_dir_path;
/// Whether to verify the S3 endpoint's TLS certificate
///
/// This option applies if the scheme is "https".
bool tls_verify_certificates = true;
S3Options();
/// Configure with the default AWS credentials provider chain.
void ConfigureDefaultCredentials();
/// Configure with anonymous credentials. This will only let you access public buckets.
void ConfigureAnonymousCredentials();
/// Configure with explicit access and secret key.
void ConfigureAccessKey(const std::string& access_key, const std::string& secret_key,
const std::string& session_token = "");
/// Configure with credentials from an assumed role.
void ConfigureAssumeRoleCredentials(
const std::string& role_arn, const std::string& session_name = "",
const std::string& external_id = "", int load_frequency = 900,
const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);
/// Configure with credentials from role assumed using a web identity token
void ConfigureAssumeRoleWithWebIdentityCredentials();
std::string GetAccessKey() const;
std::string GetSecretKey() const;
std::string GetSessionToken() const;
bool Equals(const S3Options& other) const;
/// \brief Initialize with default credentials provider chain
///
/// This is recommended if you use the standard AWS environment variables
/// and/or configuration file.
static S3Options Defaults();
/// \brief Initialize with anonymous credentials.
///
/// This will only let you access public buckets.
static S3Options Anonymous();
/// \brief Initialize with explicit access and secret key.
///
/// Optionally, a session token may also be provided for temporary credentials
/// (from STS).
static S3Options FromAccessKey(const std::string& access_key,
const std::string& secret_key,
const std::string& session_token = "");
/// \brief Initialize from an assumed role.
static S3Options FromAssumeRole(
const std::string& role_arn, const std::string& session_name = "",
const std::string& external_id = "", int load_frequency = 900,
const std::shared_ptr<Aws::STS::STSClient>& stsClient = NULLPTR);
/// \brief Initialize from an assumed role with web-identity.
/// Uses the AWS SDK which uses environment variables to
/// generate temporary credentials.
static S3Options FromAssumeRoleWithWebIdentity();
static Result<S3Options> FromUri(const ::arrow::util::Uri& uri,
std::string* out_path = NULLPTR);
static Result<S3Options> FromUri(const std::string& uri,
std::string* out_path = NULLPTR);
};
/// S3-backed FileSystem implementation.
///
/// Some implementation notes:
/// - buckets are special and the operations available on them may be limited
/// or more expensive than desired.
class ARROW_EXPORT S3FileSystem : public FileSystem {
public:
~S3FileSystem() override;
std::string type_name() const override { return "s3"; }
/// Return the original S3 options when constructing the filesystem
S3Options options() const;
/// Return the actual region this filesystem connects to
std::string region() const;
bool Equals(const FileSystem& other) const override;
Result<std::string> PathFromUri(const std::string& uri_string) const override;
/// \cond FALSE
using FileSystem::CreateDir;
using FileSystem::DeleteDirContents;
using FileSystem::DeleteDirContentsAsync;
using FileSystem::GetFileInfo;
using FileSystem::OpenAppendStream;
using FileSystem::OpenOutputStream;
/// \endcond
Result<FileInfo> GetFileInfo(const std::string& path) override;
Result<std::vector<FileInfo>> GetFileInfo(const FileSelector& select) override;
FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override;
Status CreateDir(const std::string& path, bool recursive) override;
Status DeleteDir(const std::string& path) override;
Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override;
Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override;
Status DeleteRootDirContents() override;
Status DeleteFile(const std::string& path) override;
Status Move(const std::string& src, const std::string& dest) override;
Status CopyFile(const std::string& src, const std::string& dest) override;
/// Create a sequential input stream for reading from a S3 object.
///
/// NOTE: Reads from the stream will be synchronous and unbuffered.
/// You way want to wrap the stream in a BufferedInputStream or use
/// a custom readahead strategy to avoid idle waits.
Result<std::shared_ptr<io::InputStream>> OpenInputStream(
const std::string& path) override;
/// Create a sequential input stream for reading from a S3 object.
///
/// This override avoids a HEAD request by assuming the FileInfo
/// contains correct information.
Result<std::shared_ptr<io::InputStream>> OpenInputStream(const FileInfo& info) override;
/// Create a random access file for reading from a S3 object.
///
/// See OpenInputStream for performance notes.
Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
const std::string& path) override;
/// Create a random access file for reading from a S3 object.
///
/// This override avoids a HEAD request by assuming the FileInfo
/// contains correct information.
Result<std::shared_ptr<io::RandomAccessFile>> OpenInputFile(
const FileInfo& info) override;
/// Create a sequential output stream for writing to a S3 object.
///
/// NOTE: Writes to the stream will be buffered. Depending on
/// S3Options.background_writes, they can be synchronous or not.
/// It is recommended to enable background_writes unless you prefer
/// implementing your own background execution strategy.
Result<std::shared_ptr<io::OutputStream>> OpenOutputStream(
const std::string& path,
const std::shared_ptr<const KeyValueMetadata>& metadata) override;
Result<std::shared_ptr<io::OutputStream>> OpenAppendStream(
const std::string& path,
const std::shared_ptr<const KeyValueMetadata>& metadata) override;
/// Create a S3FileSystem instance from the given options.
static Result<std::shared_ptr<S3FileSystem>> Make(
const S3Options& options, const io::IOContext& = io::default_io_context());
protected:
explicit S3FileSystem(const S3Options& options, const io::IOContext&);
class Impl;
std::shared_ptr<Impl> impl_;
};
enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace };
struct ARROW_EXPORT S3GlobalOptions {
/// The log level for S3-originating messages.
S3LogLevel log_level;
/// The number of threads to configure when creating AWS' I/O event loop
///
/// Defaults to 1 as recommended by AWS' doc when the # of connections is
/// expected to be, at most, in the hundreds
///
/// For more details see Aws::Crt::Io::EventLoopGroup
int num_event_loop_threads = 1;
/// Whether to install a process-wide SIGPIPE handler
///
/// The AWS SDK may sometimes emit SIGPIPE signals for certain errors;
/// by default, they would abort the current process.
/// This option, if enabled, will install a process-wide signal handler
/// that logs and otherwise ignore incoming SIGPIPE signals.
///
/// This option has no effect on Windows.
bool install_sigpipe_handler = false;
/// \brief Initialize with default options
///
/// For log_level, this method first tries to extract a suitable value from the
/// environment variable ARROW_S3_LOG_LEVEL.
static S3GlobalOptions Defaults();
};
/// \brief Initialize the S3 APIs with the specified set of options.
///
/// It is required to call this function at least once before using S3FileSystem.
///
/// Once this function is called you MUST call FinalizeS3 before the end of the
/// application in order to avoid a segmentation fault at shutdown.
ARROW_EXPORT
Status InitializeS3(const S3GlobalOptions& options);
/// \brief Ensure the S3 APIs are initialized, but only if not already done.
///
/// If necessary, this will call InitializeS3() with some default options.
ARROW_EXPORT
Status EnsureS3Initialized();
/// Whether S3 was initialized, and not finalized.
ARROW_EXPORT
bool IsS3Initialized();
/// Whether S3 was finalized.
ARROW_EXPORT
bool IsS3Finalized();
/// \brief Shutdown the S3 APIs.
///
/// This can wait for some S3 concurrent calls to finish so as to avoid
/// race conditions.
/// After this function has been called, all S3 calls will fail with an error.
///
/// Calls to InitializeS3() and FinalizeS3() should be serialized by the
/// application (this also applies to EnsureS3Initialized() and
/// EnsureS3Finalized()).
ARROW_EXPORT
Status FinalizeS3();
/// \brief Ensure the S3 APIs are shutdown, but only if not already done.
///
/// If necessary, this will call FinalizeS3().
ARROW_EXPORT
Status EnsureS3Finalized();
ARROW_EXPORT
Result<std::string> ResolveS3BucketRegion(const std::string& bucket);
} // namespace fs
} // namespace arrow