Connection failure when using airflow to run python script connecting to neo4j database

I'm trying to use airflow to orchestrate a workflow where a neo4j docker is run and then a python script to query data from the neo4j database (the code runs on AWS EC2 instance). I am able to run the neo4j docker successfully. But when I ran the task of querying the database, I got connection error as:

neo4j.exceptions.ServiceUnavailable: Connection to 127.0.0.1:7687 closed without handshake response

If I manually ran the python script on EC2, it can connect to the neo4j database without any issue. The code I am using is as below:

class Neo4jConnection:

    def __init__(self, uri, user, pwd):
        self.__uri = uri
        self.__user = user
        self.__pwd = pwd
        self.__driver = None
        try:
            self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
            logger.info('SUCCESS: Connected to the Neo4j Database.')
        except Exception as e:
            logger.info('ERROR: Could not connect to the Neo4j Database. See console for details.')
            raise SystemExit(e)

    def close(self):
        if self.__driver is not None:
            self.__driver.close()

    def query(self, query, parameters=None, db=None):
        assert self.__driver is not None, "Driver not initialized!"
        session = None
        response = None
        try:
            session = self.__driver.session(database=db) if db is not None else self.__driver.session()
            response = list(session.run(query, parameters))
        except Exception as e:
            logger.info("Query failed:", e)
        finally:
            if session is not None:
                session.close()
        return response 

class LoadWikiPathway2Neo4j:
    def __init__(self):
        # replace localhost with 127.0.0.1
        self.connection = Neo4jConnection(uri="bolt://localhost:7687",
                                          user="neo4j",
                                          pwd="test")
    def loadData(self):
        WIKIPATHWAY_MOUNTED_DATA_VOLUME = "/home/ec2-user/wikipathway_neo4j/data/Human/"  # the volume is mounted to neo4j docker
        WIKIPATHWAY_DATA_DOCKER_VOLUME = "file:///var/lib/neo4j/data/Human"  # file path in neo4j docker

        # connect db
        graph = self.connection
        # only run once
        graph.query('''MATCH (n) DETACH DELETE n''')
        graph.query('''CALL n10s.graphconfig.init()''')
        graph.query('''CREATE CONSTRAINT n10s_unique_uri IF NOT EXISTS ON (r:Resource) ASSERT r.uri IS UNIQUE''')
        graph.query('''call n10s.nsprefixes.removeAll()''')
        cypher = '''WITH '@prefix biopax:  <http://www.biopax.org/release/biopax-level3.owl#> . \
        @prefix cito:    <http://purl.org/spar/cito/> . \
        @prefix dc:      <http://purl.org/dc/elements/1.1/> . \
        @prefix dcat:     <http://www.w3.org/ns/dcat#> . \
        @prefix dcterms: <http://purl.org/dc/terms/> . \
        @prefix foaf:    <http://xmlns.com/foaf/0.1/> . \
        @prefix freq:    <http://purl.org/cld/freq/> . \
        @prefix gpml:    <http://vocabularies.wikipathways.org/gpml#> . \
        @prefix hmdb:     <https://identifiers.org/hmdb/> . \
        @prefix ncbigene: <https://identifiers.org/ncbigene/> .\
        @prefix owl:     <http://www.w3.org/2002/07/owl#> . \
        @prefix pav:     <http://purl.org/pav/> . \
        @prefix prov:    <http://www.w3.org/ns/prov#> . \
        @prefix pubmed:   <http://www.ncbi.nlm.nih.gov/pubmed/> .\
        @prefix rdf:     <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . \
        @prefix rdfs:    <http://www.w3.org/2000/01/rdf-schema#> . \
        @prefix skos:    <http://www.w3.org/2004/02/skos/core#> . \
        @prefix void:    <http://rdfs.org/ns/void#> . \
        @prefix wp:      <http://vocabularies.wikipathways.org/wp#> . \
        @prefix wprdf:   <http://rdf.wikipathways.org/> . \
        @prefix xsd:     <http://www.w3.org/2001/XMLSchema#> . ' as txt \
        CALL n10s.nsprefixes.addFromText(txt) yield prefix, namespace RETURN prefix, namespace'''
        graph.query(cypher)

        fileLst = os.listdir(WIKIPATHWAY_MOUNTED_DATA_VOLUME)
        # load data
        for filename in fileLst:
            filePath = f'{WIKIPATHWAY_DATA_DOCKER_VOLUME}/{filename}'
            print(filePath)
            cypher = f'''CALL n10s.rdf.import.fetch("{filePath}","Turtle")'''
            logger.info(cypher)
            graph.query(cypher)
            logger.info(f"{filename} is loaded")
        graph.close()

def load_wikipathway_files():
    data = LoadWikiPathway2Neo4j()
    data.loadData()

with DAG(
        default_args=default,
        dag_id=os.path.basename(__file__).replace(".py", ""),
        schedule_interval=None,
        catchup=False,
        start_date=days_ago(1),
        tags=['s3', 'download_wikipathway'],
) as dag:
     loadWikipathwayFile = PythonOperator(
        task_id="load_wikipathway",
        python_callable=load_wikipathway_files
    )
How many English words
do you know?
Test your English vocabulary size, and measure
how many words do you know
Online Test
Powered by Examplum