How to use _services method in localstack

Best Python code snippet using localstack_python

users.py

Source:users.py Github

copy

Full Screen

1"""Receive params as schema, calls services fnc, then respond as schema."""2from typing import List3import datetime as _dt4import fastapi as _fastapi5import fastapi.security as _security6import sqlalchemy.orm as _orm7import backend.models.schemas as _schemas8import backend.models.model as _model9import backend.services.users as _services10from backend.config import settings11app = _fastapi.FastAPI()12@app.post("/api/v1/users")13async def create_user(14 user: _schemas.UserCreate, db: _orm.Session = _fastapi.Depends(_services.get_db)15):16 db_user = await _services.get_user_by_email(user.email, db)17 if db_user:18 raise _fastapi.HTTPException(status_code=400, detail="Email already in user")19 await _services.create_user(user, db)20 return await _services.create_access_token(user)21@app.post("/api/v1/token")22async def generate_token(23 form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),24 db: _orm.Session = _fastapi.Depends(_services.get_db)25):26 user = await _services.authenticate_user(form_data.username, form_data.password, db)27 if not user:28 raise _fastapi.HTTPException(status_code=401, detail="Invalid Credentials")29 return await _services.create_token(user)30@app.get("/api/v1/users/me", response_model=_schemas.User)31async def get_user(user: _schemas.User = _fastapi.Depends(_services.get_current_user)):32 return user33@app.post("/api/v1/leads", response_model=_schemas.Lead)34async def create_lead(35 lead: _schemas.LeadCreate,36 user: _schemas.User = _fastapi.Depends(_services.get_current_user),37 db: _orm.Session = _fastapi.Depends(_services.get_db)38):39 return await _services.create_lead(user=user, db=db, lead=lead)40@app.get("/api/v1/leads", response_model=List[_schemas.Lead])41async def get_leads(42 user: _schemas.User = _fastapi.Depends(_services.get_current_user),43 db: _orm.Session = _fastapi.Depends(_services.get_db)44):45 return await _services.get_leads(user=user, db=db)46@app.get("/api/v1/leads/{lead_id}", status_code=200)47async def get_lead(48 lead_id: int,49 user: _schemas.User = _fastapi.Depends(_services.get_current_user),50 db: _orm.Session = _fastapi.Depends(_services.get_db)51):52 return await _services.get_lead(lead_id=lead_id, user=user, db=db)53@app.delete("/api/v1/leads/{lead_id}", status_code=204)54async def delete_lead(55 lead_id: int,56 user: _schemas.User = _fastapi.Depends(_services.get_current_user),57 db: _orm.Session = _fastapi.Depends(_services.get_db)58):59 await _services.delete_lead(lead_id, user, db)60 return {"message", "Successfully Deleted"}61@app.put("/api/v1/leads/{lead_id}", status_code=200)62async def update_lead(63 lead_id: int,64 lead: _schemas.LeadCreate,65 user: _schemas.User = _fastapi.Depends(_services.get_current_user),66 db: _orm.Session = _fastapi.Depends(_services.get_db)67):68 await _services.update_lead(lead_id, lead, user, db)69 return {"message", "Successfully Updated"}70@app.get("/api/v1")71async def root():72 return {"message": "Awesome Leads Manager"}73@app.post("/api/users", response_model=_schemas.User)74def signup(user_data: _schemas.UserCreate, db: _orm.Session = _fastapi.Depends(_services.get_db)):75 """add new user"""76 user = _services.get_user_by_email(db, user_data.email)77 if user:78 raise _fastapi.HTTPException(status_code=409,79 detail="Email already registered.")80 signedup_user = _services.create_user(db, user_data)81 return signedup_user82@app.post("/api/v1/login", response_model=_schemas.TokenData)83async def login_for_access_token(db: _orm.Session = _fastapi.Depends(_services.get_db),form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends()):84 user = _services.authenticate_user(form_data.username, form_data.password)85 if not user:86 raise _fastapi.HTTPException(87 status_code=_fastapi.status.HTTP_401_UNAUTHORIZED,88 detail="Incorrect username or password",89 headers={"WWW-Authenticate": "Bearer"},90 )91 access_token_expires = _dt.timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)92 access_token = _services.create_access_token(93 data={"sub": user.username}, expires_delta=access_token_expires94 )95 response = {96 "access_token": access_token,97 "token_type": "bearer",98 "username": user.username}99 return response100@app.get("/api/me", response_model=_schemas.User)101async def read_logged_in_user(current_user: _model.User = _fastapi.Depends(_services.authenticate_user)):...

Full Screen

Full Screen

app.py

Source:app.py Github

copy

Full Screen

