Async python client

This commit is contained in:
Eden Kirin
2023-03-23 00:09:07 +01:00
parent 1542bd98c2
commit e247bdc929
2 changed files with 62 additions and 39 deletions

View File

@ -1,19 +1,22 @@
from contextlib import contextmanager import asyncio
import logging import logging
import time import time
from typing import Generator, Optional from contextlib import asynccontextmanager
from typing import AsyncGenerator, Optional
import grpc import grpc
from stubs.serve_segments_pb2_grpc import ServeSegmentsStub
from stubs.serve_segments_pb2 import ( from stubs.serve_segments_pb2 import (
GetHoursRequest, GetHoursRequest,
GetHoursResponse, GetHoursResponse,
GetMillisecondsRequest,
GetMillisecondsResponse,
GetMinutesRequest, GetMinutesRequest,
GetMinutesResponse, GetMinutesResponse,
GetSecondsRequest, GetSecondsRequest,
GetSecondsResponse, GetSecondsResponse,
GetMillisecondsRequest,
GetMillisecondsResponse,
) )
from stubs.serve_segments_pb2_grpc import ServeSegmentsStub
SERVE_CURRENTTIME_HOST = "localhost" SERVE_CURRENTTIME_HOST = "localhost"
SERVE_CURRENTTIME_PORT = 50000 SERVE_CURRENTTIME_PORT = 50000
@ -23,46 +26,62 @@ SERVE_SEGMENTS_PORT = 50001
TIMEZONE = "Europe/Zagreb" TIMEZONE = "Europe/Zagreb"
@contextmanager @asynccontextmanager
def get_serve_segments_stub() -> Generator[ServeSegmentsStub, None, None]: async def get_serve_segments_stub() -> AsyncGenerator[ServeSegmentsStub, None]:
with grpc.insecure_channel( """Connect to segments server and create stub"""
async with grpc.aio.insecure_channel(
f"{SERVE_SEGMENTS_HOST}:{SERVE_SEGMENTS_PORT}" f"{SERVE_SEGMENTS_HOST}:{SERVE_SEGMENTS_PORT}"
) as channel: ) as channel:
stub = ServeSegmentsStub(channel) yield ServeSegmentsStub(channel)
yield stub
def get_hours(stub: ServeSegmentsStub) -> Optional[int]: async def get_hours(stub: ServeSegmentsStub) -> Optional[int]:
response: GetHoursResponse = stub.GetHours(GetHoursRequest(timezone=TIMEZONE)) response: GetHoursResponse = await stub.GetHours(GetHoursRequest(timezone=TIMEZONE))
return response.hours return response.hours
def get_minutes(stub: ServeSegmentsStub) -> Optional[int]: async def get_minutes(stub: ServeSegmentsStub) -> Optional[int]:
response: GetMinutesResponse = stub.GetMinutes(GetMinutesRequest(timezone=TIMEZONE)) response: GetMinutesResponse = await stub.GetMinutes(
GetMinutesRequest(timezone=TIMEZONE)
)
return response.minutes return response.minutes
def get_seconds(stub: ServeSegmentsStub) -> Optional[int]: async def get_seconds(stub: ServeSegmentsStub) -> Optional[int]:
response: GetSecondsResponse = stub.GetSeconds(GetSecondsRequest(timezone=TIMEZONE)) response: GetSecondsResponse = await stub.GetSeconds(
GetSecondsRequest(timezone=TIMEZONE)
)
return response.seconds return response.seconds
def get_milliseconds(stub: ServeSegmentsStub) -> Optional[int]: async def get_milliseconds(stub: ServeSegmentsStub) -> Optional[int]:
response: GetMillisecondsResponse = stub.GetMilliseconds( response: GetMillisecondsResponse = await stub.GetMilliseconds(
GetMillisecondsRequest(timezone=TIMEZONE) GetMillisecondsRequest(timezone=TIMEZONE)
) )
return response.milliseconds return response.milliseconds
def run(): async def main():
with get_serve_segments_stub() as stub: async with get_serve_segments_stub() as stub:
while True: while True:
t = time.perf_counter() t = time.perf_counter()
hours = get_hours(stub) # create tasks
minutes = get_minutes(stub) task_hours = asyncio.create_task(get_hours(stub))
seconds = get_seconds(stub) task_minutes = asyncio.create_task(get_minutes(stub))
milliseconds = get_milliseconds(stub) task_seconds = asyncio.create_task(get_seconds(stub))
task_milliseconds = asyncio.create_task(get_milliseconds(stub))
# exec tasks asynchronously
await asyncio.gather(
task_hours, task_minutes, task_seconds, task_milliseconds
)
# get results
hours = task_hours.result()
minutes = task_minutes.result()
seconds = task_seconds.result()
milliseconds = task_milliseconds.result()
t = time.perf_counter() - t t = time.perf_counter() - t
@ -73,4 +92,4 @@ def run():
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig() logging.basicConfig()
run() asyncio.run(main())

View File

@ -31,21 +31,12 @@ type currentTimeResponse struct {
formatted_time string formatted_time string
} }
func GetCurrentTime(timezone string) currentTimeResponse { var serveCurrentTimeServiceClient pb_get_currenttime.ServeCurrentTimeServiceClient
conn, err := grpc.Dial(
fmt.Sprintf("%s:%d", SERVE_CURRENTTIME_HOST, SERVE_CURRENTTIME_PORT),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb_get_currenttime.NewServeCurrentTimeServiceClient(conn)
// Contact the server and print out its response. func GetCurrentTime(timezone string) currentTimeResponse {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
r, err := c.GetCurrentTime( r, err := serveCurrentTimeServiceClient.GetCurrentTime(
ctx, ctx,
&pb_get_currenttime.GetCurrentTimeRequest{Timezone: timezone}, &pb_get_currenttime.GetCurrentTimeRequest{Timezone: timezone},
) )
@ -53,14 +44,13 @@ func GetCurrentTime(timezone string) currentTimeResponse {
log.Fatalf("could not greet: %v", err) log.Fatalf("could not greet: %v", err)
} }
result := currentTimeResponse{ return currentTimeResponse{
hours: r.Hours, hours: r.Hours,
minutes: r.Minutes, minutes: r.Minutes,
seconds: r.Seconds, seconds: r.Seconds,
milliseconds: r.Milliseconds, milliseconds: r.Milliseconds,
formatted_time: r.FormattedTime, formatted_time: r.FormattedTime,
} }
return result
} }
func (s *grpc_server) GetHours( func (s *grpc_server) GetHours(
@ -105,9 +95,23 @@ func serve() {
log.Fatalf("failed to listen: %v", err) log.Fatalf("failed to listen: %v", err)
} }
// create ServeSegments server
server := grpc.NewServer() server := grpc.NewServer()
pb_serve_segments.RegisterServeSegmentsServer(server, &grpc_server{}) pb_serve_segments.RegisterServeSegmentsServer(server, &grpc_server{})
log.Printf("server listening at %v", lis.Addr()) log.Printf("server listening at %v", lis.Addr())
// create connection to ServeCurrentTime server
getCurrentTimeConn, err := grpc.Dial(
fmt.Sprintf("%s:%d", SERVE_CURRENTTIME_HOST, SERVE_CURRENTTIME_PORT),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer getCurrentTimeConn.Close()
// create client
serveCurrentTimeServiceClient = pb_get_currenttime.NewServeCurrentTimeServiceClient(getCurrentTimeConn)
if err := server.Serve(lis); err != nil { if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err) log.Fatalf("failed to serve: %v", err)
} }