2012-05-17 28 views
1

我正在使用以下代碼來執行ssl服務器的ssl握手和證書驗證。如何通過python中的SSLsocket執行https請求

import ssl 
import socket 

s = socket.socket() 
print "connecting..." 
#logging.debug("Connecting") 
# Connect with SSL mutual authentication 
# We only trust our server's CA, and it only trusts user certificates signed by it 
c = ssl.wrap_socket(s, cert_reqs=ssl.CERT_REQUIRED, 
        ssl_version=ssl.PROTOCOL_SSLv3, ca_certs='ca.crt', 
        certfile='user.crt', keyfile='user.key') 
c.connect((constants.server_addr, constants.port)) 

我能夠獲取到服務器的連接和證書正確驗證,但是,我不知道該怎麼在這裏做。我需要在套接字上執行https操作,包括將XML發佈到REST API。我如何去做這件事?

回答

1

您可以使用您的wrap_socket代碼來擴展httplib.HTTPConnection,如this answer中所述。

(我還是會考慮使用類似PycURL,正如我在your previous question已經回答了。)

0

你可能想在urllib2.urlopen開始:http://docs.python.org/library/urllib2.html#urllib2.urlopen

,可以照顧HTTPS URL的,獲取,發佈,等你並不需要直接在低級別socketssl對象工作。但是,如果您使用的是Python 2.x,HTTPS連接將不會對服務器端證書進行任何驗證,這看起來就像您需要的(這很好)。不過,Python 3的urllib確實能做到這一點。

如果您使用的是Python 2,那麼您有幾個選項。一個是urllib2.HTTPSHandler的子類,以便它對其套接字進行適當的驗證。另一個是實現你自己需要的HTTP協議位(不推薦)。你也可以實例化各種不同的urllib2httplib對象,然後簡單地分配已經認證的ssl套接字來代替他們正在使用的套接字,儘管你需要非常小心他們的狀態不會混亂。儘管標準庫中的源代碼非常易讀,但如果您需要這樣做修補程序。

+1

你需要做一些手工SSL處理,如果你想使用httplib,httplib不會執行任何服務器證書驗證。同樣爲urllib2 ... – Bruno

+0

@布魯諾我通過另一個問題發現了。這就是爲什麼我必須使用我發佈的代碼。你知道如何使用該連接來獲得經過身份驗證的https連接嗎? – ewok

0

這正是我在我的項目做。這是我在項目中使用的REST客戶端模塊。它已被修改以適應我的需求,但我認爲您也可能會發現它也很有用。它需要使用httplib2:http://pypi.python.org/pypi/httplib2

""" 
    client.py 
    --------- 

    Modified to allow validation server's certificate with external cacert list. 
    -- Arif Widi Nugroho <[email protected]> 

    Copyright (C) 2008 Benjamin O'Steen 

    This file is part of python-fedoracommons. 

    python-fedoracommons is free software: you can redistribute it and/or modify 
    it under the terms of the GNU General Public License as published by 
    the Free Software Foundation, either version 3 of the License, or 
    (at your option) any later version. 

    python-fedoracommons is distributed in the hope that it will be useful, 
    but WITHOUT ANY WARRANTY; without even the implied warranty of 
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
    GNU General Public License for more details. 

    You should have received a copy of the GNU General Public License 
    along with python-fedoracommons. If not, see <http://www.gnu.org/licenses/>. 
""" 

__license__ = 'GPL http://www.gnu.org/licenses/gpl.txt' 
__author__ = "Benjamin O'Steen <[email protected]>, Arif Widi Nugroho <[email protected]>" 
__version__ = '0.1' 

import httplib2 
import urlparse 
import urllib 
import base64 
from base64 import encodestring 

from mime_types import * 

import mimetypes 

from cStringIO import StringIO 

