I'm trying to use airflow to orchestrate a workflow where a neo4j docker is run and then a python script to query data from the neo4j database (the code runs on AWS EC2 instance). I am able to run the neo4j docker successfully. But when I ran the task of querying the database, I got connection error as:
neo4j.exceptions.ServiceUnavailable: Connection to 127.0.0.1:7687 closed without handshake response
If I manually ran the python script on EC2, it can connect to the neo4j database without any issue.
The code I am using is as below:
class Neo4jConnection:
def __init__(self, uri, user, pwd):
self.__uri = uri
self.__user = user
self.__pwd = pwd
self.__driver = None
try:
self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
logger.info('SUCCESS: Connected to the Neo4j Database.')
except Exception as e:
logger.info('ERROR: Could not connect to the Neo4j Database. See console for details.')
raise SystemExit(e)
def close(self):
if self.__driver is not None:
self.__driver.close()
def query(self, query, parameters=None, db=None):
assert self.__driver is not None, "Driver not initialized!"
session = None
response = None
try:
session = self.__driver.session(database=db) if db is not None else self.__driver.session()
response = list(session.run(query, parameters))
except Exception as e:
logger.info("Query failed:", e)
finally:
if session is not None:
session.close()
return response
class LoadWikiPathway2Neo4j:
def __init__(self):
# replace localhost with 127.0.0.1
self.connection = Neo4jConnection(uri="bolt://localhost:7687",
user="neo4j",
pwd="test")
def loadData(self):
WIKIPATHWAY_MOUNTED_DATA_VOLUME = "/home/ec2-user/wikipathway_neo4j/data/Human/" # the volume is mounted to neo4j docker
WIKIPATHWAY_DATA_DOCKER_VOLUME = "file:///var/lib/neo4j/data/Human" # file path in neo4j docker
# connect db
graph = self.connection
# only run once
graph.query('''MATCH (n) DETACH DELETE n''')
graph.query('''CALL n10s.graphconfig.init()''')
graph.query('''CREATE CONSTRAINT n10s_unique_uri IF NOT EXISTS ON (r:Resource) ASSERT r.uri IS UNIQUE''')
graph.query('''call n10s.nsprefixes.removeAll()''')
cypher = '''WITH '@prefix biopax: <http://www.biopax.org/release/biopax-level3.owl#> . \
@prefix cito: <http://purl.org/spar/cito/> . \
@prefix dc: <http://purl.org/dc/elements/1.1/> . \
@prefix dcat: <http://www.w3.org/ns/dcat#> . \
@prefix dcterms: <http://purl.org/dc/terms/> . \
@prefix foaf: <http://xmlns.com/foaf/0.1/> . \
@prefix freq: <http://purl.org/cld/freq/> . \
@prefix gpml: <http://vocabularies.wikipathways.org/gpml#> . \
@prefix hmdb: <https://identifiers.org/hmdb/> . \
@prefix ncbigene: <https://identifiers.org/ncbigene/> .\
@prefix owl: <http://www.w3.org/2002/07/owl#> . \
@prefix pav: <http://purl.org/pav/> . \
@prefix prov: <http://www.w3.org/ns/prov#> . \
@prefix pubmed: <http://www.ncbi.nlm.nih.gov/pubmed/> .\
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> . \
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> . \
@prefix skos: <http://www.w3.org/2004/02/skos/core#> . \
@prefix void: <http://rdfs.org/ns/void#> . \
@prefix wp: <http://vocabularies.wikipathways.org/wp#> . \
@prefix wprdf: <http://rdf.wikipathways.org/> . \
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> . ' as txt \
CALL n10s.nsprefixes.addFromText(txt) yield prefix, namespace RETURN prefix, namespace'''
graph.query(cypher)
fileLst = os.listdir(WIKIPATHWAY_MOUNTED_DATA_VOLUME)
# load data
for filename in fileLst:
filePath = f'{WIKIPATHWAY_DATA_DOCKER_VOLUME}/{filename}'
print(filePath)
cypher = f'''CALL n10s.rdf.import.fetch("{filePath}","Turtle")'''
logger.info(cypher)
graph.query(cypher)
logger.info(f"{filename} is loaded")
graph.close()
def load_wikipathway_files():
data = LoadWikiPathway2Neo4j()
data.loadData()
with DAG(
default_args=default,
dag_id=os.path.basename(__file__).replace(".py", ""),
schedule_interval=None,
catchup=False,
start_date=days_ago(1),
tags=['s3', 'download_wikipathway'],
) as dag:
loadWikipathwayFile = PythonOperator(
task_id="load_wikipathway",
python_callable=load_wikipathway_files
)