// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #pragma once #include #include #include #include "arrow/filesystem/filesystem.h" #include "arrow/util/macros.h" #include "arrow/util/uri.h" namespace Aws { namespace Auth { class AWSCredentialsProvider; class STSAssumeRoleCredentialsProvider; } // namespace Auth namespace STS { class STSClient; } } // namespace Aws namespace arrow { namespace fs { /// Options for using a proxy for S3 struct ARROW_EXPORT S3ProxyOptions { std::string scheme; std::string host; int port = -1; std::string username; std::string password; /// Initialize from URI such as http://username:password@host:port /// or http://host:port static Result FromUri(const std::string& uri); static Result FromUri(const ::arrow::util::Uri& uri); bool Equals(const S3ProxyOptions& other) const; }; enum class S3CredentialsKind : int8_t { /// Anonymous access (no credentials used) Anonymous, /// Use default AWS credentials, configured through environment variables Default, /// Use explicitly-provided access key pair Explicit, /// Assume role through a role ARN Role, /// Use web identity token to assume role, configured through environment variables WebIdentity }; /// Pure virtual class for describing custom S3 retry strategies class ARROW_EXPORT S3RetryStrategy { public: virtual ~S3RetryStrategy() = default; /// Simple struct where each field corresponds to a field in Aws::Client::AWSError struct AWSErrorDetail { /// Corresponds to AWSError::GetErrorType() int error_type; /// Corresponds to AWSError::GetMessage() std::string message; /// Corresponds to AWSError::GetExceptionName() std::string exception_name; /// Corresponds to AWSError::ShouldRetry() bool should_retry; }; /// Returns true if the S3 request resulting in the provided error should be retried. virtual bool ShouldRetry(const AWSErrorDetail& error, int64_t attempted_retries) = 0; /// Returns the time in milliseconds the S3 client should sleep for until retrying. virtual int64_t CalculateDelayBeforeNextRetry(const AWSErrorDetail& error, int64_t attempted_retries) = 0; /// Returns a stock AWS Default retry strategy. static std::shared_ptr GetAwsDefaultRetryStrategy( int64_t max_attempts); /// Returns a stock AWS Standard retry strategy. static std::shared_ptr GetAwsStandardRetryStrategy( int64_t max_attempts); }; /// Options for the S3FileSystem implementation. struct ARROW_EXPORT S3Options { /// \brief AWS region to connect to. /// /// If unset, the AWS SDK will choose a default value. The exact algorithm /// depends on the SDK version. Before 1.8, the default is hardcoded /// to "us-east-1". Since 1.8, several heuristics are used to determine /// the region (environment variables, configuration profile, EC2 metadata /// server). std::string region; /// \brief Socket connection timeout, in seconds /// /// If negative, the AWS SDK default value is used (typically 1 second). double connect_timeout = -1; /// \brief Socket read timeout on Windows and macOS, in seconds /// /// If negative, the AWS SDK default value is used (typically 3 seconds). /// This option is ignored on non-Windows, non-macOS systems. double request_timeout = -1; /// If non-empty, override region with a connect string such as "localhost:9000" // XXX perhaps instead take a URL like "http://localhost:9000"? std::string endpoint_override; /// S3 connection transport, default "https" std::string scheme = "https"; /// ARN of role to assume std::string role_arn; /// Optional identifier for an assumed role session. std::string session_name; /// Optional external identifier to pass to STS when assuming a role std::string external_id; /// Frequency (in seconds) to refresh temporary credentials from assumed role int load_frequency = 900; /// If connection is through a proxy, set options here S3ProxyOptions proxy_options; /// AWS credentials provider std::shared_ptr credentials_provider; /// Type of credentials being used. Set along with credentials_provider. S3CredentialsKind credentials_kind = S3CredentialsKind::Default; /// Whether to use virtual addressing of buckets /// /// If true, then virtual addressing is always enabled. /// If false, then virtual addressing is only enabled if `endpoint_override` is empty. /// /// This can be used for non-AWS backends that only support virtual hosted-style access. bool force_virtual_addressing = false; /// Whether OutputStream writes will be issued in the background, without blocking. bool background_writes = true; /// Whether to allow creation of buckets /// /// When S3FileSystem creates new buckets, it does not pass any non-default settings. /// In AWS S3, the bucket and all objects will be not publicly visible, and there /// will be no bucket policies and no resource tags. To have more control over how /// buckets are created, use a different API to create them. bool allow_bucket_creation = false; /// Whether to allow deletion of buckets bool allow_bucket_deletion = false; /// Whether to allow pessimistic directory creation in CreateDir function /// /// By default, CreateDir function will try to create the directory without checking its /// existence. It's an optimization to try directory creation and catch the error, /// rather than issue two dependent I/O calls. /// Though for key/value storage like Google Cloud Storage, too many creation calls will /// breach the rate limit for object mutation operations and cause serious consequences. /// It's also possible you don't have creation access for the parent directory. Set it /// to be true to address these scenarios. bool check_directory_existence_before_creation = false; /// Whether to allow file-open methods to return before the actual open. /// /// Enabling this may reduce the latency of `OpenInputStream`, `OpenOutputStream`, /// and similar methods, by reducing the number of roundtrips necessary. It may also /// allow usage of more efficient S3 APIs for small files. /// The downside is that failure conditions such as attempting to open a file in a /// non-existing bucket will only be reported when actual I/O is done (at worse, /// when attempting to close the file). bool allow_delayed_open = false; /// \brief Default metadata for OpenOutputStream. /// /// This will be ignored if non-empty metadata is passed to OpenOutputStream. std::shared_ptr default_metadata; /// Optional retry strategy to determine which error types should be retried, and the /// delay between retries. std::shared_ptr retry_strategy; /// Optional customer-provided key for server-side encryption (SSE-C). /// /// This should be the 32-byte AES-256 key, unencoded. std::string sse_customer_key; /// Optional path to a single PEM file holding all TLS CA certificates /// /// If empty, global filesystem options will be used (see FileSystemGlobalOptions); /// if the corresponding global filesystem option is also empty, the underlying /// TLS library's defaults will be used. /// /// Note this option may be ignored on some systems (Windows, macOS). std::string tls_ca_file_path; /// Optional path to a directory holding TLS CA /// /// The given directory should contain CA certificates as individual PEM files /// named along the OpenSSL "hashed" format. /// /// If empty, global filesystem options will be used (see FileSystemGlobalOptions); /// if the corresponding global filesystem option is also empty, the underlying /// TLS library's defaults will be used. /// /// Note this option may be ignored on some systems (Windows, macOS). std::string tls_ca_dir_path; /// Whether to verify the S3 endpoint's TLS certificate /// /// This option applies if the scheme is "https". bool tls_verify_certificates = true; S3Options(); /// Configure with the default AWS credentials provider chain. void ConfigureDefaultCredentials(); /// Configure with anonymous credentials. This will only let you access public buckets. void ConfigureAnonymousCredentials(); /// Configure with explicit access and secret key. void ConfigureAccessKey(const std::string& access_key, const std::string& secret_key, const std::string& session_token = ""); /// Configure with credentials from an assumed role. void ConfigureAssumeRoleCredentials( const std::string& role_arn, const std::string& session_name = "", const std::string& external_id = "", int load_frequency = 900, const std::shared_ptr& stsClient = NULLPTR); /// Configure with credentials from role assumed using a web identity token void ConfigureAssumeRoleWithWebIdentityCredentials(); std::string GetAccessKey() const; std::string GetSecretKey() const; std::string GetSessionToken() const; bool Equals(const S3Options& other) const; /// \brief Initialize with default credentials provider chain /// /// This is recommended if you use the standard AWS environment variables /// and/or configuration file. static S3Options Defaults(); /// \brief Initialize with anonymous credentials. /// /// This will only let you access public buckets. static S3Options Anonymous(); /// \brief Initialize with explicit access and secret key. /// /// Optionally, a session token may also be provided for temporary credentials /// (from STS). static S3Options FromAccessKey(const std::string& access_key, const std::string& secret_key, const std::string& session_token = ""); /// \brief Initialize from an assumed role. static S3Options FromAssumeRole( const std::string& role_arn, const std::string& session_name = "", const std::string& external_id = "", int load_frequency = 900, const std::shared_ptr& stsClient = NULLPTR); /// \brief Initialize from an assumed role with web-identity. /// Uses the AWS SDK which uses environment variables to /// generate temporary credentials. static S3Options FromAssumeRoleWithWebIdentity(); static Result FromUri(const ::arrow::util::Uri& uri, std::string* out_path = NULLPTR); static Result FromUri(const std::string& uri, std::string* out_path = NULLPTR); }; /// S3-backed FileSystem implementation. /// /// Some implementation notes: /// - buckets are special and the operations available on them may be limited /// or more expensive than desired. class ARROW_EXPORT S3FileSystem : public FileSystem { public: ~S3FileSystem() override; std::string type_name() const override { return "s3"; } /// Return the original S3 options when constructing the filesystem S3Options options() const; /// Return the actual region this filesystem connects to std::string region() const; bool Equals(const FileSystem& other) const override; Result PathFromUri(const std::string& uri_string) const override; /// \cond FALSE using FileSystem::CreateDir; using FileSystem::DeleteDirContents; using FileSystem::DeleteDirContentsAsync; using FileSystem::GetFileInfo; using FileSystem::OpenAppendStream; using FileSystem::OpenOutputStream; /// \endcond Result GetFileInfo(const std::string& path) override; Result> GetFileInfo(const FileSelector& select) override; FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) override; Status CreateDir(const std::string& path, bool recursive) override; Status DeleteDir(const std::string& path) override; Status DeleteDirContents(const std::string& path, bool missing_dir_ok) override; Future<> DeleteDirContentsAsync(const std::string& path, bool missing_dir_ok) override; Status DeleteRootDirContents() override; Status DeleteFile(const std::string& path) override; Status Move(const std::string& src, const std::string& dest) override; Status CopyFile(const std::string& src, const std::string& dest) override; /// Create a sequential input stream for reading from a S3 object. /// /// NOTE: Reads from the stream will be synchronous and unbuffered. /// You way want to wrap the stream in a BufferedInputStream or use /// a custom readahead strategy to avoid idle waits. Result> OpenInputStream( const std::string& path) override; /// Create a sequential input stream for reading from a S3 object. /// /// This override avoids a HEAD request by assuming the FileInfo /// contains correct information. Result> OpenInputStream(const FileInfo& info) override; /// Create a random access file for reading from a S3 object. /// /// See OpenInputStream for performance notes. Result> OpenInputFile( const std::string& path) override; /// Create a random access file for reading from a S3 object. /// /// This override avoids a HEAD request by assuming the FileInfo /// contains correct information. Result> OpenInputFile( const FileInfo& info) override; /// Create a sequential output stream for writing to a S3 object. /// /// NOTE: Writes to the stream will be buffered. Depending on /// S3Options.background_writes, they can be synchronous or not. /// It is recommended to enable background_writes unless you prefer /// implementing your own background execution strategy. Result> OpenOutputStream( const std::string& path, const std::shared_ptr& metadata) override; Result> OpenAppendStream( const std::string& path, const std::shared_ptr& metadata) override; /// Create a S3FileSystem instance from the given options. static Result> Make( const S3Options& options, const io::IOContext& = io::default_io_context()); protected: explicit S3FileSystem(const S3Options& options, const io::IOContext&); class Impl; std::shared_ptr impl_; }; enum class S3LogLevel : int8_t { Off, Fatal, Error, Warn, Info, Debug, Trace }; struct ARROW_EXPORT S3GlobalOptions { /// The log level for S3-originating messages. S3LogLevel log_level; /// The number of threads to configure when creating AWS' I/O event loop /// /// Defaults to 1 as recommended by AWS' doc when the # of connections is /// expected to be, at most, in the hundreds /// /// For more details see Aws::Crt::Io::EventLoopGroup int num_event_loop_threads = 1; /// Whether to install a process-wide SIGPIPE handler /// /// The AWS SDK may sometimes emit SIGPIPE signals for certain errors; /// by default, they would abort the current process. /// This option, if enabled, will install a process-wide signal handler /// that logs and otherwise ignore incoming SIGPIPE signals. /// /// This option has no effect on Windows. bool install_sigpipe_handler = false; /// \brief Initialize with default options /// /// For log_level, this method first tries to extract a suitable value from the /// environment variable ARROW_S3_LOG_LEVEL. static S3GlobalOptions Defaults(); }; /// \brief Initialize the S3 APIs with the specified set of options. /// /// It is required to call this function at least once before using S3FileSystem. /// /// Once this function is called you MUST call FinalizeS3 before the end of the /// application in order to avoid a segmentation fault at shutdown. ARROW_EXPORT Status InitializeS3(const S3GlobalOptions& options); /// \brief Ensure the S3 APIs are initialized, but only if not already done. /// /// If necessary, this will call InitializeS3() with some default options. ARROW_EXPORT Status EnsureS3Initialized(); /// Whether S3 was initialized, and not finalized. ARROW_EXPORT bool IsS3Initialized(); /// Whether S3 was finalized. ARROW_EXPORT bool IsS3Finalized(); /// \brief Shutdown the S3 APIs. /// /// This can wait for some S3 concurrent calls to finish so as to avoid /// race conditions. /// After this function has been called, all S3 calls will fail with an error. /// /// Calls to InitializeS3() and FinalizeS3() should be serialized by the /// application (this also applies to EnsureS3Initialized() and /// EnsureS3Finalized()). ARROW_EXPORT Status FinalizeS3(); /// \brief Ensure the S3 APIs are shutdown, but only if not already done. /// /// If necessary, this will call FinalizeS3(). ARROW_EXPORT Status EnsureS3Finalized(); ARROW_EXPORT Result ResolveS3BucketRegion(const std::string& bucket); } // namespace fs } // namespace arrow