[修改]修改了错误的exporter名称
This commit is contained in:
399
opcua-exporter/client/client.cpp
Normal file
399
opcua-exporter/client/client.cpp
Normal file
@@ -0,0 +1,399 @@
|
||||
//
|
||||
// Created by 闫鹏宇 on 2022/7/28.
|
||||
//
|
||||
|
||||
#include "client.h"
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <open62541/client_config_default.h>
|
||||
#include <cmath>
|
||||
|
||||
#include <utility>
|
||||
#include <open62541/client_highlevel_async.h>
|
||||
|
||||
|
||||
namespace std {
|
||||
template<>
|
||||
struct hash <std::vector<std::string>>{
|
||||
size_t operator()(const std::vector<std::string> &rhs) const noexcept {
|
||||
size_t res = 0;
|
||||
for (auto const &str: rhs) {
|
||||
res ^= std::hash<std::string>()(str);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
inline static spdlog::level::level_enum get_spd_level(UA_LogLevel level) {
|
||||
switch (level) {
|
||||
case UA_LogLevel::UA_LOGLEVEL_TRACE:
|
||||
return spdlog::level::trace;
|
||||
case UA_LogLevel::UA_LOGLEVEL_DEBUG:
|
||||
return spdlog::level::debug;
|
||||
case UA_LogLevel::UA_LOGLEVEL_INFO:
|
||||
return spdlog::level::info;
|
||||
case UA_LogLevel::UA_LOGLEVEL_WARNING:
|
||||
return spdlog::level::warn;
|
||||
case UA_LogLevel::UA_LOGLEVEL_ERROR:
|
||||
return spdlog::level::err;
|
||||
case UA_LogLevel::UA_LOGLEVEL_FATAL:
|
||||
return spdlog::level::critical;
|
||||
default:
|
||||
return spdlog::level::trace;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void opc_ua_log(void *logContext, UA_LogLevel level, UA_LogCategory category,
|
||||
const char *msg, va_list args) {
|
||||
(void) args;
|
||||
thread_local static char const *last_log_msg = nullptr;
|
||||
|
||||
auto c = static_cast<client *>(logContext);
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
if (msg != last_log_msg) {
|
||||
char buff[256] = {0};
|
||||
vsprintf(buff, msg, args);
|
||||
spdlog::log(get_spd_level(level), buff);
|
||||
}
|
||||
last_log_msg = msg;
|
||||
}
|
||||
|
||||
static void OPC_UA_Client_DeleteSubscriptionCallback(
|
||||
UA_Client *client,
|
||||
UA_UInt32 subId,
|
||||
void *subContext
|
||||
) {
|
||||
spdlog::trace(
|
||||
"OPC_UA_Client_DeleteSubscriptionCallback subId{}",
|
||||
subId
|
||||
);
|
||||
}
|
||||
|
||||
static void OPC_UA_Client_StatusChangeNotificationCallback(
|
||||
UA_Client *client,
|
||||
UA_UInt32 subId,
|
||||
void *subContext,
|
||||
UA_StatusChangeNotification *notification
|
||||
) {
|
||||
spdlog::trace(
|
||||
"OPC_UA_Client_StatusChangeNotificationCallback subId{}",
|
||||
subId
|
||||
);
|
||||
}
|
||||
|
||||
static void OPC_UA_Client_DataChangeNotificationCallback(
|
||||
UA_Client *client,
|
||||
UA_UInt32 subId,
|
||||
void *subContext,
|
||||
UA_UInt32 monId,
|
||||
void *monContext,
|
||||
UA_DataValue *value
|
||||
) {
|
||||
spdlog::trace(
|
||||
"OPC_UA_Client_DataChangeNotificationCallback subId{} monId",
|
||||
subId,
|
||||
monId
|
||||
);
|
||||
}
|
||||
|
||||
static void OPC_UA_Client_DeleteMonitoredItemCallback(
|
||||
UA_Client *client,
|
||||
UA_UInt32 subId,
|
||||
void *subContext,
|
||||
UA_UInt32 monId,
|
||||
void *monContext) {
|
||||
spdlog::trace(
|
||||
"OPC_UA_Client_DeleteMonitoredItemCallback subId{} monId",
|
||||
subId,
|
||||
monId
|
||||
);
|
||||
}
|
||||
|
||||
static void OPC_UA_ClientAsyncReadCallback(
|
||||
UA_Client *c, void *userdata,
|
||||
UA_UInt32 requestId,
|
||||
UA_ReadResponse *rr
|
||||
) {
|
||||
auto cc = static_cast<client *>(userdata);
|
||||
auto hash_key = cc->getAsyncReadRequestHashKey(requestId);
|
||||
cc->updateReadValueCache(hash_key, *rr);
|
||||
}
|
||||
|
||||
|
||||
client::client(std::string endpoint) :
|
||||
ua_endpoint(std::move(endpoint)) {
|
||||
ua_config.logger.context = this;
|
||||
ua_config.logger.log = &opc_ua_log;
|
||||
UA_ClientConfig_setDefault(&ua_config);
|
||||
ua_client = UA_Client_newWithConfig(&ua_config);
|
||||
for (int i = 0; i < async_read_request_ids.size(); ++i) {
|
||||
async_read_request_ids[i].first = i;
|
||||
async_read_request_ids[i].second = 0;
|
||||
}
|
||||
}
|
||||
|
||||
client::~client() {
|
||||
stop();
|
||||
UA_Client_delete(ua_client);
|
||||
}
|
||||
|
||||
void client::createSubscription() {
|
||||
if (create_subscription_response) {
|
||||
return;
|
||||
}
|
||||
auto request = UA_CreateSubscriptionRequest_default();
|
||||
create_subscription_response = UA_Client_Subscriptions_create(
|
||||
ua_client,
|
||||
request,
|
||||
this,
|
||||
&OPC_UA_Client_StatusChangeNotificationCallback,
|
||||
&OPC_UA_Client_DeleteSubscriptionCallback
|
||||
);
|
||||
}
|
||||
|
||||
void client::stop() {
|
||||
running = false;
|
||||
disconnect();
|
||||
}
|
||||
|
||||
void client::run() {
|
||||
UA_StatusCode retval = UA_STATUSCODE_GOOD;
|
||||
int error_time = 0;
|
||||
while (running) {
|
||||
if (!connect()) {
|
||||
//disconnect();
|
||||
spdlog::error("OPC-UA Server({}) not connected({}). Retrying to connect in 1 second.", ua_endpoint,
|
||||
retval);
|
||||
UA_sleep_ms(1000);
|
||||
continue;
|
||||
} else if (MONITOR == read_mode) {
|
||||
createSubscription();
|
||||
}
|
||||
while (running && UA_STATUSCODE_GOOD == retval) {
|
||||
std::unique_lock lock(ua_mutex);
|
||||
retval = UA_Client_run_iterate(ua_client, 100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool client::connect() {
|
||||
std::unique_lock lock(ua_mutex);
|
||||
return UA_STATUSCODE_GOOD == UA_Client_connect(ua_client, ua_endpoint.c_str());
|
||||
}
|
||||
|
||||
void client::disconnect() {
|
||||
std::unique_lock lock(ua_mutex);
|
||||
UA_Client_disconnect(ua_client);
|
||||
}
|
||||
|
||||
|
||||
std::vector<UA_ReadValueId> client::makeReadValueIds(std::vector<std::string> const &node_ids) {
|
||||
std::vector<UA_ReadValueId> ids;
|
||||
if (!node_ids.empty()) {
|
||||
for (auto const &node_id: node_ids) {
|
||||
ids.emplace_back(
|
||||
UA_ReadValueId{
|
||||
UA_NODEID(node_id.c_str()),
|
||||
UA_ATTRIBUTEID_VALUE,
|
||||
{},
|
||||
{}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
UA_ReadRequest &client::makeReadRequest(size_t hash_key) {
|
||||
|
||||
std::vector<UA_ReadValueId> &read_value_ids = ua_read_value_request_cache_[hash_key].read_value_ids;
|
||||
auto &request = ua_read_value_request_cache_[hash_key].read_request;
|
||||
request.nodesToRead = const_cast<UA_ReadValueId *>(read_value_ids.data());
|
||||
request.nodesToReadSize = read_value_ids.size();
|
||||
return request;
|
||||
}
|
||||
|
||||
void client::cachePreCheckAndInit(size_t hash_key, std::vector<std::string> const &node_ids) {
|
||||
auto ua_read_value_ids_iter = ua_read_value_request_cache_.find(hash_key);
|
||||
if (ua_read_value_ids_iter == ua_read_value_request_cache_.end()) {
|
||||
auto ids = makeReadValueIds(node_ids);
|
||||
if (ids.empty()) {
|
||||
return;
|
||||
}
|
||||
ua_read_value_request_cache_[hash_key].read_value_ids = std::move(ids);
|
||||
ua_read_value_request_cache_[hash_key].node_ids = node_ids;
|
||||
UA_ReadRequest_init(&ua_read_value_request_cache_[hash_key].read_request);
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t *client::getAsyncReadRequestId(size_t hash_key) {
|
||||
auto &ids = async_read_request_ids[(current_async_read_request_pos++) % async_read_request_ids.size()];
|
||||
ids.second = hash_key;
|
||||
return &ids.first;
|
||||
}
|
||||
|
||||
size_t client::getAsyncReadRequestHashKey(uint32_t req_id) {
|
||||
for (auto const &iter: async_read_request_ids) {
|
||||
if (iter.first == req_id) {
|
||||
return iter.second;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void client::sendReadRequest(size_t hash_key, bool force_sync) {
|
||||
ua_read_value_request_cache_[hash_key].last_read_request_time = std::chrono::system_clock::now();
|
||||
if (SYNC == read_mode || force_sync) {
|
||||
UA_ReadResponse response;
|
||||
{
|
||||
std::shared_lock lock(ua_mutex);
|
||||
response = UA_Client_Service_read(
|
||||
ua_client,
|
||||
makeReadRequest(
|
||||
hash_key
|
||||
)
|
||||
);
|
||||
}
|
||||
if (UA_STATUSCODE_GOOD == response.responseHeader.serviceResult) {
|
||||
updateReadValueCache(hash_key, response);
|
||||
}
|
||||
UA_ReadResponse_clear(&response);
|
||||
}
|
||||
switch (read_mode) {
|
||||
case ASYNC: {
|
||||
UA_Client_sendAsyncReadRequest(
|
||||
ua_client,
|
||||
&makeReadRequest(hash_key),
|
||||
&OPC_UA_ClientAsyncReadCallback,
|
||||
this,
|
||||
getAsyncReadRequestId(hash_key)
|
||||
);
|
||||
break;
|
||||
}
|
||||
case MONITOR: {
|
||||
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::chrono::system_clock::duration client::getReadRequestDuration(size_t hash_key) {
|
||||
return std::chrono::system_clock::now() -
|
||||
ua_read_value_request_cache_[hash_key].last_read_request_time;
|
||||
}
|
||||
|
||||
std::chrono::system_clock::duration client::getCacheUpdateDuration(size_t hash_key) {
|
||||
return std::chrono::system_clock::now() -
|
||||
ua_read_value_request_cache_[hash_key].last_read_response_time;
|
||||
}
|
||||
|
||||
std::map<std::string, double> client::readValue(std::vector<std::string> const &node_ids) {
|
||||
if (!node_ids.empty()) {
|
||||
auto node_ids_hash_value = std::hash<std::vector<std::string>>()(node_ids);
|
||||
cachePreCheckAndInit(node_ids_hash_value, node_ids);
|
||||
|
||||
std::lock_guard<std::mutex> lock(ua_read_mutex);
|
||||
|
||||
auto duration = getCacheUpdateDuration(node_ids_hash_value);
|
||||
if (duration > cache_timeout) {
|
||||
sendReadRequest(node_ids_hash_value, true);
|
||||
} else if (duration > cache_update_time) {
|
||||
sendReadRequest(node_ids_hash_value);
|
||||
}
|
||||
if (getCacheUpdateDuration(node_ids_hash_value) < cache_timeout) {
|
||||
return getCacheValue(node_ids_hash_value);
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
std::map<std::string, double> client::getCacheValue(size_t hash_key) {
|
||||
std::map<std::string, double> values;
|
||||
|
||||
auto iter = ua_read_value_request_cache_.find(hash_key);
|
||||
if (iter != ua_read_value_request_cache_.end()) {
|
||||
for (auto const &node_id: iter->second.node_ids) {
|
||||
values[node_id] = ua_read_value_cache[node_id];
|
||||
}
|
||||
}
|
||||
return values;
|
||||
}
|
||||
|
||||
void client::updateReadValueCache(size_t hash_key, UA_ReadResponse const &response) {
|
||||
auto iter = ua_read_value_request_cache_.find(hash_key);
|
||||
if (iter == ua_read_value_request_cache_.end()) {
|
||||
return;
|
||||
}
|
||||
if (response.resultsSize != iter->second.read_value_ids.size()) {
|
||||
return;
|
||||
}
|
||||
iter->second.last_read_response_time = std::chrono::system_clock::now();
|
||||
|
||||
for (int i = 0; i < response.resultsSize; ++i) {
|
||||
auto &data_value = response.results[i];
|
||||
setValueCache(iter->second.node_ids[i], getValueAsDouble(data_value));
|
||||
}
|
||||
}
|
||||
|
||||
void client::setValueCache(std::string const &node_id, double value) {
|
||||
ua_read_value_cache[node_id] = value;
|
||||
}
|
||||
|
||||
double client::getValueAsDouble(UA_DataValue const &ua_value) {
|
||||
if (ua_value.hasValue) {
|
||||
switch (ua_value.value.type->typeKind) {
|
||||
case UA_DATATYPEKIND_BOOLEAN: {
|
||||
return static_cast<double>(*static_cast<UA_Boolean *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_SBYTE: {
|
||||
return static_cast<double>(*static_cast<UA_SByte *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_BYTE: {
|
||||
return static_cast<double>(*static_cast<UA_Byte *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_INT16: {
|
||||
return static_cast<double>(*static_cast<UA_Int16 *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_UINT16: {
|
||||
return static_cast<double>(*static_cast<UA_UInt16 *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_INT32: {
|
||||
return static_cast<double>(*static_cast<UA_Int32 *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_UINT32: {
|
||||
return static_cast<double>(*static_cast<UA_UInt32 *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_INT64: {
|
||||
return static_cast<double>(*static_cast<UA_Int64 *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_UINT64: {
|
||||
return static_cast<double>(*static_cast<UA_UInt64 *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_FLOAT: {
|
||||
return static_cast<double>(*static_cast<UA_Float *>(ua_value.value.data));
|
||||
}
|
||||
case UA_DATATYPEKIND_DOUBLE: {
|
||||
return static_cast<double>(*static_cast<UA_Double *>(ua_value.value.data));
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return std::nan(nullptr);
|
||||
}
|
||||
|
||||
void client::readTest() {
|
||||
auto read_request = UA_ReadRequest_new();
|
||||
}
|
||||
|
||||
|
||||
UA_Client *client::getClient() {
|
||||
return ua_client;
|
||||
}
|
||||
83
opcua-exporter/client/client.h
Normal file
83
opcua-exporter/client/client.h
Normal file
@@ -0,0 +1,83 @@
|
||||
//
|
||||
// Created by 闫鹏宇 on 2022/7/28.
|
||||
//
|
||||
|
||||
#ifndef OPCUA_EXPORTER_CLIENT_H
|
||||
#define OPCUA_EXPORTER_CLIENT_H
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <open62541/client.h>
|
||||
#include <shared_mutex>
|
||||
#include <chrono>
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <open62541/client_subscriptions.h>
|
||||
#include <optional>
|
||||
|
||||
class client final {
|
||||
public:
|
||||
explicit client(std::string endpoint);
|
||||
~client();
|
||||
void stop();
|
||||
void run();
|
||||
bool connect();
|
||||
void disconnect();
|
||||
std::map<std::string, double> readValue(std::vector<std::string> const &node_ids);
|
||||
void readTest();
|
||||
UA_Client *getClient();
|
||||
size_t getAsyncReadRequestHashKey(uint32_t req_id);
|
||||
void updateReadValueCache(size_t hash_key, UA_ReadResponse const &response);
|
||||
|
||||
protected:
|
||||
std::vector<UA_ReadValueId> makeReadValueIds(std::vector<std::string> const &node_ids);
|
||||
UA_ReadRequest &makeReadRequest(size_t hash_key);
|
||||
std::map<std::string, double> getCacheValue(size_t hash_key);
|
||||
double getValueAsDouble(UA_DataValue const &ua_value);
|
||||
void cachePreCheckAndInit(size_t hash_key, std::vector<std::string> const &node_ids);
|
||||
void sendReadRequest(size_t hash_key, bool force_sync = false);
|
||||
std::chrono::system_clock::duration getCacheUpdateDuration(size_t hash_key);
|
||||
std::chrono::system_clock::duration getReadRequestDuration(size_t hash_key);
|
||||
void setValueCache(std::string const &node_id, double value);
|
||||
uint32_t *getAsyncReadRequestId(size_t hash_key);
|
||||
void createSubscription();
|
||||
protected:
|
||||
enum enum_read_mode {
|
||||
SYNC,
|
||||
ASYNC,
|
||||
MONITOR
|
||||
};
|
||||
|
||||
struct st_read_value_request_cache {
|
||||
std::chrono::system_clock::time_point last_read_response_time;
|
||||
std::chrono::system_clock::time_point last_read_request_time;
|
||||
std::vector<UA_ReadValueId> read_value_ids;
|
||||
UA_ReadRequest read_request;
|
||||
std::vector<std::string> node_ids;
|
||||
};
|
||||
private:
|
||||
UA_Client *ua_client{nullptr};
|
||||
UA_ClientConfig ua_config{};
|
||||
std::string ua_endpoint;
|
||||
std::shared_mutex ua_mutex;
|
||||
std::mutex ua_read_mutex;
|
||||
std::chrono::system_clock::duration cache_timeout{std::chrono::milliseconds(1000)};
|
||||
std::chrono::system_clock::duration cache_update_time{std::chrono::milliseconds(400)};
|
||||
|
||||
enum_read_mode read_mode{ASYNC};
|
||||
|
||||
std::unordered_map<std::size_t, st_read_value_request_cache> ua_read_value_request_cache_;
|
||||
std::map<std::string, double> ua_read_value_cache;
|
||||
|
||||
bool running{true};
|
||||
|
||||
std::array<std::pair<uint32_t, size_t>, 1024> async_read_request_ids;
|
||||
std::atomic_int64_t current_async_read_request_pos{0};
|
||||
|
||||
std::optional<UA_CreateSubscriptionResponse> create_subscription_response;
|
||||
};
|
||||
|
||||
|
||||
#endif //OPCUA_EXPORTER_CLIENT_H
|
||||
Reference in New Issue
Block a user