var net = require('net'); var redis = require('redis'); var moment = require('moment'); var kafka = require('kafka-node'); var isEmpty = require('is-empty'); var config = require('./config'); var method = require('./method'); var log4js = require('log4js'); var crc16 = require('./hj212-2017/crc16_2017_Complement'); var redisClient = redis.createClient(config.REDIS_OPTS); log4js.configure('./log4js.json'); var logger1 = log4js.getLogger('log_date'); var Producer = kafka.Producer; var client = new kafka.KafkaClient({kafkaHost: config.KAFKA_HOST}); var producerOption = { requireAcks: 1, ackTimeoutMs: 100, partitionerType: 3 //默认为第一个分区 }; var producer = new Producer(client,producerOption); global.dev_sockets = {}; global.app_sockets = {}; let macs = []; function pushData(mac, cp_stores, socket, entryTime, cn){ var originaltime = cp_stores['DataTime']; var datatime = originaltime.substr(0, 4) + "/" + originaltime.substr(4, 2) + "/" + originaltime.substr(6, 2) + " " + originaltime.substr(8, 2) + ":" + originaltime.substr(10, 2) + ":" + originaltime.substr(12, 2); var timeTamp = new Date(datatime).getTime(); var data = { mac: mac, time: moment().valueOf(), entryTime: entryTime //time: timeTamp }; for (var key in cp_stores){ var value = cp_stores[key]; data[key] = value; }; redisClient.get("is_start", function (err, reply) { if (err) { console.log(err); } else { if (reply == 1) { // 发往调试页面 http://dbg.7drlb.com:3000/3rd_party //producer.pushToKafka("devices_data_3rd_party"); } } }); var topic = ''; if (cn === '2011'){ topic = 'uav_second_data'; }else { topic = 'test'; } producer.send([{topic: topic, messages: JSON.stringify(data), key: JSON.stringify(mac)}], function (err, data) { /*dataInsert.delete(); dataIns.delete();*/ if (err){ //console.log('++++++'+err); //logger.error(err); } if (!err) { //console.log("send message complete!data:" + JSON.stringify(data), new Date()); } }); } function handleData(socket, mac, stores, entryTime){ var cp_stores = method.get_cp_data_store(stores["CP"]); //console.log("handleData => " + JSON.stringify(cp_stores)); var cn = stores['CN']; pushData(mac, cp_stores, socket, entryTime, cn); } function doWork(socket, data, entryTime) { if (isEmpty(data)) return; if (!data.startsWith('##') && !data.startsWith('@@')) return; if (data.length < 6) return; var length = parseInt(data.substr(2, 4)); if (data.length < length + 10) return; var stores = method.get_data_store(data, length); if (isEmpty(stores) || isEmpty(stores['MN'])) return; var mac = stores['MN'].toLowerCase(); // CRC校验 var crc_data = data.substr(6 + length, 4); var data_val = data.substr(6, length); if (isEmpty(crc_data) || crc16(data_val, data_val.length) != crc_data.toUpperCase()) return; //反控 if (data.startsWith('@@')) { //云端控制 var command = data.replace("@@", "##"); if (!isEmpty(dev_sockets[mac]) && dev_sockets[mac].writable) { // console.log("RC => " + command); dev_sockets[mac].write(command); } if (!isEmpty(mac)) { app_sockets[mac] = socket; } } else { //硬件设备 if (data.startsWith('##') && !isEmpty(mac)) { dev_sockets[mac] = socket; } //console.log(stores['Flag']); if(stores['Flag']==='5'){ var res = 'QN='+stores['QN']+';ST=91;CN=9014;PW=123456;MN='+mac+';Flag=4;CP=&&&&\r\n'; logger1.info(moment().format('YYYY-MM-DD HH:mm:ss') + ' ANSWER: ' + ':' + res); //dev_sockets[mac].write(res); socket.write(res); } if (!macs.includes(mac)) { macs.push(mac); // console.log(moment().format('YYYY-MM-DD HH:mm:ss') + " :: " + mac); } handleData(socket, mac, stores, entryTime); } doWork(socket, data.slice(length + 12), entryTime); } net.createServer().on('connection', function (socket) { logger1.info(moment().format('YYYY-MM-DD HH:mm:ss') + ' CONNECTED: ' + socket.remoteAddress + ':' + socket.remotePort); //console.log(moment().format('YYYY-MM-DD HH:mm:ss') + ' CONNECTED: ' + socket.remoteAddress + ':' + socket.remotePort); socket.on('data', function (data) { logger1.info(moment().format('YYYY-MM-DD HH:mm:ss') + ' DATA: ' + ':' + data); var entryTime = moment().valueOf(); var str_data = data.toString(); var cur_time = moment().format('YYYY-MM-DD HH:mm:ss'); //console.log(cur_time + " => " + str_data); redisClient.get("is_start", function (err, reply) { if (err) { console.log(err); } else { if (reply == 1) { // 发往调试页面 //producer.pushToKafka("devices_data_3rd_party"); } } }); doWork(socket, str_data, entryTime); }); socket.on('end', function () { }); socket.on('error', function (error) { console.log(moment().format('YYYY-MM-DD HH:mm:ss') + ': ' + error); logger1.error(moment().format('YYYY-MM-DD HH:mm:ss') + ': ' + error) //socket.end(); }); socket.on('timeout', function () { //socket.end(); logger1.error(moment().format('YYYY-MM-DD HH:mm:ss') + ': ' + "timeout"); }); socket.on('close', function (data) { console.log(moment().format('YYYY-MM-DD HH:mm:ss') + ' Closed socket: ' + socket.remoteAddress + ' ' + socket.remotePort); logger1.info(moment().format('YYYY-MM-DD HH:mm:ss') + ' Closed socket: ' + socket.remoteAddress + ' ' + socket.remotePort) for (var mac in dev_sockets) { if (dev_sockets[mac] == socket) { delete dev_sockets[mac]; break; } } }); }).listen(config.UAV_PORT, config.HOST);