Skip to content

物联网实战面试题

物联网概述:

物联网技术(IoT,Internet of Things)指的是通过互联网将各种物理设备和物品连接起来,实现数据交换和通信的技术。物联网技术涵盖了从传感器到云计算的一整套技术体系,旨在通过数据的收集、传输和分析来实现智能化控制和管理。以下是物联网技术的几个关键组成部分:

  1. 传感器和设备:这些是物联网的前端设备,负责采集环境数据,如温度、湿度、光照、压力等。传感器将这些物理现象转换为可处理的数字信号。

  2. 网络连接:这是物联网设备与服务器或其他设备之间的数据传输通道。常见的网络连接方式包括Wi-Fi、蓝牙、蜂窝网络(如4G、5G)、Zigbee、LoRa等。

  3. 数据处理和存储:传感器采集的数据通过网络传输到中央服务器或云端,进行数据的存储和初步处理。数据存储可以是本地存储,也可以是云存储。

  4. 数据分析:收集到的大量数据需要通过分析工具进行处理,以提取有用的信息。这些分析工具可能包括机器学习算法、大数据分析平台等,用于实现预测性维护、行为分析等应用。

  5. 控制和决策:根据数据分析的结果,物联网系统可以做出相应的控制和决策。例如,智能家居系统可以根据传感器数据自动调节温度和照明;工业物联网可以实时监控设备状态,预防故障。

  6. 用户接口和应用程序:这些是用户与物联网系统交互的界面。它们可以是移动应用程序、网页应用程序或专用的控制面板,用户可以通过这些接口监控和控制物联网设备。

物联网技术的应用非常广泛,涵盖了智能家居、智慧城市、智能交通、智能医疗、工业自动化、环境监测等多个领域。例如,在智能家居中,用户可以通过手机应用远程控制家中的电器设备;在智慧城市中,物联网技术可以用于交通管理、环境监测和公共安全等方面;在工业自动化中,物联网可以实现设备的远程监控和维护,提高生产效率和安全性。

物联网协议详解:

OSI参考模型 OSI(Open System Interconnect),即开放式系统互联。一般都叫OSI参考模型,是ISO(国际标准化组织)组织在1985年研究的网络互连模型。ISO为了更好的使网络应用更为普及,推出了OSI参考模型,这样所有的公司都按照统一的标准来指定自己的网络,就可以互通互联了。 OSI定义了网络互连的七层框架(物理层、数据链路层、网络层、传输层、会话层、表示层、应用层)。

img

一、TCP/IP:

TCP/IP四层协议(数据链路层、网络层、传输层、应用层)

1、应用层 应用层最靠近用户的一层,是为计算机用户提供应用接口,也为用户直接提供各种网络服务。我们常见应用层的网络服务协议有:HTTP,HTTPS,FTP,TELNET等。

2、传输层 建立了主机端到端的链接,传输层的作用是为上层协议提供端到端的可靠和透明的数据传输服务,包括处理差错控制和流量控制等问题。该层向高层屏蔽了下层数据通信的细节,使高层用户看到的只是在两个传输实体间的一条主机到主机的、可由用户控制和设定的、可靠的数据通路。我们通常说的,TCP UDP就是在这一层。端口号既是这里的“端”。

3、网络层 本层通过IP寻址来建立两个节点之间的连接,为源端的运输层送来的分组,选择合适的路由和交换节点,正确无误地按照地址传送给目的端的运输层。就是通常说的IP层。这一层就是我们经常说的IP协议层。IP协议是Internet的基础。

4、数据链路层 通过一些规程或协议来控制这些数据的传输,以保证被传输数据的正确性。实现这些规程或协议的硬件和软件加到物理线路,这样就构成了数据链路。

1、TCP 协议有哪些主要特点?

  1. 面向连接: TCP 是一种面向连接的协议,通信双方在传输数据之前需要先建立连接。连接建立后,数据的传输是可靠的。
  2. 可靠性: TCP 提供可靠的数据传输服务。它通过序号、确认和重传机制来确保数据的可靠性。如果发现数据包丢失或损坏,TCP 会重新传输数据。
  3. 流控制: TCP 使用流控制机制来防止快速发送方导致慢速接收方无法处理的情况。通过接收方发送的窗口大小,TCP 调整发送方的发送速率,以适应网络状况和接收方的处理能力。
  4. 拥塞控制: TCP 通过拥塞控制机制来防止网络拥塞。当网络拥塞时,TCP 会降低发送速率以减轻网络负担,从而保持整体网络的稳定性。
  5. 全双工通信: TCP 支持全双工通信,允许双方在连接建立后同时发送和接收数据。
  6. 面向字节流: TCP 是面向字节流的协议,它不关心数据的边界。发送方将数据划分为小的数据块,而接收方会根据需要重组这些数据块。
  7. 三次握手和四次挥手: 在建立连接和关闭连接的过程中,TCP 使用三次握手和四次挥手的机制,以确保双方同步状态,避免不必要的错误。
  8. 提供错误检测和纠正: TCP 使用校验和机制对数据进行错误检测,同时在发现错误时采取重传等措施进行纠正。
  9. 面向字节流: TCP 不关心应用层的消息边界,而是将数据视为一连续的字节流进行传输。这使得应用层可以以更灵活的方式使用TCP协议。

2、TCP、UDP的区别?

TCP与UDP区别总结:

  1. TCP面向连接(如打电话要先拨号建立连接);UDP是无连接的,即发送数据之前不需要建立连接。
  2. TCP提供可靠的服务。也就是说,通过TCP连接传送的数据,无差错,不丢失,不重复,且按序到达;UDP尽最大努力交付,即不保证可靠交付
  3. TCP面向字节流,实际上是TCP把数据看成一连串无结构的字节流;UDP是面向报文的 UDP没有拥塞控制,因此网络出现拥塞不会使源主机的发送速率降低(对实时应用很有用,如IP电话,实时视频会议等)
  4. 每一条TCP连接只能是点到点的;UDP支持一对一,一对多,多对一和多对多的交互通信
  5. TCP首部开销20字节;UDP的首部开销小,只有8个字节
  6. TCP的逻辑通信信道是全双工的可靠信道,UDP则是不可靠信道

3、TCP协议如何保证可靠传输?

校验和,确认应答和序列号

序列号:TCP传输时将每个字节的数据都进行了编号,这就是序列号。

确认应答:TCP传输的过程中,每次接收方收到数据后,都会对传输方进行确认应答。也就是发送ACK报文。

这个ACK报文当中带有对应的确认序列号,告诉发送方,接收到了哪些数据,下一次的数据从哪里发。

img

序列号的作用不仅仅是应答的作用,有了序列号能够将接收到的数据根据序列号排序,并且去掉重复序列号的数据。这也是TCP传输可靠性的保证之一。

超时重传:

当TCP发出一个段后,它启动一个定时器,等待目的端确认收到这个报文段。如果不能及时收到一个确认,将重发这个报文段。

流量控制:

TCP连接的每一方都有固定大小的缓冲空间,TCP的接收端只允许发送端发送接收端缓冲区能接纳的数据。当接收方来不及处理发送方的数据,能提示发送方降低发送的速率,防止包丢失。

TCP使用的流量控制协议是可变大小的滑动窗口协议。接收方有即时窗口(滑动窗口),随ACK报文发送

拥塞控制:

如果网络出现拥塞,分组将会丢失,此时发送方会继续重传,从而导致网络拥塞程度更高。因此当出现拥塞时,应当控制发送方的速率。

