Loading...
Loading...
Compare original and translation side by side
undefinedundefined
**Docker Compose for entire stack:**
```yaml
**使用Docker Compose管理整个技术栈:**
```yaml
```bash
```bash
**Terraform GCP setup:**
```hcl
**Terraform GCP配置:**
```hcl
```hcl
```hcl
```bash
```bashundefinedundefinedundefinedundefined
**Python data ingestion script:**
```python
**Python数据 ingestion 脚本:**
```python# Download CSV
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")
# Create engine
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
# Read CSV in chunks
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
df = next(df_iter)
# Convert datetime columns
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
# Create table
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
# Insert first chunk
df.to_sql(name=table_name, con=engine, if_exists='append')
# Insert remaining chunks
while True:
try:
t_start = time()
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print(f'Inserted another chunk, took %.3f seconds' % (t_end - t_start))
except StopIteration:
print("Finished ingesting data")
breakparser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table')
parser.add_argument('--url', required=True, help='url of the csv file')
args = parser.parse_args()
main(args)
```bash# Download CSV
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")
# Create engine
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
# Read CSV in chunks
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
df = next(df_iter)
# Convert datetime columns
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
# Create table
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
# Insert first chunk
df.to_sql(name=table_name, con=engine, if_exists='append')
# Insert remaining chunks
while True:
try:
t_start = time()
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print(f'Inserted another chunk, took %.3f seconds' % (t_end - t_start))
except StopIteration:
print("Finished ingesting data")
breakparser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table')
parser.add_argument('--url', required=True, help='url of the csv file')
args = parser.parse_args()
main(args)
```bashundefinedundefined-- Create external table
CREATE OR REPLACE EXTERNAL TABLE `trips_data_all.external_yellow_tripdata`
OPTIONS (
format = 'CSV',
uris = ['gs://nyc-tl-data/trip data/yellow_tripdata_2019-*.csv',
'gs://nyc-tl-data/trip data/yellow_tripdata_2020-*.csv']
);
-- Create partitioned table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned`
PARTITION BY
DATE(tpep_pickup_datetime) AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;
-- Create partitioned and clustered table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned_clustered`
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY VendorID AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;
-- Query comparison
SELECT DISTINCT(VendorID)
FROM `trips_data_all.yellow_tripdata_partitioned`
WHERE DATE(tpep_pickup_datetime) BETWEEN '2020-06-01' AND '2020-06-30';
-- This query will process less data vs non-partitionedundefined-- Create external table
CREATE OR REPLACE EXTERNAL TABLE `trips_data_all.external_yellow_tripdata`
OPTIONS (
format = 'CSV',
uris = ['gs://nyc-tl-data/trip data/yellow_tripdata_2019-*.csv',
'gs://nyc-tl-data/trip data/yellow_tripdata_2020-*.csv']
);
-- Create partitioned table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned`
PARTITION BY
DATE(tpep_pickup_datetime) AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;
-- Create partitioned and clustered table
CREATE OR REPLACE TABLE `trips_data_all.yellow_tripdata_partitioned_clustered`
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY VendorID AS
SELECT * FROM `trips_data_all.external_yellow_tripdata`;
-- Query comparison
SELECT DISTINCT(VendorID)
FROM `trips_data_all.yellow_tripdata_partitioned`
WHERE DATE(tpep_pickup_datetime) BETWEEN '2020-06-01' AND '2020-06-30';
-- This query will process less data vs non-partitionedundefinedundefinedundefineddbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/
│ │ ├── stg_yellow_tripdata.sql
│ │ └── schema.yml
│ └── core/
│ ├── fact_trips.sql
│ └── dim_zones.sql
└── macros/
└── get_payment_type_description.sqlname: 'taxi_rides_ny'
version: '1.0.0'
config-version: 2
profile: 'default'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
taxi_rides_ny:
staging:
+materialized: view
core:
+materialized: tabledefault:
outputs:
dev:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: dbt_dev
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
location: EU
prod:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: production
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
location: EU
target: dev{{ config(materialized='view') }}
with tripdata as
(
select *,
row_number() over(partition by vendorid, tpep_pickup_datetime) as rn
from {{ source('staging','yellow_tripdata') }}
where vendorid is not null
)
select
-- identifiers
{{ dbt_utils.generate_surrogate_key(['vendorid', 'tpep_pickup_datetime']) }} as tripid,
cast(vendorid as integer) as vendorid,
cast(ratecodeid as integer) as ratecodeid,
cast(pulocationid as integer) as pickup_locationid,
cast(dolocationid as integer) as dropoff_locationid,
-- timestamps
cast(tpep_pickup_datetime as timestamp) as pickup_datetime,
cast(tpep_dropoff_datetime as timestamp) as dropoff_datetime,
-- trip info
store_and_fwd_flag,
cast(passenger_count as integer) as passenger_count,
cast(trip_distance as numeric) as trip_distance,
-- payment info
cast(fare_amount as numeric) as fare_amount,
cast(extra as numeric) as extra,
cast(mta_tax as numeric) as mta_tax,
cast(tip_amount as numeric) as tip_amount,
cast(tolls_amount as numeric) as tolls_amount,
cast(improvement_surcharge as numeric) as improvement_surcharge,
cast(total_amount as numeric) as total_amount,
cast(payment_type as integer) as payment_type,
{{ get_payment_type_description('payment_type') }} as payment_type_description
from tripdata
where rn = 1{{ config(materialized='table') }}
with green_data as (
select *,
'Green' as service_type
from {{ ref('stg_green_tripdata') }}
),
yellow_data as (
select *,
'Yellow' as service_type
from {{ ref('stg_yellow_tripdata') }}
),
trips_unioned as (
select * from green_data
union all
select * from yellow_data
),
dim_zones as (
select * from {{ ref('dim_zones') }}
where borough != 'Unknown'
)
select
trips_unioned.tripid,
trips_unioned.vendorid,
trips_unioned.service_type,
trips_unioned.ratecodeid,
trips_unioned.pickup_locationid,
pickup_zone.borough as pickup_borough,
pickup_zone.zone as pickup_zone,
trips_unioned.dropoff_locationid,
dropoff_zone.borough as dropoff_borough,
dropoff_zone.zone as dropoff_zone,
trips_unioned.pickup_datetime,
trips_unioned.dropoff_datetime,
trips_unioned.store_and_fwd_flag,
trips_unioned.passenger_count,
trips_unioned.trip_distance,
trips_unioned.fare_amount,
trips_unioned.extra,
trips_unioned.mta_tax,
trips_unioned.tip_amount,
trips_unioned.tolls_amount,
trips_unioned.total_amount,
trips_unioned.payment_type,
trips_unioned.payment_type_description
from trips_unioned
inner join dim_zones as pickup_zone
on trips_unioned.pickup_locationid = pickup_zone.locationid
inner join dim_zones as dropoff_zone
on trips_unioned.dropoff_locationid = dropoff_zone.locationid{#
This macro returns the description of the payment_type
#}
{% macro get_payment_type_description(payment_type) -%}
case {{ payment_type }}
when 1 then 'Credit card'
when 2 then 'Cash'
when 3 then 'No charge'
when 4 then 'Dispute'
when 5 then 'Unknown'
when 6 then 'Voided trip'
end
{%- endmacro %}version: 2
sources:
- name: staging
database: "{{ env_var('GCP_PROJECT_ID') }}"
schema: trips_data_all
tables:
- name: yellow_tripdata
- name: green_tripdata
models:
- name: stg_yellow_tripdata
description: >
Trip made by yellow taxis.
columns:
- name: tripid
description: Primary key for this table, generated with a concatenation of vendorid+pickup_datetime
tests:
- unique:
severity: warn
- not_null:
severity: warn
- name: vendorid
description: >
A code indicating the TPEP provider that provided the record.
tests:
- accepted_values:
values: [1, 2]
- name: pickup_datetime
description: The date and time when the meter was engaged.
tests:
- not_null:
severity: warn
- name: passenger_count
description: The number of passengers in the vehicle.
tests:
- accepted_values:
values: [1, 2, 3, 4, 5, 6]
severity: warnundefineddbt_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/
│ │ ├── stg_yellow_tripdata.sql
│ │ └── schema.yml
│ └── core/
│ ├── fact_trips.sql
│ └── dim_zones.sql
└── macros/
└── get_payment_type_description.sqlname: 'taxi_rides_ny'
version: '1.0.0'
config-version: 2
profile: 'default'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target"
clean-targets:
- "target"
- "dbt_packages"
models:
taxi_rides_ny:
staging:
+materialized: view
core:
+materialized: tabledefault:
outputs:
dev:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: dbt_dev
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
location: EU
prod:
type: bigquery
method: service-account
project: "{{ env_var('GCP_PROJECT_ID') }}"
dataset: production
threads: 4
keyfile: "{{ env_var('GOOGLE_APPLICATION_CREDENTIALS') }}"
location: EU
target: dev{{ config(materialized='view') }}
with tripdata as
(
select *,
row_number() over(partition by vendorid, tpep_pickup_datetime) as rn
from {{ source('staging','yellow_tripdata') }}
where vendorid is not null
)
select
-- identifiers
{{ dbt_utils.generate_surrogate_key(['vendorid', 'tpep_pickup_datetime']) }} as tripid,
cast(vendorid as integer) as vendorid,
cast(ratecodeid as integer) as ratecodeid,
cast(pulocationid as integer) as pickup_locationid,
cast(dolocationid as integer) as dropoff_locationid,
-- timestamps
cast(tpep_pickup_datetime as timestamp) as pickup_datetime,
cast(tpep_dropoff_datetime as timestamp) as dropoff_datetime,
-- trip info
store_and_fwd_flag,
cast(passenger_count as integer) as passenger_count,
cast(trip_distance as numeric) as trip_distance,
-- payment info
cast(fare_amount as numeric) as fare_amount,
cast(extra as numeric) as extra,
cast(mta_tax as numeric) as mta_tax,
cast(tip_amount as numeric) as tip_amount,
cast(tolls_amount as numeric) as tolls_amount,
cast(improvement_surcharge as numeric) as improvement_surcharge,
cast(total_amount as numeric) as total_amount,
cast(payment_type as integer) as payment_type,
{{ get_payment_type_description('payment_type') }} as payment_type_description
from tripdata
where rn = 1{{ config(materialized='table') }}
with green_data as (
select *,
'Green' as service_type
from {{ ref('stg_green_tripdata') }}
),
yellow_data as (
select *,
'Yellow' as service_type
from {{ ref('stg_yellow_tripdata') }}
),
trips_unioned as (
select * from green_data
union all
select * from yellow_data
),
dim_zones as (
select * from {{ ref('dim_zones') }}
where borough != 'Unknown'
)
select
trips_unioned.tripid,
trips_unioned.vendorid,
trips_unioned.service_type,
trips_unioned.ratecodeid,
trips_unioned.pickup_locationid,
pickup_zone.borough as pickup_borough,
pickup_zone.zone as pickup_zone,
trips_unioned.dropoff_locationid,
dropoff_zone.borough as dropoff_borough,
dropoff_zone.zone as dropoff_zone,
trips_unioned.pickup_datetime,
trips_unioned.dropoff_datetime,
trips_unioned.store_and_fwd_flag,
trips_unioned.passenger_count,
trips_unioned.trip_distance,
trips_unioned.fare_amount,
trips_unioned.extra,
trips_unioned.mta_tax,
trips_unioned.tip_amount,
trips_unioned.tolls_amount,
trips_unioned.total_amount,
trips_unioned.payment_type,
trips_unioned.payment_type_description
from trips_unioned
inner join dim_zones as pickup_zone
on trips_unioned.pickup_locationid = pickup_zone.locationid
inner join dim_zones as dropoff_zone
on trips_unioned.dropoff_locationid = dropoff_zone.locationid{#
This macro returns the description of the payment_type
#}
{% macro get_payment_type_description(payment_type) -%}
case {{ payment_type }}
when 1 then 'Credit card'
when 2 then 'Cash'
when 3 then 'No charge'
when 4 then 'Dispute'
when 5 then 'Unknown'
when 6 then 'Voided trip'
end
{%- endmacro %}version: 2
sources:
- name: staging
database: "{{ env_var('GCP_PROJECT_ID') }}"
schema: trips_data_all
tables:
- name: yellow_tripdata
- name: green_tripdata
models:
- name: stg_yellow_tripdata
description: >
Trip made by yellow taxis.
columns:
- name: tripid
description: Primary key for this table, generated with a concatenation of vendorid+pickup_datetime
tests:
- unique:
severity: warn
- not_null:
severity: warn
- name: vendorid
description: >
A code indicating the TPEP provider that provided the record.
tests:
- accepted_values:
values: [1, 2]
- name: pickup_datetime
description: The date and time when the meter was engaged.
tests:
- not_null:
severity: warn
- name: passenger_count
description: The number of passengers in the vehicle.
tests:
- accepted_values:
values: [1, 2, 3, 4, 5, 6]
severity: warnundefinedundefinedundefinedundefinedundefined
**PySpark script for data processing:**
```python
**PySpark数据处理脚本:**
```python
**Spark with Google Cloud Storage:**
```python
import pyspark
from pyspark.sql import SparkSession
**Spark对接Google Cloud Storage:**
```python
import pyspark
from pyspark.sql import SparkSessionundefinedundefinedundefinedundefined
```bash
```bash
**Python Kafka producer:**
```python
**Python Kafka生产者:**
```pythonproducer.send('rides', value=message)
print(f"Sent message {i}")
time.sleep(1)
**Python Kafka consumer:**
```pythonproducer.send('rides', value=message)
print(f"Sent message {i}")
time.sleep(1)
**Python Kafka消费者:**
```python# Process message
trip_id = message.value['trip_id']
trip_distance = message.value['trip_distance']
print(f"Trip {trip_id}: {trip_distance} miles")
**Kafka Streams example:**
```python# Process message
trip_id = message.value['trip_id']
trip_distance = message.value['trip_distance']
print(f"Trip {trip_id}: {trip_distance} miles")
**Kafka Streams示例:**
```python# Send aggregated results
result = {
'location': location,
'count': location_counts[location]
}
producer.send('rides-pulocationid', value=result)
print(f"Location {location}: {location_counts[location]} rides")undefined# Send aggregated results
result = {
'location': location,
'count': location_counts[location]
}
producer.send('rides-pulocationid', value=result)
print(f"Location {location}: {location_counts[location]} rides")undefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefinedundefined