...34 """35 self.topic = topic36 self.default_version = default_version37 super(RpcProxy, self).__init__()38 def _set_version(self, msg, vers):39 """Helper method to set the version in a message.40 :param msg: The message having a version added to it.41 :param vers: The version number to add to the message.42 """43 msg['version'] = vers if vers else self.default_version44 def _get_topic(self, topic):45 """Return the topic to use for a message."""46 return topic if topic else self.topic47 @staticmethod48 def make_msg(method, **kwargs):49 return {'method': method, 'args': kwargs}50 def call(self, context, msg, topic=None, version=None, timeout=None):51 """ a remote method.52 :param context: The request context53 :param msg: The message to send, including the method and args.54 :param topic: Override the topic for this message.55 :param timeout: (Optional) A timeout to use when waiting for the56 response. If no timeout is specified, a default timeout will be57 used that is usually sufficient.58 :param version: (Optional) Override the requested API version in this59 message.60 :returns: The return value from the remote method.61 """62 self._set_version(msg, version)63 return, self._get_topic(topic), msg, timeout)64 def multicall(self, context, msg, topic=None, version=None, timeout=None):65 """rpc.multicall() a remote method.66 :param context: The request context67 :param msg: The message to send, including the method and args.68 :param topic: Override the topic for this message.69 :param timeout: (Optional) A timeout to use when waiting for the70 response. If no timeout is specified, a default timeout will be71 used that is usually sufficient.72 :param version: (Optional) Override the requested API version in this73 message.74 :returns: An iterator that lets you process each of the returned values75 from the remote method as they arrive.76 """77 self._set_version(msg, version)78 return rpc.multicall(context, self._get_topic(topic), msg, timeout)79 def cast(self, context, msg, topic=None, version=None):80 """rpc.cast() a remote method.81 :param context: The request context82 :param msg: The message to send, including the method and args.83 :param topic: Override the topic for this message.84 :param version: (Optional) Override the requested API version in this85 message.86 :returns: None. rpc.cast() does not wait on any return value from the87 remote method.88 """89 self._set_version(msg, version)90 rpc.cast(context, self._get_topic(topic), msg)91 def fanout_cast(self, context, msg, topic=None, version=None):92 """rpc.fanout_cast() a remote method.93 :param context: The request context94 :param msg: The message to send, including the method and args.95 :param topic: Override the topic for this message.96 :param version: (Optional) Override the requested API version in this97 message.98 :returns: None. rpc.fanout_cast() does not wait on any return value99 from the remote method.100 """101 self._set_version(msg, version)102 rpc.fanout_cast(context, self._get_topic(topic), msg)103 def cast_to_server(self, context, server_params, msg, topic=None,104 version=None):105 """rpc.cast_to_server() a remote method.106 :param context: The request context107 :param server_params: Server parameters. See rpc.cast_to_server() for108 details.109 :param msg: The message to send, including the method and args.110 :param topic: Override the topic for this message.111 :param version: (Optional) Override the requested API version in this112 message.113 :returns: None. rpc.cast_to_server() does not wait on any114 return values.115 """116 self._set_version(msg, version)117 rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)118 def fanout_cast_to_server(self, context, server_params, msg, topic=None,119 version=None):120 """rpc.fanout_cast_to_server() a remote method.121 :param context: The request context122 :param server_params: Server parameters. See rpc.cast_to_server() for123 details.124 :param msg: The message to send, including the method and args.125 :param topic: Override the topic for this message.126 :param version: (Optional) Override the requested API version in this127 message.128 :returns: None. rpc.fanout_cast_to_server() does not wait on any129 return values.130 """131 self._set_version(msg, version)132 rpc.fanout_cast_to_server(context, server_params,...

1# Copyright 2014-2015 MongoDB, Inc.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.1415"""Represent one server in the topology."""1617from pymongo.server_type import SERVER_TYPE18from pymongo.ismaster import IsMaster192021class ServerDescription(object):22 """Immutable representation of one server.2324 :Parameters:25 - `address`: A (host, port) pair26 - `ismaster`: Optional IsMaster instance27 - `round_trip_time`: Optional float28 - `error`: Optional, the last error attempting to connect to the server29 """3031 __slots__ = (32 '_address', '_server_type', '_all_hosts', '_tags', '_replica_set_name',33 '_primary', '_max_bson_size', '_max_message_size',34 '_max_write_batch_size', '_min_wire_version', '_max_wire_version',35 '_round_trip_time', '_me', '_is_writable', '_is_readable', '_error',36 '_set_version', '_election_id')3738 def __init__(39 self,40 address,41 ismaster=None,42 round_trip_time=None,43 error=None):44 self._address = address45 if not ismaster:46 ismaster = IsMaster({})4748 self._server_type = ismaster.server_type49 self._all_hosts = ismaster.all_hosts50 self._tags = ismaster.tags51 self._replica_set_name = ismaster.replica_set_name52 self._primary = ismaster.primary53 self._max_bson_size = ismaster.max_bson_size54 self._max_message_size = ismaster.max_message_size55 self._max_write_batch_size = ismaster.max_write_batch_size56 self._min_wire_version = ismaster.min_wire_version57 self._max_wire_version = ismaster.max_wire_version58 self._set_version = ismaster.set_version59 self._election_id = ismaster.election_id60 self._is_writable = ismaster.is_writable61 self._is_readable = ismaster.is_readable62 self._round_trip_time = round_trip_time63 self._me = ismaster.me64 self._error = error6566 @property67 def address(self):68 return self._address6970 @property71 def server_type(self):72 return self._server_type7374 @property75 def all_hosts(self):76 """List of hosts, passives, and arbiters known to this server."""77 return self._all_hosts7879 @property80 def tags(self):81 return self._tags8283 @property84 def replica_set_name(self):85 """Replica set name or None."""86 return self._replica_set_name8788 @property89 def primary(self):90 """This server's opinion about who the primary is, or None."""91 return self._primary9293 @property94 def max_bson_size(self):95 return self._max_bson_size9697 @property98 def max_message_size(self):99 return self._max_message_size100101 @property102 def max_write_batch_size(self):103 return self._max_write_batch_size104105 @property106 def min_wire_version(self):107 return self._min_wire_version108109 @property110 def max_wire_version(self):111 return self._max_wire_version112113 @property114 def set_version(self):115 return self._set_version116117 @property118 def election_id(self):119 return self._election_id120121 @property122 def election_tuple(self):123 return self._set_version, self._election_id124125 @property126 def me(self):127 return self._me128129 @property130 def round_trip_time(self):131 """The current average latency or None."""132 # This override is for unittesting only!133 if self._address in self._host_to_round_trip_time:134 return self._host_to_round_trip_time[self._address]135136 return self._round_trip_time137138 @property139 def error(self):140 """The last error attempting to connect to the server, or None."""141 return self._error142143 @property144 def is_writable(self):145 return self._is_writable146147 @property148 def is_readable(self):149 return self._is_readable150151 @property152 def is_server_type_known(self):153 return self.server_type != SERVER_TYPE.Unknown154155 # For unittesting only. Use under no circumstances! ...