这一点和流量控制很像,但是出发点不同。流量控制是为了让接收方能来得及接收,而拥塞控制是为了降低整个网络的拥塞程度。

img

TCP 主要通过四个算法来进行拥塞控制:慢开始、拥塞避免、快重传、快恢复。

4、TCP的握手、挥手机制?

三次握手

所谓三次握手(Three-way Handshake),是指建立一个 TCP 连接时,需要客户端和服务器总共发送3个包。

三次握手的目的是连接服务器指定端口,建立 TCP 连接,并同步连接双方的序列号和确认号,交换 TCP 窗口大小信息。在 socket 编程中,客户端执行 connect() 时。将触发三次握手。

img
  • 第一次握手(SYN=1, seq=x):

客户端发送一个 TCP 的 SYN 标志位置 1 的包,指明客户端打算连接的服务器的端口,以及初始序号 X,保存在包头的序列号 (Sequence Number) 字段里。

发送完毕后,客户端进入 SYN_SEND 状态。

  • 第二次握手(SYN=1, ACK=1, seq=y, ACKnum=x+1):

服务器发回确认包(ACK)应答。即 SYN 标志位和 ACK 标志位均为 1。服务器端选择自己 ISN 序列号,放到 Seq 域里,同时将确认序号(Acknowledgement Number)设置为客户的 ISN 加1,即 X+1。 发送完毕后,服务器端进入 SYN_RCVD 状态。

  • 第三次握手(ACK=1,ACKnum=y+1)

客户端再次发送确认包(ACK),SYN 标志位为 0,ACK 标志位为 1,并且把服务器发来 ACK 的序号字段 +1,放在确定字段中发送给对方,并且在数据段放写 ISN 的 +1

发送完毕后,客户端进入 ESTABLISHED 状态,当服务器端接收到这个包时,也进入 ESTABLISHED 状态,TCP 握手结束。

四次挥手

TCP 的连接的拆除需要发送四个包,因此称为四次挥手(Four-way handshake),也叫做改进的三次握手。客户端或服务器均可主动发起挥手动作,在 socket 编程中,任何一方执行 close() 操作即可产生挥手操作。

img
  • 第一次挥手(FIN=1,seq=x)

假设客户端想要关闭连接,客户端发送一个 FIN 标志位置为1的包,表示自己已经没有数据可以发送了,但是仍然可以接受数据。

发送完毕后,客户端进入 FIN_WAIT_1 状态。

  • 第二次挥手(ACK=1,ACKnum=x+1)

服务器端确认客户端的 FIN 包,发送一个确认包,表明自己接受到了客户端关闭连接的请求,但还没有准备好关闭连接。

发送完毕后,服务器端进入 CLOSE_WAIT 状态,客户端接收到这个确认包之后,进入 FIN_WAIT_2 状态,等待服务器端关闭连接。

  • 第三次挥手(FIN=1,seq=y)

服务器端准备好关闭连接时,向客户端发送结束连接请求,FIN 置为1。

发送完毕后,服务器端进入 LAST_ACK 状态,等待来自客户端的最后一个ACK。

  • 第四次挥手(ACK=1,ACKnum=y+1)

客户端接收到来自服务器端的关闭请求,发送一个确认包,并进入 TIME_WAIT状态,等待可能出现的要求重传的 ACK 包。

服务器端接收到这个确认包之后,关闭连接,进入 CLOSED 状态。

客户端等待了某个固定时间(两个最大段生命周期,2MSL,2 Maximum Segment Lifetime)之后,没有收到服务器端的 ACK ,认为服务器端已经正常关闭连接,于是自己也关闭连接,进入 CLOSED 状态。

5、SYN攻击

  • 什么是 SYN 攻击(SYN Flood)?

    在三次握手过程中,服务器发送 SYN-ACK 之后,收到客户端的 ACK 之前的 TCP 连接称为半连接(half-open connect)。此时服务器处于 SYN_RCVD 状态。当收到 ACK 后,服务器才能转入 ESTABLISHED 状态.

    SYN 攻击指的是,攻击客户端在短时间内伪造大量不存在的IP地址,向服务器不断地发送SYN包,服务器回复确认包,并等待客户的确认。由于源地址是不存在的,服务器需要不断的重发直至超时,这些伪造的SYN包将长时间占用未连接队列,正常的SYN请求被丢弃,导致目标系统运行缓慢,严重者会引起网络堵塞甚至系统瘫痪。

    SYN 攻击是一种典型的 DoS/DDoS 攻击。

  • 如何检测 SYN 攻击?

    检测 SYN 攻击非常的方便,当你在服务器上看到大量的半连接状态时,特别是源IP地址是随机的,基本上可以断定这是一次SYN攻击。在 Linux/Unix 上可以使用系统自带的 netstats 命令来检测 SYN 攻击。

  • 如何防御 SYN 攻击?

    SYN攻击不能完全被阻止,除非将TCP协议重新设计。我们所做的是尽可能的减轻SYN攻击的危害,常见的防御 SYN 攻击的方法有如下几种:

    • 缩短超时(SYN Timeout)时间
    • 增加最大半连接数
    • 过滤网关防护
    • SYN cookies技术

6、TCP KeepAlive

TCP 的连接,实际上是一种纯软件层面的概念,在物理层面并没有“连接”这种概念。TCP 通信双方建立交互的连接,但是并不是一直存在数据交互,有些连接会在数据交互完毕后,主动释放连接,而有些不会。在长时间无数据交互的时间段内,交互双方都有可能出现掉电、死机、异常重启等各种意外,当这些意外发生之后,这些 TCP 连接并未来得及正常释放,在软件层面上,连接的另一方并不知道对端的情况,它会一直维护这个连接,长时间的积累会导致非常多的半打开连接,造成端系统资源的消耗和浪费,为了解决这个问题,在传输层可以利用 TCP 的 KeepAlive 机制实现来实现。主流的操作系统基本都在内核里支持了这个特性。

TCP KeepAlive 的基本原理是,隔一段时间给连接对端发送一个探测包,如果收到对方回应的 ACK,则认为连接还是存活的,在超过一定重试次数之后还是没有收到对方的回应,则丢弃该 TCP 连接。

TCP-Keepalive-HOWTO 有对 TCP KeepAlive 特性的详细介绍,有兴趣的同学可以参考。这里主要说一下,TCP KeepAlive 的局限。首先 TCP KeepAlive 监测的方式是发送一个 probe 包,会给网络带来额外的流量,另外 TCP KeepAlive 只能在内核层级监测连接的存活与否,而连接的存活不一定代表服务的可用。例如当一个服务器 CPU 进程服务器占用达到 100%,已经卡死不能响应请求了,此时 TCP KeepAlive 依然会认为连接是存活的。因此 TCP KeepAlive 对于应用层程序的价值是相对较小的。需要做连接保活的应用层程序,例如 QQ,往往会在应用层实现自己的心跳功能。

7、如何使用Springboot+Netty搭建基于TCP协议的服务端?

1、Netty结合Springboot快速开发框架搭建服务端程序
java
 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.65.Final</version>
 </dependency>
2、Springboot启动类,Netty启动
java
package boot.netty.base.server;
 
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
//import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
 
@SpringBootApplication
@EnableAsync
public class BootNettyApplication implements CommandLineRunner{
    public static void main( String[] args )
    {
    	/**
    	 * 启动springboot
    	 */
		SpringApplication app = new SpringApplication(BootNettyApplication.class);
		//app.setWebApplicationType(WebApplicationType.NONE);//不启动web服务
		app.run(args);
 
        System.out.println( "Hello World!" );
    }
 
