Splitting large files by rows in GCP

I have some very large files in GCP that I'd like to split down before copying across to AWS to be processed by a lambda.

Files can be as big as 50GB with millions of rows. I'm trying to split them to, say, 100k rows for the lambda to process.

As far as I'm aware, there's nothing in gsutils that can do this.

I've tried writing a file splitter as both a Cloud Function and deployed in App Engine but I've hit memory issues in testing. I went up to an F4 instance but that was still insufficient memory. This was the error I got processing only a 500mb file:

Exceeded hard memory limit of 1024 MB with 1787 MB after servicing 0 requests total. Consider setting a larger instance class in app.yaml

This was the code deployed to App Engine to do the file splitting:

def run():
    LOGGER.info(f"Request received with the following arguments: {request.args}")

    # Request args
    bucket_name = request.args.get('bucket_name')
    file_location = request.args.get('file_location')
    chunk_size = int(request.args.get('chunk_size', 100000))

    LOGGER.info(f"Getting files in bucket: [{bucket_name}] with prefix: [{file_location}]")
    storage_client = storage.Client()

    for blob in storage_client.list_blobs(bucket_name, prefix=file_location):
        blob_name = str(blob.name)
        if "json" in blob_name:
            LOGGER.info(f"Found blob: [{blob_name}]")
            blob_split = blob_name.split("/")
            file_name = blob_split[-1]

            bucket = storage_client.get_bucket(bucket_name)
            LOGGER.info(f"Downloading file: [{file_name}]")
            download_blob = bucket.get_blob(blob_name)
            downloaded_blob_string = download_blob.download_as_string()
            downloaded_json_data = downloaded_blob_string.decode("utf-8").splitlines()
            LOGGER.info(f"Got blob: [{file_name}]")
            file_count = len(downloaded_json_data)
            LOGGER.info(f"Blob [{file_name}] has {file_count} rows")

            for file_number in range(0, file_count - 1, chunk_size):
                range_min = file_number
                range_max = file_number + chunk_size - 1
                if range_max > file_count:
                    range_max = file_count - 1
                LOGGER.info(f"Generating file for rows: {range_min+1} - {range_max+1}")
                split_file = "\n".join(downloaded_json_data[range_min:range_max+1]).encode("utf-8")
                LOGGER.info(f"Attempting upload of file for rows: {range_min+1} - {range_max+1}")
                upload_blob = bucket.blob(f"{file_location}split/{file_name}_split_{range_min+1}-{range_max+1}")
                LOGGER.info(f"Upload complete for rows: {range_min+1} - {range_max+1}")
            LOGGER.info(f"Successfully split file: {file_name}")
    LOGGER.info(f"Completed all file splits for {file_location}")
    return "success"

Is there a more efficient way to do this? What other alternatives do I have?

I want to automate this file splitting process which we have to do a couple of times in a month. Is my best bet to spin up a GCE instance each time so that I can just run the following:

 split -l 100000 file.json

Then shut it down after splitting is completed?