You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, I am building a apache arrow flight server. When I am querying the do_get endpoint, I am getting an error Error, message length too large: found 5605265 bytes, the limit is: 4194304 bytes
This is the code for the server.
use arrow_array::RecordBatch;use arrow_flight::flight_service_server::FlightServiceServer;use arrow_flight::PollInfo;use arrow_schema::{ArrowError,Schema};use chrono::Utc;use datafusion::common::tree_node::TreeNode;use std::net::SocketAddr;use std::sync::Arc;use std::time::Instant;use futures_util::{Future,TryFutureExt};use tonic::transport::{Identity,Server,ServerTlsConfig};use tonic_web::GrpcWebLayer;usecrate::option::CONFIG;usecrate::handlers::livetail::cross_origin_config;usecrate::handlers::http::query::{authorize_and_set_filter_tags, into_query};usecrate::query::{TableScanVisitor,QUERY_SESSION};usecrate::storage::object_storage::commit_schema_to_storage;usecrate::utils::arrow::flight::{get_query_from_ticket, run_do_get_rpc};use arrow_flight::{
flight_service_server::FlightService,Action,ActionType,Criteria,Empty,FlightData,FlightDescriptor,FlightInfo,HandshakeRequest,HandshakeResponse,PutResult,SchemaAsIpc,SchemaResult,Ticket,};use arrow_ipc::writer::{DictionaryTracker,IpcDataGenerator,IpcWriteOptions};use futures::stream::BoxStream;use tonic::{Request,Response,Status,Streaming};usecrate::handlers::livetail::extract_session_key;usecrate::metadata::STREAM_INFO;usecrate::rbac::Users;constL_CURLY:char = '{';constR_CURLY:char = '}';#[derive(Clone)]pubstructAirServiceImpl{}#[tonic::async_trait]implFlightServiceforAirServiceImpl{typeHandshakeStream = BoxStream<'static,Result<HandshakeResponse,Status>>;typeListFlightsStream = BoxStream<'static,Result<FlightInfo,Status>>;typeDoGetStream = BoxStream<'static,Result<FlightData,Status>>;typeDoPutStream = BoxStream<'static,Result<PutResult,Status>>;typeDoActionStream = BoxStream<'static,Result<arrow_flight::Result,Status>>;typeListActionsStream = BoxStream<'static,Result<ActionType,Status>>;typeDoExchangeStream = BoxStream<'static,Result<FlightData,Status>>;/// other methods are unimplementedasyncfndo_get(&self,req:Request<Ticket>) -> Result<Response<Self::DoGetStream>,Status>{let key = extract_session_key(req.metadata())?;let ticket = get_query_from_ticket(req)?;// get the query session_statelet session_state = QUERY_SESSION.state();// get the logical plan and extract the table namelet raw_logical_plan = session_state
.create_logical_plan(&ticket.query).await.map_err(|err| {
log::error!("Datafusion Error: Failed to create logical plan: {}", err);Status::internal("Failed to create logical plan")})?;// create a visitor to extract the table nameletmut visitor = TableScanVisitor::default();let _ = raw_logical_plan.visit(&mut visitor);let tables = visitor.into_inner();// map payload to queryletmut query = into_query(&ticket,&session_state).await.map_err(|_| Status::internal("Failed to parse query"))?;// if table name is not present it is a Malformed Querylet stream_name = query
.first_table_name().ok_or_else(|| Status::invalid_argument("Malformed Query"))?;let permissions = Users.get_permissions(&key);authorize_and_set_filter_tags(&mut query, permissions,&stream_name).map_err(|_| {Status::permission_denied("User Does not have permission to access this")})?;let(results, _) = query
.execute(stream_name.clone()).await.map_err(|err| Status::internal(err.to_string())).unwrap();let schemas = results
.iter().map(|batch| batch.schema()).map(|s| s.as_ref().clone()).collect::<Vec<_>>();let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();let schema_flight_data = SchemaAsIpc::new(&schema,&options);letmut flights = vec![FlightData::from(schema_flight_data)];let encoder = IpcDataGenerator::default();letmut tracker = DictionaryTracker::new(false);for batch in&results {let(flight_dictionaries, flight_batch) = encoder
.encoded_batch(batch,&mut tracker,&options).map_err(|e| Status::internal(e.to_string()))?;
flights.extend(flight_dictionaries.into_iter().map(Into::into));
flights.push(flight_batch.into());}let output = futures::stream::iter(flights.into_iter().map(Ok));Ok(Response::new(Box::pin(output)asSelf::DoGetStream))}}pubfnserver() -> implFuture<Output = Result<(),Box<dyn std::error::Error + Send>>> + Send{letmut addr:SocketAddr = CONFIG.parseable.address.parse().expect("valid socket address");
addr.set_port(CONFIG.parseable.flight_port);let service = AirServiceImpl{};let svc = FlightServiceServer::new(service).max_encoding_message_size(usize::MAX).max_decoding_message_size(usize::MAX).send_compressed(CompressionEncoding::Zstd).accept_compressed(CompressionEncoding::Zstd);let cors = cross_origin_config();let identity = match(&CONFIG.parseable.tls_cert_path,&CONFIG.parseable.tls_key_path,){(Some(cert),Some(key)) => {match(std::fs::read_to_string(cert), std::fs::read_to_string(key)){(Ok(cert_file),Ok(key_file)) => {let identity = Identity::from_pem(cert_file, key_file);Some(identity)}
_ => None,}}(_, _) => None,};let config = identity.map(|id| ServerTlsConfig::new().identity(id));let err_map_fn = |err| Box::new(err)asBox<dyn std::error::Error + Send>;Server::builder().accept_http1(true).max_frame_size((16*1024*1024) - 2)// 6MB ish.layer(cors).layer(GrpcWebLayer::new()).add_service(svc).serve(addr).map_err(err_map_fn),}
even after updating the frame size, max_encoding_message_size, max_decoding_message_size. It still is giving me the same error.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi, I am building a apache arrow flight server. When I am querying the
do_get
endpoint, I am getting an errorError, message length too large: found 5605265 bytes, the limit is: 4194304 bytes
This is the code for the server.
even after updating the frame size, max_encoding_message_size, max_decoding_message_size. It still is giving me the same error.
Beta Was this translation helpful? Give feedback.
All reactions