This content originally appeared on DEV Community and was authored by RUDRA SHARMA
import streamlit as st
import pandas as pd
import numpy as np
from pathlib import Path
import time
from openpyxl import load_workbook
from filelock import FileLock
import base64
# Configuration
MAX_FILE_SIZE_MB = 100
DEFAULT_PREVIEW_ROWS = 50
# Define COLORS for highlighting
COLORS = ['#ff6666', '#66b3ff', '#99ff99', '#ffcc99', '#cc99ff']
# Helper functions
def get_subdirs(base_path):
return [d for d in base_path.iterdir() if d.is_dir()]
def process_excel_header(header):
if not header:
return []
return [str(col).strip() if col is not None else f"Unnamed_{i}" for i, col in enumerate(header)]
# Search index function
@st.cache_data(ttl=300, show_spinner="Indexing files...")
def build_search_index(subdir):
index = []
for ext in ["*.csv", "*.xlsx", "*.xls"]:
for fp in subdir.rglob(ext):
try:
file_size = fp.stat().st_size / (1024 * 1024)
if file_size > MAX_FILE_SIZE_MB:
continue
with FileLock(fp.with_suffix('.lock')):
if fp.suffix.lower() == '.csv':
df_chunk = pd.read_csv(fp, nrows=1000, engine='c')
else:
df_chunk = pd.read_excel(fp, nrows=1000, engine='openpyxl')
index.append({
'path': str(fp),
'name': fp.name,
'size_mb': file_size,
'columns': list(df_chunk.columns),
'preview': df_chunk,
'mtime': fp.stat().st_mtime
})
except Exception as e:
st.error(f"Error indexing {fp.name}: {str(e)}")
return index
# Sidebar Configuration
def sidebar_config():
st.sidebar.title("Configuration")
st.sidebar.markdown("---")
# Folder selection at the top
with st.sidebar.expander("📁 Folder Selection", expanded=True):
base_dir = Path(__file__).parent if "__file__" in locals() else Path.cwd()
subdirs = get_subdirs(base_dir)
selected_subdir = st.selectbox("Choose Directory", subdirs, format_func=lambda x: x.name)
# File selection dropdown
with st.sidebar.expander("📄 File Selection", expanded=True):
search_index = build_search_index(selected_subdir)
if search_index:
file_names = [entry['name'] for entry in search_index]
selected_file = st.selectbox("Select a specific file to search", ["All Files"] + file_names)
else:
selected_file = st.selectbox("Select a specific file to search", ["No files found"])
return {
'selected_subdir': selected_subdir,
'selected_file': selected_file
}
def search_in_file(file_entry, search_terms):
results = []
try:
file_name = file_entry['name']
if file_entry['path'].endswith(('.csv')):
# CSV handling
reader = pd.read_csv(file_entry['path'], engine='c', chunksize=10000, low_memory=False)
for chunk in reader:
chunk = chunk.astype(str)
mask = pd.Series([True] * len(chunk))
for term in search_terms:
term_mask = chunk.apply(lambda x: x.str.contains(term, case=False, regex=False)).any(axis=1) if not term.strip() == '' else True
mask &= term_mask
filtered = chunk[mask]
if not filtered.empty:
filtered.insert(0, 'source_file', file_name)
results.append(filtered)
else:
# Excel handling
wb = load_workbook(filename=file_entry['path'], read_only=True)
# Process all sheets
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
if ws.max_row < 2: # Skip empty sheets
continue
header = [cell.value for cell in ws[1]]
header = process_excel_header(header)
if not header:
continue
rows = []
for row in ws.iter_rows(min_row=2, values_only=True):
rows.append(row)
if len(rows) == 10000: # Process in chunks
chunk_df = pd.DataFrame(rows, columns=header)
chunk_df = chunk_df.astype(str)
mask = pd.Series([True] * len(chunk_df))
for term in search_terms:
term_mask = chunk_df.apply(lambda x: x.str.contains(term, case=False, regex=False)).any(axis=1) if not term.strip() == '' else True
mask &= term_mask
filtered = chunk_df[mask]
if not filtered.empty:
filtered.insert(0, 'source_file', file_name)
filtered.insert(1, 'sheet_name', sheet_name)
results.append(filtered)
rows = []
# Process remaining rows
if rows:
chunk_df = pd.DataFrame(rows, columns=header)
chunk_df = chunk_df.astype(str)
mask = pd.Series([True] * len(chunk_df))
for term in search_terms:
term_mask = chunk_df.apply(lambda x: x.str.contains(term, case=False, regex=False)).any(axis=1) if not term.strip() == '' else True
mask &= term_mask
filtered = chunk_df[mask]
if not filtered.empty:
filtered.insert(0, 'source_file', file_name)
filtered.insert(1, 'sheet_name', sheet_name)
results.append(filtered)
wb.close()
return pd.concat(results, ignore_index=True) if results else pd.DataFrame()
except Exception as e:
st.error(f"Error searching {file_entry['name']}: {str(e)}")
return pd.DataFrame()
def process_results(results, search_terms, show_highlights):
if not results:
return pd.DataFrame()
final_df = pd.concat(results, ignore_index=True)
# Ensure proper column order
if 'sheet_name' in final_df.columns:
cols = ['source_file', 'sheet_name'] + [col for col in final_df.columns if col not in ['source_file', 'sheet_name']]
final_df = final_df[cols]
return final_df
def highlight_matches(df, search_terms):
styles = pd.DataFrame('', index=df.index, columns=df.columns)
for i, term in enumerate(search_terms):
if term.strip():
mask = df.applymap(lambda x: term.lower() in str(x).lower() if pd.notnull(x) else False)
styles = styles.where(~mask, f'background-color: {COLORS[i % len(COLORS)]}')
return styles
def show_results(df, elapsed, stats, search_terms):
st.subheader("Results")
if isinstance(df, pd.DataFrame) and df.empty:
st.info("No matches found")
return
# Helper function for checking empty columns - moved to top level
def is_all_empty(series):
if series.dtype.kind in 'iuf':
return series.isnull().all()
elif series.dtype.kind == 'O':
return series.apply(lambda x: x.lower() in ['nan', 'none', ''] if not pd.isnull(x) else True).all()
else:
return True
# Set up display columns
if 'source_file' in df.columns and 'sheet_name' in df.columns:
display_cols = ['source_file', 'sheet_name'] + [col for col in df.columns if col not in ['source_file', 'sheet_name']]
df = df[display_cols]
# Display options
cols = st.columns([2, 1, 1, 1, 1])
with cols[0]:
show_stats = st.checkbox("Show Statistics", False)
with cols[1]:
show_highlights = st.checkbox("Highlight Matches", True)
with cols[2]:
hide_empty = st.checkbox("Hide Empty Columns", True)
with cols[3]:
show_keyword_columns = st.checkbox("Show Only Columns with Keywords", False)
with cols[4]:
show_separate_files = st.checkbox("Show Files Separately", False)
# Show statistics if requested
if show_stats:
stats_df = pd.DataFrame({
'Metric': ['Total Matches', 'Files Processed', 'Search Time (s)', 'Avg Time/File (s)'],
'Value': [
stats['matches_found'],
stats['files_processed'],
f"{elapsed:.2f}",
f"{(elapsed / stats['files_processed']) if stats['files_processed'] else 0:.2f}"
]
})
st.dataframe(stats_df)
if show_separate_files:
# Group data by source file
for source_file in df['source_file'].unique():
st.markdown(f"### File: {source_file}")
file_df = df[df['source_file'] == source_file].copy()
# Get columns specific to this file
file_cols = ['source_file']
if 'sheet_name' in file_df.columns:
file_cols.append('sheet_name')
# Add non-empty columns for this specific file
if hide_empty:
for col in file_df.columns:
if col not in file_cols and not is_all_empty(file_df[col]):
file_cols.append(col)
else:
file_cols.extend([col for col in file_df.columns if col not in file_cols])
# Filter keyword columns if requested
if show_keyword_columns:
keyword_cols = ['source_file']
if 'sheet_name' in file_df.columns:
keyword_cols.append('sheet_name')
keyword_cols.extend([
col for col in file_cols
if col not in ['source_file', 'sheet_name'] and
file_df[col].apply(lambda x: any(term.lower() in str(x).lower()
for term in search_terms if term.strip())).any()
])
file_cols = keyword_cols
file_df = file_df[file_cols]
if show_highlights:
file_df = file_df.style.apply(lambda df: highlight_matches(df, search_terms), axis=None)
st.dataframe(file_df, use_container_width=True)
st.markdown("---")
else:
# Original display logic for combined view
if hide_empty or show_keyword_columns:
if show_keyword_columns:
filtered_cols = []
for col in df.columns:
if col in ['source_file', 'sheet_name']:
filtered_cols.append(col)
continue
if df[col].apply(lambda x: any(term.lower() in str(x).lower() for term in search_terms if term.strip())).any():
filtered_cols.append(col)
df_filtered = df[filtered_cols]
else:
if hide_empty:
non_empty_cols = [col for col in df.columns if not is_all_empty(df[col])]
df_filtered = df[non_empty_cols]
else:
df_filtered = df
if show_highlights:
df_filtered = df_filtered.style.apply(lambda df: highlight_matches(df, search_terms), axis=None)
st.dataframe(df_filtered, use_container_width=True, height=600)
else:
if show_highlights:
df_styled = df.style.apply(lambda df: highlight_matches(df, search_terms), axis=None)
st.dataframe(df_styled, use_container_width=True, height=600)
else:
st.dataframe(df, use_container_width=True, height=600)
def show_overview(search_index):
st.subheader("Directory Overview")
files_df = pd.DataFrame([{
'File': entry['name'],
'Type': Path(entry['path']).suffix[1:].upper(),
'Size (MB)': f"{entry['size_mb']:.1f}",
'Columns': len(entry['columns']),
'Rows': len(entry['preview']),
'Last Modified': pd.to_datetime(entry['mtime'], unit='s')
} for entry in search_index])
st.dataframe(files_df[['File', 'Type', 'Size (MB)', 'Columns', 'Rows', 'Last Modified']], height=600)
def main():
st.set_page_config(page_title="Data Search Pro", layout="wide", page_icon="🔍")
st.title("📂 Data Search Pro")
config = sidebar_config()
selected_subdir = config['selected_subdir']
selected_file = config['selected_file']
search_index = build_search_index(selected_subdir)
if not search_index:
st.warning("No searchable files found in selected directory!")
st.stop()
st.subheader("Search Filters")
cols = st.columns([3, 3, 3])
with cols[0]:
term1 = st.text_input("Keyword 1", help="First search term (required)")
with cols[1]:
term2 = st.text_input("Keyword 2", help="Second search term")
with cols[2]:
term3 = st.text_input("Keyword 3", help="Third search term")
search_terms = [t.strip() for t in [term1, term2, term3] if t.strip()]
if not search_terms:
show_overview(search_index)
return
start_time = time.time()
results = []
stats = {'files_processed': 0, 'matches_found': 0}
progress_bar = st.progress(0)
status_text = st.empty()
if selected_file == "All Files":
files_to_search = search_index
else:
files_to_search = [entry for entry in search_index if entry['name'] == selected_file]
for i, file_entry in enumerate(files_to_search):
status_text.text(f"🔎 Searching: {file_entry['name']}")
progress_bar.progress((i + 1) / len(files_to_search))
file_results = search_in_file(file_entry, search_terms)
if not file_results.empty:
results.append(file_results)
stats['matches_found'] += len(file_results)
stats['files_processed'] += 1
final_df = process_results(results, search_terms, show_highlights=True)
elapsed = time.time() - start_time
show_results(final_df, elapsed, stats, search_terms)
progress_bar.empty()
status_text.empty()
if __name__ == "__main__":
main()
This content originally appeared on DEV Community and was authored by RUDRA SHARMA
Print
Share
Comment
Cite
Upload
Translate
Updates
There are no updates yet.
Click the Upload button above to add an update.

APA
MLA
RUDRA SHARMA | Sciencx (2025-01-29T13:29:51+00:00) TEST. Retrieved from https://www.scien.cx/2025/01/29/test-11/
" » TEST." RUDRA SHARMA | Sciencx - Wednesday January 29, 2025, https://www.scien.cx/2025/01/29/test-11/
HARVARDRUDRA SHARMA | Sciencx Wednesday January 29, 2025 » TEST., viewed ,<https://www.scien.cx/2025/01/29/test-11/>
VANCOUVERRUDRA SHARMA | Sciencx - » TEST. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2025/01/29/test-11/
CHICAGO" » TEST." RUDRA SHARMA | Sciencx - Accessed . https://www.scien.cx/2025/01/29/test-11/
IEEE" » TEST." RUDRA SHARMA | Sciencx [Online]. Available: https://www.scien.cx/2025/01/29/test-11/. [Accessed: ]
rf:citation » TEST | RUDRA SHARMA | Sciencx | https://www.scien.cx/2025/01/29/test-11/ |
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.