    @Async
	@Override
	public void run(String... args) throws Exception {
		/**
		 * 使用异步注解方式启动netty服务端服务
		 */
		new BootNettyServer().bind(8888);
	}
}
3、Netty的server类
java
@Slf4j
@Component
public class BootNettyServer {

    public void bind(int port) throws Exception {

        /**
         * 配置服务端的NIO线程组
         * NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
         * bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
         * bossGroup接收到连接后就会把连接信息注册到workerGroup
         * workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            /**
             * ServerBootstrap 是一个启动NIO服务的辅助启动类
             */
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            /**
             * 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap
             */
            serverBootstrap = serverBootstrap.group(bossGroup, workerGroup);
            /**
             * ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接
             */
            serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class);
            /**
             * option是设置 bossGroup,childOption是设置workerGroup
             * netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费最小初始化最大 (根据生产环境实际情况来定)
             * 使用对象池,重用缓冲区
             */
            serverBootstrap = serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
            serverBootstrap = serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
            /**
             * 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
             */
            serverBootstrap = serverBootstrap.childHandler(new BootNettyChannelInitializer<SocketChannel>());

            log.info("netty server start success!");
            /**
             * 绑定端口,同步等待成功
             */
            ChannelFuture f = serverBootstrap.bind(port).sync();
            /**
             * 等待服务器监听端口关闭
             */
            f.channel().closeFuture().sync();

        } catch (InterruptedException e) {

        } finally {
            /**
             * 退出,释放线程池资源
             */
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
4、通道初始化
java
/**
 * 通道初始化
 */
public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        //添加编解码,此处代码为解析tcp传过来的参数,为UTF-8格式,可以自定义解码格式
        // ChannelOutboundHandler,依照逆序执行
        ch.pipeline().addLast("encoder", new StringEncoder());

        // 属于ChannelInboundHandler,依照顺序执行
        ch.pipeline().addLast("decoder", new StringDecoder());
        /**
         * 自定义ChannelInboundHandlerAdapter
         */
        ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());

    }

}
5、I/O数据读写处理类
java
@Slf4j
@Component
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{

    private static BootNettyChannelInboundHandlerAdapter nettyServerHandler;
    @PostConstruct
    public void init() {
        nettyServerHandler = this;
    }
    @Autowired
    private GatewayEquipmentMapper gatewayEquipmentMapper;
    /**
     * 从客户端收到新的数据时,这个方法会在收到消息时被调用
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
        try{
            if (msg == null) {
                log.error("======加载客户端报文为空======【" + ctx.channel().id() + "】" + " :" + msg);
            }
            log.info("======加载客户端报文======【" + ctx.channel().id() + "】" + " :" + msg);
            JSONObject jsonObject = JSON.parseObject(msg.toString());
            // 解析数据
            getGateWayData(jsonObject);
            //回应客户端
            ctx.write(ctx.channel().id()+"服务端已成功接收数据!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private void getGateWayData(JSONObject jsonObject){
        Integer battery = null;//网关电池电量
        long time;// 数据生成时间
        String gateWayTime = null; // 时间戳转为日期
        if (jsonObject.getInteger("battery") != null) {
            battery = jsonObject.getInteger("battery");
        }
        // ......
        JSONArray devicesArray = jsonObject.getJSONArray("devices");
        log.info("==========解析网关广播数据:"+devicesArray);
    }
    /**
     * 从客户端收到新的数据、读取完成时调用
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        log.info("channelReadComplete");
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
        log.info("exceptionCaught");
        cause.printStackTrace();
        ctx.close();//抛出异常,断开与客户端的连接
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelActive(ctx);
        ctx.channel().read();
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
        log.info("channelActive:"+clientIp+ctx.name());
    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelInactive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
        log.info("channelInactive:"+clientIp);
    }

    /**
     * 服务端当read超时, 会调用这个方法
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
        super.userEventTriggered(ctx, evt);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close();//超时时断开连接
        log.info("userEventTriggered:"+clientIp);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception{
        log.info("channelRegistered");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception{
        //当一个客户端断开连接或一个服务器完成其服务并关闭连接时
        log.info("channelUnregistered");
    }
    /**
     * 流量控制是一种拥塞控制机制,
     * 用于防止发送方过多地发送数据,从而导致接收方过载。
     * 当通道变得不可写时,这可能意味着接收方的缓冲区已满,
     * 因此发送方需要停止发送数据,等待通道变得可写时再继续发送
     * @throws Exception
     */
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception{
        // 通道变得可写,可以继续发送数据
        log.info("Channel is writable, resuming data transmission.");
    }
    /**
     * 广播数据十六进制,每两个字符为一组,
     *如:09 57 79 30 30 30 30 30 30 30 30 30 30 30 41 30 30 35 30 30
     * 再转为对应ASCII码,即可得到明文的uuid
     * @param uuid
     */
    public String hexToASCII(String uuid) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < uuid.length(); i += 2) {
            String hex = uuid.substring(i, i + 2);
            int decimal = Integer.parseInt(hex, 16);
            char asciiChar = (char) decimal;
            sb.append(asciiChar);
        }
        String result = sb.toString();
        return result;
    }
    /**
     * 将时间戳转换为yyyy-MM-dd HH:mm:ss格式的日期字符串。
     * @param timestamp 时间戳(以毫秒为单位)
     * @return 格式化后的日期字符串
     */
    public String timestampToDate(long timestamp) {
        Instant instant = Instant.ofEpochSecond(timestamp); // 将时间戳转换为Instant对象
        LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); // 将Instant对象转换为本地日期时间对象
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 定义日期格式
        // 将日期时间对象格式化为字符串
        String time = dateTime.format(formatter);
        return time;
    }
    /**
     * 将LocalDateTime转换为yyyy-MM-dd HH:mm:ss格式的日期字符串。
     * @return 格式化后的日期字符串
     */
    public String LocalDateTimeToStr() {
        // 获取当前日期时间
        LocalDateTime now = LocalDateTime.now();
        // 定义日期时间的格式
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        // 将LocalDateTime格式化为字符串
        String formattedDateTime = now.format(formatter);
        return formattedDateTime;
    }
}

仅仅四个类就将Springboot和Netty结合,启动Springboot应用的同时也就启动了Netty,端口:8888

可以使用TCP客户端工具:

img

二、MQTT协议详解及应用

  • mqtt协议

  • 1 MQTT协议特点

    • 发布和订阅
    • QoS(Quality of Service levels)
  • 2 MQTT 协议数据包结构

    • MQTT固定头
    • MQTT可变头 / Variable header
    • Payload消息体
  • 3 环境搭建

    • MQTT服务器搭建
    • MQTT Client
  • 4 整合SpringBoot进行物联网企业级开发

mqtt协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务

作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

1、MQTT协议特点?

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。

MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。

其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

MQTT协议当前版本为,2014年发布的MQTT v3.1.1。除标准版外,还有一个简化版MQTT-SN,该协议主要针对嵌入式设备,这些设备一般工作于TCP/IP网络,如:ZigBee。

MQTT 与 HTTP 一样,MQTT 运行在传输控制协议/互联网协议 (TCP/IP) 堆栈之上。

image-20230825084352982

2、什么是发布和订阅?

MQTT使用的发布/订阅消息模式,它提供了一对多的消息分发机制,从而实现与应用程序的解耦。

