Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

# Build argument for custom certificates directory
ARG CUSTOM_CERT_DIR="certs"
# Build argument for AST chunking (default to false)
ARG AST_CHUNKING=false

FROM node:20-alpine3.22 AS node_base

Expand Down Expand Up @@ -67,6 +69,15 @@ ENV PATH="/opt/venv/bin:$PATH"
COPY --from=py_deps /opt/venv /opt/venv
COPY api/ ./api/

# Configure AST chunking based on build argument
RUN if [ "$AST_CHUNKING" = "true" ]; then \
echo "🚀 Enabling AST chunking during build..."; \
cd /app/api && python enable_ast.py enable; \
else \
echo "📝 Using default text chunking..."; \
cd /app/api && python enable_ast.py disable; \
fi

# Copy Node app
COPY --from=node_builder /app/public ./public
COPY --from=node_builder /app/.next/standalone ./
Expand Down
282 changes: 253 additions & 29 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,215 @@
from pydantic import BaseModel, Field
import google.generativeai as genai
import asyncio
from collections import defaultdict
import fnmatch

# Configure logging
from api.logging_config import setup_logging
from api.config import load_repo_config

setup_logging()
logger = logging.getLogger(__name__)


# Initialize FastAPI app
app = FastAPI(
title="Streaming API",
description="API for streaming chat completions"
app = FastAPI()

# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify your frontend domain
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)


# Pydantic models for wiki pages
class WikiPage(BaseModel):
id: str
title: str
content: str
related_pages: List[str] = []


# ============================================================================
# INTELLIGENT FILE CHUNKING SYSTEM
# ============================================================================

def should_exclude_dir(dir_name: str, excluded_patterns: List[str]) -> bool:
"""Check if directory should be excluded based on patterns."""
# Always exclude hidden directories and common build/cache dirs
if dir_name.startswith('.'):
return True
if dir_name in ['__pycache__', 'node_modules', '.venv', 'venv', 'env',
'image-cache', 'dist', 'build', 'target', 'out']:
return True

# Check against user-defined patterns
for pattern in excluded_patterns:
pattern_clean = pattern.strip('./').rstrip('/')
if fnmatch.fnmatch(dir_name, pattern_clean):
return True
return False


def should_exclude_file(file_name: str, excluded_patterns: List[str]) -> bool:
"""Check if file should be excluded based on patterns."""
# Always exclude hidden files and common files
if file_name.startswith('.') or file_name == '__init__.py' or file_name == '.DS_Store':
return True

# Check against user-defined patterns
for pattern in excluded_patterns:
if fnmatch.fnmatch(file_name, pattern):
return True
return False


def collect_all_files(path: str, config: Dict) -> tuple[List[str], str]:
"""
Collect ALL files from repository respecting include/exclude patterns.
Also finds and reads README.md during the same walk.

Args:
path: Root directory path
config: Configuration with excluded_dirs and excluded_files

Returns:
Tuple of (list of relative file paths, README content string)
"""
all_files = []
readme_content = ""
excluded_dirs = config.get('excluded_dirs', [])
excluded_files = config.get('excluded_files', [])

logger.info(f"Collecting files from {path}")
logger.info(f"Excluded dirs: {len(excluded_dirs)} patterns")
logger.info(f"Excluded files: {len(excluded_files)} patterns")

for root, dirs, files in os.walk(path):
# Filter directories in-place
dirs[:] = [d for d in dirs if not should_exclude_dir(d, excluded_dirs)]

for file in files:
if not should_exclude_file(file, excluded_files):
rel_dir = os.path.relpath(root, path)
rel_file = os.path.join(rel_dir, file) if rel_dir != '.' else file
all_files.append(rel_file)

# Find README.md (case-insensitive) during the same walk
if file.lower() == 'readme.md' and not readme_content:
try:
with open(os.path.join(root, file), 'r', encoding='utf-8') as f:
readme_content = f.read()
logger.info(f"Found README.md at: {rel_file}")
except Exception as e:
logger.warning(f"Could not read README.md at {rel_file}: {str(e)}")

logger.info(f"Collected {len(all_files)} files after filtering")
return all_files, readme_content


def group_files_by_directory(files: List[str]) -> Dict[str, List[str]]:
"""Group files by their parent directory."""
by_dir = defaultdict(list)

for file_path in files:
dir_name = os.path.dirname(file_path)
if not dir_name:
dir_name = "root"
by_dir[dir_name].append(file_path)

return dict(by_dir)


def create_file_chunks(files: List[str], max_files_per_chunk: int = 500) -> List[Dict[str, Any]]:
"""
Create intelligent chunks of files grouped by directory.
Ensures no chunk exceeds max_files_per_chunk by splitting large directories.

Args:
files: List of all file paths
max_files_per_chunk: Maximum files per chunk

Returns:
List of chunk dictionaries with metadata
"""
# Group by directory
by_dir = group_files_by_directory(files)

chunks = []
current_chunk_files = []
current_chunk_dirs = []

for dir_name, dir_files in sorted(by_dir.items()):
# Handle large directories that exceed max_files_per_chunk on their own
if len(dir_files) > max_files_per_chunk:
# First, save current chunk if it has files
if current_chunk_files:
chunks.append({
'files': current_chunk_files[:],
'directories': current_chunk_dirs[:],
'file_count': len(current_chunk_files)
})
current_chunk_files = []
current_chunk_dirs = []

