Associate parameterized fixture with parameterized test in pytest
I want to do something similar to this in pytest (this is just a simplified example):
import pytest
@pytest.fixture(params=['A', 'B'])
def fixture1(request):
return request.param.lower()
@pytest.mark.parametrize("extra_arg1", ["1", "2"])
def test_xxx(fixture1, extra_arg1):
assert fixture1 + extra_arg1 == .....
# the test is expected to run 4 times and the results should be "a1", "b1", "a2", "b2"
How can I associate the parameterized fixture with the parameters in the test? Note: I have to use fixture because it has been already in place in the project repo.
Thank you!
See also questions close to this topic
-
Print Variables in For Loop
from matplotlib.pyplot import figure T10,T11,T12,T13 = nx.random_tree(10),nx.random_tree(11),nx.random_tree(12),nx.random_tree(13) T = [T10,T11,T12,T13] for t in T: print("The Pruefer Code for -", pruefer_code(t)) fig = figure() axis = fig.add_subplot(111) nx.draw(t, **opts, ax = axis)
Which results in:
The Pruefer Code for - [2, 8, 1, 9, 5, 5, 6, 8] The Pruefer Code for - [9, 6, 10, 7, 4, 8, 10, 4, 6] The Pruefer Code for - [4, 1, 4, 8, 11, 11, 8, 3, 4, 8] The Pruefer Code for - [8, 7, 11, 4, 2, 7, 9, 1, 5, 10, 7]
Plus the graphs - how would I amend the code so it'd say:
The Pruefer Code for - T10 [2, 8, 1, 9, 5, 5, 6, 8] The Pruefer Code for - T11 [9, 6, 10, 7, 4, 8, 10, 4, 6] The Pruefer Code for - T12 [4, 1, 4, 8, 11, 11, 8, 3, 4, 8] The Pruefer Code for - T13 [8, 7, 11, 4, 2, 7, 9, 1, 5, 10, 7]
Any help appreciated :)
-
Get number of packets received by ping command in python
I need a function that can return the number of packets received or loss percentage. Before I used the code below to get true/false if I receive any of the packets. This should work in Windows, but if somebody can do it suitable for Linux too, I would be thankful.
def ping_ip(ip): current_os = platform.system().lower() parameter = "-c" if current_os == "windows": parameter = "-n" command = ['ping', parameter, '4', ip] res = subprocess.call(command) return res == 0
-
Tkinter GUI unresponsive
I'm pretty new to python and tkinter, so pardon me if I'm being naive. I'm trying to make a GUI for solving an engineering design problem using a widely accepted method (implying the method is seamless). The code for this method takes 0.537909984588623 seconds when run independently (not in tkinter but normal code), and its not too complex or tangled. When I tried to modify this code to fit into a GUI using tkinter, it becomes unresponsive after I enter all the inputs and select a button, even though the program keeps running in the background. Also, when I forcefully close the GUI window, the jupyter kernel becomes dead. Heres a brief outline of my code:
-----------------------------------------------code begins------------------------------------------
from tkinter import * from scipy.optimize import fsolve import matplotlib import numpy as np import threading from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure import matplotlib.pyplot as plt matplotlib.use('TkAgg') import math class MyWindow(): def __init__(self, win): self.lbl1=Label(win, text='Alpha') self.lbl2=Label(win, text='xd') self.lbl3=Label(win, text='xw') self.lbl4=Label(win, text='xf') self.lbl5=Label(win, text='q') self.lbl6=Label(win, text='Reflux Factor') self.lbl7=Label(win, text='Tray Efficiency') self.lbl8=Label(win, text='Total Number of Stages') self.lbl9=Label(win, text='Feed Stage') self.t1=Entry(bd=3) self.t2=Entry(bd=3) self.t3=Entry(bd=3) self.t4=Entry(bd=3) self.t5=Entry(bd=8) self.t6=Entry(bd=8) self.t7=Entry(bd=8) self.t8=Entry(bd=8) self.t9=Entry(bd=8) self.btn1=Button(win, text='Total Number of Stages ', command=self.stagesN) self.lbl1.place(x=100, y=80) self.t1.place(x=300, y=80) self.lbl2.place(x=100, y=130) self.t2.place(x=300, y=130) self.lbl3.place(x=100, y=180) self.t3.place(x=300, y=180) self.lbl4.place(x=100, y=230) self.t4.place(x=300, y=230) self.lbl5.place(x=100, y=280) self.t5.place(x=300, y=280) self.lbl6.place(x=100, y=330) self.t6.place(x=300, y=330) self.lbl7.place(x=100, y=380) self.t7.place(x=300, y=380) self.lbl8.place(x=800, y=130) self.t8.place(x=790, y=170) self.lbl9.place(x=800, y=210) self.t9.place(x=790, y=260) self.btn1.place(x= 500, y= 75) def originalEq(self,xa,relative_volatility): ya=(relative_volatility*xa)/(1+(relative_volatility-1)*xa) return ya def equilibriumReal(self,xa,relative_volatility,nm): ya=(relative_volatility*xa)/(1+(relative_volatility-1)*xa) ya=((ya-xa)*nm)+xa return ya def equilibriumReal2(self,ya,relative_volatility,nm): a=((relative_volatility*nm)-nm-relative_volatility+1) b=((ya*relative_volatility)-ya+nm-1-(relative_volatility*nm)) c=ya xa=(-b-np.sqrt((b**2)-(4*a*c)))/(2*a) return xa def stepping_ESOL(self,x1,y1,relative_volatility,R,xd,nm): x2=self.equilibriumReal2(y1,relative_volatility,nm) y2=(((R*x2)/(R+1))+(xd/(R+1))) return x1,x2,y1,y2 def stepping_SSOL(self,x1,y1,relative_volatility,\ ESOL_q_x,ESOL_q_y,xb,nm): x2=self.equilibriumReal2(y1,relative_volatility,nm) m=((xb-ESOL_q_y)/(xb-ESOL_q_x)) c=ESOL_q_y-(m*ESOL_q_x) y2=(m*x2)+c return x1,x2,y1,y2 def stagesN(self): relative_volatility=float(self.t1.get()) nm=float(self.t7.get()) xd=float(self.t2.get()) xb=float(self.t3.get()) xf=float(self.t4.get()) q=float(self.t5.get()) R_factor=float(self.t6.get()) xa=np.linspace(0,1,100) ya_og=self.originalEq(xa[:],relative_volatility) ya_eq=self.equilibriumReal(xa[:],relative_volatility,nm) x_line=xa[:] y_line=xa[:] al=relative_volatility a=((al*q)/(q-1))-al+(al*nm)-(q/(q-1))+1-nm b=(q/(q-1))-1+nm+((al*xf)/(1-q))-(xf/(1-q))-(al*nm) c=xf/(1-q) if q>1: q_eqX=(-b+np.sqrt((b**2)-(4*a*c)))/(2*a) else: q_eqX=(-b-np.sqrt((b**2)-(4*a*c)))/(2*a) q_eqy=self.equilibriumReal(q_eqX,relative_volatility,nm) theta_min=xd*(1-((xd-q_eqy)/(xd-q_eqX))) R_min=(xd/theta_min)-1 R=R_factor*R_min theta=(xd/(R+1)) ESOL_q_x=((theta-(xf/(1-q)))/((q/(q-1))-((xd-theta)/xd))) ESOL_q_y=(ESOL_q_x*((xd-theta)/xd))+theta x1,x2,y1,y2=self.stepping_ESOL(xd,xd,relative_volatility,R,xd,nm) step_count=1 while x2>ESOL_q_x: x1,x2,y1,y2=self.stepping_ESOL(x2,y2,relative_volatility,R,xd,nm) step_count+=1 feed_stage=step_count x1,x2,y1,y2=self.stepping_SSOL(x1,y1,relative_volatility\ ,ESOL_q_x,ESOL_q_y,xb,nm) step_count+=1 while x2>xb: x1,x2,y1,y2=self.stepping_SSOL(x2,y2,relative_volatility\ ,ESOL_q_x,ESOL_q_y,xb,nm) step_count+=1 xb_actual=x2 stagesN=step_count-1 self.t8.insert(END, str(stagesN)) return window=Tk() mywin=MyWindow(window) window.title('DColumn') window.geometry("1500x1500") window.mainloop()
-------------------------------------------Code end--------------------------------------------
I read on other articles that using multiple threads brings down the load on mainloop and prevents freezing. But like I said, the code isnt very complex. Is it still because of everythings running on the mainloop? Or is there something more than meets the eye? Is multithreading the only way to go past this point?
Thanks in advance.
-
Why test file can't run tests against app file?
I'm practising tdd with Flask and funny enough my first test file can't seem to detect the main flask_app file:
test_basics.py :
from flask_testing import TestCase from flask_app import app import unittest class TestBasics(TestCase): def test_home_route(self): with self.client: response = self.client.get('/') self.assertEqual(response.status_code, 200) self.assertTrue(response.data == b'hakuna matata') if __name__ == '__main__': unittest.main()
I created
setup.py
so the root directory is treated as a project directory:from distutils.core import setup from setuptools import find_packages setup( version='0.0.1', description='training', name='flask' )
project structure:
├── flask_app.py ├── __pycache__ │ └── flask_app.cpython-38.pyc ├── setup.py ├── tests │ └── test_basics.py └── venv export FLASK_APP=flask_app.py export FLASK_ENV=development flask run
What do I need to do so the tests can run against the main flask_app.py file ?
python tests/test_basics.py
returns:ModuleNotFoundError: No module named 'flask_app'
-
Unit test DAO classes with JUnit using a Connection Pool
Hi everyone i'm new here and i need your help to unit test with junit a DAO class with a connection pool. This is the ConPool:
public class ConPool { private static DataSource datasource; /** * {@return} Connection * {@throws} SQLException * Ritorna la connessione al db. */ public static Connection getConnection() throws SQLException { if (datasource == null) { PoolProperties p = new PoolProperties(); p.setUrl("jdbc:mysql://localhost:3306/GameLand?serverTimezone=" + TimeZone.getDefault().getID()); p.setDriverClassName("com.mysql.cj.jdbc.Driver"); p.setUsername("root"); p.setPassword("basedidati"); p.setMaxActive(100); p.setInitialSize(10); p.setMinIdle(10); p.setRemoveAbandonedTimeout(60); p.setRemoveAbandoned(true); datasource = new DataSource(); datasource.setPoolProperties(p); } return datasource.getConnection(); } }
and this is the DAO class that i want to test:
public class OrdineDAO { /** * {@return} ArrayList of Ordine. */ public synchronized ArrayList<Ordine> doRetrieveAll() { String query = "SELECT * FROM ordine"; ArrayList<Ordine> result = new ArrayList<Ordine>(); try (Connection conn = ConPool.getConnection()) { PreparedStatement ps = conn.prepareStatement(query); ResultSet rs = ps.executeQuery(); while (rs.next()) { Ordine ord = new Ordine(); ord.setConsegnato(rs.getBoolean("consegnato")); ord.setDataOra(rs.getString("dataOra")); ord.setIdOrdine(rs.getInt("idOrdine")); ord.setIdProdotto(rs.getInt("idProdotto")); ord.setPrezzoFis(rs.getDouble("prezzoFis")); ord.setPrezzoDig(rs.getDouble("prezzoDig")); ord.setIva(rs.getDouble("iva")); ord.setQuantitaDigitale(rs.getInt("quantitaDigitale")); ord.setQuantitaFisico(rs.getInt("quantitaFisico")); ord.setIdUtente(rs.getInt("idUtente")); result.add(ord); } } catch (SQLException e) { e.printStackTrace(); } return result; } /** * {@param} id: int. * {@return} ArrayList of Ordine. */ public synchronized ArrayList<Ordine> doRetrieveByUser(int id) { PreparedStatement ps = null; String query = "SELECT * FROM ordine WHERE idUtente = ?"; ArrayList<Ordine> result = new ArrayList<Ordine>(); try (Connection conn = ConPool.getConnection()) { ps = conn.prepareStatement(query); ps.setInt(1, id); ResultSet rs = ps.executeQuery(); while (rs.next()) { Ordine ord = new Ordine(); ord.setConsegnato(rs.getBoolean("consegnato")); ord.setDataOra(rs.getString("dataOra")); ord.setIdOrdine(rs.getInt("idOrdine")); ord.setIdProdotto(rs.getInt("idProdotto")); ord.setPrezzoFis(rs.getDouble("prezzoFis")); ord.setPrezzoDig(rs.getDouble("prezzoDig")); ord.setIva(rs.getDouble("iva")); ord.setQuantitaDigitale(rs.getInt("quantitaDigitale")); ord.setQuantitaFisico(rs.getInt("quantitaFisico")); ord.setIdUtente(rs.getInt("idUtente")); result.add(ord); } } catch (SQLException e) { e.printStackTrace(); } return result; } /** * {@param} data1: String. * {@param} data2: String. * {@return} ArrayList of Ordine. */ public synchronized ArrayList<Ordine> doRetrieveByDate(String data1, String data2) { PreparedStatement ps = null; String query = "SELECT * FROM ordine WHERE dataOra >= ? AND dataOra <= ?"; ArrayList<Ordine> result = new ArrayList<Ordine>(); try (Connection conn = ConPool.getConnection()) { ps = conn.prepareStatement(query); ps.setString(1, data1); ps.setString(2, data2); ResultSet rs = ps.executeQuery(); while (rs.next()) { Ordine ord = new Ordine(); ord.setConsegnato(rs.getBoolean("consegnato")); ord.setDataOra(rs.getString("dataOra")); ord.setIdOrdine(rs.getInt("idOrdine")); ord.setIdProdotto(rs.getInt("idProdotto")); ord.setPrezzoFis(rs.getDouble("prezzoFis")); ord.setPrezzoDig(rs.getDouble("prezzoDig")); ord.setIva(rs.getDouble("iva")); ord.setQuantitaDigitale(rs.getInt("quantitaDigitale")); ord.setQuantitaFisico(rs.getInt("quantitaFisico")); ord.setIdUtente(rs.getInt("idUtente")); result.add(ord); } } catch (SQLException e) { e.printStackTrace(); } return result; } }
starting first with doRetrieveAll method, i tried to do a
@Test public void doRetrieveAll_Success() throws SQLException { assertNotNull(ordineDAO.doRetrieveAll()); }
But i need to know how to set the @BeforeAll in order to test this method. Can you please help me understanding how to set properly the test class? Thank you
-
NSubstitute : Can not return value of type Task`1 for IMapperBase.Map (expected type List`1)
I write a unit test, in the test I want to test my
CashCapitalService
.in my service, I used
AutoMapper
for mapping my model to dto and reverse.My Service is:
public class CashCapitalService : ICashCapitalService { private readonly IUnitOfWork _unitOfWork; private readonly IMapper _mapper; public CashCapitalService(IUnitOfWork unitOfWork, IMapper mapper) { _unitOfWork = unitOfWork; _mapper = mapper; } public async Task<List<CashCapitalInsertAndUpdateDto>> GetAllAsync() { var allAsync = await _unitOfWork.CashCapitalRepository.GetAllAsync(); List<CashCapitalInsertAndUpdateDto> cashCapitalInsertAndUpdateDtos = _mapper.Map<List<CashCapitalInsertAndUpdateDto>>(source:allAsync); return cashCapitalInsertAndUpdateDtos; } public class CashCapitalService : ICashCapitalService { private readonly IUnitOfWork _unitOfWork; private readonly IMapper _mapper; public CashCapitalService(IUnitOfWork unitOfWork, IMapper mapper) { _unitOfWork = unitOfWork; _mapper = mapper; } public async Task<List<CashCapitalInsertAndUpdateDto>> GetAllAsync() { var allAsync = await _unitOfWork.CashCapitalRepository.GetAllAsync(); List<CashCapitalInsertAndUpdateDto> cashCapitalInsertAndUpdateDtos = _mapper.Map<List<CashCapitalInsertAndUpdateDto>>(source:allAsync); return cashCapitalInsertAndUpdateDtos; }
as you can see Automapper profile is:
public class CashCapitalInsertDtoProfile : Profile { public CashCapitalInsertDtoProfile() { CreateMap<CashCapitalInsertAndUpdateDto, CashCapital>().ReverseMap(); } }
and in my unit test:
public class CashCapitalServiceTest { private readonly CashCapitalService _cashCapitalService; private readonly IUnitOfWork _unitOfWork; private readonly IMapper _mapper; public CashCapitalServiceTest() { _unitOfWork = Substitute.For<IUnitOfWork>(); _mapper = Substitute.For<IMapper>(); _cashCapitalService = new CashCapitalService(_unitOfWork, _mapper); } [Fact] public async Task GetAllAsync_Should_GetAllCashCapital() { //Arrange _unitOfWork.CashCapitalRepository.GetAllAsync().Returns(new List<CashCapital>()); var cashCapitals = new List<CashCapital>() { new CashCapital() { InsertDateTime = DateTime.Now, AmountRial = 250000000, Description = "tv,a lj,v" } }; var cashCapitalInsertAndUpdateDtos = new List<CashCapitalInsertAndUpdateDto>() { new CashCapitalInsertAndUpdateDto() { AmountRial = 250000000, Description = "tv,a lj,v" } }; //var returnValue = _mapper.Map(Arg.Any<List<CashCapital>>(), Arg.Any<List<CashCapitalInsertAndUpdateDto>>()); //_cashCapitalService.GetAllAsync().Returns(returnValue); _mapper.Map(cashCapitals, cashCapitalInsertAndUpdateDtos).Returns(new List<CashCapitalInsertAndUpdateDto>() { new CashCapitalInsertAndUpdateDto() { AmountRial = 250000000, Description = "tv,a lj,v" } }); var resultValue = _mapper.Map(cashCapitals, cashCapitalInsertAndUpdateDtos); _cashCapitalService.GetAllAsync().ReturnsForAnyArgs(resultValue); //Act var all = await _cashCapitalService.GetAllAsync(); //Assert all.Should().BeOfType<List<CashCapitalInsertAndUpdateDto>>(); await _unitOfWork.CashCapitalRepository.Received(1).GetAllAsync(); } }
When I run this test, failed and give me an error:
Message: NSubstitute.Exceptions.CouldNotSetReturnDueToTypeMismatchException : Can not return value of type Task
1 for IMapperBase.Map (expected type List
1).I'm new in unit testing and the NSubstitute library, I don't know how to mock the AutoMapper.
-
How to pass data to a monkeypatch in pytest
I have a simple test, where I would like to test the correct creation of the user data. The
Data
creation is depending on the user's week (you can consider it as a week joined). This means that users withweek=2
should have allData
objects which are also haveweek=2
. Since I have users with a different week, I want my test to cover several weeks. My problem here is thatUser.week
is a property that uses calculations to get a real week number and I don't know how to pass theweek
parameter to my monkeypatch fixture to make static value be "dynamic".Now I have an ugly solution that works for me - I have created two the same mocks with different return values.
test_tasks.py
from users.tasks import create_default_data_for_user class TestUserTasks: @pytest.mark.parametrize( "week", ["mock_user_week_1", "mock_user_week_2"] ) def test_create_default_data_for_user(self, user, rf, data, week): """ Check creation of user data using create_default_data_for_user task. :param user: a user object fixture :param rf: instance of Django test RequestFactory :param data: fixture that creates default data object :param week: mock object of user's week property :return: """ # Make sure the user has no user data assert len(user.data_set.all()) == 0 create_default_data_for_user() # Now the user should be given data according to his week expected_user_data_count = Data.objects.filter(week=user.week).count() assert len(user.data_set.all()) == expected_user_data_count # Make sure that for the current user the second run will not cause user data duplication create_default_data_for_user() assert len(user.data_set.all()) == expected_user_data_count
fixtures.py
import pytest @pytest.fixture def mock_user_week_1(monkeypatch): """ Mock the User object's property week. The mock object will prevent calculation of the real User's week and just return a given number. """ @property def user_week(*args, **kwargs): """ A mock object that overrides the method and returns a static value. :param args: args of the original object :param kwargs: kwargs of the original object :return: static value indicating week number 1 """ return 1 monkeypatch.setattr( User, "week", user_week ) @pytest.fixture def mock_user_week_2(monkeypatch): """ Mock the User object's property week. The mock object will prevent calculation of the real User's week and just return a given number. """ # pylint: disable=unused-argument @property def user_week(*args, **kwargs): """ A mock object that overrides the method and returns a static value. :param args: args of the original object :param kwargs: kwargs of the original object :return: static value indicating week number 2 """ return 2 monkeypatch.setattr( User, "week", user_week )
-
In pytest-html plugin, capture log message for all but passed tests in report.html
I am trying generate a html report for my pytest tests using pytest-html plugin using the below command
pytest -v tests/ --html=report.html
This is generating the report with log capture for all the tests but I don't want to include the logs for passed tests. Is there a way to achieve this?
-
pytest stack parametrize decorators with dynamic parameters
i'm new to pytest so please bear with me.
i'm trying to use stack parametrize decorators to test multiple combination permutations, but the question is how can i use values from other parametrize decorators in the stack.i found the following: but it is not exactly what i'm looking for
stacked parametrize
Using fixtures in pytest.mark.parametrize\this is what i'm trying to achieve:
@pytest.mark.parametrize("environment", ["main", "develop", "ci"]) @pytest.mark.parametrize("model", get_models()) @pytest.mark.parametrize("id", get_ids(environment, model)) #here i tried to use the returned values of environment and model from the above decorators def test_ids(environment, model, id): another_method(environment, model, id) # some logic here
get_ids()
returns a list of ids based on a givenenvironment
andmodel
.
this solution doesn't work, since it raising an unresolved reference error forenvironment
andmodel
the reason that i want to use parametrize decorator is because i need to test all the permutations of
environments
,models
andids
, but want pytest to generate a separate test for each combination.my current solution is:
@pytest.mark.parametrize("environment", ["main", "develop", "ci"]) @pytest.mark.parametrize("model", get_models()) def test_ids(environment, model): ids = get_ids(environment, model) for id in ids: another_method(environment, model, id) # some logic here
this solution works, but each test is very long since it's looping over a long list of ids, i prefer running multiple small tests, rather than fewer tests but very long.
it makes it harder to understand what happen in the tests.
any suggestion? -
Use a pytest fixture to download data once, but use new copies for each test
I have test code that downloads a set of images, does some processing on them, and asserts that the processing worked as expected:
@pytest.fixture def image_dir(tmp_path): test_imgs = [ # ... list of img URLs ] for idx, im_url in enumerate(test_imgs): urllib.request.urlretrieve(im_url, tmp_path / f"{idx}.png") yield tmp_path def test_op_A(image_dir: Path): for im_path in image_dir.iterdir(): # load the image # modify the image # save the image back to disk # assert that the modification worked as expected def test_op_B(image_dir: Path): for im_path in image_dir.iterdir(): # load the image # modify the image # save the image back to disk # assert that the modification worked as expected # ... more tests with a similar format
This works but is incredibly slow. I suspect that this is because the images are downloaded anew for each test.
Is there a clean way to create the temporary directory once, cache it, and use a copy of the directory for each test? This way each test can modify the images as desired, without influencing the other tests and while remaining performant.