这是一种消息传递模式,消息不是直接从发送器发送到接收器(即点对点),而是由MQTT server(或称为 MQTT Broker)分发的。

img

3、MQTT 服务器是发布-订阅架构的核心

它可以非常简单地在Raspberry Pi或NAS等单板计算机上实现,当然也可以在大型机或 Internet 服务器上实现。

服务器分发消息,因此必须是发布者,但绝不是订阅者!

客户端可以发布消息(发送方)、订阅消息(接收方)或两者兼而有之。

客户端(也称为节点)是一种智能设备,如微控制器或具有 TCP/IP 堆栈和实现 MQTT 协议的软件的计算机。

消息在允许过滤的主题下发布。主题是分层划分的 UTF-8 字符串。不同的主题级别用斜杠/作为分隔符号。

我们来看看下面的设置。

  • 光伏发电站是发布者(Publisher)。
  • 主要主题(Topic)级别是"PV",这个工厂发布两个子级别"sunshine""data"
  • "PV/sunshine"是一个布尔值(true/fault,也可以是 1/0),充电站需要它来知道是否应该装载电动汽车(仅在阳光普照时 😃)。
  • 充电站(EVSE)是订阅者,订阅"PV/sunshine"从服务器获取信息。
  • "PV/data" 另一方面,以 kW 为单位传输工厂产生的瞬时功率,并且该主题可以例如通过计算机或平板电脑订阅,以生成一天内传输功率的图表。

这就是一个简单的MQTT的应用场景,具体如下图所示;

image-20230825090515693

4、QoS(Quality of Service levels)是什么?

服务质量是 MQTT 的一个重要特性。当我们使用 TCP/IP 时,连接已经在一定程度上受到保护。但是在无线网络中,中断和干扰很频繁,MQTT 在这里帮助避免信息丢失及其服务质量水平。这些级别在发布时使用。如果客户端发布到 MQTT 服务器,则客户端将是发送者,MQTT 服务器将是接收者。当MQTT服务器向客户端发布消息时,服务器是发送者,客户端是接收者。

很多时候,使用MQTT协议的设备都运行在网络受限的环境下,而只依靠底层的TCP传输协议,并不能完全保证消息的可靠到达。因此,MQTT提供了QoS机制,其核心是设计了多种消息交互机制来提供不同的服务质量,来满足用户在各种场景下对消息可靠性的要求。

MQTT定义了三个QoS等级,分别为:

  • QoS 0,最多交付一次。
  • QoS 1,至少交付一次。
  • QoS 2,只交付一次。
1、QoS 0-最多交付一次

QoS 0是最低的QoS等级。QoS 0消息即发即弃,不需要等待确认,不需要存储和重传,因此对于接收方来说,永远都不需要担心收到重复的消息。

image-20230825090919852

2、为什么QoS 0消息会丢失?

当我们使用QoS 0传递消息时,消息的可靠性完全依赖于底层的TCP协议。

而TCP只能保证在连接稳定不关闭的情况下消息的可靠到达,一旦出现连接关闭、重置,仍有可能丢失当前处于网络链路或操作系统底层缓冲区中的消息。这也是QoS 0消息最主要的丢失场景。

3、QoS 1-至少交付一次

为了保证消息到达,QoS 1加入了应答与重传机制,发送方只有在收到接收方的PUBACK报文以后,才能认为消息投递成功,在此之前,发送方需要存储该PUBLISH报文以便下次重传。

QoS 1需要在PUBLISH报文中设置Packet ID,而作为响应的PUBACK报文,则会使用与PUBLISH报文相同的Packet ID,以便发送方收到后删除正确的PUBLISH报文缓存。

image-20230825091410865

4、为什么QoS 1消息会重复?

对于发送方来说,没收到PUBACK报文分为以下两种情况:

  • PUBLISH未到达接收方
  • PUBLISH已经到达接收方,接收方的PUBACK报文还未到达发送方

在第一种情况下,发送方虽然重传了PUBLISH报文,但是对于接收方来说,实际上仍然仅收到了一次消息。

但是在第二种情况下,在发送方重传时,接收方已经收到过了这个PUBLISH报文,这就导致接收方将收到重复的消息。

image-20230825091546751

虽然重传时PUBLISH报文中的DUP标志会被设置为1,用以表示这是一个重传的报文。但是接收方并不能因此假定自己曾经接收过这个消息,仍然需要将其视作一个全新的消息。

这是因为对于接收方来说,可能存在以下两种情况:

image-20230825091649030

第一种情况,发送方由于没有收到PUBACK报文而重传了PUBLISH报文。此时,接收方收到的前后两个PUBLISH报文使用了相同的Packet ID,并且第二个PUBLISH报文的DUP标志为1,此时它确实是一个重复的消息。

第二种情况,第一个PUBLISH报文已经完成了投递,1024这个Packet ID重新变为可用状态。发送方使用这个Packet ID发送了一个全新的PUBLISH报文,但这一次报文未能到达对端,所以发送方后续重传了这个PUBLISH报文。这就使得虽然接收方收到的第二个PUBLISH报文同样是相同的Packet ID,并且DUP为1,但确实是一个全新的消息。

由于我们无法区分这两种情况,所以只能让接收方将这些PUBLISH报文都当作全新的消息来处理。因此当我们使用QoS 1时,消息的重复在协议层面上是无法避免的。

甚至在比较极端的情况下,例如Broker从发布方收到了重复的PUBLISH报文,而在将这些报文转发给订阅方的过程中,再次发生重传,这将导致订阅方最终收到更多的重复消息。

在下图表示的例子中,虽然发布者的本意只是发布一条消息,但对接收方来说,最终却收到了三条相同的消息:

image-20230825091832917

以上,就是QoS 1保证消息到达带来的副作用。

5、QoS 2-只交付一次

QoS 2解决了QoS 0、1消息可能丢失或者重复的问题,但相应地,它也带来了最复杂的交互流程和最高的开销。每一次的QoS 2消息投递,都要求发送方与接收方进行至少两次请求/响应流程。

image-20230825092044201

  1. 首先,发送方存储并发送QoS为2的PUBLISH报文以启动一次QoS 2消息的传输,然后等待接收方回复PUBREC报文。这一部分与QoS 1基本一致,只是响应报文从PUBACK变成了PUBREC。
  2. 当发送方收到PUBREC报文,即可确认对端已经收到了PUBLISH报文,发送方将不再需要重传这个报文,并且也不能再重传这个报文。所以此时发送方可以删除本地存储的PUBLISH报文,然后发送一个PUBREL报文,通知对端自己准备将本次使用的Packet ID标记为可用了。与PUBLISH报文一样,我们需要确保PUBREL报文到达对端,所以也需要一个响应报文,并且这个PUBREL报文需要被存储下来以便后续重传。
  3. 当接收方收到PUBREL报文,也可以确认在这一次的传输流程中不会再有重传的PUBLISH报文到达,因此回复PUBCOMP报文表示自己也准备好将当前的Packet ID用于新的消息了。
  4. 当发送方收到PUBCOMP报文,这一次的QoS 2消息传输就算正式完成了。在这之后,发送方可以再次使用当前的Packet ID发送新的消息,而接收方再次收到使用这个Packet ID的PUBLISH报文时,也会将它视为一个全新的消息。
6、为什么QoS 2消息不会重复?

QoS 2消息保证不会丢失的逻辑与QoS 1相同,所以这里我们就不再重复了。

