var net = require('net');
|
var redis = require('redis');
|
var moment = require('moment');
|
var kafka = require('kafka-node');
|
var isEmpty = require('is-empty');
|
var config = require('./config');
|
var method = require('./method');
|
var log4js = require('log4js');
|
var crc16 = require('./hj212-2017/crc16_2017_Complement');
|
|
var redisClient = redis.createClient(config.REDIS_OPTS);
|
log4js.configure('./log4js.json');
|
var logger1 = log4js.getLogger('log_date');
|
|
var Producer = kafka.Producer;
|
var client = new kafka.KafkaClient({kafkaHost: config.KAFKA_HOST});
|
|
var producerOption = {
|
requireAcks: 1,
|
ackTimeoutMs: 100,
|
partitionerType: 3 //默认为第一个分区
|
};
|
var producer = new Producer(client,producerOption);
|
|
global.dev_sockets = {};
|
global.app_sockets = {};
|
|
let macs = [];
|
|
function pushData(mac, cp_stores, socket, entryTime, cn){
|
var originaltime = cp_stores['DataTime'];
|
var datatime = originaltime.substr(0, 4) + "/" + originaltime.substr(4, 2) + "/" + originaltime.substr(6, 2) + " " + originaltime.substr(8, 2) + ":" + originaltime.substr(10, 2) + ":" + originaltime.substr(12, 2);
|
var timeTamp = new Date(datatime).getTime();
|
var data = {
|
mac: mac,
|
time: moment().valueOf(),
|
entryTime: entryTime
|
//time: timeTamp
|
};
|
|
for (var key in cp_stores){
|
var value = cp_stores[key];
|
data[key] = value;
|
};
|
|
redisClient.get("is_start", function (err, reply) {
|
if (err) {
|
console.log(err);
|
} else {
|
if (reply == 1) {
|
// 发往调试页面 http://dbg.7drlb.com:3000/3rd_party
|
//producer.pushToKafka("devices_data_3rd_party");
|
}
|
}
|
});
|
|
var topic = '';
|
if (cn === '2011'){
|
topic = 'uav_second_data';
|
}else {
|
topic = 'test';
|
}
|
producer.send([{topic: topic, messages: JSON.stringify(data), key: JSON.stringify(mac)}], function (err, data) {
|
/*dataInsert.delete();
|
dataIns.delete();*/
|
if (err){
|
//console.log('++++++'+err);
|
//logger.error(err);
|
}
|
if (!err) {
|
//console.log("send message complete!data:" + JSON.stringify(data), new Date());
|
}
|
});
|
}
|
|
function handleData(socket, mac, stores, entryTime){
|
var cp_stores = method.get_cp_data_store(stores["CP"]);
|
//console.log("handleData => " + JSON.stringify(cp_stores));
|
|
var cn = stores['CN'];
|
|
pushData(mac, cp_stores, socket, entryTime, cn);
|
}
|
|
function doWork(socket, data, entryTime) {
|
if (isEmpty(data)) return;
|
|
if (!data.startsWith('##') && !data.startsWith('@@')) return;
|
|
if (data.length < 6) return;
|
var length = parseInt(data.substr(2, 4));
|
|
if (data.length < length + 10) return;
|
|
var stores = method.get_data_store(data, length);
|
|
if (isEmpty(stores) || isEmpty(stores['MN'])) return;
|
|
var mac = stores['MN'].toLowerCase();
|
|
// CRC校验
|
var crc_data = data.substr(6 + length, 4);
|
var data_val = data.substr(6, length);
|
if (isEmpty(crc_data) || crc16(data_val, data_val.length) != crc_data.toUpperCase()) return;
|
|
//反控
|
if (data.startsWith('@@')) { //云端控制
|
var command = data.replace("@@", "##");
|
if (!isEmpty(dev_sockets[mac]) && dev_sockets[mac].writable) {
|
// console.log("RC => " + command);
|
dev_sockets[mac].write(command);
|
}
|
if (!isEmpty(mac)) {
|
app_sockets[mac] = socket;
|
}
|
} else { //硬件设备
|
if (data.startsWith('##') && !isEmpty(mac)) {
|
dev_sockets[mac] = socket;
|
}
|
|
//console.log(stores['Flag']);
|
if(stores['Flag']==='5'){
|
var res = 'QN='+stores['QN']+';ST=91;CN=9014;PW=123456;MN='+mac+';Flag=4;CP=&&&&\r\n';
|
logger1.info(moment().format('YYYY-MM-DD HH:mm:ss') + ' ANSWER: ' + ':' + res);
|
//dev_sockets[mac].write(res);
|
socket.write(res);
|
}
|
|
if (!macs.includes(mac)) {
|
macs.push(mac);
|
// console.log(moment().format('YYYY-MM-DD HH:mm:ss') + " :: " + mac);
|
}
|
|
handleData(socket, mac, stores, entryTime);
|
}
|
|
doWork(socket, data.slice(length + 12), entryTime);
|
}
|
|
net.createServer().on('connection', function (socket) {
|
logger1.info(moment().format('YYYY-MM-DD HH:mm:ss') + ' CONNECTED: ' + socket.remoteAddress + ':' + socket.remotePort);
|
//console.log(moment().format('YYYY-MM-DD HH:mm:ss') + ' CONNECTED: ' + socket.remoteAddress + ':' + socket.remotePort);
|
|
socket.on('data', function (data) {
|
logger1.info(moment().format('YYYY-MM-DD HH:mm:ss') + ' DATA: ' + ':' + data);
|
var entryTime = moment().valueOf();
|
var str_data = data.toString();
|
var cur_time = moment().format('YYYY-MM-DD HH:mm:ss');
|
//console.log(cur_time + " => " + str_data);
|
|
redisClient.get("is_start", function (err, reply) {
|
if (err) {
|
console.log(err);
|
} else {
|
if (reply == 1) {
|
// 发往调试页面
|
//producer.pushToKafka("devices_data_3rd_party");
|
}
|
}
|
});
|
doWork(socket, str_data, entryTime);
|
});
|
|
socket.on('end', function () {
|
|
});
|
|
socket.on('error', function (error) {
|
console.log(moment().format('YYYY-MM-DD HH:mm:ss') + ': ' + error);
|
logger1.error(moment().format('YYYY-MM-DD HH:mm:ss') + ': ' + error)
|
//socket.end();
|
});
|
|
socket.on('timeout', function () {
|
//socket.end();
|
logger1.error(moment().format('YYYY-MM-DD HH:mm:ss') + ': ' + "timeout");
|
});
|
|
socket.on('close', function (data) {
|
console.log(moment().format('YYYY-MM-DD HH:mm:ss') + ' Closed socket: ' + socket.remoteAddress + ' ' + socket.remotePort);
|
logger1.info(moment().format('YYYY-MM-DD HH:mm:ss') + ' Closed socket: ' + socket.remoteAddress + ' ' + socket.remotePort)
|
for (var mac in dev_sockets) {
|
if (dev_sockets[mac] == socket) {
|
delete dev_sockets[mac];
|
break;
|
}
|
}
|
});
|
}).listen(config.UAV_PORT, config.HOST);
|