1import fastapi as _fastapi2import fastapi.security as _security3import sqlalchemy.orm as _orm4from typing import List5import schemas as _schemas6import services as _services7from fastapi.middleware.cors import CORSMiddleware8app = _fastapi.FastAPI()9app.add_middleware(10 CORSMiddleware,11 allow_origins=["*"],12 allow_credentials=True,13 allow_methods=["*"],14 allow_headers=["*"]15)16@app.post("/api/v1/users")17async def register_user(18 user: _schemas.UserRequest, db: _orm.Session = _fastapi.Depends(_services.get_db)19):20 # call to check if user with email exist21 db_user = await _services.getUserByEmail(email=user.email, db=db)22 # if user found throw exception23 if db_user:24 raise _fastapi.HTTPException(status_code=400, detail="Email already exist, try with another email!")25 # create the user and return a token26 db_user = await _services.create_user(user=user, db=db)27 return await _services.create_token(user=db_user)28@app.post("/api/v1/login")29async def login_user(30 form_data: _security.OAuth2PasswordRequestForm = _fastapi.Depends(),31 db: _orm.Session = _fastapi.Depends(_services.get_db)32):33 db_user = await _services.login(email=form_data.username,34 password=form_data.password, db=db)35 #Invalid login then throw exception36 if not db_user:37 raise _fastapi.HTTPException(status_code=401, detail="Wrong Login Credentials!")38 #create and return the token39 return await _services.create_token(db_user)40@app.get("/api/v1/users/current-user", response_model=_schemas.UserResponse)41async def current_user(user: _schemas.UserResponse = _fastapi.Depends(_services.current_user)):42 return user43@app.post("/api/v1/posts", response_model=_schemas.PostResponse)44async def create_post(45 post_request: _schemas.PostRequest,46 user: _schemas.UserRequest = _fastapi.Depends(_services.current_user),47 db: _orm.Session = _fastapi.Depends(_services.get_db)48):49 return await _services.create_post(user=user, db=db, post=post_request)50@app.get("/api/v1/posts/user", response_model=List[_schemas.PostResponse])51async def get_posts_by_user(52 user: _schemas.UserRequest = _fastapi.Depends(_services.current_user),53 db: _orm.Session = _fastapi.Depends(_services.get_db)54):55 return await _services.get_posts_by_user(user=user, db=db)56@app.get("/api/v1/posts/all", response_model=List[_schemas.PostResponse])57async def get_posts_by_all(58 db: _orm.Session = _fastapi.Depends(_services.get_db)59):60 return await _services.get_posts_by_all(db=db)61@app.get("/api/v1/posts/{post_id}/", response_model=_schemas.PostResponse)62async def get_post_detail(63 post_id:int, db: _orm.Session = _fastapi.Depends(_services.get_db)64):65 post = await _services.get_post_detail(post_id=post_id, db=db)66 return post67@app.get("/api/v1/users/{user_id}/", response_model=_schemas.UserResponse)68async def get_user_detail(69 user_id:int, db: _orm.Session = _fastapi.Depends(_services.get_db)70):71 return await _services.get_user_detail(user_id=user_id, db=db)72@app.delete("/api/v1/posts/{post_id}/")73async def delete_post(74 post_id: int,75 db: _orm.Session = _fastapi.Depends(_services.get_db),76 user: _schemas.UserRequest = _fastapi.Depends(_services.current_user)77):78 post = await _services.get_post_detail(post_id=post_id, db=db)79 await _services.delete_post(post=post, db=db)80 return "Post deleted successfully"81@app.put("/api/v1/posts/{post_id}/", response_model=_schemas.PostResponse)82async def update_post(83 post_id: int,84 post_request: _schemas.PostRequest,85 db: _orm.Session = _fastapi.Depends(_services.get_db)86):87 db_post = await _services.get_post_detail(post_id=post_id, db=db)...

Full Screen

Full Screen

main.py

Source:main.py Github

copy

Full Screen

1from typing import List2import fastapi as _fastapi3import fastapi.security as _security4import sqlalchemy.orm as _orm5import services as _services, schemas as _schemas6app = _fastapi.FastAPI()7@app.post("/api/users")8async def create_user(9 user: _schemas.UserCreate, db: _orm.Session = _fastapi.Depends(_services.get_db)10):11 db_user = await _services.get_user_by_email(user.email, db)12 if db_user:13 raise _fastapi.HTTPException(status_code=400, detail="Email already in use")14 user = await _services.create_user(user, db)15 return await _services.create_token(user)16@app.post("/api/token")17async def generate_token(from_data:_security.OAuth2PasswordRequestForm=_fastapi.Depends(),db:_orm.Session = _fastapi.Depends(_services.get_db)):18 user = await _services.authenticate_user(from_data.username, from_data.password, db)19 if not user:20 raise _fastapi.HTTPException(status_code=401, detail="Invalid Credentials")21 22 return await _services.create_token(user)23@app.get("api/users/me", response_model=_schemas.User)24async def get_user(user:_schemas.User=_fastapi.Depends(_services.get_current_user)):25 return user26@app.post("/api/leads", response_model=_schemas.Lead)27async def create_lead(lead: _schemas.LeadCreate,user:_schemas.User = _fastapi.Depends(_services.get_current_user),db:_orm.Session = _fastapi.Depends(_services.get_db)):28 return await _services.create_lead(user=user, db=db, lead=lead)29@app.get("/api/leads", response_model=List[_schemas.Lead])30async def get_leads(user:_schemas.User=_fastapi.Depends(_services.get_current_user),db:_orm.Session = _fastapi.Depends(_services.get_db)):31 return await _services.get_leads(user=user, db=db)32@app.get("/api/leads/{lead_id}", status_code=200)33async def get_lead(lead_id:int,user:_schemas.User = _fastapi.Depends(_services.get_current_user),db:_orm.Session=_fastapi.Depends(_services.get_db)):34 return await _services.get_lead(lead_id, user, db)35@app.delete("/api/leads/{lead_id}", status_code=204)36async def delete_lead(lead_id:int, user:_schemas.User=_fastapi.Depends(_services.get_current_user),db:_orm.Session=_fastapi.Depends(_services.get_db)):37 await _services.delete_lead(lead_id, user, db)38 return {"Message", "Successfullty Delete"}39@app.delete("/api/leads/{lead_id}", status_code=200)40async def update_lead(lead_id:int, lead:_schemas.LeadCreate,user:_schemas.User=_fastapi.Depends(_services.get_db)):41 await _services.update_lead(lead_id, lead, user, db)42 return ("Message", "Successfully updatet leads")43@app.get("/api")44async def root():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful