Skip to content

Commit 0a3cf00

Browse files
authored
Merge pull request #55 from stackql/feature/refactor
output formatting
2 parents 48de707 + 0026fda commit 0a3cf00

File tree

7 files changed

+232
-185
lines changed

7 files changed

+232
-185
lines changed

pystackql/core/output.py

Lines changed: 84 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@ def _format_error(self, error_msg):
9696
def _format_data(self, data):
9797
"""Format data.
9898
99+
This method processes SQL type objects from StackQL:
100+
- SQL NULL values: {'String': '', 'Valid': False} → None
101+
- Regular values: {'String': 'value', 'Valid': True} → 'value'
102+
- Empty strings: {'String': '', 'Valid': True} → '' (preserved as empty string)
103+
99104
Args:
100105
data (str): The data string
101106
@@ -104,19 +109,84 @@ def _format_data(self, data):
104109
"""
105110
if self.output_format == 'csv':
106111
return data
107-
elif self.output_format == 'pandas':
108-
import pandas as pd
109-
try:
110-
return pd.read_json(StringIO(data))
111-
except ValueError:
112-
return pd.DataFrame([{"error": "Invalid JSON output"}])
113-
else: # dict
114-
try:
115-
retval = json.loads(data)
116-
return retval if retval else []
117-
except ValueError:
118-
return [{"error": f"Invalid JSON output : {data}"}]
119-
112+
113+
try:
114+
# Attempt to parse JSON first
115+
raw_json_data = json.loads(data)
116+
except json.JSONDecodeError as e:
117+
# Handle specific JSON parsing errors
118+
error_result = [{"error": f"Invalid JSON format: {str(e)}", "position": e.pos, "line": e.lineno, "column": e.colno}]
119+
return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result
120+
except TypeError as e:
121+
# Handle cases where data is not a string or buffer
122+
error_result = [{"error": f"Invalid data type for JSON parsing: {str(e)}", "data_type": str(type(data))}]
123+
return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result
124+
except Exception as e:
125+
# Catch any other unexpected errors
126+
error_result = [{"error": f"Unexpected error parsing JSON: {str(e)}", "exception_type": type(e).__name__}]
127+
return pd.DataFrame(error_result) if self.output_format == 'pandas' else error_result
128+
129+
try:
130+
# Process the JSON data to clean up SQL type objects
131+
processed_json_data = self._process_sql_types(raw_json_data)
132+
133+
# Handle empty data
134+
if not processed_json_data:
135+
return pd.DataFrame() if self.output_format == 'pandas' else []
136+
137+
if self.output_format == 'pandas':
138+
import pandas as pd
139+
# Convert the preprocessed JSON data to a DataFrame
140+
return pd.DataFrame(processed_json_data)
141+
142+
# Return the preprocessed dictionary data
143+
return processed_json_data
144+
145+
except Exception as e:
146+
# Handle any errors during processing
147+
error_msg = f"Error processing data: {str(e)}"
148+
if self.output_format == 'pandas':
149+
import pandas as pd
150+
return pd.DataFrame([{"error": error_msg}])
151+
return [{"error": error_msg}]
152+
153+
def _process_sql_types(self, data):
154+
"""Process SQL type objects in the data.
155+
156+
Args:
157+
data: The parsed JSON data
158+
159+
Returns:
160+
The processed data with SQL type objects transformed
161+
"""
162+
# Handle lists (most common case from StackQL)
163+
if isinstance(data, list):
164+
return [self._process_sql_types(item) for item in data]
165+
166+
# Handle dictionaries (individual records or nested objects)
167+
elif isinstance(data, dict):
168+
# Check if this is an SQL type object
169+
if 'Valid' in data and len(data) <= 2 and ('String' in data or 'Int64' in data or 'Float64' in data):
170+
# This is an SQL type object - transform it
171+
if data.get('Valid', False):
172+
# Valid: True -> return the actual value
173+
for type_key in ['String', 'Int64', 'Float64']:
174+
if type_key in data:
175+
return data.get(type_key)
176+
return None # Fallback if no value field found
177+
else:
178+
# Valid: False -> return None (SQL NULL)
179+
return None
180+
else:
181+
# Regular dictionary - process each value
182+
result = {}
183+
for key, value in data.items():
184+
result[key] = self._process_sql_types(value)
185+
return result
186+
187+
# All other data types (strings, numbers, booleans, None) - return as is
188+
return data
189+
120190
def _format_empty(self):
121191
"""Format an empty result.
122192
@@ -154,8 +224,4 @@ def format_statement_result(self, result):
154224
elif self.output_format == 'csv':
155225
return message
156226
else: # dict
157-
# Count number of rows in the message
158-
try:
159-
return {'message': message, 'rowsaffected': message.count('\n')}
160-
except Exception:
161-
return {'message': message, 'rowsaffected': 0}
227+
return {'message': message.rstrip('\n')}

pystackql/core/query.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -151,24 +151,21 @@ def execute(self, query, custom_auth=None, env_vars=None):
151151
os.remove(script_path)
152152
return output
153153

154-
155154
class AsyncQueryExecutor:
156-
"""Executes StackQL queries asynchronously.
155+
"""Executes StackQL queries asynchronously in local mode.
157156
158157
This class provides methods for executing multiple StackQL queries
159-
concurrently using asyncio.
158+
concurrently using asyncio. Server mode is not supported for async queries.
160159
"""
161160

