package ru.ritm.idp.controllers;

import java.util.Iterator;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import ru.ritm.idp.connector.IDPConnector;
import ru.ritm.idp.connector.IDPConnectorException;
import ru.ritm.idp.connector.IDPMessage;
import ru.ritm.idp.connector.db.IDPDbOutConnector;
import ru.ritm.idp.connector.routing.RoutingDestination;
import ru.ritm.idp.connector.routing.RoutingProperties;
import ru.ritm.idp.connector.translation.IDPTranslationTable;
import ru.ritm.idp.entities.PacketDump;
import ru.ritm.idp.entities.RouteTablePK;

/* loaded from: input_file:idpsrv-ejb-2.45.1.jar:ru/ritm/idp/controllers/BaseMessageConsumptionController.class */
public class BaseMessageConsumptionController implements MessageConsumptionController {
    private static final Logger LOGGER = Logger.getLogger("ru.ritm.idp.server.BaseMessageConsumptionController");
    protected final IDPConnector connector;
    protected AtomicBoolean canConsume = new AtomicBoolean(false);
    protected AtomicBoolean afterburning = new AtomicBoolean(false);
    protected final ConsumptionControllerHelperBean helper;
    private static final int MAX_RESULTS = 1000;

    public BaseMessageConsumptionController(ConsumptionControllerHelperBean consumptionControllerHelperBean, IDPConnector iDPConnector) {
        this.helper = consumptionControllerHelperBean;
        this.connector = iDPConnector;
    }

    @Override // ru.ritm.idp.controllers.MessageConsumptionController
    public int consumeMessages(IDPMessage[] iDPMessageArr, RoutingProperties routingProperties, IDPTranslationTable iDPTranslationTable) throws IDPConnectorException {
        if (this.canConsume.get() && this.connector.isStarted()) {
            int consumeMessages = this.connector.consumeMessages(iDPMessageArr, routingProperties, iDPTranslationTable);
            this.canConsume.set(consumeMessages == iDPMessageArr.length);
            if (!this.canConsume.get()) {
                LOGGER.log(Level.INFO, "{0} CAN_CONSUME set to false. Consumed: {1}; length: {2}", new Object[]{toString(), Integer.valueOf(consumeMessages), Integer.valueOf(iDPMessageArr.length)});
            }
            return consumeMessages;
        }
        if (!this.canConsume.get()) {
            LOGGER.log(Level.FINE, "{0} DEBUG consumeMessages canConsume setted to false", new Object[]{toString()});
        }
        if (this.connector.isStarted()) {
            return 0;
        }
        LOGGER.log(Level.FINE, "{0} DEBUG consumeMessages connector does not started", new Object[]{toString()});
        return 0;
    }

    @Override // ru.ritm.idp.controllers.MessageConsumptionController
    public void tryToConsumeFromDb() throws IDPConnectorException {
        if (this.canConsume.get() || this.afterburning.get()) {
            return;
        }
        this.afterburning.set(true);
        try {
            tryToConsumeFromDbInternal();
            this.afterburning.set(false);
            this.canConsume.set(true);
        } catch (Exception e) {
            this.connector.stop();
            throw new IDPConnectorException("can not consume from DB", e);
        }
    }

    protected void tryToConsumeFromDbInternal() {
        processPacketDumps();
    }

    public IDPConnector getConnector() {
        return this.connector;
    }

    @Override // ru.ritm.idp.controllers.MessageConsumptionController
    public void onRestore() {
        try {
            this.canConsume.set(false);
            LOGGER.log(Level.FINE, "{0} CAN_CONSUME set to false. OnRestore.", new Object[]{toString()});
            tryToConsumeFromDb();
        } catch (IDPConnectorException e) {
            LOGGER.log(Level.SEVERE, (String) null, (Throwable) e);
        }
    }

    @Override // ru.ritm.idp.controllers.MessageConsumptionController
    public void onFail() {
        this.canConsume.set(false);
        LOGGER.log(Level.FINE, "{0} CAN_CONSUME set to false. OnFail.", new Object[]{toString()});
    }

    @Override // ru.ritm.idp.controllers.MessageConsumptionController
    public void asyncOnRestore() {
        this.helper.asyncOnRestore(this);
    }

    protected void processPacketDumps() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        while (true) {
            TreeSet<PacketDump> mustConsumeOnRestore = this.helper.mustConsumeOnRestore(this.connector.getId(), concurrentHashMap, 1000);
            int size = mustConsumeOnRestore.size();
            if (size < 1) {
                return;
            }
            LOGGER.log(Level.INFO, "{0}: packets fetched from db: {1}", new Object[]{getConnector(), Integer.valueOf(size)});
            if (!getConnector().isStarted()) {
                LOGGER.log(Level.WARNING, "{0}: getConnector().isStarted() -- FALSE. Give up. ", new Object[]{getConnector(), Integer.valueOf(size)});
                return;
            }
            if ((getConnector() instanceof IDPDbOutConnector) && ((IDPDbOutConnector) getConnector()).isStopping()) {
                return;
            }
            Long l = null;
            try {
                Iterator<PacketDump> it = mustConsumeOnRestore.iterator();
                while (it.hasNext()) {
                    PacketDump next = it.next();
                    LOGGER.log(Level.FINE, "{0}: >>>>>>>>>>>>>> {1}", new Object[]{getConnector(), next});
                    RouteTablePK routeTablePK = next.getRouteTablePK();
                    l = next.getId();
                    RoutingDestination findRoutingDestination = this.helper.findRoutingDestination(routeTablePK.getFromId(), routeTablePK.getToId());
                    if (this.connector.consumeMessages(new IDPMessage[]{new IDPMessage(next, this.helper.findConnector(next.getSession().getConnectorId().getId().intValue()))}, findRoutingDestination.getProperties(), findRoutingDestination.getTranslationTable()) != 1) {
                        return;
                    }
                }
                if (size < 1000) {
                    LOGGER.log(Level.INFO, "{0}: break fetching packets. fetched size: {1} less then MAX_RESULTS: {2}", new Object[]{getConnector(), Integer.valueOf(size), 1000});
                    return;
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            } catch (Exception e2) {
                LOGGER.log(Level.SEVERE, getConnector() + ": can not send message: " + l, (Throwable) e2);
                return;
            }
        }
    }
}