# Split large directory across multiple chunks
logger.warning(f"Directory '{dir_name}' has {len(dir_files)} files, splitting across multiple chunks")
for i in range(0, len(dir_files), max_files_per_chunk):
chunk_slice = dir_files[i:i + max_files_per_chunk]
chunks.append({
'files': chunk_slice,
'directories': [f"{dir_name} (part {i//max_files_per_chunk + 1})"],
'file_count': len(chunk_slice)
})
else:
# Normal case: check if adding this directory would exceed limit
if current_chunk_files and len(current_chunk_files) + len(dir_files) > max_files_per_chunk:
# Save current chunk and start new one
chunks.append({
'files': current_chunk_files[:],
'directories': current_chunk_dirs[:],
'file_count': len(current_chunk_files)
})
current_chunk_files = []
current_chunk_dirs = []

# Add directory to current chunk
current_chunk_files.extend(dir_files)
current_chunk_dirs.append(dir_name)

# Add final chunk if it has files
if current_chunk_files:
chunks.append({
'files': current_chunk_files,
'directories': current_chunk_dirs,
'file_count': len(current_chunk_files)
})

logger.info(f"Created {len(chunks)} chunks from {len(files)} files")
for i, chunk in enumerate(chunks):
logger.info(f" Chunk {i+1}: {chunk['file_count']} files across {len(chunk['directories'])} directories")

return chunks


def format_chunk_as_tree(chunk: Dict[str, Any]) -> str:
"""Format a chunk of files as a tree string."""
files = chunk['files']
tree_lines = sorted(files)

# Add chunk metadata
chunk_info = f"# Chunk contains {len(files)} files from {len(chunk['directories'])} directories\n"
chunk_info += f"# Directories: {', '.join(chunk['directories'][:5])}"
if len(chunk['directories']) > 5:
chunk_info += f" ... and {len(chunk['directories']) - 5} more"
chunk_info += "\n\n"

return chunk_info + '\n'.join(tree_lines)

# Configure CORS
app.add_middleware(
CORSMiddleware,
Expand Down Expand Up @@ -273,8 +468,19 @@ async def export_wiki(request: WikiExportRequest):
raise HTTPException(status_code=500, detail=error_msg)

@app.get("/local_repo/structure")
async def get_local_repo_structure(path: str = Query(None, description="Path to local repository")):
"""Return the file tree and README content for a local repository."""
async def get_local_repo_structure(
path: str = Query(None, description="Path to local repository"),
chunk_size: int = Query(500, description="Maximum files per chunk"),
return_chunks: bool = Query(False, description="Return chunked structure for large repos")
):
"""
Return the file tree and README content for a local repository.

Now supports intelligent chunking for large repositories:
- Collects ALL files respecting include/exclude patterns
- Groups files by directory
- Returns chunks if repository is large
"""
if not path:
return JSONResponse(
status_code=400,
Expand All @@ -288,30 +494,48 @@ async def get_local_repo_structure(path: str = Query(None, description="Path to
)

try:
logger.info(f"Processing local repository at: {path}")
file_tree_lines = []
readme_content = ""

for root, dirs, files in os.walk(path):
# Exclude hidden dirs/files and virtual envs
dirs[:] = [d for d in dirs if not d.startswith('.') and d != '__pycache__' and d != 'node_modules' and d != '.venv']
for file in files:
if file.startswith('.') or file == '__init__.py' or file == '.DS_Store':
continue
rel_dir = os.path.relpath(root, path)
rel_file = os.path.join(rel_dir, file) if rel_dir != '.' else file
file_tree_lines.append(rel_file)
# Find README.md (case-insensitive)
if file.lower() == 'readme.md' and not readme_content:
try:
with open(os.path.join(root, file), 'r', encoding='utf-8') as f:
readme_content = f.read()
except Exception as e:
logger.warning(f"Could not read README.md: {str(e)}")
readme_content = ""

file_tree_str = '\n'.join(sorted(file_tree_lines))
return {"file_tree": file_tree_str, "readme": readme_content}
logger.info(f"Processing local repository at: {path} (chunk_size={chunk_size}, return_chunks={return_chunks})")

# Load configuration from repo.json (imported at the top)
config_data = load_repo_config()
file_filters = config_data.get('file_filters', {})

# Collect ALL files respecting patterns and find README in one pass
all_files, readme_content = collect_all_files(path, file_filters)

# Decide whether to chunk based on repository size
total_files = len(all_files)
logger.info(f"Total files collected: {total_files}")

if return_chunks or total_files > chunk_size:
# Create intelligent chunks
chunks = create_file_chunks(all_files, max_files_per_chunk=chunk_size)

return {
"chunked": True,
"total_files": total_files,
"chunk_count": len(chunks),
"chunks": [
{
"chunk_id": i,
"file_count": chunk['file_count'],
"directories": chunk['directories'],
"file_tree": format_chunk_as_tree(chunk)
}
for i, chunk in enumerate(chunks)
],
"readme": readme_content
}
else:
# Small repo, return as single tree
file_tree_str = '\n'.join(sorted(all_files))
return {
"chunked": False,
"total_files": total_files,
"file_tree": file_tree_str,
"readme": readme_content
}

except Exception as e:
logger.error(f"Error processing local repository: {str(e)}")
return JSONResponse(
Expand Down
Loading