TEST

import streamlit as st
import pandas as pd
import numpy as np
from pathlib import Path
import time
from openpyxl import load_workbook
from filelock import FileLock
import base64

# Configuration
MAX_FILE_SIZE_MB = 100
DEFAULT_PREVIEW_ROWS = 50

# Defi…


This content originally appeared on DEV Community and was authored by RUDRA SHARMA

import streamlit as st
import pandas as pd
import numpy as np
from pathlib import Path
import time
from openpyxl import load_workbook
from filelock import FileLock
import base64

# Configuration
MAX_FILE_SIZE_MB = 100
DEFAULT_PREVIEW_ROWS = 50

# Define COLORS for highlighting
COLORS = ['#ff6666', '#66b3ff', '#99ff99', '#ffcc99', '#cc99ff']

# Helper functions
def get_subdirs(base_path):
    return [d for d in base_path.iterdir() if d.is_dir()]

def process_excel_header(header):
    if not header:
        return []
    return [str(col).strip() if col is not None else f"Unnamed_{i}" for i, col in enumerate(header)]

# Search index function
@st.cache_data(ttl=300, show_spinner="Indexing files...")
def build_search_index(subdir):
    index = []
    for ext in ["*.csv", "*.xlsx", "*.xls"]:
        for fp in subdir.rglob(ext):
            try:
                file_size = fp.stat().st_size / (1024 * 1024)
                if file_size > MAX_FILE_SIZE_MB:
                    continue

                with FileLock(fp.with_suffix('.lock')):
                    if fp.suffix.lower() == '.csv':
                        df_chunk = pd.read_csv(fp, nrows=1000, engine='c')
                    else:
                        df_chunk = pd.read_excel(fp, nrows=1000, engine='openpyxl')

                index.append({
                    'path': str(fp),
                    'name': fp.name,
                    'size_mb': file_size,
                    'columns': list(df_chunk.columns),
                    'preview': df_chunk,
                    'mtime': fp.stat().st_mtime
                })
            except Exception as e:
                st.error(f"Error indexing {fp.name}: {str(e)}")
    return index

# Sidebar Configuration
def sidebar_config():
    st.sidebar.title("Configuration")
    st.sidebar.markdown("---")

    # Folder selection at the top
    with st.sidebar.expander("📁 Folder Selection", expanded=True):
        base_dir = Path(__file__).parent if "__file__" in locals() else Path.cwd()
        subdirs = get_subdirs(base_dir)
        selected_subdir = st.selectbox("Choose Directory", subdirs, format_func=lambda x: x.name)

    # File selection dropdown
    with st.sidebar.expander("📄 File Selection", expanded=True):
        search_index = build_search_index(selected_subdir)
        if search_index:
            file_names = [entry['name'] for entry in search_index]
            selected_file = st.selectbox("Select a specific file to search", ["All Files"] + file_names)
        else:
            selected_file = st.selectbox("Select a specific file to search", ["No files found"])

    return {
        'selected_subdir': selected_subdir,
        'selected_file': selected_file
    }

def search_in_file(file_entry, search_terms):
    results = []
    try:
        file_name = file_entry['name']

        if file_entry['path'].endswith(('.csv')):
            # CSV handling
            reader = pd.read_csv(file_entry['path'], engine='c', chunksize=10000, low_memory=False)
            for chunk in reader:
                chunk = chunk.astype(str)
                mask = pd.Series([True] * len(chunk))
                for term in search_terms:
                    term_mask = chunk.apply(lambda x: x.str.contains(term, case=False, regex=False)).any(axis=1) if not term.strip() == '' else True
                    mask &= term_mask
                filtered = chunk[mask]
                if not filtered.empty:
                    filtered.insert(0, 'source_file', file_name)
                    results.append(filtered)
        else:
            # Excel handling
            wb = load_workbook(filename=file_entry['path'], read_only=True)

            # Process all sheets
            for sheet_name in wb.sheetnames:
                ws = wb[sheet_name]

                if ws.max_row < 2:  # Skip empty sheets
                    continue

                header = [cell.value for cell in ws[1]]
                header = process_excel_header(header)
                if not header:
                    continue

                rows = []
                for row in ws.iter_rows(min_row=2, values_only=True):
                    rows.append(row)
                    if len(rows) == 10000:  # Process in chunks
                        chunk_df = pd.DataFrame(rows, columns=header)
                        chunk_df = chunk_df.astype(str)

                        mask = pd.Series([True] * len(chunk_df))
                        for term in search_terms:
                            term_mask = chunk_df.apply(lambda x: x.str.contains(term, case=False, regex=False)).any(axis=1) if not term.strip() == '' else True
                            mask &= term_mask
                        filtered = chunk_df[mask]

                        if not filtered.empty:
                            filtered.insert(0, 'source_file', file_name)
                            filtered.insert(1, 'sheet_name', sheet_name)
                            results.append(filtered)
                        rows = []

                # Process remaining rows
                if rows:
                    chunk_df = pd.DataFrame(rows, columns=header)
                    chunk_df = chunk_df.astype(str)

                    mask = pd.Series([True] * len(chunk_df))
                    for term in search_terms:
                        term_mask = chunk_df.apply(lambda x: x.str.contains(term, case=False, regex=False)).any(axis=1) if not term.strip() == '' else True
                        mask &= term_mask
                    filtered = chunk_df[mask]

                    if not filtered.empty:
                        filtered.insert(0, 'source_file', file_name)
                        filtered.insert(1, 'sheet_name', sheet_name)
                        results.append(filtered)

            wb.close()

        return pd.concat(results, ignore_index=True) if results else pd.DataFrame()
    except Exception as e:
        st.error(f"Error searching {file_entry['name']}: {str(e)}")
        return pd.DataFrame()