与QoS 1相比,QoS 2新增了PUBREL报文和PUBCOMP报文的流程,也正是这个新增的流程带来了消息不会重复的保证。

在我们更进一步之前,我们先快速回顾一下QoS 1消息无法避免重复的原因。

当我们使用QoS 1消息时,对接收方来说,回复完PUBACK这个响应报文以后Packet ID就重新可用了,也不管响应是否确实已经到达了发送方。所以就无法得知之后到达的,携带了相同Packet ID的PUBLISH报文,到底是发送方因为没有收到响应而重传的,还是发送方因为收到了响应所以重新使用了这个Packet ID发送了一个全新的消息。

image-20230825092611544

所以,消息去重的关键就在于,通信双方如何正确地同步释放Packet ID,换句话说,不管发送方是重传消息还是发布新消息,一定是和对端达成共识了的。

而QoS 2中增加的PUBREL流程,正是提供了帮助通信双方协商Packet ID何时可以重用的能力。

image-20230825092846122

QoS 2规定,发送方只有在收到PUBREC报文之前可以重传PUBLISH报文。一旦收到PUBREC报文并发出PUBREL报文,发送方就进入了Packet ID释放流程,不可以再使用当前Packet ID重传PUBLISH报文。同时,在收到对端回复的PUBCOMP报文确认双方都完成Packet ID释放之前,也不可以使用当前Packet ID发送新的消息。

image-20230825092937368

因此,对于接收方来说,能够以PUBREL报文为界限,凡是在PUBREL报文之前到达的PUBLISH报文,都必然是重复的消息;而凡是在PUBREL报文之后到达的PUBLISH报文,都必然是全新的消息。

一旦有了这个前提,我们就能够在协议层面完成QoS 2消息的去重。

7、不同QoS的适用场景和注意事项

QoS 0

QoS 0的缺点是可能会丢失消息,消息丢失的频率依赖于你所处的网络环境,并且可能使你错过断开连接期间的消息,不过优点是投递的效率较高。

所以我们通常选择使用QoS 0传输一些高频且不那么重要的数据,比如传感器数据,周期性更新,即使遗漏几个周期的数据也可以接受。

QoS 1

QoS 1可以保证消息到达,所以适合传输一些较为重要的数据,比如下达关键指令、更新重要的有实时性要求的状态等。

但因为QoS 1还可能会导致消息重复,所以当我们选择使用QoS 1时,还需要能够处理消息的重复,或者能够允许消息的重复。

在我们决定使用QoS 1并且不对其进行去重处理之前,我们需要先了解,允许消息的重复,可能意味着什么。

如果我们不对QoS 1进行去重处理,我们可能会遭遇这种情况,发布方以1、2的顺序发布消息,但最终订阅方接收到的消息顺序可能是1、2、1、2。如果1表示开灯指令,2表示关灯指令,我想大部分用户都不会接受自己仅仅进行了开灯然后关灯的操作,结果灯在开和关的状态来回变化。

QoS 2

QoS 2既可以保证消息到达,也可以保证消息不会重复,但传输成本最高。如果我们不愿意自行实现去重方案,并且能够接受QoS 2带来的额外开销,那么QoS 2将是一个合适的选择。通常我们会在金融、航空等行业场景下会更多地见到QoS 2的使用。

不同QoS的性能有差距么?

以EMQX为例,在相同的硬件配置下进行点对点通信,通常QoS 0与QoS 1能够达到的吞吐比较接近,不过QoS 1的CPU占用会略高于QoS 0,负载较高时,QoS 1的消息延迟也会进一步增加。而QoS 2能够达到的吞吐一般仅为QoS 0、1的一半左右。

5、MQTT协议数据包结构

在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

  1. 固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
  2. 可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
  3. 消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

整体MQTT的消息格式如下图所示;

image-20230825094253984
1、固定头

固定头存在于所有MQTT数据包中,其结构如下:

image-20230825094431564

固定报头的第一个字节分为控制报文的类型(4bit),以及控制报文类型的标志位,控制类型共有14种,其中0与15被系统保留出来,其他的类型具体见:

img

固定报头的bit0-bit3为标志位,依照报文类型有不同的含义,事实上,除了PUBLISH类型报文以外,其他报文的标志位均为系统保留,PUBLISH报文的第一字节bit3是控制报文的重复分发标志(DUP),bit1-bit2是服务质量等级,bit0是PUBLISH报文的保留标志,用于标识PUBLISH是否保留,当客户端发送一个PUBLISH消息到服务器,如果保留标识位置1,那么服务器应该保留这条消息,当一个新的订阅者订阅这个主题的时候,最后保留的主题消息应被发送到新订阅的用户。

固定报头的第二个字节开始是剩余长度字段,是用于记录剩余报文长度的,表示当前的消息剩余的字节数,包括可变报头和有效载荷区域(如果存在),但剩余长度不包括用于编码剩余长度字段本身的字节数。

剩余长度字段使用一个变长度编码方案,对小于128的值它使用单字节编码,而对于更大的数值则按下面的方式处理:每个字节的低7位用于编码数据长度,最高位(bit7)用于标识剩余长度字段是否有更多的字节,且按照大端模式进行编码,因此每个字节可以编码128个数值和一个延续位,剩余长度字段最大可拥有4个字节。

  • 当剩余长度使用1个字节存储时,其取值范围为0(0x00)~127(0x7f)。
  • 当使用2个字节时,其取值范围为128(0x80,0x01)~16383(0Xff,0x7f)。
  • 当使用3个字节时,其取值范围为16384(0x80,0x80,0x01)~2097151(0xFF,0xFF,0x7F)。
  • 当使用4个字节时,其取值范围为2097152(0x80,0x80,0x80,0x01)~268435455(0xFF,0xFF,0xFF,0x7F)。

总的来说,MQTT报文理论上可以发送最大256M的报文,当然,这种情况是非常少的。

固定头存在于所有MQTT数据包中,下面简单分析一下固定头的消息格式:

1、MQTT消息类型 (message type)

位置:Byte1 中 bits 7-4 位。

相于一个4位的无符号值,类型、取值及描述如下:

img

2、标识位 (DUP)

位置:Byte1 中 bits 3-0位。

在不使用标识位的消息类型中,标识位被作为保留位。如果收到无效的标志时,接收端必须关闭网络连接:

img

(1)DUP:发布消息的副本。用来在保证消息的可靠传输,如果设置为1,则在下面的变长中增加MessageId,并且需要回复确认,以保证消息传输完成,但不能用于检测消息重复发送。

(2)QoS:发布消息的服务质量,即:保证消息传递的次数

(3)RETAIN: 发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。

2、剩余长度(Remaining Length)

地址:Byte 2。

固定头的第二字节用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为1时,表示长度不足,需要使用二个字节继续保存。例如:计算出后面的大小为0

3、MQTT可变头(Variable header)

MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是作为包的标识:

image-20230825100035847

很多类型数据包中都包括一个2字节的数据包标识字段,这些类型的包有:

PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。

只有某些报文才拥有可变报头,它在固定报头和有效负载之间,可变报头的内容会根据报文类型的不同而有所不同,但可变报头的报文标识符(Packet Identifier)字段存在于在多个类型的报文里,而有一些报文又没有报文标识符字段,具体见表格,报文标识符结构具体见图:

image-20240730191010857

4、Payload消息体

Payload消息体位MQTT数据包的第三部分,包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息:

  1. CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码。
  2. SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。
  3. SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。
  4. UNSUBSCRIBE,消息体内容是要订阅的主题。

