Skip to content

Commit c436267

Browse files
revert: Optimise metadata discovery for large databases (#536)
Related: - Reverts #528 - #535 This reverts commit 9bb40d1.
1 parent ecd616d commit c436267

File tree

1 file changed

+0
-137
lines changed

1 file changed

+0
-137
lines changed

tap_postgres/client.py

-137
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import sqlalchemy.types
2121
from psycopg2 import extras
2222
from singer_sdk import SQLConnector, SQLStream
23-
from singer_sdk import typing as th
24-
from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema
2523
from singer_sdk.connectors.sql import SQLToJSONSchema
2624
from singer_sdk.helpers._state import increment_state
2725
from singer_sdk.helpers._typing import TypeConformanceLevel
@@ -34,12 +32,6 @@
3432
from singer_sdk.helpers.types import Context
3533
from sqlalchemy.dialects import postgresql
3634
from sqlalchemy.engine import Engine
37-
from sqlalchemy.engine.interfaces import ( # type: ignore[attr-defined]
38-
ReflectedColumn,
39-
ReflectedIndex,
40-
ReflectedPrimaryKeyConstraint,
41-
TableKey,
42-
)
4335
from sqlalchemy.engine.reflection import Inspector
4436

4537

@@ -191,135 +183,6 @@ def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]:
191183
return self.config["filter_schemas"]
192184
return super().get_schema_names(engine, inspected)
193185

194-
# Uses information_schema for speed.
195-
def discover_catalog_entry_optimized( # noqa: PLR0913
196-
self,
197-
engine: Engine,
198-
inspected: Inspector,
199-
schema_name: str,
200-
table_name: str,
201-
is_view: bool,
202-
table_data: dict[TableKey, list[ReflectedColumn]],
203-
pk_data: dict[TableKey, ReflectedPrimaryKeyConstraint],
204-
index_data: dict[TableKey, list[ReflectedIndex]],
205-
) -> CatalogEntry:
206-
"""Create `CatalogEntry` object for the given table or a view.
207-
208-
Args:
209-
engine: SQLAlchemy engine
210-
inspected: SQLAlchemy inspector instance for engine
211-
schema_name: Schema name to inspect
212-
table_name: Name of the table or a view
213-
is_view: Flag whether this object is a view, returned by `get_object_names`
214-
table_data: Cached inspector data for the relevant tables
215-
pk_data: Cached inspector data for the relevant primary keys
216-
index_data: Cached inspector data for the relevant indexes
217-
218-
Returns:
219-
`CatalogEntry` object for the given table or a view
220-
"""
221-
# Initialize unique stream name
222-
unique_stream_id = f"{schema_name}-{table_name}"
223-
table_key = (schema_name, table_name)
224-
225-
# Detect key properties
226-
possible_primary_keys: list[list[str]] = []
227-
pk_def = pk_data.get(table_key, {})
228-
if pk_def and "constrained_columns" in pk_def:
229-
possible_primary_keys.append(pk_def["constrained_columns"])
230-
231-
# An element of the columns list is ``None`` if it's an expression and is
232-
# returned in the ``expressions`` list of the reflected index.
233-
possible_primary_keys.extend(
234-
index_def["column_names"]
235-
for index_def in index_data.get(table_key, [])
236-
if index_def.get("unique", False)
237-
)
238-
239-
key_properties = next(iter(possible_primary_keys), None)
240-
241-
# Initialize columns list
242-
table_schema = th.PropertiesList()
243-
for column_def in table_data.get(table_key, []):
244-
column_name = column_def["name"]
245-
is_nullable = column_def.get("nullable", False)
246-
jsonschema_type: dict = self.to_jsonschema_type(column_def["type"])
247-
table_schema.append(
248-
th.Property(
249-
name=column_name,
250-
wrapped=th.CustomType(jsonschema_type),
251-
nullable=is_nullable,
252-
required=column_name in key_properties if key_properties else False,
253-
),
254-
)
255-
schema = table_schema.to_dict()
256-
257-
# Initialize available replication methods
258-
addl_replication_methods: list[str] = [""] # By default an empty list.
259-
# Notes regarding replication methods:
260-
# - 'INCREMENTAL' replication must be enabled by the user by specifying
261-
# a replication_key value.
262-
# - 'LOG_BASED' replication must be enabled by the developer, according
263-
# to source-specific implementation capabilities.
264-
replication_method = next(reversed(["FULL_TABLE", *addl_replication_methods]))
265-
266-
# Create the catalog entry object
267-
return CatalogEntry(
268-
tap_stream_id=unique_stream_id,
269-
stream=unique_stream_id,
270-
table=table_name,
271-
key_properties=key_properties,
272-
schema=Schema.from_dict(schema),
273-
is_view=is_view,
274-
replication_method=replication_method,
275-
metadata=MetadataMapping.get_standard_metadata(
276-
schema_name=schema_name,
277-
schema=schema,
278-
replication_method=replication_method,
279-
key_properties=key_properties,
280-
valid_replication_keys=None, # Must be defined by user
281-
),
282-
database=None, # Expects single-database context
283-
row_count=None,
284-
stream_alias=None,
285-
replication_key=None, # Must be defined by user
286-
)
287-
288-
def discover_catalog_entries(self) -> list[dict]:
289-
"""Return a list of catalog entries from discovery.
290-
291-
Returns:
292-
The discovered catalog entries as a list.
293-
"""
294-
result: list[dict] = []
295-
engine = self._engine
296-
inspected = sa.inspect(engine)
297-
for schema_name in self.get_schema_names(engine, inspected):
298-
# Use get_multi_* data here instead of pulling per-table
299-
table_data = inspected.get_multi_columns(schema=schema_name)
300-
pk_data = inspected.get_multi_pk_constraint(schema=schema_name)
301-
index_data = inspected.get_multi_indexes(schema=schema_name)
302-
303-
# Iterate through each table and view
304-
for table_name, is_view in self.get_object_names(
305-
engine,
306-
inspected,
307-
schema_name,
308-
):
309-
catalog_entry = self.discover_catalog_entry_optimized(
310-
engine,
311-
inspected,
312-
schema_name,
313-
table_name,
314-
is_view,
315-
table_data,
316-
pk_data,
317-
index_data,
318-
)
319-
result.append(catalog_entry.to_dict())
320-
321-
return result
322-
323186

324187
class PostgresStream(SQLStream):
325188
"""Stream class for Postgres streams."""

0 commit comments

Comments
 (0)