3
沒有人有大量查詢創建一個新的插入工作的任何例子同時使用:使用POST請求和Java客戶端庫加載到BigQuery的任何示例?
- 的BigQuery的Java客戶端庫
- 創建從POST請求,這裏記錄負載工作:https://developers.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest
沒有人有大量查詢創建一個新的插入工作的任何例子同時使用:使用POST請求和Java客戶端庫加載到BigQuery的任何示例?
您需要致電bigquery.jobs().insert(...)方法。
我不知道你做了什麼還沒有,但你應該有一個驗證的客戶端的API至少像:
bigquery = new Bigquery.Builder(HTTP_TRANSPORT, JSON_FACTORY, credentials)
.setApplicationName("...").build();
這是一個insertRows方法我用google-http-client library for java和寫的簡化版本bigquery-api(你應該檢查該數據集的存在,驗證IDS等):
public Long insertRows(String projectId,
String datasetId,
String tableId,
InputStream schema,
AbstractInputStreamContent data) {
try {
// Defining table fields
ObjectMapper mapper = new ObjectMapper();
List<TableFieldSchema> schemaFields = mapper.readValue(schema, new TypeReference<List<TableFieldSchema>>(){});
TableSchema tableSchema = new TableSchema().setFields(schemaFields);
// Table reference
TableReference tableReference = new TableReference()
.setProjectId(projectId)
.setDatasetId(datasetId)
.setTableId(tableId);
// Load job configuration
JobConfigurationLoad loadConfig = new JobConfigurationLoad()
.setDestinationTable(tableReference)
.setSchema(tableSchema)
// Data in Json format (could be CSV)
.setSourceFormat("NEWLINE_DELIMITED_JSON")
// Table is created if it does not exists
.setCreateDisposition("CREATE_IF_NEEDED")
// Append data (not override data)
.setWriteDisposition("WRITE_APPEND");
// If your data are coming from Google Cloud Storage
//.setSourceUris(...);
// Load job
Job loadJob = new Job()
.setJobReference(
new JobReference()
.setJobId(Joiner.on("-").join("INSERT", projectId, datasetId,
tableId, DateTime.now().toString("dd-MM-yyyy_HH-mm-ss-SSS")))
.setProjectId(projectId))
.setConfiguration(new JobConfiguration().setLoad(loadConfig));
// Job execution
Job createTableJob = bigquery.jobs().insert(projectId, loadJob, data).execute();
// If loading data from Google Cloud Storage
//createTableJob = bigquery.jobs().insert(projectId, loadJob).execute();
String jobId = createTableJob.getJobReference().getJobId();
// Wait for job completion
createTableJob = waitForJob(projectId, createTableJob);
Long rowCount = createTableJob != null ? createTableJob.getStatistics().getLoad().getOutputRows() : 0l;
log.info("{} rows inserted in table '{}' (dataset: '{}', project: '{}')", rowCount, tableId, datasetId, projectId);
return rowCount;
}
catch (IOException e) { throw Throwables.propagate(e); }
}
我不知道你的數據的格式,但如果你正在使用的文件,你可以添加一個功能,如:
public Long insertRows(String projectId, String datasetId, String tableId, File schema, File data) {
try {
return insertRows(projectId, datasetId, tableId, new FileInputStream(schema),
new FileContent(MediaType.OCTET_STREAM.toString(), data));
}
catch (FileNotFoundException e) { throw Throwables.propagate(e); }
}