2016-02-10 47 views
0

我有這樣一段代碼:多處理池:操作超時

import urllib2, json, csv 
import requests 
import itertools 
import multiprocessing 
import numpy 
from datetime import datetime, date, timedelta 

def getTaxiTrips(date): 
    """ 
    Gets the taxi trips occurred in NY from a starting date. 
    :param date: (Y-m-d). 
    :return: list of tuples (long, lat, drop off date). 
    """ 
    today = str(datetime.date(datetime.now())).split('-') 
    today_y = today[0] 
    today_m = today[1] 

    start = date.split('-') 
    start_y = start[0] 
    start_m = start[1] 

    print start_m + "-" + start_y + "/" + today_m + "-" + today_y 

    data = [] 
    y = int(start_y) 
    m = int(start_m) 
    while int(start_y) <= int(today_y): 
     # Month transformation 
     if m > 12: 
      m %= 12 
      y += 1 

     mt = str(m) if m > 9 else '0' + str(m) 
     # Green cabs 
     if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) + \ 
        "/green_tripdata_" + str(y) + "-" + mt + ".csv") is not None: 
      data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) + \ 
         "/green_tripdata_" + str(y) + "-" + mt + ".csv") 

     # Yellow cabs 
     if readCSV("https://storage.googleapis.com/tlc-trip-data/" + str(y) + 
        "/yellow_tripdata_" + str(y) + "-" + mt + ".csv") is not None: 
      data.append("https://storage.googleapis.com/tlc-trip-data/" + str(y) + 
         "/yellow_tripdata_" + str(y) + "-" + mt + ".csv") 

     if m == int(today_m): 
      break 
     m += 1 

    pool = multiprocessing.Pool(mps-1) 
    result = pool.map(consumeTaxiData, data) 
    pool.close() 
    pool.join() 

    return list(itertools.chain(*result)) 


def consumeTaxiData(url): 
    """ 
    Given a url, reads its content and process its data. 
    :param url: the url to be readen. 
    :return: a list of tuples in the form (long, lat, hour). 
    """ 
    print "Processing", url 
    points = [] 

    data = readCSV(url) 
    for line in data: 
     latitude = line.get('dropoff_latitude', None) 
     if latitude is None: 
      latitude = line.get('Dropoff_latitude', None) 

     longitude = line.get('dropoff_longitude', None) 
     if longitude is None: 
      longitude = line.get('Dropoff_longitude', None) 

     time = line.get('tpep_dropoff_datetime', None) 
     if time is None: 
      time = line.get('Lpep_dropoff_datetime', None) 

     if time is not None: 
      time = datetime.strptime(time, '%Y-%m-%d %H:%M:%S') 
      if latitude is not None and longitude is not None and time >= datetime.strptime(date, '%Y-%m-%d') and \ 
        time.weekday(): 
       time = roundTime(time, roundTo=60 * 60).hour 
       points.append((float(longitude), float(latitude), time)) 

    return points 

def readCSV(url): 
    """ 
    Read a csv file. 
    :param url: url to be read. 
    :return: an array of dictionaries. 
    """ 
    try: 
     response = urllib2.urlopen(url) 
     return csv.DictReader(response, delimiter=',') 
    except urllib2.HTTPError as e: 
     return None 

def roundTime(dt=None, roundTo=60): 
    """ 
    Round a datetime object to any time laps in seconds 
    :param dt: datetime.datetime object, default now. 
    :param roundTo: closest number of seconds to round to, default 1 minute. 
    :return: the rounded time. 
    """ 
    if dt == None : dt = datetime.now() 
    seconds = (dt - dt.min).seconds 
    rounding = (seconds+roundTo/2) // roundTo * roundTo 
    return dt + timedelta(0, rounding-seconds, -dt.microsecond) 

if __name__ == '__main__': 
    mps = multiprocessing.cpu_count() 

    date = str(datetime.date(datetime.now()) - timedelta(31*8)) 
    print "-----> Inital date:", date 

    print "-----> Getting taxi data..." 
    taxi_dropoffs = getTaxiTrips(date) 
    print len(taxi_dropoffs), "taxi trips" 

,這是工作得很好,這些數據:

https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv 
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv 

現在,我想處理更多的數據:

https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-06.csv 
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-07.csv 
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-08.csv 
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-08.csv 
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-07.csv 
https://storage.googleapis.com/tlc-trip-data/2015/yellow_tripdata_2015-06.csv 
https://storage.googleapis.com/tlc-trip-data/2015/green_tripdata_2015-09.csv 

我不斷收到這樣的信息:

Traceback (most recent call last): 
    File "noiseInference.py", line 489, in <module> 
    taxi_dropoffs = getTaxiTrips(date) 
    File "noiseInference.py", line 300, in getTaxiTrips 
    result = pool.map(consumeTaxiData, data) 
    File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 251, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get 
    raise self._value 
socket.error: [Errno 60] Operation timed out 

因爲每個.csv文件很大,我正在處理它們中的很多,所以我期待處理方法需要一些時間。但是,處理正在終止。我該如何解決這個問題?

本數據來源於此:http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml

+0

1.6GB不是很大。如果您確實需要使用CSV,那就好比多了......但如果產生的CSV爲1.6GB,則沒有人應該嘗試使用CSV。但是,1.6GB並不是很多。 –

+0

我看@MarcusMüller。問題在於,數據如何存儲在這裏http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml。 – pceccon

+0

嗯,重點是可能你下載的持續時間是有問題的事情? –

回答

0

multiprocessing.Pool的輸出通常是非常容易引起誤解。

回溯:

return self.map_async(func, iterable, chunksize).get() 
File "/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get 
    raise self._value 
    socket.error: [Errno 60] Operation timed out 

顯示map_async.get提高存儲在其_value屬性的錯誤。該屬性僅包含在該過程中引發的錯誤。 錯誤非常明顯:套接字操作超時。

爲了更好地瞭解我建議您在multiprocessing.Pool.map_async之外重現的問題。