2017-03-08 16 views
2

我正在使用配有STH的Fiware Cygnus。當天鵝座在任何實體收到兩次或更多的變化通知時,它不能將新值發送到STH。它記錄錯誤collection already exists並且更改未保存。在第一次通知中,一切順利。Fiware Cygnus - 錯誤:當Cygnus收到兩次或多次通知時收集已存在

我已執行以下步驟:

  • 創建獵戶通知任何實體
  • 變化的實體屬性

在第一時間,Cygnus的正常工作,所以我再次改變屬性,Cygnus向我顯示錯誤。

如何解決這個問題?

全信息是:

*我正在使用的容器搬運工天鵝如文檔中所述。

*我用mongodb-ip:27017代替真實IP在這裏發帖。

time=2017-03-08T11:51:05.164Z | lvl=ERROR | 
corr=53d86140-03f5-11e7-a70e-080027f6529d | 
trans=236416c2-776e-4cc0-91dc-29bca203ea2a | srv=red | subsrv=/red/red | 
comp=cygnus-ngsi | op=processRollbackedBatches |msg=com.telefonica.iot.cygnus.sinks.NGSISink[394] : 
Persistence error. Message: -, Command failed with error -1: 
'collection already exists' on server <mongodb-ip>:27017. The full response is 
{ "ok" : 0.0, "errmsg" : "collection already exists" }, Stack trace: 
[com.telefonica.iot.cygnus.sinks.NGSISTHSink.persistOne(NGSISTHSink.java:158), 
com.telefonica.iot.cygnus.sinks.NGSISTHSink.persistBatch(NGSISTHSink.java:93), 
com.telefonica.iot.cygnus.sinks.NGSISink.processRollbackedBatches(NGSISink.java:387), 
com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:370), 
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68), 
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147), 
java.lang.Thread.run(Thread.java:745)] 
time=2017-03-08T11:51:05.164Z | lvl=INFO | corr=53d86140-03f5-11e7-a70e-080027f6529d | trans=236416c2-776e-4cc0-91dc-29bca203ea2a | srv=red | subsrv=/red/red | comp=cygnus-ngsi | op=doRollbackAgain | msg=com.telefonica.iot.cygnus.sinks.NGSISink[464] : Finishing internal transaction (53d86140-03f5-11e7-a70e-080027f6529d), this was retry #10 

,我用我的agent.conf(天鵝)的配置:

cygnus-ngsi.sources = http-source 
cygnus-ngsi.sinks = sth-sink 
cygnus-ngsi.channels = sth-channel 

cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource 
cygnus-ngsi.sources.http-source.channels = sth-channel 
cygnus-ngsi.sources.http-source.port = 5050 
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler 
cygnus-ngsi.sources.http-source.handler.notification_target = /notify 
cygnus-ngsi.sources.http-source.handler.default_service = default 
cygnus-ngsi.sources.http-source.handler.default_service_path =/
cygnus-ngsi.sources.http-source.interceptors = ts gi 
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp 
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder 
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /opt/apache-flume/conf/grouping_rules.conf 

cygnus-ngsi.sinks.sth-sink.type = com.telefonica.iot.cygnus.sinks.NGSISTHSink 
cygnus-ngsi.sinks.sth-sink.channel = sth-channel 
#cygnus-ngsi.sinks.sth-sink.enable_encoding = false 
#cygnus-ngsi.sinks.sth-sink.enable_grouping = false 
#cygnus-ngsi.sinks.sth-sink.enable_name_mappings = false 
#cygnus-ngsi.sinks.sth-sink.enable_lowercase = false 
cygnus-ngsi.sinks.sth-sink.data_model = dm-by-entity 
cygnus-ngsi.sinks.sth-sink.mongo_hosts = <mongodb-ip>:27017 
cygnus-ngsi.sinks.sth-sink.mongo_username = 
cygnus-ngsi.sinks.sth-sink.mongo_password = 
cygnus-ngsi.sinks.sth-sink.db_prefix = sth_ 
cygnus-ngsi.sinks.sth-sink.collection_prefix = sth_ 
cygnus-ngsi.sinks.sth-sink.resolutions = day,hour,minute 
#cygnus-ngsi.sinks.sth-sink.batch_size = 1 
#cygnus-ngsi.sinks.sth-sink.batch_timeout = 30 
#cygnus-ngsi.sinks.sth-sink.batch_ttl = 10 
#cygnus-ngsi.sinks.sth-sink.data_expiration = 0 
#cygnus-ngsi.sinks.sth-sink.ignore_white_spaces = true 


cygnus-ngsi.channels.sth-channel.type = com.telefonica.iot.cygnus.channels.CygnusMemoryChannel 
cygnus-ngsi.channels.sth-channel.capacity = 1000 
cygnus-ngsi.channels.sth-channel.transactionCapacity = 100 

STH側,我有以下的conf:

var config = {}; 

// STH server configuration 
//-------------------------- 
config.server = { 

    host: '10.0.2.15', 

    port: '8666', 


    defaultService: 'testservice', 


    defaultServicePath: '/testservicepath', 


    filterOutEmpty: 'true', 

    aggregationBy: ['day', 'hour', 'minute'], 

    temporalDir: 'temp' 
}; 

// Database configuration 
//------------------------ 
config.database = { 

    dataModel: 'collection-per-entity', 
user: '', 

    password: '', 

    URI: 'localhost:27017', 

    replicaSet: '', 

    prefix: 'sth_', 


    collectionPrefix: 'sth_', 

    poolSize: '5', 


    shouldStore: 'both', 
    truncation: { 


    expireAfterSeconds: '0', 
    size: '0', 

    max: '0' 
    }, 

    ignoreBlankSpaces: 'true', 
    nameMapping: { 

    enabled: 'false', 

    configFile: './name-mapping.json' 
    }, 

    nameEncoding: 'false' 
}; 

// Logging configuration 
//------------------------ 
config.logging = { 
    level: 'info', 
    NODE_ENV variable is set to 'development'. 

    format: 'pipe', 


    proofOfLifeInterval: '60' 
}; 

module.exports = config; 
+0

您使用的是哪個版本的MongoDB? – frb

+0

我使用的是v2.6.12 –

+1

https://github.com/telefonicaid/fiware-cygnus/blob/master/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md#section2.3.4 :) – frb

回答

2

MongoDB的錯誤處理是基於可能因版本不同而異的異常消息。

天鵝已經被編碼來檢測這些異常信息中包含字符串"code" : 48,這意味着收集已經存在,並且不採取任何措施。 「沒有完成」意味着沒有創建集合(因爲它已經存在)並且原始異常沒有進展。

問題是,當一個MongoDB的版本返回不包含"code" : 48串的消息。在這種情況下,假定例外情況與「收集已經存在」不同,因此它被推進。這就是你的情況。

+0

謝謝@frb!代碼中的貢獻值得歡迎嗎? –

+0

當然!你在想什麼? – frb

+0

試圖支持舊版本。你認爲這是一個好主意嗎? –

相關問題