Оптимизация обработки данных

创建于:2024年11月21日

使用 OpenAI 01 Mini 回答 OpenAI01

问题

import pandas as pd
import numpy as np

# Предполагается, что response_status и last_sunday уже определены
# response_status = get_data.get_status_data()
# last_sunday = ... (определите дату последнего воскресенья)

columns = ['CARD_CODE', 'query_date', 'first_visit_days', 'last_visit_days', 'is_churn']
w_number = 13

# Создаём диапазон дат, идущих назад с шагом в одну неделю
date_range = pd.date_range(end=last_sunday, periods=w_number, freq='-1W-SUN')

# Преобразуем SHIFT_DATE в datetime, если это ещё не сделано
response_status['SHIFT_DATE'] = pd.to_datetime(response_status['SHIFT_DATE'])

# Находим минимальную и максимальную дату визита для каждого CARD_CODE
agg_df = response_status.groupby('CARD_CODE').agg(
min_shift_date=('SHIFT_DATE', 'min'),
max_shift_date=('SHIFT_DATE', 'max')
).reset_index()

# Создаём DataFrame с каждой комбинацией CARD_CODE и query_date
status_df = agg_df.merge(pd.DataFrame({'query_date': date_range}), how='cross')

# Фильтруем записи, где SHIFT_DATE <= query_date
status_df = status_df.merge(response_status[['CARD_CODE', 'SHIFT_DATE']], on='CARD_CODE')
status_df = status_df[status_df['SHIFT_DATE'] <= status_df['query_date']]

# Вычисляем первые и последние дни визита относительно query_date
status_df_grouped = status_df.groupby(['CARD_CODE', 'query_date']).agg(
first_visit_days=('SHIFT_DATE', lambda x: (status_df['query_date'].iloc[0] - x.min()).days),
last_visit_days=('SHIFT_DATE', lambda x: (status_df['query_date'].iloc[0] - x.max()).days)
).reset_index()

# Фильтруем по условию first_visit_days >= 70
status_df_filtered = status_df_grouped[status_df_grouped['first_visit_days'] >= 70].copy()

# Определяем is_churn
status_df_filtered['is_churn'] = (status_df_filtered['last_visit_days'] >= 70).astype(int)

# Сортируем данные
status_df_filtered = status_df_filtered.sort_values(['CARD_CODE', 'query_date'])

# Отметка изменения статуса
status_df_filtered['status_changed'] = status_df_filtered.groupby('CARD_CODE')['is_churn'].diff().fillna(0).ne(0).astype(int)

# Итоговая информация и вывод
status_df_filtered = status_df_filtered[columns + ['status_changed']]
status_df_filtered.info()
status_df_filtered.head()
как можно еще ускорить код пиши на русском

思考

Создаю временной интервал

В коде выясняется, что w_number задаёт количество недель, и создаётся временной интервал, который сдвигается назад на одну неделю после last_sunday.

Осуществляю объединение

Я работаю с DataFrame, объединяя агрегированный датафрейм с датафреймом query_dates через cross join, чтобы создать все комбинации CARD_CODE и query_date.

Фильтрую и группирую

Фильтрую строки по условию SHIFT_DATE <= query_date и группирую по CARD_CODE и query_date. Вычисляю первые и последние дни посещений, фильтрую по первым дням посещений >= 70.

Определение и сортировка

Определяю is_churn на основе последних дней посещений, сортирую по CARD_CODE и query_date.

Оптимизация

Рассматриваю пути улучшения: избегание излишних объединений, оптимизация группировок, векторизация lambda-функций, раннее уменьшение размера данных, использование категориальных типов данных, правильная установка индексов.

回答

Ваш код уже хорошо структурирован, однако есть несколько способов его оптимизировать для повышения производительности. Вот некоторые рекомендации:

1. Избегайте ненужных операций merge

Каждое объединение (merge) может быть дорогостоящим, особенно на больших наборах данных. Попробуйте минимизировать количество объединений или использовать более эффективные методы объединения.

Оптимизация: Используйте pd.merge только там, где это действительно необходимо. Например, при создании перекрестного соединения можно рассмотреть альтернативные подходы, такие как использование индексов или более эффективных библиотек для больших данных, если это применимо.

2. Используйте более эффективные методы группировки

Группировка данных может занимать значительное время. Оптимизируйте группировки, минимизируя количество агрегируемых столбцов и предварительно фильтруя данные.

Оптимизация:

# Предварительно отфильтруйте данные до группировки filtered_response = response_status[response_status['SHIFT_DATE'] <= response_status['query_date']] agg_df = filtered_response.groupby('CARD_CODE').agg( min_shift_date=('SHIFT_DATE', 'min'), max_shift_date=('SHIFT_DATE', 'max') ).reset_index()

3. Векторизируйте операции вместо использования lambda