3、如何使用SpringBoot整合MQTT进行物联网企业级开发?

1、引入POM依赖
xml
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
2、配置文件
java
#MQTT配置信息
mqtt:
  # 设备连接
  channel:
    username: admin 					#账号
    password: xiaoxueboke				#密码
    host: tcp://{ip}:1833		#mqtt连接tcp地址
    clientid: xx-service-dev202308	#客户端Id,不能相同
    timeout: 1000						# 超时时间
    keepalive: 10						# 保持连接
    cleanSession: false					# 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息)
3、MQTT客户端配置
java
package com.xx.reservoir.mqtt;


import lombok.Data;

/**
 * @Author: xueqimiao
 * @Date: 2024/7/24 14:37
 * @Description: MQTT客户端配置
 */
@Data
public class MqttConfiguration {

    private String host;
    private String clientId;
    private String userName;
    private String password;
    private String topic;
    private int timeout;
    private int keepAlive;
    private boolean cleanSession;
}
4、MQTT配置
java
package com.xx.reservoir.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;


/**
 * @Author: xueqimiao
 * @Date: 2024/7/24 14:37
 * @Description: MQTT配置
 */
@Slf4j
@Component
@Configuration
@ConfigurationProperties(MqttConfiguration.PREFIX)
public class MqttConfiguration1 extends MqttConfiguration {

    //指定配置文件application-local.properties中的属性名前缀
    public static final String PREFIX = "mqtt.channel";

}
5、MQTT消息回调
java
package com.xx.reservoir.mqtt;


import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


/**
 * @Author: xueqimiao
 * @Date: 2024/7/24 14:37
 * @Description: MQTT客户端回调
 */
@Slf4j
@Component
public class MqttClientCallback implements MqttCallback {

    /**
     * 系统的mqtt客户端id
     */
    private static String mqttClientId;

    public static MqttClientCallback myMQTTCallback;

    public MqttClientConnection client;


    @Autowired
    private MqttConfiguration1 mqttConfiguration;


    @PostConstruct
    private void init() {
        myMQTTCallback = this;
        myMQTTCallback.mqttConfiguration1 = this.mqttConfiguration1;
    }


    public MqttClientCallback() {
    }

    public MqttClientCallback(String mqttClientId) {
        this.mqttClientId = mqttClientId;
    }

    /**
     * MQTT 断开连接会执行此方法
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("断开了MQTT连接", throwable);
    }

    /**
     * publish发布成功后会执行到这里
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("发布消息成功");
    }

    /**
     * subscribe订阅后得到的消息会执行到这里
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        //  TODO    此处可以将订阅得到的消息进行业务处理、数据存储
        String payload = new String(mqttMessage.getPayload());
        try {
            if (topic.contains("xxbk/")) {
                log.info("接收到主题" + topic + " 消息:" + payload);
            }

        } catch (Exception e) {
            e.printStackTrace();
            log.error("消息处理失败:" + e);
        }
    }
}
6、MQTT客户端连接
java
package com.xx.reservoir.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author: xueqimiao
 * @Date: 2024/7/24 14:37
 * @Description: MQTT客户端连接
 */
@Slf4j
@Component
public class MqttClientConnection {

    private MqttClient mqttClient;

    /**
     * 系统的mqtt客户端id
     */
    private String mqttClientId;


    /**
     * 客户端
     */
    public static ConcurrentHashMap<String, MqttClientConnection> mqttClients = new ConcurrentHashMap();

    /**
     * 客户端connect连接mqtt服务器
     *
     * @param userName     用户名
     * @param passWord     密码
     * @param mqttCallback 回调函数
     **/
    public MqttClient setMqttClient(String host, String clientId, String userName, String passWord, boolean cleanSession, MqttCallback mqttCallback) throws MqttException {
        MqttConnectOptions options = mqttConnectOptions(host, clientId, userName, passWord, cleanSession);
        if (mqttCallback == null) {
            mqttClient.setCallback(new MqttClientCallback(mqttClientId));
        } else {
            mqttClient.setCallback(mqttCallback);
        }
        mqttClient.connect(options);
        return mqttClient;
    }