162-
def __init__(self, sync_query_func, server_mode=False, output_format='dict'):
161+
def __init__(self, sync_query_func, output_format='dict'):
163162
"""Initialize the AsyncQueryExecutor.
164163
165164
Args:
166165
sync_query_func (callable): Function to execute a single query synchronously
167-
server_mode (bool, optional): Whether to use server mode. Defaults to False.
168166
output_format (str, optional): Output format (dict or pandas). Defaults to 'dict'.
169167
"""
170168
self.sync_query_func = sync_query_func
171-
self.server_mode = server_mode
172169
self.output_format = output_format
173170

174171
async def execute_queries(self, queries):
@@ -188,15 +185,12 @@ async def execute_queries(self, queries):
188185

189186
async def main():
190187
with ThreadPoolExecutor() as executor:
191-
# New connection is created for each query in server_mode, reused otherwise
192-
new_connection = self.server_mode
193-
194188
# Create tasks for each query
195189
loop = asyncio.get_event_loop()
196190
futures = [
197191
loop.run_in_executor(
198192
executor,
199-
lambda q=query: self.sync_query_func(q, new_connection),
193+
lambda q=query: self.sync_query_func(q),
200194
# Pass query as a default argument to avoid late binding issues
201195
)
202196
for query in queries
@@ -213,6 +207,7 @@ async def main():
213207
# Process results based on output format
214208
if self.output_format == 'pandas':
215209
import pandas as pd
210+
# Concatenate the DataFrames
216211
return pd.concat(results, ignore_index=True)
217212
else:
218213
# Flatten the list of results

pystackql/core/stackql.py

Lines changed: 76 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ def __init__(self,
152152
self.debug_log_file = None
153153

154154
# Setup output formatter
155-
self.output_formatter = OutputFormatter(output)
155+
self.local_output_formatter = OutputFormatter(output)
156156
self.output = output.lower()
157157

158158
# Server mode setup
@@ -179,62 +179,38 @@ def __init__(self,
179179
self.params = setup_local_mode(self, **local_params)
180180

181181
# Initialize query executor
182-
self.query_executor = QueryExecutor(
182+
self.local_query_executor = QueryExecutor(
183183
self.bin_path,
184184
self.params,
185185
self.debug,
186186
self.debug_log_file
187187
)
188188

189-
# Initialize async query executor
190-
self.async_executor = AsyncQueryExecutor(
191-
self._sync_query_wrapper,
192-
self.server_mode,
193-
self.output
194-
)
189+
# Initialize async query executor (only for local mode)
190+
if not self.server_mode:
191+
self.async_executor = AsyncQueryExecutor(
192+
self._sync_query_wrapper,
193+
output_format=self.output
194+
)
195195

196-
def _sync_query_wrapper(self, query, new_connection=False):
197-
"""Wrapper for synchronous query execution.
196+
def _sync_query_wrapper(self, query):
197+
"""Wrapper for synchronous query execution used by AsyncQueryExecutor.
198+
199+
This method is exclusively used for local mode async queries.
200+
Server mode is not supported for async queries.
198201
199202
Args:
200203
query (str): The query to execute
201-
new_connection (bool, optional): Whether to use a new connection. Defaults to False.
202-
204+
203205
Returns:
204-
The query result
206+
The formatted query result
205207
"""
206-
if self.server_mode:
207-
if new_connection:
208-
result = self.server_connection.execute_query_with_new_connection(query)
209-
else:
210-
result = self.server_connection.execute_query(query)
211-
212-
# Format server result if needed
213-
if self.output == 'pandas':
214-
import pandas as pd
215-
return pd.DataFrame(result)
216-
return result
217-
else:
218-
# Execute query and format result
219-
query_result = self.query_executor.execute(query)
220-
221-
if "exception" in query_result:
222-
result = [{"error": query_result["exception"]}]
223-
elif "error" in query_result:
224-
result = [{"error": query_result["error"]}]
225-
elif "data" in query_result:
226-
try:
227-
result = json.loads(query_result["data"])
228-
except Exception:
229-
result = [{"error": f"Invalid JSON output: {query_result['data']}"}]
230-
else:
231-
result = []
232-
233-
# Format local result if needed
234-
if self.output == 'pandas':
235-
import pandas as pd
236-
return pd.DataFrame(result)
237-
return result
208+
# Execute query
209+
query_result = self.local_query_executor.execute(query)
210+
211+
# Format the result using the OutputFormatter
212+
# This will handle SQL type objects through the _format_data method
213+
return self.local_output_formatter.format_query_result(query_result)
238214

239215
def properties(self):
240216
"""Retrieves the properties of the StackQL instance.
@@ -257,7 +233,7 @@ def properties(self):
257233
props = {}
258234
for var in vars(self):
259235
# Skip internal objects
260-
if var.startswith('_') or var in ['output_formatter', 'query_executor', 'async_executor', 'binary_manager', 'server_connection']:
236+
if var.startswith('_') or var in ['local_output_formatter', 'local_query_executor', 'async_executor', 'binary_manager', 'server_connection']:
261237
continue
262238
props[var] = getattr(self, var)
263239
return props
@@ -329,10 +305,10 @@ def executeStmt(self, query, custom_auth=None, env_vars=None):
329305
return result
330306
else:
331307
# Execute the query
332-
result = self.query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars)
308+
result = self.local_query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars)
333309

334310
# Format the result
335-
return self.output_formatter.format_statement_result(result)
311+
return self.local_output_formatter.format_statement_result(result)
336312

337313
def execute(self, query, suppress_errors=True, custom_auth=None, env_vars=None):
338314
"""
@@ -387,11 +363,53 @@ def execute(self, query, suppress_errors=True, custom_auth=None, env_vars=None):
387363
suppress_errors = False
388364

389365
# Execute the query
390-
output = self.query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars)
366+
output = self.local_query_executor.execute(query, custom_auth=custom_auth, env_vars=env_vars)
391367

392368
# Format the result
393-
return self.output_formatter.format_query_result(output, suppress_errors)
394-
369+
return self.local_output_formatter.format_query_result(output, suppress_errors)
370+
371+
# async def executeQueriesAsync(self, queries):
372+
# """Executes multiple StackQL queries asynchronously using the current StackQL instance.
373+
374+
# This method utilizes an asyncio event loop to concurrently run a list of provided
375+
# StackQL queries. Each query is executed independently, and the combined results of
376+
# all the queries are returned as a list of JSON objects if 'dict' output mode is selected,
377+
# or as a concatenated DataFrame if 'pandas' output mode is selected.
378+
379+
# The order of the results in the returned list or DataFrame may not necessarily
380+
# correspond to the order of the queries in the input list due to the asynchronous nature
381+
# of execution.
382+
383+
# :param queries: A list of StackQL query strings to be executed concurrently.
384+
# :type queries: list[str], required
385+
# :return: A list of results corresponding to each query. Each result is a JSON object or a DataFrame.
386+
# :rtype: list[dict] or pd.DataFrame
387+
# :raises ValueError: If method is used in `server_mode` on an unsupported OS (anything other than Linux).
388+
# :raises ValueError: If an unsupported output mode is selected (anything other than 'dict' or 'pandas').
389+
390+
# Example:
391+
# >>> from pystackql import StackQL
392+
# >>> stackql = StackQL()
393+
# >>> queries = [
394+
# >>> \"\"\"SELECT '%s' as region, instanceType, COUNT(*) as num_instances
395+
# ... FROM aws.ec2.instances
396+
# ... WHERE region = '%s'
397+
# ... GROUP BY instanceType\"\"\" % (region, region)
398+
# >>> for region in regions ]
399+
# >>> result = stackql.executeQueriesAsync(queries)
400+
401+
# Note:
402+
# - When operating in `server_mode`, this method is not supported.
403+
# """
404+
# if self.server_mode:
405+
# raise ValueError(
406+
# "The executeQueriesAsync method is not supported in server mode. "
407+
# "Please use the standard execute method with individual queries instead, "
408+
# "or switch to local mode if you need to run multiple queries concurrently."
409+
# )
410+
411+
# return await self.async_executor.execute_queries(queries)
412+
395413
async def executeQueriesAsync(self, queries):
396414
"""Executes multiple StackQL queries asynchronously using the current StackQL instance.
397415
@@ -408,7 +426,7 @@ async def executeQueriesAsync(self, queries):
408426
:type queries: list[str], required
409427
:return: A list of results corresponding to each query. Each result is a JSON object or a DataFrame.
410428
:rtype: list[dict] or pd.DataFrame
411-
:raises ValueError: If method is used in `server_mode` on an unsupported OS (anything other than Linux).
429+
:raises ValueError: If server_mode is True (async is only supported in local mode).
412430
:raises ValueError: If an unsupported output mode is selected (anything other than 'dict' or 'pandas').
413431
414432
Example:
@@ -423,7 +441,7 @@ async def executeQueriesAsync(self, queries):
423441
>>> result = stackql.executeQueriesAsync(queries)
424442
425443
Note:
426-
- When operating in `server_mode`, this method is not supported.
444+
- This method is only supported in local mode.
427445
"""
428446
if self.server_mode:
429447
raise ValueError(
@@ -432,8 +450,12 @@ async def executeQueriesAsync(self, queries):
432450
"or switch to local mode if you need to run multiple queries concurrently."
433451
)
434452

453+
# Verify that async_executor is available (should only be initialized in local mode)
454+
if not hasattr(self, 'async_executor'):
455+
raise RuntimeError("Async executor not initialized. This should not happen.")
456+
435457
return await self.async_executor.execute_queries(queries)
436-
458+
437459
def test_connection(self):
438460
"""Tests if the server connection is working by executing a simple query.
439461

0 commit comments

Comments
 (0)