class Connection(object): 
    def __init__(self, base_url, username=None, password=None, cache=None, ca_certs=None, user_agent_name=None): 
     self.base_url = base_url 
     self.username = username 
     m = MimeTypes() 
     self.mimetypes = m.get_dictionary() 

     self.url = urlparse.urlparse(base_url) 

     (scheme, netloc, path, query, fragment) = urlparse.urlsplit(base_url) 

     self.scheme = scheme 
     self.host = netloc 
     self.path = path 

     if user_agent_name is None: 
      self.user_agent_name = 'Basic Agent' 
     else: 
      self.user_agent_name = user_agent_name 

     # Create Http class with support for Digest HTTP Authentication, if necessary 
     # self.h = httplib2.Http(".cache") 
     self.h = httplib2.Http(cache=cache, ca_certs=ca_certs) 
     self.h.follow_all_redirects = True 
     if username and password: 
      self.h.add_credentials(username, password) 

    def request_get(self, resource, args = None, headers={}): 
     return self.request(resource, "get", args, headers=headers) 

    def request_delete(self, resource, args = None, headers={}): 
     return self.request(resource, "delete", args, headers=headers) 

    def request_head(self, resource, args = None, headers={}): 
     return self.request(resource, "head", args, headers=headers) 

    def request_post(self, resource, args = None, body = None, filename=None, headers={}): 
     return self.request(resource, "post", args , body = body, filename=filename, headers=headers) 

    def request_put(self, resource, args = None, body = None, filename=None, headers={}): 
     return self.request(resource, "put", args , body = body, filename=filename, headers=headers) 

    def get_content_type(self, filename): 
     extension = filename.split('.')[-1] 
     guessed_mimetype = self.mimetypes.get(extension, mimetypes.guess_type(filename)[0]) 
     return guessed_mimetype or 'application/octet-stream' 

    def request(self, resource, method = "get", args = None, body = None, filename=None, headers={}): 
     params = None 
     path = resource 
     headers['User-Agent'] = self.user_agent_name 

     BOUNDARY = u'00hoYUXOnLD5RQ8SKGYVgLLt64jejnMwtO7q8XE1' 
     CRLF = u'\r\n' 

     if filename and body: 
      #fn = open(filename ,'r') 
      #chunks = fn.read() 
      #fn.close() 

      # Attempt to find the Mimetype 
      content_type = self.get_content_type(filename) 
      headers['Content-Type']='multipart/form-data; boundary='+BOUNDARY 
      encode_string = StringIO() 
      encode_string.write(CRLF) 
      encode_string.write(u'--' + BOUNDARY + CRLF) 
      encode_string.write(u'Content-Disposition: form-data; name="file"; filename="%s"' % filename) 
      encode_string.write(CRLF) 
      encode_string.write(u'Content-Type: %s' % content_type + CRLF) 
      encode_string.write(CRLF) 
      encode_string.write(body) 
      encode_string.write(CRLF) 
      encode_string.write(u'--' + BOUNDARY + u'--' + CRLF) 

      body = encode_string.getvalue() 
      headers['Content-Length'] = str(len(body)) 
     elif body: 
      if not headers.get('Content-Type', None): 
       headers['Content-Type']='text/xml' 
      headers['Content-Length'] = str(len(body))   
     else: 
      headers['Content-Type']='text/xml' 

     if method.upper() == 'POST': 
      headers['Content-Type']='application/x-www-form-urlencoded' 

     if args: 
      path += u"?" + urllib.urlencode(args) 

     request_path = [] 
     if self.path != "/": 
      if self.path.endswith('/'): 
       request_path.append(self.path[:-1]) 
      else: 
       request_path.append(self.path) 
      if path.startswith('/'): 
       request_path.append(path[1:]) 
      else: 
       request_path.append(path) 

     resp, content = self.h.request(u"%s://%s%s" % (self.scheme, self.host, u'/'.join(request_path)), method.upper(), body=body, headers=headers) 

     return {u'headers':resp, u'body':content.decode('UTF-8')} 

用法示例(如果服務器證書不是由指定的CA簽署的連接會失敗):

c = client.Connection('https://localhost:8000', certs='/path/to/cacert.pem') 
# now post some data to the server 
response = c.request_post('rest/path/', body=some_urlencoded_data) 
if response['headers']['status'] == '200': 
    # do something...