Использование lambda функций внутри agg может замедлить выполнение. Вместо этого воспользуйтесь векторизованными операциями pandas.

Оптимизация:

status_df_grouped = status_df.groupby(['CARD_CODE', 'query_date']).agg( first_visit_days=('SHIFT_DATE', 'min'), last_visit_days=('SHIFT_DATE', 'max') ).reset_index() status_df_grouped['first_visit_days'] = (status_df_grouped['query_date'] - status_df_grouped['first_visit_days']).dt.days status_df_grouped['last_visit_days'] = (status_df_grouped['query_date'] - status_df_grouped['last_visit_days']).dt.days

4. Сократите объем данных как можно раньше

Чем меньше данных обрабатывается на следующих этапах, тем быстрее выполняется код. Фильтруйте ненужные данные как можно раньше в процессе обработки.

Оптимизация:

# Фильтрация до объединения status_df = agg_df.merge(pd.DataFrame({'query_date': date_range}), how='cross') status_df = status_df[status_df['max_shift_date'] <= status_df['query_date']]

5. Используйте типы данных с экономией памяти

Преобразование типов данных на этапе загрузки может значительно снизить использование памяти и ускорить операции.

Оптимизация:

response_status['CARD_CODE'] = response_status['CARD_CODE'].astype('category')

6. Установите индексы для ускорения операций

Установка индексов на часто используемые столбцы может ускорить операции merge и groupby.

Оптимизация:

response_status.set_index('CARD_CODE', inplace=True)

7. Параллелизация операций

Если у вас есть возможность использовать многопоточность или многопроцессорность, можно распараллелить некоторые операции.

Оптимизация: Используйте библиотеку dask, которая позволяет работать с большими данными и распараллеливать операции:

import dask.dataframe as dd dd_response_status = dd.from_pandas(response_status, npartitions=4) # Продолжайте обработку с использованием Dask

8. Используйте более быстрые альтернативы pandas для специфичных задач

В некоторых случаях, использование специализированных библиотек может ускорить выполнение.

Оптимизация: Например, для операций группировки и агрегирования можно использовать библиотеку vaex, которая оптимизирована для больших данных.

Пример оптимизированного кода

Ниже приведен пример оптимизированного кода с учетом вышеперечисленных рекомендаций:

import pandas as pd import numpy as np # Предполагается, что response_status и last_sunday уже определены columns = ['CARD_CODE', 'query_date', 'first_visit_days', 'last_visit_days', 'is_churn'] w_number = 13 # Создаём диапазон дат, идущих назад с шагом в одну неделю date_range = pd.date_range(end=last_sunday, periods=w_number, freq='-1W-SUN') # Преобразуем SHIFT_DATE в datetime, если это ещё не сделано response_status['SHIFT_DATE'] = pd.to_datetime(response_status['SHIFT_DATE']) # Используем категориальные типы для CARD_CODE response_status['CARD_CODE'] = response_status['CARD_CODE'].astype('category') # Находим минимальную и максимальную дату визита для каждого CARD_CODE agg_df = response_status.groupby('CARD_CODE').agg( min_shift_date=('SHIFT_DATE', 'min'), max_shift_date=('SHIFT_DATE', 'max') ).reset_index() # Создаём DataFrame с каждой комбинацией CARD_CODE и query_date agg_df['key'] = 1 date_df = pd.DataFrame({'query_date': date_range, 'key': 1}) status_df = pd.merge(agg_df, date_df, on='key').drop('key', axis=1) # Фильтруем записи, где SHIFT_DATE <= query_date status_df = status_df[status_df['max_shift_date'] <= status_df['query_date']] # Вычисляем первые и последние дни визита относительно query_date status_df['first_visit_days'] = (status_df['query_date'] - status_df['min_shift_date']).dt.days status_df['last_visit_days'] = (status_df['query_date'] - status_df['max_shift_date']).dt.days # Фильтруем по условию first_visit_days >= 70 status_df_filtered = status_df[status_df['first_visit_days'] >= 70].copy() # Определяем is_churn status_df_filtered['is_churn'] = (status_df_filtered['last_visit_days'] >= 70).astype(int) # Сортируем данные status_df_filtered = status_df_filtered.sort_values(['CARD_CODE', 'query_date']) # Отметка изменения статуса status_df_filtered['status_changed'] = status_df_filtered.groupby('CARD_CODE')['is_churn'].diff().fillna(0).ne(0).astype(int) # Итоговая информация и вывод status_df_filtered = status_df_filtered[columns + ['status_changed']] status_df_filtered.info() status_df_filtered.head()

Заключение

Оптимизация кода с использованием pandas часто сводится к минимизации количества операций объединения и группировки, использованию векторизованных операций вместо lambda, правильному выбору типов данных и эффективному управлению памятью. Применение этих методов поможет значительно ускорить выполнение вашего кода.

分享这个问答