diff --git a/news/fix_sql_injection_in_history_delete.rst b/news/fix_sql_injection_in_history_delete.rst new file mode 100644 index 000000000..43c3d349b --- /dev/null +++ b/news/fix_sql_injection_in_history_delete.rst @@ -0,0 +1,7 @@ +**Security:** + +* The ``history delete`` action on the sqlite backend used to + pass matched history lines to a SQL statement without sanitization. + This could lead to unexpected SQL being run on the history database. + This is now fixed. Security risk: low. + diff --git a/xonsh/history/sqlite.py b/xonsh/history/sqlite.py index 7b63d7313..cfc59fd1a 100644 --- a/xonsh/history/sqlite.py +++ b/xonsh/history/sqlite.py @@ -136,7 +136,7 @@ def _xh_sqlite_insert_command(cursor, cmd, sessionid, store_stdout, remove_dupli def _xh_sqlite_get_count(cursor, sessionid=None): - sql = "SELECT count(*) FROM xonsh_history " + sql = f"SELECT count(*) FROM {XH_SQLITE_TABLE_NAME} " params = [] if sessionid is not None: sql += "WHERE sessionid = ? " @@ -146,7 +146,7 @@ def _xh_sqlite_get_count(cursor, sessionid=None): def _xh_sqlite_get_records(cursor, sessionid=None, limit=None, newest_first=False): - sql = "SELECT inp, tsb, rtn, frequency, cwd FROM xonsh_history " + sql = f"SELECT inp, tsb, rtn, frequency, cwd FROM {XH_SQLITE_TABLE_NAME} " params = [] if sessionid is not None: sql += "WHERE sessionid = ? " @@ -162,14 +162,14 @@ def _xh_sqlite_get_records(cursor, sessionid=None, limit=None, newest_first=Fals def _xh_sqlite_delete_records(cursor, size_to_keep): sql = "SELECT min(tsb) FROM (" - sql += "SELECT tsb FROM xonsh_history ORDER BY tsb DESC " + sql += f"SELECT tsb FROM {XH_SQLITE_TABLE_NAME} ORDER BY tsb DESC " sql += "LIMIT %d)" % size_to_keep cursor.execute(sql) result = cursor.fetchone() if not result: return max_tsb = result[0] - sql = "DELETE FROM xonsh_history WHERE tsb < ?" + sql = f"DELETE FROM {XH_SQLITE_TABLE_NAME} WHERE tsb < ?" result = cursor.execute(sql, (max_tsb,)) return result.rowcount @@ -211,11 +211,15 @@ def xh_sqlite_pull(filename, last_pull_time, current_sessionid, src_sessionid=No if src_sessionid: sql = ( - "SELECT inp FROM xonsh_history WHERE tsb > ? AND sessionid = ? ORDER BY tsb" + f"SELECT inp FROM {XH_SQLITE_TABLE_NAME} WHERE tsb > ?" + " AND sessionid = ? ORDER BY tsb" ) params = [last_pull_time, src_sessionid] else: - sql = "SELECT inp FROM xonsh_history WHERE tsb > ? AND sessionid != ? ORDER BY tsb" + sql = ( + f"SELECT inp FROM {XH_SQLITE_TABLE_NAME} WHERE tsb > ?" + " AND sessionid != ? ORDER BY tsb" + ) params = [last_pull_time, current_sessionid] with _xh_sqlite_get_conn(filename=filename) as conn: @@ -226,7 +230,7 @@ def xh_sqlite_pull(filename, last_pull_time, current_sessionid, src_sessionid=No def xh_sqlite_wipe_session(sessionid=None, filename=None): """Wipe the current session's entries from the database.""" - sql = "DELETE FROM xonsh_history WHERE sessionid = ?" + sql = f"DELETE FROM {XH_SQLITE_TABLE_NAME} WHERE sessionid = ?" with _xh_sqlite_get_conn(filename=filename) as conn: c = conn.cursor() _xh_sqlite_create_history_table(c) @@ -238,10 +242,13 @@ def xh_sqlite_delete_input_matching(pattern, filename=None): with _xh_sqlite_get_conn(filename=filename) as conn: c = conn.cursor() _xh_sqlite_create_history_table(c) + deleted = 0 for inp, *_ in _xh_sqlite_get_records(c): if pattern.match(inp): - sql = f"DELETE FROM xonsh_history WHERE inp = '{inp}'" - c.execute(sql) + sql = f"DELETE FROM {XH_SQLITE_TABLE_NAME} WHERE inp=?" + c.execute(sql, (inp,)) + deleted += 1 + return deleted class SqliteHistoryGC(threading.Thread): @@ -415,6 +422,6 @@ class SqliteHistory(History): def delete(self, pattern): """Deletes all entries in the database where the input matches a pattern.""" - xh_sqlite_delete_input_matching( + return xh_sqlite_delete_input_matching( pattern=re.compile(pattern), filename=self.filename )