From e247bdc929a2d821c8a0a1ee2a60f3fccf0146dc Mon Sep 17 00:00:00 2001 From: Eden Kirin Date: Thu, 23 Mar 2023 00:09:07 +0100 Subject: [PATCH] Async python client --- src/funnel/main.py | 69 ++++++++++++++++++++++++-------------- src/serve_segments/main.go | 32 ++++++++++-------- 2 files changed, 62 insertions(+), 39 deletions(-) diff --git a/src/funnel/main.py b/src/funnel/main.py index 36d9946..15bb375 100644 --- a/src/funnel/main.py +++ b/src/funnel/main.py @@ -1,19 +1,22 @@ -from contextlib import contextmanager +import asyncio import logging import time -from typing import Generator, Optional +from contextlib import asynccontextmanager +from typing import AsyncGenerator, Optional + import grpc -from stubs.serve_segments_pb2_grpc import ServeSegmentsStub + from stubs.serve_segments_pb2 import ( GetHoursRequest, GetHoursResponse, + GetMillisecondsRequest, + GetMillisecondsResponse, GetMinutesRequest, GetMinutesResponse, GetSecondsRequest, GetSecondsResponse, - GetMillisecondsRequest, - GetMillisecondsResponse, ) +from stubs.serve_segments_pb2_grpc import ServeSegmentsStub SERVE_CURRENTTIME_HOST = "localhost" SERVE_CURRENTTIME_PORT = 50000 @@ -23,46 +26,62 @@ SERVE_SEGMENTS_PORT = 50001 TIMEZONE = "Europe/Zagreb" -@contextmanager -def get_serve_segments_stub() -> Generator[ServeSegmentsStub, None, None]: - with grpc.insecure_channel( +@asynccontextmanager +async def get_serve_segments_stub() -> AsyncGenerator[ServeSegmentsStub, None]: + """Connect to segments server and create stub""" + async with grpc.aio.insecure_channel( f"{SERVE_SEGMENTS_HOST}:{SERVE_SEGMENTS_PORT}" ) as channel: - stub = ServeSegmentsStub(channel) - yield stub + yield ServeSegmentsStub(channel) -def get_hours(stub: ServeSegmentsStub) -> Optional[int]: - response: GetHoursResponse = stub.GetHours(GetHoursRequest(timezone=TIMEZONE)) +async def get_hours(stub: ServeSegmentsStub) -> Optional[int]: + response: GetHoursResponse = await stub.GetHours(GetHoursRequest(timezone=TIMEZONE)) return response.hours -def get_minutes(stub: ServeSegmentsStub) -> Optional[int]: - response: GetMinutesResponse = stub.GetMinutes(GetMinutesRequest(timezone=TIMEZONE)) +async def get_minutes(stub: ServeSegmentsStub) -> Optional[int]: + response: GetMinutesResponse = await stub.GetMinutes( + GetMinutesRequest(timezone=TIMEZONE) + ) return response.minutes -def get_seconds(stub: ServeSegmentsStub) -> Optional[int]: - response: GetSecondsResponse = stub.GetSeconds(GetSecondsRequest(timezone=TIMEZONE)) +async def get_seconds(stub: ServeSegmentsStub) -> Optional[int]: + response: GetSecondsResponse = await stub.GetSeconds( + GetSecondsRequest(timezone=TIMEZONE) + ) return response.seconds -def get_milliseconds(stub: ServeSegmentsStub) -> Optional[int]: - response: GetMillisecondsResponse = stub.GetMilliseconds( +async def get_milliseconds(stub: ServeSegmentsStub) -> Optional[int]: + response: GetMillisecondsResponse = await stub.GetMilliseconds( GetMillisecondsRequest(timezone=TIMEZONE) ) return response.milliseconds -def run(): - with get_serve_segments_stub() as stub: +async def main(): + async with get_serve_segments_stub() as stub: while True: t = time.perf_counter() - hours = get_hours(stub) - minutes = get_minutes(stub) - seconds = get_seconds(stub) - milliseconds = get_milliseconds(stub) + # create tasks + task_hours = asyncio.create_task(get_hours(stub)) + task_minutes = asyncio.create_task(get_minutes(stub)) + task_seconds = asyncio.create_task(get_seconds(stub)) + task_milliseconds = asyncio.create_task(get_milliseconds(stub)) + + # exec tasks asynchronously + await asyncio.gather( + task_hours, task_minutes, task_seconds, task_milliseconds + ) + + # get results + hours = task_hours.result() + minutes = task_minutes.result() + seconds = task_seconds.result() + milliseconds = task_milliseconds.result() t = time.perf_counter() - t @@ -73,4 +92,4 @@ def run(): if __name__ == "__main__": logging.basicConfig() - run() + asyncio.run(main()) diff --git a/src/serve_segments/main.go b/src/serve_segments/main.go index 4282dc2..65e7421 100644 --- a/src/serve_segments/main.go +++ b/src/serve_segments/main.go @@ -31,21 +31,12 @@ type currentTimeResponse struct { formatted_time string } -func GetCurrentTime(timezone string) currentTimeResponse { - conn, err := grpc.Dial( - fmt.Sprintf("%s:%d", SERVE_CURRENTTIME_HOST, SERVE_CURRENTTIME_PORT), - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - log.Fatalf("did not connect: %v", err) - } - defer conn.Close() - c := pb_get_currenttime.NewServeCurrentTimeServiceClient(conn) +var serveCurrentTimeServiceClient pb_get_currenttime.ServeCurrentTimeServiceClient - // Contact the server and print out its response. +func GetCurrentTime(timezone string) currentTimeResponse { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - r, err := c.GetCurrentTime( + r, err := serveCurrentTimeServiceClient.GetCurrentTime( ctx, &pb_get_currenttime.GetCurrentTimeRequest{Timezone: timezone}, ) @@ -53,14 +44,13 @@ func GetCurrentTime(timezone string) currentTimeResponse { log.Fatalf("could not greet: %v", err) } - result := currentTimeResponse{ + return currentTimeResponse{ hours: r.Hours, minutes: r.Minutes, seconds: r.Seconds, milliseconds: r.Milliseconds, formatted_time: r.FormattedTime, } - return result } func (s *grpc_server) GetHours( @@ -105,9 +95,23 @@ func serve() { log.Fatalf("failed to listen: %v", err) } + // create ServeSegments server server := grpc.NewServer() pb_serve_segments.RegisterServeSegmentsServer(server, &grpc_server{}) log.Printf("server listening at %v", lis.Addr()) + + // create connection to ServeCurrentTime server + getCurrentTimeConn, err := grpc.Dial( + fmt.Sprintf("%s:%d", SERVE_CURRENTTIME_HOST, SERVE_CURRENTTIME_PORT), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer getCurrentTimeConn.Close() + // create client + serveCurrentTimeServiceClient = pb_get_currenttime.NewServeCurrentTimeServiceClient(getCurrentTimeConn) + if err := server.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) }