Skip to content

GitLab Connector: TypeError with async generator #12594

@VenutNSA

Description

@VenutNSA

GitLab Connector: TypeError with async generator

Description

The GitLab connector fails during synchronization with the following error:

TypeError: 'async for' requires an object with __aiter__ method, got generator

This occurs when the sync task tries to iterate over documents returned by the GitLab connector.

Error Details

Error Message:

TypeError: 'async for' requires an object with __aiter__ method, got generator

Location:

  • File: ragflow/rag/svr/sync_data_source.py
  • Method: SyncBase._run_task_logic() at line 113
  • Code: async for document_batch in document_batch_generator:

Stack Trace:

File "ragflow/rag/svr/sync_data_source.py", line 113, in _run_task_logic
    async for document_batch in document_batch_generator:
TypeError: 'async for' requires an object with __aiter__ method, got generator

Root Cause

The Gitlab class in ragflow/rag/svr/sync_data_source.py (lines 985-1023) returns a regular synchronous generator from GitlabConnector.load_from_state() or poll_source(), but the _run_task_logic() method expects an asynchronous iterator (object with __aiter__ method).

Current Implementation (Broken):

class Gitlab(SyncBase):
    SOURCE_NAME: str = FileSource.GITLAB

    async def _generate(self, task: dict):
        # ... connector setup ...
        
        if task["reindex"] == "1" or not task["poll_range_start"]:
            document_generator = self.connector.load_from_state()
            begin_info = "totally"
        else:
            # ... poll logic ...
            document_generator = self.connector.poll_source(...)
            begin_info = "from {}".format(poll_start)
        
        logging.info("Connect to Gitlab: ({}) {}".format(self.conf["project_name"], begin_info))
        return document_generator  # ❌ Returns regular generator

Expected Behavior

The _generate() method should return an async generator, similar to how Github and Confluence connectors are implemented.

Working Examples

GitHub Connector (Correct Implementation)

class Github(SyncBase):
    # ... setup code ...
    
    def document_batches():
        # ... synchronous generator logic ...
        yield doc_batch
    
    async def async_wrapper():
        for batch in document_batches():
            yield batch
    
    return async_wrapper()  # ✅ Returns async generator

Confluence Connector (Correct Implementation)

class Confluence(SyncBase):
    # ... setup code ...
    
    def document_batches():
        # ... synchronous generator logic ...
        yield pending_docs
    
    async def async_wrapper():
        for batch in document_batches():
            yield batch
    
    return async_wrapper()  # ✅ Returns async generator

Proposed Fix

Wrap the synchronous generator in an async wrapper function, similar to the Github and Confluence implementations:

class Gitlab(SyncBase):
    SOURCE_NAME: str = FileSource.GITLAB

    async def _generate(self, task: dict):
        """
        Sync files from GitLab attachments.
        """
        self.connector = GitlabConnector(
            project_owner=self.conf.get("project_owner"),
            project_name=self.conf.get("project_name"),
            include_mrs=self.conf.get("include_mrs", False),
            include_issues=self.conf.get("include_issues", False),
            include_code_files=self.conf.get("include_code_files", False),
        )

        self.connector.load_credentials(
            {
                "gitlab_access_token": self.conf.get("credentials", {}).get("gitlab_access_token"),
                "gitlab_url": self.conf.get("credentials", {}).get("gitlab_url"),
            }
        )

        if task["reindex"] == "1" or not task["poll_range_start"]:
            document_generator = self.connector.load_from_state()
            begin_info = "totally"
        else:
            poll_start = task["poll_range_start"]
            if poll_start is None:
                document_generator = self.connector.load_from_state()
                begin_info = "totally"
            else:
                document_generator = self.connector.poll_source(
                    poll_start.timestamp(),
                    datetime.now(timezone.utc).timestamp()
                )
                begin_info = "from {}".format(poll_start)
        
        async def async_wrapper():
            for batch in document_generator:
                yield batch
        
        logging.info("Connect to Gitlab: ({}) {}".format(self.conf["project_name"], begin_info))
        return async_wrapper()

Related Issues

This appears to be the same issue as described in:

Environment

  • RAGFlow Version: v0.23.1 (or current version)
  • Python Version: (as per your environment)
  • GitLab Version: (on-premise or GitLab.com)

Steps to Reproduce

  1. Configure a GitLab connector in RAGFlow UI with:
    • GitLab URL
    • Project owner and project name
    • Personal Access Token with read_repository and read_api scopes
    • Enable include_code_files (or other options)
  2. Link the connector to a dataset
  3. Trigger synchronization (Rebuild or wait for scheduled sync)
  4. Observe the error in container logs

Additional Context

  • The connector configuration appears correct (credentials, project settings)
  • The error occurs immediately when the sync task starts processing
  • Other connectors (GitHub, Confluence) work correctly with the same pattern
  • The fix is straightforward and follows the established pattern used by other connectors

Metadata

Metadata

Assignees

No one assigned

    Labels

    🐞 bugSomething isn't working, pull request that fix bug.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions