auto commit

This commit is contained in:
bangdk 2024-11-20 16:26:07 +09:00
parent d4109b792f
commit 0a87b2308f
2 changed files with 1800 additions and 10 deletions

File diff suppressed because it is too large Load Diff

View File

@ -9,34 +9,112 @@ class MQTTService {
this.redis = redis;
this.brokerUrl = brokerUrl;
this.options = options;
this.connected = false;
this.reconnectAttempts = 0;
}
async connect() {
logger.info("Connecting to MQTT broker...", {
broker: this.brokerUrl,
clientId: this.options.clientId,
});
this.client = mqtt.connect(this.brokerUrl, {
...this.options,
clientId: `fems-realtime-api-${process.pid}`,
will: {
topic: "clients/realtime-api/status",
payload: "offline",
qos: 1,
retain: true,
},
});
this.client.on("connect", () => {
this.connected = true;
this.reconnectAttempts = 0;
logger.info("Connected to MQTT broker");
this.publishStatus("online");
this.subscribeToTopics();
});
this.client.on("reconnect", () => {
this.reconnectAttempts++;
logger.warn(
`Reconnecting to MQTT broker (attempt ${this.reconnectAttempts})`
);
});
this.client.on("offline", () => {
this.connected = false;
logger.warn("MQTT client is offline");
});
this.client.on("message", this.handleMessage.bind(this));
this.client.on("error", this.handleError.bind(this));
this.client.on("error", (error) => {
logger.error("MQTT error:", error);
if (error.message.includes("Not authorized")) {
logger.error("MQTT authentication failed. Please check credentials.", {
username: this.options.username,
});
}
});
// 연결 성공 여부 확인
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("MQTT connection timeout"));
}, this.options.connectTimeout || 30000);
this.client.once("connect", () => {
clearTimeout(timeout);
resolve();
});
this.client.once("error", (error) => {
clearTimeout(timeout);
reject(error);
});
});
}
publishStatus(status) {
if (this.connected) {
this.client.publish("clients/realtime-api/status", status, {
qos: 1,
retain: true,
});
}
}
subscribeToTopics() {
// 토픽 형식 변경: data/companyId/branchId/deviceId/dataType
this.client.subscribe("data/+/+/+/#", { qos: 1 });
const topics = ["data/+/+/+/#"];
topics.forEach((topic) => {
this.client.subscribe(topic, { qos: 1 }, (err) => {
if (err) {
logger.error(`Failed to subscribe to ${topic}:`, err);
} else {
logger.info(`Subscribed to ${topic}`);
}
});
});
}
async handleMessage(topic, message) {
try {
// 토픽 파싱 수정
// 토픽 파싱
const [, companyId, branchId, deviceId, dataType] = topic.split("/");
const data = JSON.parse(message.toString());
logger.debug("Received MQTT message:", {
topic,
companyId,
branchId,
deviceId,
dataType,
});
// 구독 상태 확인
const subscriptionStatus = await this.mainBackend.checkSubscription(
companyId
@ -49,6 +127,15 @@ class MQTTService {
return;
}
// 데이터 검증
if (!this.validateData(data)) {
logger.warn("Invalid data format received:", {
topic,
data,
});
return;
}
// 데이터 저장
await this.sensorData.insert({
time: new Date(data.timestamp),
@ -60,9 +147,10 @@ class MQTTService {
metadata: data.metadata,
});
// Redis 캐시 키 형식 변경
// Redis 캐시 갱신
const cacheKey = `latest:${companyId}:${branchId}:${deviceId}:${dataType}`;
await this.redis.set(
`latest:${companyId}:${branchId}:${deviceId}:${dataType}`,
cacheKey,
JSON.stringify({
...data,
receivedAt: new Date(),
@ -70,13 +158,93 @@ class MQTTService {
"EX",
300 // 5분 캐시
);
logger.debug("Successfully processed message:", {
topic,
cacheKey,
});
} catch (error) {
logger.error("Error handling message:", error);
logger.error("Error handling message:", {
error: error.message,
topic,
stack: error.stack,
});
}
}
handleError(error) {
logger.error("MQTT error:", error);
validateData(data) {
// 기본적인 데이터 형식 검증
if (!data || typeof data !== "object") return false;
if (!data.timestamp || isNaN(new Date(data.timestamp).getTime()))
return false;
if (data.value === undefined || data.value === null) return false;
return true;
}
async publish(topic, message, options = {}) {
return new Promise((resolve, reject) => {
if (!this.connected) {
reject(new Error("MQTT client is not connected"));
return;
}
this.client.publish(
topic,
JSON.stringify(message),
{
qos: 1,
...options,
},
(error) => {
if (error) {
logger.error("Failed to publish message:", {
topic,
error: error.message,
});
reject(error);
} else {
logger.debug("Message published successfully:", { topic });
resolve();
}
}
);
});
}
async disconnect() {
if (this.client) {
try {
await this.publishStatus("offline");
return new Promise((resolve) => {
this.client.end(true, {}, () => {
this.connected = false;
logger.info("Disconnected from MQTT broker");
resolve();
});
});
} catch (error) {
logger.error("Error during MQTT disconnect:", error);
throw error;
}
}
}
isConnected() {
return this.connected;
}
getReconnectAttempts() {
return this.reconnectAttempts;
}
getClientInfo() {
return {
connected: this.connected,
reconnectAttempts: this.reconnectAttempts,
clientId: this.options.clientId,
broker: this.brokerUrl,
};
}
}