    /**
     * MQTT连接参数设置
     */
    private MqttConnectOptions mqttConnectOptions(String host, String clientId, String userName, String passWord, boolean cleanSession) throws MqttException {
        mqttClient = new MqttClient(host, clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(60);///默认:30
        options.setAutomaticReconnect(true);//默认:false
        options.setCleanSession(cleanSession);//默认:true
        options.setKeepAliveInterval(60);
        return options;
    }

    /**
     * 关闭MQTT连接
     */
    public void close() throws MqttException {
        mqttClient.close();
        mqttClient.disconnect();
    }

    /**
     * 向某个主题发布消息 默认qos:1
     *
     * @param topic:发布的主题
     * @param msg:发布的消息
     */
    public void pub(String topic, String msg) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        //mqttMessage.setQos(2);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 向某个主题发布消息
     *
     * @param topic: 发布的主题
     * @param msg:   发布的消息
     * @param qos:   消息质量    Qos:0、1、2
     */
    public void pub(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 订阅多个主题 ,此方法默认的的Qos等级为:1
     *
     * @param topic 主题
     */
    public void sub(String[] topic) throws MqttException {
        mqttClient.subscribe(topic);
    }

    /**
     * 订阅某一个主题,可携带Qos
     *
     * @param topic 所要订阅的主题
     * @param qos   消息质量:0、1、2
     */
    public void sub(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
    }

    public String getMqttClientId() {
        return mqttClientId;
    }

    public void setMqttClientId(String mqttClientId) {
        this.mqttClientId = mqttClientId;
    }

    public MqttClient getMqttClient() {
        return mqttClient;
    }


    /**
     * 发布消息
     *
     * @param pushMessage
     * @param topic
     * @param qos
     * @param retained:留存
     */
    public void publish(String pushMessage, String topic, int qos, boolean retained, String ClientId) {
        MqttClientConnection mqttClient = MqttClientConnection.mqttClients.get(ClientId);
        MqttClient client = mqttClient.getMqttClient();
        MqttMessage message = new MqttMessage();
        message.setPayload(pushMessage.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        MqttTopic mqttTopic = client.getTopic(topic);
        if (null == mqttTopic) {
            log.error("主题不存在");
        }
        MqttDeliveryToken token;//Delivery:配送
        synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁。
            try {
                token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
                //token.waitForCompletion(1000L);
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

}
7、启动连接并订阅主题
java
package com.xx.reservoir.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicBoolean;

/**
 * @Author: xueqimiao
 * @Date: 2024/7/24 14:37
 * @Description: 启动连接并订阅主题
 */
@Slf4j
@Component
public class MqttClientListener implements ApplicationListener<ContextRefreshedEvent> {

    @Autowired
    private MqttConfiguration mqttConfiguration;

    private static String topic = "xxbk/#";

    private volatile AtomicBoolean isInit = new AtomicBoolean(false);


    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        //防止重复触发
        if (!isInit.compareAndSet(false, true)) {
            return;
        }
        try {
            MqttClientConnection mqttClientConnect = new MqttClientConnection();
            mqttClientConnect.setMqttClientId(mqttConfiguration.getClientId());
            mqttClientConnect.setMqttClient(mqttConfiguration.getHost(), mqttConfiguration.getClientId(), mqttConfiguration1.getUserName(), mqttConfiguration1.getPassword(), false, new MqttClientCallback(mqttConfiguration1.getClientId()));
            MqttClientConnection.mqttClients.put(mqttConfiguration.getClientId(), mqttClientConnect);
            this.subscribe(topic, 1, mqttConfiguration.getClientId());
            this.cleanTopic("#", mqttConfiguration.getClientId());
            log.info("{}--连接成功!!订阅主题--{}", mqttConfiguration.getClientId(), mqttConfiguration.getTopic(), mqttConfiguration1.getKeepAlive(), mqttConfiguration.getTimeout());
        } catch (Exception e) {
            log.error("连接设备mqtt失败", e);
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos, String ClientId) {
        MqttClientConnection mqttClientConnection = MqttClientConnection.mqttClients.get(ClientId);
        MqttClient client = mqttClientConnection.getMqttClient();
        log.info("====订阅" + topic + "主题===");
        try {
            client.subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }


    /**
     * 取消订阅主题
     *
     * @param topic 主题名称
     */
    public void cleanTopic(String topic, String ClientId) {
        MqttClientConnection mqttClientConnection = MqttClientConnection.mqttClients.get(ClientId);
        MqttClient client = mqttClientConnection.getMqttClient();
        if (client != null && client.isConnected()) {
            try {
                client.unsubscribe(topic);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        } else {
            log.error("取消订阅失败!");
        }
    }

}

物联网面试题详解:

一、什么是物联网?

物联网即“万物相连的互联网”,是互联网基础上的延伸和扩展的网络,将各种信息传感设备与互联网相结合而形成的一个巨大网络,实现在任何时间、任何地点,人、机、物的互联互通。

物联网是新一代信息技术的重要组成部分,IT行业又叫:泛互联,意指物物相连,万物万联。由此,“物联网就是物物相连的互联网”。

这有两层意思:

第一,物联网的核心和基础仍然是互联网,是在互联网基础上的延伸和扩展的网络;

第二,其用户端延伸和扩展到了任何物品与物品之间,进行信息交换和通信。

二、物联网产品架构?

对于物联网,一般可分为四层,感知层、网络层、平台层、应用层

1)感知层

通过传感技术,采集物理世界的数据。

包含RFID(射频识别技术),如高速公路的我们车上的ETC和我们手机常用的NFC;

还有传感器,比如我们监测行业常用裂缝传感器,监测裂缝的大小,如雨量传感器,监测降雨量。

2)网络层

作为传输数据的管道,把物与互联网联通,实现人、机、物互通。

网络层,实现数据从感知层传输至平台层,分为物接入互联网、互联网传输两部分。

物接入物联网:以太网/光纤、串口通讯、ZigBee、WIFI、Bluetooth(蓝牙)、LORA、NB-LOT、4G/5G。

互联网传输:主流的协议是MQTT、CoAP。

3)平台层

数据接入平台后,对数据进行解析、分析、处理后,提供丰富的服务与功能。

平台层其实指的就是物联网平台,它作为承接设备与行业的中间服务,承载了抽象化的业务逻辑及标准化的核心数据模型,实现设备的快速接入,同时提供强大的模块化能力,支撑行业应用场景下的各类需求。

4)应用层

物联网的数据最终会应用到各类行业,如地质灾害监测、智能家居、智慧城市等。

三、你们公司有自己的物联网平台吗?具体介绍一下吧

这个是面试物联网相关企业一定会问的问题,因为物联网作为数据的中台,上接受着感知层设备的数据,下对接着应用层各个业务平台,承载着至关重要的作用。

有自己的物联网平台,但是期初是没有的,因为当时公司业务规模还不大,涉及的业务类型项目、管理的物联网设备还不多,因此公司初期也是直接设备对接到业务平台。

但是随着企业的业务规模持续发展,也遇到了以下痛点:

① 设备数据与业务平台耦合太深,不利于数据的统一管理和分发。

② 设备种类繁杂、数量众多,难以统一接入和管理。

③ 信息滞后,设备产生问题后,一般都是业主或者相关负责人被动的告知,导致信息相对比较滞后,造成数据的中断。

④ 运维成本高,有些设备比偏远,导致较高的人力、物力、财力的投入。

基于以上原因,我们从0到1开发了物联网智能远程管理平台。

物联网远程管理平台主要功能有:

① 首页;总览大屏,统计设备在地图上的分布,产品、设备、在线、离线、在线率情况,常用功能等信息,主要满足用户对平台信息总览的需求。

② 设备接入与管理;主要有产品管理、设备管理、固件管理、物联网卡管理、指令管理等功能,主要满足用户对产品的定义、数据的解析、设备的远程管理和操作、历史数据查看、数据过滤等需求。

③ 数据管理;主要有数据分组、数据推送、规则引擎、数据分析,主要满足用户对数据的分析、分发、联动的需求。

④ 运维管理;设备预警、设备任务。主要满足运维人员接受设备预警、批量查看和处理预警信息的需求。

四、怎么理解物模型?

传统的物联网业务开发包括终端设备研发、设备与云端联调、基于设备和云端进行应用开发三个步骤。

如图1所示。三个业务开发步骤是串行的,且每一步都需要一定的资源投入和开发周期,从而导致物联网业务开发周期冗长,资源投入大。

传统的物联业务开发流程:

img

基于物模型,可将终端设备实体进行数字化描述,在云端实现设备虚拟化。基于云端虚拟设备可以直接进行物联网的应用开发,终端设备的研发也可以同步进行。

如图2所示。这样使得原本的串行研发流程变为并行的研发流程,缩短研发周期,节省人力和资源成本。

基于物模型的物联网业务开发流程:

img

物联网中的物模型,可以通俗地解释为物联网平台中用来描述和表示物理世界中物体或设备的数字化“模板”或“模型”。

这个模型将物理实体的各种属性和功能转化为数字世界中的信息和操作,使得这些实体能够在物联网平台上进行交互、通信和管理。

具体来说,物模型就像是为物理世界中的物体或设备在数字世界中创建的一个“身份证”或“说明书”。它包含了物体的基本信息(如名称、类型、位置等)、属性(如温度、湿度、电量等)、服务(如控制开关、调节亮度等)以及事件(如传感器检测到异常时触发的报警等)。

通过这些信息,物联网平台可以了解物体的状态,控制物体的行为,并与其他物体或系统进行交互。

在物联网应用中,物模型是连接物理世界和数字世界的桥梁。通过定义物模型,我们可以将各种不同类型的物体或设备集成到物联网平台中,实现智能化管理和控制。

引入物模型,将对物联网业务带来巨大的价值。

一方面,物模型将实体设备在云端数字化为服务或资源,为垂直行业IoT应用开发提供统一的访问接口、开发工具、增值服务,实现业务应用的高效复制。

另一方面,物模型统一化、数字化地描述了实体设备是什么、能做什么,将设备和应用解耦,实现信息在设备和平台间的横向流动,消除产业链间的服务壁垒。

img

设备的状态和档案信息(基础信息)就不多说了,主要说一下功能定义。

我们在设计物模型的时,也会对应,有传感器物模型、属性物模型、服务物模型和事件物模型。

传感器物模型,设备下会挂载多个传感器,传感器是数据的载体,在平台当中,我们需要定义传感器物模型的数据格式,只要传感器的数据和传感器物模型的格式对应,即可上传对应数据,否则上传失败。

属性物模型,我们也分为基础属性、参数集属性、能力集属性,基础属性,比如运行时间、CPU、温湿度、经纬度等等;能力集属性,比如是否支持北斗、北斗卡号等;参数集属性,比如采集频率。

服务物模型,我们可以指定设备输入什么参数,返回什么参数。

事件物模型,可配置设备相关阈值或者定义故障类型,设备数据超过阈值或发生故障自动上报,这种行为属于设备侧的报警,边缘计算,和我们常见的平台设置阈值,数据上传过来之后和阈值比较是不同的触发逻辑。

其实总结一句话,因为设备参数各种各样,引入物模型,让设备的集成更加灵活可配,无需设备增加相关功能和字段,使得平台也需要跟着开发新增新的功能和字段。

五、数据是怎么从设备流转到业务平台的?

主要分为五步:

① 设备数据采集:物联网设备通过内置的传感器等硬件,实时采集各种数据,如温度、湿度、压力、位置等。

② 数据预处理:采集到的原始数据可能包含噪声、冗余或格式不统一等问题,需要通过预处理技术(如数据清洗、格式转换等)将其转换为标准、可用的数据格式。

③ 数据传输:经过预处理的数据通过物联网通信协议(如MQTT、CoAP等)传输到物联网平台。在这一过程中,数据可能会经过边缘计算设备或网关的初步处理和分析。

④ 数据存储与处理:物联网平台接收到的数据会被存储在云端数据库中,并可以通过规则引擎等工具进行实时处理和分析。

⑤ 数据流转到业务平台:经过处理和分析的数据可以通过API接口、消息队列(如AMQP)等方式流转到业务平台。业务平台可以根据这些数据进行业务逻辑的处理和展示,如实时监控、预警通知、数据分析等。

六、平台接入不同厂商设备有哪几种方式,如何完成对接?

设备接入,主要有两种接入方式:

  1. 是满足物联网协议,直接对接物联网平台;
  2. 另外一种是不满足,开发一个中间件(设备数据过来后先过这个服务,将其转换成物联网或者用户所接受的格式),来满足接入物联网平台。

比如我们公司,开发物联网平台,并且拥有自己的物联网协议,对于我们自有产品,我们有很大的话语权,会要求设备厂商按照我们的物联网协议去改造设备,来满足物联网平台的对接。

有时候对于项目上不是自己公司的设备或设备厂商不愿意改造,这个时候就需要自己公司开发中间件来完成设备的对接。

七、设备主要的通讯协议有哪些?有什么区别?

目前有两种主流的通讯协议:MQTT、CoAP。

  1. MQTT(消息队列遥测传输):是一个基于客户端-服务器的消息发布/订阅传输协议,可保持长连接,实现多对多异步通信;
  2. CoAP(受限应用协议):是一种客户端-服务器单对单的协议,具备轻量低功耗的特点。

MQTT由于其轻量级、可靠性和安全性,特别适用于低带宽和不可靠的网络环境。经常用于远程监控、家庭自动化、传感器网络、工业控制、车联网等领域有广泛的应用。

CoAP则是一个专为小型设备设计的简化HTTP版本,它具有简单、低功耗的特点,由于它使用UDP协议进行通信,传输效率高,占用带宽小,适合传输小数据量。因此,它在智能家居、低功耗设备等领域有广泛的应用。

八、物联网和工业物联网有什么区别?

​ 工业物联网 ( IIoT ) 通常被定义为 IoT 的一个子集,专门针对工业环境,例如制造业、农业或石油和天然气。然而,一些业内人士将 IoT 和 IIoT 定义为两个独立的工作,IoT 侧重于设备连接的消费者方面。无论哪种情况,IIoT 都完全属于等式的工业方面,并且主要关注使用智能传感器和执行器来增强和自动化工业操作。

​ IIoT也称为 工业 4.0,它使用支持机器对机器 ( M2M ) 技术或认知计算技术(例如 AI、 机器学习 或 深度学习)的智能机器。有些机器甚至结合了这两种技术。智能机器实时捕获和分析数据,并传达可用于推动业务决策的信息。与一般物联网相比,工业物联网往往在兼容性、安全性、弹性和精度等方面有更严格的要求。最终,IIoT 旨在简化操作、改进工作流程、提高生产力并最大限度地提高自动化程度。

九、物联网项目最大痛点是什么?

随着制造业数字化的发展以及物联网应用的普及,越来越非互联网、业务系统的数据被采集、记录和存储。很多系统开发者熟悉的是 Oracle、MySQL等关系型数据库,以及像 Redis 这样的键值数据库,于是在物联网应用、制造业数字化应用中也延续了相似的数据库选型。这样做的好处很直接,产品熟悉,技术难度可控,开发工期可控。但随着系统运行时间的推移,该方案面临的性能挑战越来越大,而且在业务开发中会面临很多相似的查询、统计需求需要实现。

为什么呢?我们再审视一下物联网数据、制造业数据的特点。首先,数据结构相对简单,主要包含三列,时间、标签、值;

第二,数据生成大多具有稳定的节奏,不存在或者与一般互联网应用那样的波峰波谷;

第三,数据很少更新,更多的是一次写,多次查询;第四,数据量极其巨大,同时对于存储成本又很敏感;

第五,数据分析统计中很重要的维度之一是时间;

以上这些特点和我们熟悉的交易类数据有很明显不同。虽然用关系型数据库可以存储和管理,但没有很好的利用对于数据的理解。

如果能选择针对这类数据特点的专用数据库,则会让很多技术难点得到化解,而且会提高系统的稳定性。在数据库行业中,这类产品叫时序数据库。

常见时序数据库

1、InfluxDB

时序数据库 InfluxDB®版是一款专门处理高写入和查询负载的时序数据库,用于存储大规模的时序数据并进行实时分析,包括来自DevOps监控、应用指标和IoT传感器上的数据。目前有以下特点:

  • 专为时间序列数据量身打造的高性能数据存储。TSM引擎提供数据高速读写和压缩等功能。
  • 简单高效的HTTP API写入和查询接口。
  • 针对时序数据,量身打造类似SQL的查询语言,轻松查询聚合数据。
  • 允许对tag建索引,实现快速有效的查询。
  • 数据保留策略(Retention policies)能够有效地使旧数据自动失效。

2、OpenTSDB

OpenTSDB是可扩展的分布式时序数据库,底层依赖HBase。作为基于通用存储开发的时序数据库典型代表,起步比较早,在时序市场的认可度相对较高。

OpenTSDB的自我定位很清晰:The Scalable Time Series Database。

如果应用场景很看中扩展性,可以选择 OpenTSDB,否则就要考虑一下是否需要接受将 HBase 也纳入到技术栈和系统中了。

10、什么是物联网设备上的嵌入式系统?

嵌入式 系统是 为特定目的配置的硬件、软件和固件 的组合 。它本质上是一台可以嵌入机械或电气系统中的小型计算机,例如汽车、工业设备、医疗设备、智能扬声器或数字手表。嵌入式系统可能是可编程的或具有固定功能。

它通常由处理器、内存、电源和通信端口组成,并包括执行操作所需的软件。一些嵌入式系统也可能运行 轻量级操作系统,例如 Linux 的精简版。

嵌入式系统使用通信端口将数据从其处理器传输到外围设备,外围设备可能是网关、中央数据处理平台或其他嵌入式系统。处理器可能是 微处理器 或 微控制器,后者是包括集成存储器和外围接口的微处理器。为了解释收集到的数据,处理器使用存储在内存中的专用软件。

就复杂性和功能而言,嵌入式系统在物联网设备之间可能有很大差异,但它们都提供处理和传输数据的能力。