def process_results(results, search_terms, show_highlights):
    if not results:
        return pd.DataFrame()

    final_df = pd.concat(results, ignore_index=True)

    # Ensure proper column order
    if 'sheet_name' in final_df.columns:
        cols = ['source_file', 'sheet_name'] + [col for col in final_df.columns if col not in ['source_file', 'sheet_name']]
        final_df = final_df[cols]

    return final_df

def highlight_matches(df, search_terms):
    styles = pd.DataFrame('', index=df.index, columns=df.columns)
    for i, term in enumerate(search_terms):
        if term.strip():
            mask = df.applymap(lambda x: term.lower() in str(x).lower() if pd.notnull(x) else False)
            styles = styles.where(~mask, f'background-color: {COLORS[i % len(COLORS)]}')
    return styles

def show_results(df, elapsed, stats, search_terms):
    st.subheader("Results")

    if isinstance(df, pd.DataFrame) and df.empty:
        st.info("No matches found")
        return

    # Helper function for checking empty columns - moved to top level
    def is_all_empty(series):
        if series.dtype.kind in 'iuf':
            return series.isnull().all()
        elif series.dtype.kind == 'O':
            return series.apply(lambda x: x.lower() in ['nan', 'none', ''] if not pd.isnull(x) else True).all()
        else:
            return True

    # Set up display columns
    if 'source_file' in df.columns and 'sheet_name' in df.columns:
        display_cols = ['source_file', 'sheet_name'] + [col for col in df.columns if col not in ['source_file', 'sheet_name']]
        df = df[display_cols]

    # Display options
    cols = st.columns([2, 1, 1, 1, 1])
    with cols[0]:
        show_stats = st.checkbox("Show Statistics", False)
    with cols[1]:
        show_highlights = st.checkbox("Highlight Matches", True)
    with cols[2]:
        hide_empty = st.checkbox("Hide Empty Columns", True)
    with cols[3]:
        show_keyword_columns = st.checkbox("Show Only Columns with Keywords", False)
    with cols[4]:
        show_separate_files = st.checkbox("Show Files Separately", False)

    # Show statistics if requested
    if show_stats:
        stats_df = pd.DataFrame({
            'Metric': ['Total Matches', 'Files Processed', 'Search Time (s)', 'Avg Time/File (s)'],
            'Value': [
                stats['matches_found'],
                stats['files_processed'],
                f"{elapsed:.2f}",
                f"{(elapsed / stats['files_processed']) if stats['files_processed'] else 0:.2f}"
            ]
        })
        st.dataframe(stats_df)

    if show_separate_files:
        # Group data by source file
        for source_file in df['source_file'].unique():
            st.markdown(f"### File: {source_file}")
            file_df = df[df['source_file'] == source_file].copy()

            # Get columns specific to this file
            file_cols = ['source_file']
            if 'sheet_name' in file_df.columns:
                file_cols.append('sheet_name')

            # Add non-empty columns for this specific file
            if hide_empty:
                for col in file_df.columns:
                    if col not in file_cols and not is_all_empty(file_df[col]):
                        file_cols.append(col)
            else:
                file_cols.extend([col for col in file_df.columns if col not in file_cols])

            # Filter keyword columns if requested
            if show_keyword_columns:
                keyword_cols = ['source_file']
                if 'sheet_name' in file_df.columns:
                    keyword_cols.append('sheet_name')
                keyword_cols.extend([
                    col for col in file_cols 
                    if col not in ['source_file', 'sheet_name'] and 
                    file_df[col].apply(lambda x: any(term.lower() in str(x).lower() 
                                                   for term in search_terms if term.strip())).any()
                ])
                file_cols = keyword_cols

            file_df = file_df[file_cols]

            if show_highlights:
                file_df = file_df.style.apply(lambda df: highlight_matches(df, search_terms), axis=None)

            st.dataframe(file_df, use_container_width=True)
            st.markdown("---")
    else:
        # Original display logic for combined view
        if hide_empty or show_keyword_columns:
            if show_keyword_columns:
                filtered_cols = []
                for col in df.columns:
                    if col in ['source_file', 'sheet_name']:
                        filtered_cols.append(col)
                        continue
                    if df[col].apply(lambda x: any(term.lower() in str(x).lower() for term in search_terms if term.strip())).any():
                        filtered_cols.append(col)
                df_filtered = df[filtered_cols]
            else:
                if hide_empty:
                    non_empty_cols = [col for col in df.columns if not is_all_empty(df[col])]
                    df_filtered = df[non_empty_cols]
                else:
                    df_filtered = df

            if show_highlights:
                df_filtered = df_filtered.style.apply(lambda df: highlight_matches(df, search_terms), axis=None)
            st.dataframe(df_filtered, use_container_width=True, height=600)
        else:
            if show_highlights:
                df_styled = df.style.apply(lambda df: highlight_matches(df, search_terms), axis=None)
                st.dataframe(df_styled, use_container_width=True, height=600)
            else:
                st.dataframe(df, use_container_width=True, height=600)

