Spaces:
Sleeping
Sleeping
| """Unit tests for Pinecone client.""" | |
| from __future__ import annotations | |
| import tempfile | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from unittest.mock import MagicMock, patch | |
| import pytest | |
| from pinecone.exceptions import PineconeException | |
| from tools.pinecone_client import PineconeClient | |
| from tools.pinecone_models import PineconeRecord | |
| class TestUploadTracking: | |
| """Tests for upload tracking marker file operations.""" | |
| def test_is_uploaded_returns_false_when_marker_missing(self): | |
| """is_uploaded() returns False when marker file doesn't exist.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| set_dir = Path(tmpdir) | |
| assert PineconeClient.is_uploaded(set_dir) is False | |
| def test_is_uploaded_returns_true_when_marker_exists(self): | |
| """is_uploaded() returns True when marker file exists.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| set_dir = Path(tmpdir) | |
| marker_file = set_dir / ".pinecone_uploaded" | |
| marker_file.write_text("2025-01-15T14:30:00Z") | |
| assert PineconeClient.is_uploaded(set_dir) is True | |
| def test_mark_uploaded_creates_marker_file(self): | |
| """mark_uploaded() creates marker file with ISO 8601 timestamp.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| set_dir = Path(tmpdir) | |
| marker_file = set_dir / ".pinecone_uploaded" | |
| assert not marker_file.exists() | |
| PineconeClient.mark_uploaded(set_dir) | |
| assert marker_file.exists() | |
| # Verify timestamp format | |
| timestamp = marker_file.read_text(encoding="utf-8").strip() | |
| # Should be valid ISO 8601 format | |
| datetime.fromisoformat(timestamp.replace("Z", "+00:00")) | |
| def test_mark_uploaded_writes_utc_timestamp(self): | |
| """mark_uploaded() writes UTC timestamp in ISO 8601 format.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| set_dir = Path(tmpdir) | |
| PineconeClient.mark_uploaded(set_dir) | |
| marker_file = set_dir / ".pinecone_uploaded" | |
| timestamp_str = marker_file.read_text(encoding="utf-8").strip() | |
| # Parse and verify it's UTC | |
| if timestamp_str.endswith("Z"): | |
| timestamp_str = timestamp_str[:-1] + "+00:00" | |
| parsed = datetime.fromisoformat(timestamp_str) | |
| assert parsed.tzinfo == timezone.utc | |
| def test_get_upload_timestamp_returns_none_when_marker_missing(self): | |
| """get_upload_timestamp() returns None when marker file doesn't exist.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| set_dir = Path(tmpdir) | |
| assert PineconeClient.get_upload_timestamp(set_dir) is None | |
| def test_get_upload_timestamp_returns_timestamp_when_marker_exists(self): | |
| """get_upload_timestamp() returns timestamp string when marker exists.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| set_dir = Path(tmpdir) | |
| expected_timestamp = "2025-01-15T14:30:00Z" | |
| marker_file = set_dir / ".pinecone_uploaded" | |
| marker_file.write_text(expected_timestamp) | |
| result = PineconeClient.get_upload_timestamp(set_dir) | |
| assert result == expected_timestamp | |
| def test_get_upload_timestamp_handles_read_error(self): | |
| """get_upload_timestamp() returns None if marker file can't be read.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| set_dir = Path(tmpdir) | |
| marker_file = set_dir / ".pinecone_uploaded" | |
| marker_file.write_text("test") | |
| # Make file unreadable (on Unix systems) | |
| marker_file.chmod(0o000) | |
| try: | |
| result = PineconeClient.get_upload_timestamp(set_dir) | |
| # Should return None or handle gracefully | |
| assert result is None or isinstance(result, str) | |
| finally: | |
| # Restore permissions for cleanup | |
| marker_file.chmod(0o644) | |
| class TestPineconeClientCore: | |
| """Tests for core Pinecone client functionality.""" | |
| def test_init_raises_error_when_api_key_missing(self, mock_get_settings): | |
| """__init__() raises ValueError when API key is not set.""" | |
| mock_settings = MagicMock() | |
| mock_settings.pinecone_api_key = "" | |
| mock_get_settings.return_value = mock_settings | |
| with pytest.raises(ValueError, match="PINECONE_API_KEY"): | |
| PineconeClient() | |
| def test_init_initializes_pinecone_client(self, mock_get_settings): | |
| """__init__() initializes Pinecone SDK with API key.""" | |
| mock_settings = MagicMock() | |
| mock_settings.pinecone_api_key = "test-api-key" | |
| mock_settings.pinecone_index_name = "test-index" | |
| mock_settings.pinecone_namespace = "test-namespace" | |
| mock_get_settings.return_value = mock_settings | |
| mock_pc = MagicMock() | |
| mock_pc.Index.return_value = MagicMock() | |
| mock_pc.has_index.return_value = True | |
| mock_pinecone_class = MagicMock(return_value=mock_pc) | |
| with patch("tools.pinecone_client.Pinecone", mock_pinecone_class): | |
| client = PineconeClient() | |
| assert client.pc == mock_pc | |
| assert client.index_name == "test-index" | |
| assert client.namespace == "test-namespace" | |
| def test_validate_index_raises_error_when_index_missing(self, mock_get_settings): | |
| """validate_index() raises ValueError when index doesn't exist.""" | |
| mock_settings = MagicMock() | |
| mock_settings.pinecone_api_key = "test-api-key" | |
| mock_settings.pinecone_index_name = "missing-index" | |
| mock_get_settings.return_value = mock_settings | |
| mock_pc = MagicMock() | |
| mock_pc.has_index.return_value = False | |
| mock_pinecone_class = MagicMock(return_value=mock_pc) | |
| with patch("tools.pinecone_client.Pinecone", mock_pinecone_class): | |
| client = PineconeClient() | |
| with pytest.raises(ValueError, match="Index 'missing-index' not found"): | |
| client.validate_index() | |
| def test_validate_index_succeeds_when_index_exists(self, mock_get_settings): | |
| """validate_index() succeeds when index exists.""" | |
| mock_settings = MagicMock() | |
| mock_settings.pinecone_api_key = "test-api-key" | |
| mock_settings.pinecone_index_name = "existing-index" | |
| mock_get_settings.return_value = mock_settings | |
| mock_pc = MagicMock() | |
| mock_pc.has_index.return_value = True | |
| mock_pinecone_class = MagicMock(return_value=mock_pc) | |
| with patch("tools.pinecone_client.Pinecone", mock_pinecone_class): | |
| client = PineconeClient() | |
| # Should not raise | |
| client.validate_index() | |
| def test_exponential_backoff_retry_succeeds_on_first_attempt(self): | |
| """exponential_backoff_retry() succeeds when function succeeds immediately.""" | |
| func = MagicMock(return_value="success") | |
| result = PineconeClient.exponential_backoff_retry(func) | |
| assert result == "success" | |
| func.assert_called_once() | |
| def test_exponential_backoff_retry_retries_on_429(self, mock_sleep): | |
| """exponential_backoff_retry() retries on 429 rate limit errors.""" | |
| error_429 = PineconeException("Rate limited") | |
| error_429.status = 429 | |
| func = MagicMock(side_effect=[error_429, "success"]) | |
| result = PineconeClient.exponential_backoff_retry(func, max_retries=2) | |
| assert result == "success" | |
| assert func.call_count == 2 | |
| mock_sleep.assert_called_once_with(1) # 2^0 = 1 | |
| def test_exponential_backoff_retry_retries_on_5xx(self, mock_sleep): | |
| """exponential_backoff_retry() retries on 5xx server errors.""" | |
| error_500 = PineconeException("Server error") | |
| error_500.status = 500 | |
| func = MagicMock(side_effect=[error_500, "success"]) | |
| result = PineconeClient.exponential_backoff_retry(func, max_retries=2) | |
| assert result == "success" | |
| assert func.call_count == 2 | |
| mock_sleep.assert_called_once_with(1) | |
| def test_exponential_backoff_retry_fails_on_4xx(self): | |
| """exponential_backoff_retry() fails immediately on 4xx client errors.""" | |
| error_400 = PineconeException("Bad request") | |
| error_400.status = 400 | |
| func = MagicMock(side_effect=error_400) | |
| with pytest.raises(PineconeException): | |
| PineconeClient.exponential_backoff_retry(func, max_retries=3) | |
| # Should only try once (no retries for 4xx) | |
| assert func.call_count == 1 | |
| def test_exponential_backoff_retry_caps_delay_at_60s(self, mock_sleep): | |
| """exponential_backoff_retry() caps delay at 60 seconds.""" | |
| error_500 = PineconeException("Server error") | |
| error_500.status = 500 | |
| func = MagicMock(side_effect=[error_500, error_500, "success"]) | |
| result = PineconeClient.exponential_backoff_retry(func, max_retries=3) | |
| assert result == "success" | |
| # First retry: 2^0 = 1s, second retry: min(2^1, 60) = 2s | |
| assert mock_sleep.call_count == 2 | |
| mock_sleep.assert_any_call(1) | |
| mock_sleep.assert_any_call(2) | |
| def test_record_to_dict_omits_none_optional_fields(self): | |
| """_record_to_dict() omits None values for optional fields.""" | |
| record = PineconeRecord( | |
| _id="test-id", | |
| content="test content", | |
| standard_set_id="set-id", | |
| standard_set_title="Test Set", | |
| subject="Math", | |
| education_levels=["01"], | |
| document_id="doc-id", | |
| document_valid="2021", | |
| jurisdiction_id="jur-id", | |
| jurisdiction_title="Test Jurisdiction", | |
| depth=0, | |
| is_leaf=True, | |
| is_root=True, | |
| root_id="test-id", | |
| ancestor_ids=[], | |
| child_ids=[], | |
| sibling_count=0, | |
| # Optional fields set to None | |
| normalized_subject=None, | |
| publication_status=None, | |
| asn_identifier=None, | |
| statement_notation=None, | |
| statement_label=None, | |
| parent_id=None, | |
| ) | |
| record_dict = PineconeClient._record_to_dict(record) | |
| # Verify _id is serialized (not id) | |
| assert "_id" in record_dict | |
| assert record_dict["_id"] == "test-id" | |
| assert "id" not in record_dict | |
| # Optional fields should be omitted | |
| assert "asn_identifier" not in record_dict | |
| assert "statement_notation" not in record_dict | |
| assert "statement_label" not in record_dict | |
| assert "normalized_subject" not in record_dict | |
| assert "publication_status" not in record_dict | |
| # parent_id should be present as null | |
| assert "parent_id" in record_dict | |
| assert record_dict["parent_id"] is None | |
| def test_record_to_dict_includes_present_optional_fields(self): | |
| """_record_to_dict() includes optional fields when they have values.""" | |
| record = PineconeRecord( | |
| _id="test-id", | |
| content="test content", | |
| standard_set_id="set-id", | |
| standard_set_title="Test Set", | |
| subject="Math", | |
| normalized_subject="Math", | |
| education_levels=["01"], | |
| document_id="doc-id", | |
| document_valid="2021", | |
| publication_status="Published", | |
| jurisdiction_id="jur-id", | |
| jurisdiction_title="Test Jurisdiction", | |
| asn_identifier="ASN123", | |
| statement_notation="1.2.3", | |
| statement_label="Standard", | |
| depth=1, | |
| is_leaf=True, | |
| is_root=False, | |
| parent_id="parent-id", | |
| root_id="root-id", | |
| ancestor_ids=["root-id"], | |
| child_ids=[], | |
| sibling_count=0, | |
| ) | |
| record_dict = PineconeClient._record_to_dict(record) | |
| # Verify _id is serialized (not id) | |
| assert "_id" in record_dict | |
| assert record_dict["_id"] == "test-id" | |
| assert "id" not in record_dict | |
| # Optional fields should be included when present | |
| assert record_dict["asn_identifier"] == "ASN123" | |
| assert record_dict["statement_notation"] == "1.2.3" | |
| assert record_dict["statement_label"] == "Standard" | |
| assert record_dict["normalized_subject"] == "Math" | |
| assert record_dict["publication_status"] == "Published" | |
| assert record_dict["parent_id"] == "parent-id" | |