我想使用RxPy打開一個(csv)文件並逐行處理文件。我正是我設想有以下步驟RxPy讀取csv文件和進程行
- 提供一個文件名到流
- 打開由線
- 文件
- 讀取文件中的行刪除與註釋(例如#開頭的行.. 。)
- 申請CSV閱讀
- 篩選記錄滿足某種條件
到目前爲止,我有:
def to_file(filename):
f = open(filename)
return Observable.using(
lambda: AnonymousDisposable(lambda: f.close()),
lambda d: Observable.just(f)
)
def to_reader(f):
return csv.reader(f)
def print_rows(reader):
for row in reader:
print(row)
這工作
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**map**(to_reader).subscribe(print_rows)
這不:ValueError異常:I/O操作上關閉的文件
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**flat_map**(to_rows).subscribe(print)
第二屆不會因爲工作(見https://github.com/ReactiveX/RxPY/issues/69)
When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.
任何想法我如何能實現: 喜歡的東西:
Observable.from_(["filename.csv", "filename2.csv"]
).flat_map(to_file
).filter(comment_lines
).filter(empty_lines
).map(to_csv_reader
).filter(filter_by..)
).do whatever
非常感謝您的幫助
克林斯曼