开发手册 欢迎您!
软件开发者资料库

Python pandas.read_sql_query() 使用示例(demo)代码

本文主要介绍使用pandas.read_sql_query()一些示例demo代码。

1、参数查询示例demo代码

def get_classified_songs(self, telegram_id):    conn = sqlite3.connect(self._DATABASE)    sql = """          SELECT            danceability, energy, loudness, speechiness, acousticness,            instrumentalness, liveness, valence, tempo, activity          FROM songs s, users u, song_user su          WHERE            activity IS NOT NULL AND            s.id = su.song_id AND            su.user_id = u.id AND            u.telegram_user_id = {}    """.format(telegram_id)    resp = pd.read_sql_query(sql, conn)    conn.close()    return resp 

源代码:https://github.com/mongonauta/heydjbot/tree/master/demo3/server/database.py

2、创建Dataframe示例demo代码

def build_df(table: str = 'articles',
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None) -> pd.DataFrame:
"""Build dataframe with derived fields."""
with closing(sqlite3.connect(DB_FILE_NAME)) as conn:
articles = pd.read_sql_query(f'select * from {table}', conn)
articles['date'] = pd.to_datetime(articles['publish_date'])
if start_date:
articles = articles.loc[articles['date'] >= start_date]
if end_date:
articles = articles.loc[articles['date'] <= end_date]
articles = articles.replace([None], [''], regex=True)
articles['base_url'] = articles.apply(get_url_base, axis=1)
articles['word_count'] = articles.apply(count_words, axis=1)
return articles

源代码:https://github.com/bmassman/fake_news/blob/master/fake_news/pipeline/build_df.py

3、表连接查询示例demo代码

def SensitivityQuery(self, table, data_set):#Returns the number of times an analyte is found at each concentration and the#number of repetitions in a particular data set.sql_statement = "SELECT COUNT(%s.id) AS Count, %s.Concentration_pg AS Conc_pg, \DataSetConcentrations.Repetitions AS Repetitions \FROM \Sample \INNER JOIN %s ON \%s.id = Sample.%s_foreignkey \INNER JOIN DataSetConcentrations ON \DataSetConcentrations.id = Sample.DataSetConcentrations_foreignkey \WHERE \Sample.DataSetName = '%s' \GROUP BY \Conc_pg \ORDER BY \Conc_pg;" % (table, table, table, table, table, data_set)return pd.read_sql_query(sql_statement, self.conn) 

4、分组查询示例demo代码

def GetRepsAtEachConcentration(self, analyte_table_lst, data_set):df = pd.DataFrame()for table in analyte_table_lst:sql_statement = "SELECT \%s.Concentration_pg AS Conc, COUNT(%s.Concentration_pg) AS %s \FROM \Sample \Inner Join %s ON \%s.id = Sample.%s_foreignkey \WHERE \DataSetName = '%s' \GROUP BY 1 \ORDER BY 1 ASC;" % (table, table, table, table, table, table, data_set)df1 = pd.read_sql_query(sql_statement, self.conn)df1.set_index('Conc', inplace=True)df = pd.concat([df, df1], axis=1)return df

5、导出数据示例demo代码

def import_data_from_psql(user_id):    """Import data from psql; clean & merge dataframes."""    library = pd.read_sql_table(        'library',        con='postgres:///nextbook',        columns=['book_id', 'title', 'author', 'pub_year', 'original_pub_year', 'pages'])    book_subjects = pd.read_sql_table(        'book_subjects',        con='postgres:///nextbook')    subjects = pd.read_sql_table(        'subjects', con='postgres:///nextbook',        columns=['subject_id', 'subject'])    user_ratings = pd.read_sql_query(        sql=('SELECT book_id, user_id, status, rating FROM user_books WHERE user_id=%s' % user_id),        con='postgres:///nextbook')    library = library.merge(user_ratings, how='left', on='book_id')    library['pages'].fillna(0, inplace=True)    #merge subject names into book_subjects; drop uninteresting subjects from book_subjects table    book_subjects = book_subjects.merge(subjects, how='left', on='subject_id')    delete_values = ["protected daisy", "accessible book", "in library", "overdrive", "large type books", 'ficci\xc3\xb3n juvenil', 'ficci\xc3\xb3n', 'lending library']    book_subjects = book_subjects[~book_subjects['subject'].isin(delete_values)]    return [library, book_subjects, subjects] 

源代码:https://github.com/EmmaOnThursday/next-book/blob/master/app/recommendation_creation.py

6、数据库连接示例demo代码

def test_sql_open_close(self):        #Test if the IO in the database still work if the connection closed        #between the writing and reading (as in many real situations).        with tm.ensure_clean() as name:            conn = self.connect(name)            sql.to_sql(self.test_frame3, "test_frame3_legacy", conn,                       flavor="sqlite", index=False)            conn.close()            conn = self.connect(name)            result = sql.read_sql_query("SELECT * FROM test_frame3_legacy;",                                        conn)            conn.close()        tm.assert_frame_equal(self.test_frame3, result) 

源代码:https://github.com/SignalMedia/PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda/blob/master/pandas/io/tests/test_sql.py

7、有关时间类型的示例demo代码

def test_datetime(self):    df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),                    'B': np.arange(3.0)})    df.to_sql('test_datetime', self.conn)    # with read_table -> type information from schema used    result = sql.read_sql_table('test_datetime', self.conn)    result = result.drop('index', axis=1)    tm.assert_frame_equal(result, df)    # with read_sql -> no type information -> sqlite has no native    result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)    result = result.drop('index', axis=1)    if self.flavor == 'sqlite':        self.assertTrue(isinstance(result.loc[0, 'A'], string_types))        result['A'] = to_datetime(result['A'])        tm.assert_frame_equal(result, df)    else:        tm.assert_frame_equal(result, df)

源代码:https://github.com/SignalMedia/PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda/blob/master/pandas/io/tests/test_sql.py

8、时间类型处理的示例demo代码

def test_datetime_NaT(self):    df = DataFrame({'A': date_range('2013-01-01 09:00:00', periods=3),                    'B': np.arange(3.0)})    df.loc[1, 'A'] = np.nan    df.to_sql('test_datetime', self.conn, index=False)    # with read_table -> type information from schema used    result = sql.read_sql_table('test_datetime', self.conn)    tm.assert_frame_equal(result, df)    # with read_sql -> no type information -> sqlite has no native    result = sql.read_sql_query('SELECT * FROM test_datetime', self.conn)    if self.flavor == 'sqlite':        self.assertTrue(isinstance(result.loc[0, 'A'], string_types))        result['A'] = to_datetime(result['A'], errors='coerce')        tm.assert_frame_equal(result, df)    else:        tm.assert_frame_equal(result, df) 

源代码:https://github.com/SignalMedia/PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda/blob/master/pandas/io/tests/test_sql.py

9、相关文档:Python Pandas pandas.read_sql_query函数方法的使用