// Licensed to the Apache Software Foundation (ASF) under one | |
// or more contributor license agreements. See the NOTICE file | |
// distributed with this work for additional information | |
// regarding copyright ownership. The ASF licenses this file | |
// to you under the Apache License, Version 2.0 (the | |
// "License"); you may not use this file except in compliance | |
// with the License. You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, | |
// software distributed under the License is distributed on an | |
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
// KIND, either express or implied. See the License for the | |
// specific language governing permissions and limitations | |
// under the License. | |
/// \brief Implementation of Flight RPC client. | |
namespace arrow { | |
class RecordBatch; | |
class Schema; | |
namespace flight { | |
/// \brief A duration type for Flight call timeouts. | |
typedef std::chrono::duration<double, std::chrono::seconds::period> TimeoutDuration; | |
/// \brief Hints to the underlying RPC layer for Arrow Flight calls. | |
class ARROW_FLIGHT_EXPORT FlightCallOptions { | |
public: | |
/// Create a default set of call options. | |
FlightCallOptions(); | |
/// \brief An optional timeout for this call. Negative durations | |
/// mean an implementation-defined default behavior will be used | |
/// instead. This is the default value. | |
TimeoutDuration timeout; | |
/// \brief IPC reader options, if applicable for the call. | |
ipc::IpcReadOptions read_options; | |
/// \brief IPC writer options, if applicable for the call. | |
ipc::IpcWriteOptions write_options; | |
/// \brief Headers for client to add to context. | |
std::vector<std::pair<std::string, std::string>> headers; | |
/// \brief A token to enable interactive user cancellation of long-running requests. | |
StopToken stop_token; | |
/// \brief An optional memory manager to control where to allocate incoming data. | |
std::shared_ptr<MemoryManager> memory_manager; | |
}; | |
/// \brief Indicate that the client attempted to write a message | |
/// larger than the soft limit set via write_size_limit_bytes. | |
class ARROW_FLIGHT_EXPORT FlightWriteSizeStatusDetail : public arrow::StatusDetail { | |
public: | |
explicit FlightWriteSizeStatusDetail(int64_t limit, int64_t actual) | |
: limit_(limit), actual_(actual) {} | |
const char* type_id() const override; | |
std::string ToString() const override; | |
int64_t limit() const { return limit_; } | |
int64_t actual() const { return actual_; } | |
/// \brief Extract this status detail from a status, or return | |
/// nullptr if the status doesn't contain this status detail. | |
static std::shared_ptr<FlightWriteSizeStatusDetail> UnwrapStatus( | |
const arrow::Status& status); | |
private: | |
int64_t limit_; | |
int64_t actual_; | |
}; | |
struct ARROW_FLIGHT_EXPORT FlightClientOptions { | |
/// \brief Root certificates to use for validating server | |
/// certificates. | |
std::string tls_root_certs; | |
/// \brief Override the hostname checked by TLS. Use with caution. | |
std::string override_hostname; | |
/// \brief The client certificate to use if using Mutual TLS | |
std::string cert_chain; | |
/// \brief The private key associated with the client certificate for Mutual TLS | |
std::string private_key; | |
/// \brief A list of client middleware to apply. | |
std::vector<std::shared_ptr<ClientMiddlewareFactory>> middleware; | |
/// \brief A soft limit on the number of bytes to write in a single | |
/// batch when sending Arrow data to a server. | |
/// | |
/// Used to help limit server memory consumption. Only enabled if | |
/// positive. When enabled, FlightStreamWriter.Write* may yield a | |
/// IOError with error detail FlightWriteSizeStatusDetail. | |
int64_t write_size_limit_bytes = 0; | |
/// \brief Generic connection options, passed to the underlying | |
/// transport; interpretation is implementation-dependent. | |
std::vector<std::pair<std::string, std::variant<int, std::string>>> generic_options; | |
/// \brief Use TLS without validating the server certificate. Use with caution. | |
bool disable_server_verification = false; | |
/// \brief Get default options. | |
static FlightClientOptions Defaults(); | |
}; | |
/// \brief A RecordBatchReader exposing Flight metadata and cancel | |
/// operations. | |
class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader { | |
public: | |
/// \brief Try to cancel the call. | |
virtual void Cancel() = 0; | |
using MetadataRecordBatchReader::ToRecordBatches; | |
/// \brief Consume entire stream as a vector of record batches | |
virtual arrow::Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches( | |
const StopToken& stop_token) = 0; | |
using MetadataRecordBatchReader::ToTable; | |
/// \brief Consume entire stream as a Table | |
arrow::Result<std::shared_ptr<Table>> ToTable(const StopToken& stop_token); | |
}; | |
// Silence warning | |
// "non dll-interface class RecordBatchReader used as base for dll-interface class" | |
/// \brief A RecordBatchWriter that also allows sending | |
/// application-defined metadata via the Flight protocol. | |
class ARROW_FLIGHT_EXPORT FlightStreamWriter : public MetadataRecordBatchWriter { | |
public: | |
/// \brief Indicate that the application is done writing to this stream. | |
/// | |
/// The application may not write to this stream after calling | |
/// this. This differs from closing the stream because this writer | |
/// may represent only one half of a readable and writable stream. | |
virtual Status DoneWriting() = 0; | |
}; | |
/// \brief A reader for application-specific metadata sent back to the | |
/// client during an upload. | |
class ARROW_FLIGHT_EXPORT FlightMetadataReader { | |
public: | |
virtual ~FlightMetadataReader(); | |
/// \brief Read a message from the server. | |
virtual Status ReadMetadata(std::shared_ptr<Buffer>* out) = 0; | |
}; | |
/// \brief Client class for Arrow Flight RPC services. | |
class ARROW_FLIGHT_EXPORT FlightClient { | |
public: | |
~FlightClient(); | |
/// \brief Connect to an unauthenticated flight service | |
/// \param[in] location the URI | |
/// \return Arrow result with the created FlightClient, OK status may not indicate that | |
/// the connection was successful | |
static arrow::Result<std::unique_ptr<FlightClient>> Connect(const Location& location); | |
/// \brief Connect to an unauthenticated flight service | |
/// \param[in] location the URI | |
/// \param[in] options Other options for setting up the client | |
/// \return Arrow result with the created FlightClient, OK status may not indicate that | |
/// the connection was successful | |
static arrow::Result<std::unique_ptr<FlightClient>> Connect( | |
const Location& location, const FlightClientOptions& options); | |
/// \brief Authenticate to the server using the given handler. | |
/// \param[in] options Per-RPC options | |
/// \param[in] auth_handler The authentication mechanism to use | |
/// \return Status OK if the client authenticated successfully | |
Status Authenticate(const FlightCallOptions& options, | |
std::unique_ptr<ClientAuthHandler> auth_handler); | |
/// \brief Authenticate to the server using basic HTTP style authentication. | |
/// \param[in] options Per-RPC options | |
/// \param[in] username Username to use | |
/// \param[in] password Password to use | |
/// \return Arrow result with bearer token and status OK if client authenticated | |
/// successfully | |
arrow::Result<std::pair<std::string, std::string>> AuthenticateBasicToken( | |
const FlightCallOptions& options, const std::string& username, | |
const std::string& password); | |
/// \brief Perform the indicated action, returning an iterator to the stream | |
/// of results, if any | |
/// \param[in] options Per-RPC options | |
/// \param[in] action the action to be performed | |
/// \return Arrow result with an iterator object for reading the returned results | |
arrow::Result<std::unique_ptr<ResultStream>> DoAction(const FlightCallOptions& options, | |
const Action& action); | |
arrow::Result<std::unique_ptr<ResultStream>> DoAction(const Action& action) { | |
return DoAction({}, action); | |
} | |
/// \brief Perform the CancelFlightInfo action, returning a | |
/// CancelFlightInfoResult | |
/// | |
/// \param[in] options Per-RPC options | |
/// \param[in] request The CancelFlightInfoRequest | |
/// \return Arrow result with a CancelFlightInfoResult | |
arrow::Result<CancelFlightInfoResult> CancelFlightInfo( | |
const FlightCallOptions& options, const CancelFlightInfoRequest& request); | |
arrow::Result<CancelFlightInfoResult> CancelFlightInfo( | |
const CancelFlightInfoRequest& request) { | |
return CancelFlightInfo({}, request); | |
} | |
/// \brief Perform the RenewFlightEndpoint action, returning a renewed | |
/// FlightEndpoint | |
/// | |
/// \param[in] options Per-RPC options | |
/// \param[in] request The RenewFlightEndpointRequest | |
/// \return Arrow result with a renewed FlightEndpoint | |
arrow::Result<FlightEndpoint> RenewFlightEndpoint( | |
const FlightCallOptions& options, const RenewFlightEndpointRequest& request); | |
arrow::Result<FlightEndpoint> RenewFlightEndpoint( | |
const RenewFlightEndpointRequest& request) { | |
return RenewFlightEndpoint({}, request); | |
} | |
/// \brief Retrieve a list of available Action types | |
/// \param[in] options Per-RPC options | |
/// \return Arrow result with the available actions | |
arrow::Result<std::vector<ActionType>> ListActions(const FlightCallOptions& options); | |
arrow::Result<std::vector<ActionType>> ListActions() { | |
return ListActions(FlightCallOptions()); | |
} | |
/// \brief Request access plan for a single flight, which may be an existing | |
/// dataset or a command to be executed | |
/// \param[in] options Per-RPC options | |
/// \param[in] descriptor the dataset request, whether a named dataset or | |
/// command | |
/// \return Arrow result with the FlightInfo describing where to access the dataset | |
arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfo( | |
const FlightCallOptions& options, const FlightDescriptor& descriptor); | |
arrow::Result<std::unique_ptr<FlightInfo>> GetFlightInfo( | |
const FlightDescriptor& descriptor) { | |
return GetFlightInfo({}, descriptor); | |
} | |
/// \brief Asynchronous GetFlightInfo. | |
/// \param[in] options Per-RPC options | |
/// \param[in] descriptor the dataset request | |
/// \param[in] listener Callbacks for response and RPC completion | |
void GetFlightInfoAsync(const FlightCallOptions& options, | |
const FlightDescriptor& descriptor, | |
std::shared_ptr<AsyncListener<FlightInfo>> listener); | |
void GetFlightInfoAsync(const FlightDescriptor& descriptor, | |
std::shared_ptr<AsyncListener<FlightInfo>> listener) { | |
return GetFlightInfoAsync({}, descriptor, std::move(listener)); | |
} | |
/// \brief Asynchronous GetFlightInfo returning a Future. | |
/// \param[in] options Per-RPC options | |
/// \param[in] descriptor the dataset request | |
arrow::Future<FlightInfo> GetFlightInfoAsync(const FlightCallOptions& options, | |
const FlightDescriptor& descriptor); | |
arrow::Future<FlightInfo> GetFlightInfoAsync(const FlightDescriptor& descriptor) { | |
return GetFlightInfoAsync({}, descriptor); | |
} | |
/// \brief Request and poll a long running query | |
/// \param[in] options Per-RPC options | |
/// \param[in] descriptor the dataset request or a descriptor returned by a | |
/// prior PollFlightInfo call | |
/// \return Arrow result with the PollInfo describing the status of | |
/// the requested query | |
arrow::Result<std::unique_ptr<PollInfo>> PollFlightInfo( | |
const FlightCallOptions& options, const FlightDescriptor& descriptor); | |
arrow::Result<std::unique_ptr<PollInfo>> PollFlightInfo( | |
const FlightDescriptor& descriptor) { | |
return PollFlightInfo({}, descriptor); | |
} | |
/// \brief Request schema for a single flight, which may be an existing | |
/// dataset or a command to be executed | |
/// \param[in] options Per-RPC options | |
/// \param[in] descriptor the dataset request, whether a named dataset or | |
/// command | |
/// \return Arrow result with the SchemaResult describing the dataset schema | |
arrow::Result<std::unique_ptr<SchemaResult>> GetSchema( | |
const FlightCallOptions& options, const FlightDescriptor& descriptor); | |
arrow::Result<std::unique_ptr<SchemaResult>> GetSchema( | |
const FlightDescriptor& descriptor) { | |
return GetSchema({}, descriptor); | |
} | |
/// \brief List all available flights known to the server | |
/// \return Arrow result with an iterator that returns a FlightInfo for each flight | |
arrow::Result<std::unique_ptr<FlightListing>> ListFlights(); | |
/// \brief List available flights given indicated filter criteria | |
/// \param[in] options Per-RPC options | |
/// \param[in] criteria the filter criteria (opaque) | |
/// \return Arrow result with an iterator that returns a FlightInfo for each flight | |
arrow::Result<std::unique_ptr<FlightListing>> ListFlights( | |
const FlightCallOptions& options, const Criteria& criteria); | |
/// \brief Given a flight ticket and schema, request to be sent the | |
/// stream. Returns record batch stream reader | |
/// \param[in] options Per-RPC options | |
/// \param[in] ticket The flight ticket to use | |
/// \return Arrow result with the returned RecordBatchReader | |
arrow::Result<std::unique_ptr<FlightStreamReader>> DoGet( | |
const FlightCallOptions& options, const Ticket& ticket); | |
arrow::Result<std::unique_ptr<FlightStreamReader>> DoGet(const Ticket& ticket) { | |
return DoGet({}, ticket); | |
} | |
/// \brief DoPut return value | |
struct DoPutResult { | |
/// \brief a writer to write record batches to | |
std::unique_ptr<FlightStreamWriter> writer; | |
/// \brief a reader for application metadata from the server | |
std::unique_ptr<FlightMetadataReader> reader; | |
}; | |
/// \brief Upload data to a Flight described by the given | |
/// descriptor. The caller must call Close() on the returned stream | |
/// once they are done writing. | |
/// | |
/// The reader and writer are linked; closing the writer will also | |
/// close the reader. Use \a DoneWriting to only close the write | |
/// side of the channel. | |
/// | |
/// \param[in] options Per-RPC options | |
/// \param[in] descriptor the descriptor of the stream | |
/// \param[in] schema the schema for the data to upload | |
/// \return Arrow result with a DoPutResult struct holding a reader and a writer | |
arrow::Result<DoPutResult> DoPut(const FlightCallOptions& options, | |
const FlightDescriptor& descriptor, | |
const std::shared_ptr<Schema>& schema); | |
arrow::Result<DoPutResult> DoPut(const FlightDescriptor& descriptor, | |
const std::shared_ptr<Schema>& schema) { | |
return DoPut({}, descriptor, schema); | |
} | |
struct DoExchangeResult { | |
std::unique_ptr<FlightStreamWriter> writer; | |
std::unique_ptr<FlightStreamReader> reader; | |
}; | |
arrow::Result<DoExchangeResult> DoExchange(const FlightCallOptions& options, | |
const FlightDescriptor& descriptor); | |
arrow::Result<DoExchangeResult> DoExchange(const FlightDescriptor& descriptor) { | |
return DoExchange({}, descriptor); | |
} | |
/// \brief Set server session option(s) by name/value. Sessions are generally | |
/// persisted via HTTP cookies. | |
/// \param[in] options Per-RPC options | |
/// \param[in] request The server session options to set | |
::arrow::Result<SetSessionOptionsResult> SetSessionOptions( | |
const FlightCallOptions& options, const SetSessionOptionsRequest& request); | |
/// \brief Get the current server session options. The session is generally | |
/// accessed via an HTTP cookie. | |
/// \param[in] options Per-RPC options | |
/// \param[in] request The (empty) GetSessionOptions request object. | |
::arrow::Result<GetSessionOptionsResult> GetSessionOptions( | |
const FlightCallOptions& options, const GetSessionOptionsRequest& request); | |
/// \brief Close/invalidate the current server session. The session is generally | |
/// accessed via an HTTP cookie. | |
/// \param[in] options Per-RPC options | |
/// \param[in] request The (empty) CloseSession request object. | |
::arrow::Result<CloseSessionResult> CloseSession(const FlightCallOptions& options, | |
const CloseSessionRequest& request); | |
/// \brief Explicitly shut down and clean up the client. | |
/// | |
/// For backwards compatibility, this will be implicitly called by | |
/// the destructor if not already called, but this gives the | |
/// application no chance to handle errors, so it is recommended to | |
/// explicitly close the client. | |
/// | |
/// \since 8.0.0 | |
Status Close(); | |
/// \brief Whether this client supports asynchronous methods. | |
bool supports_async() const; | |
/// \brief Check whether this client supports asynchronous methods. | |
/// | |
/// This is like supports_async(), except that a detailed error message | |
/// is returned if async support is not available. If async support is | |
/// available, this function returns successfully. | |
Status CheckAsyncSupport() const; | |
private: | |
FlightClient(); | |
Status CheckOpen() const; | |
std::unique_ptr<internal::ClientTransport> transport_; | |
bool closed_; | |
int64_t write_size_limit_bytes_; | |
}; | |
} // namespace flight | |
} // namespace arrow | |