#7075 Retry grpc commands on "connection reset by peer" errors.

Grpc connection is sometimes reset on flaky networks, and this is not
handled by the python grpc library. Solved by intercepting UNAVAILABLE
responses and retrying the command.

Adapted from this issue in python grpc repo:
https://github.com/grpc/grpc/issues/19514
This commit is contained in:
Kristian Bendiksen
2020-12-18 17:20:15 +01:00
parent 401389bd80
commit d1c0226fd2
2 changed files with 60 additions and 1 deletions

View File

@@ -0,0 +1,44 @@
import grpc
class RetryOnRpcErrorClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.StreamUnaryClientInterceptor
):
def __init__(
self,
*,
retry_policy,
status_for_retry,
):
self.retry_policy = retry_policy
self.status_for_retry = status_for_retry
def _intercept_call(self, continuation, client_call_details, request_or_iterator):
for retry_num in range(self.retry_policy.num_retries()):
response = continuation(client_call_details, request_or_iterator)
if isinstance(response, grpc.RpcError):
# Return if it was last attempt
if retry_num == (self.retry_policy.num_retries() - 1):
return response
# If status code is not in retryable status codes
if (
self.status_for_retry
and response.code() not in self.status_for_retry
):
return response
self.retry_policy.sleep(retry_num)
else:
return response
def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)
def intercept_stream_unary(
self, continuation, client_call_details, request_iterator
):
return self._intercept_call(continuation, client_call_details, request_iterator)

View File

@@ -22,6 +22,8 @@ import RiaVersionInfo
from .project import Project
from .retry_policy import ExponentialBackoffRetryPolicy
from .grpc_retry_interceptor import RetryOnRpcErrorClientInterceptor
class Instance:
"""The ResInsight Instance class. Use to launch or find existing ResInsight instances
@@ -204,8 +206,21 @@ class Instance:
self.version_string(), " ",
self.client_version_string())
# Intercept UNAVAILABLE errors and retry on failures
interceptors = (
RetryOnRpcErrorClientInterceptor(
retry_policy=ExponentialBackoffRetryPolicy(min_backoff=100, max_backoff=5000, max_num_retries=20),
status_for_retry=(grpc.StatusCode.UNAVAILABLE,),
),
)
intercepted_channel = grpc.intercept_channel(self.channel, *interceptors)
# Recreate ommand stubs with the retry policy
self.commands = Commands_pb2_grpc.CommandsStub(intercepted_channel)
# Service packages
self.project = Project.create(self.channel)
self.project = Project.create(intercepted_channel)
path = os.getcwd()
self.set_start_dir(path=path)