def show_overview(search_index):
    st.subheader("Directory Overview")
    files_df = pd.DataFrame([{
        'File': entry['name'],
        'Type': Path(entry['path']).suffix[1:].upper(),
        'Size (MB)': f"{entry['size_mb']:.1f}",
        'Columns': len(entry['columns']),
        'Rows': len(entry['preview']),
        'Last Modified': pd.to_datetime(entry['mtime'], unit='s')
    } for entry in search_index])

    st.dataframe(files_df[['File', 'Type', 'Size (MB)', 'Columns', 'Rows', 'Last Modified']], height=600)

def main():
    st.set_page_config(page_title="Data Search Pro", layout="wide", page_icon="🔍")
    st.title("📂 Data Search Pro")

    config = sidebar_config()
    selected_subdir = config['selected_subdir']
    selected_file = config['selected_file']

    search_index = build_search_index(selected_subdir)

    if not search_index:
        st.warning("No searchable files found in selected directory!")
        st.stop()

    st.subheader("Search Filters")
    cols = st.columns([3, 3, 3])
    with cols[0]:
        term1 = st.text_input("Keyword 1", help="First search term (required)")
    with cols[1]:
        term2 = st.text_input("Keyword 2", help="Second search term")
    with cols[2]:
        term3 = st.text_input("Keyword 3", help="Third search term")

    search_terms = [t.strip() for t in [term1, term2, term3] if t.strip()]

    if not search_terms:
        show_overview(search_index)
        return

    start_time = time.time()
    results = []
    stats = {'files_processed': 0, 'matches_found': 0}

    progress_bar = st.progress(0)
    status_text = st.empty()

    if selected_file == "All Files":
        files_to_search = search_index
    else:
        files_to_search = [entry for entry in search_index if entry['name'] == selected_file]

    for i, file_entry in enumerate(files_to_search):
        status_text.text(f"🔎 Searching: {file_entry['name']}")
        progress_bar.progress((i + 1) / len(files_to_search))

        file_results = search_in_file(file_entry, search_terms)
        if not file_results.empty:
            results.append(file_results)
            stats['matches_found'] += len(file_results)
        stats['files_processed'] += 1

    final_df = process_results(results, search_terms, show_highlights=True)
    elapsed = time.time() - start_time

    show_results(final_df, elapsed, stats, search_terms)

    progress_bar.empty()
    status_text.empty()

if __name__ == "__main__":
    main()


This content originally appeared on DEV Community and was authored by RUDRA SHARMA


Print Share Comment Cite Upload Translate Updates
APA

RUDRA SHARMA | Sciencx (2025-01-29T13:29:51+00:00) TEST. Retrieved from https://www.scien.cx/2025/01/29/test-11/

MLA
" » TEST." RUDRA SHARMA | Sciencx - Wednesday January 29, 2025, https://www.scien.cx/2025/01/29/test-11/
HARVARD
RUDRA SHARMA | Sciencx Wednesday January 29, 2025 » TEST., viewed ,<https://www.scien.cx/2025/01/29/test-11/>
VANCOUVER
RUDRA SHARMA | Sciencx - » TEST. [Internet]. [Accessed ]. Available from: https://www.scien.cx/2025/01/29/test-11/
CHICAGO
" » TEST." RUDRA SHARMA | Sciencx - Accessed . https://www.scien.cx/2025/01/29/test-11/
IEEE
" » TEST." RUDRA SHARMA | Sciencx [Online]. Available: https://www.scien.cx/2025/01/29/test-11/. [Accessed: ]
rf:citation
» TEST | RUDRA SHARMA | Sciencx | https://www.scien.cx/2025/01/29/test-11/ |

Please log in to upload a file.




There are no updates yet.
Click the Upload button above to add an update.

You must be logged in to translate posts. Please log in or register.